Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ d9c82a4e

History | View | Annotate | Download (48.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable-msg=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import time
32
import errno
33
import weakref
34
import logging
35
import heapq
36

    
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import compat
40

    
41

    
42
_EXCLUSIVE_TEXT = "exclusive"
43
_SHARED_TEXT = "shared"
44

    
45
_DEFAULT_PRIORITY = 0
46

    
47

    
48
def ssynchronized(mylock, shared=0):
49
  """Shared Synchronization decorator.
50

51
  Calls the function holding the given lock, either in exclusive or shared
52
  mode. It requires the passed lock to be a SharedLock (or support its
53
  semantics).
54

55
  @type mylock: lockable object or string
56
  @param mylock: lock to acquire or class member name of the lock to acquire
57

58
  """
59
  def wrap(fn):
60
    def sync_function(*args, **kwargs):
61
      if isinstance(mylock, basestring):
62
        assert args, "cannot ssynchronize on non-class method: self not found"
63
        # args[0] is "self"
64
        lock = getattr(args[0], mylock)
65
      else:
66
        lock = mylock
67
      lock.acquire(shared=shared)
68
      try:
69
        return fn(*args, **kwargs)
70
      finally:
71
        lock.release()
72
    return sync_function
73
  return wrap
74

    
75

    
76
class RunningTimeout(object):
77
  """Class to calculate remaining timeout when doing several operations.
78

79
  """
80
  __slots__ = [
81
    "_allow_negative",
82
    "_start_time",
83
    "_time_fn",
84
    "_timeout",
85
    ]
86

    
87
  def __init__(self, timeout, allow_negative, _time_fn=time.time):
88
    """Initializes this class.
89

90
    @type timeout: float
91
    @param timeout: Timeout duration
92
    @type allow_negative: bool
93
    @param allow_negative: Whether to return values below zero
94
    @param _time_fn: Time function for unittests
95

96
    """
97
    object.__init__(self)
98

    
99
    if timeout is not None and timeout < 0.0:
100
      raise ValueError("Timeout must not be negative")
101

    
102
    self._timeout = timeout
103
    self._allow_negative = allow_negative
104
    self._time_fn = _time_fn
105

    
106
    self._start_time = None
107

    
108
  def Remaining(self):
109
    """Returns the remaining timeout.
110

111
    """
112
    if self._timeout is None:
113
      return None
114

    
115
    # Get start time on first calculation
116
    if self._start_time is None:
117
      self._start_time = self._time_fn()
118

    
119
    # Calculate remaining time
120
    remaining_timeout = self._start_time + self._timeout - self._time_fn()
121

    
122
    if not self._allow_negative:
123
      # Ensure timeout is always >= 0
124
      return max(0.0, remaining_timeout)
125

    
126
    return remaining_timeout
127

    
128

    
129
class _SingleNotifyPipeConditionWaiter(object):
130
  """Helper class for SingleNotifyPipeCondition
131

132
  """
133
  __slots__ = [
134
    "_fd",
135
    "_poller",
136
    ]
137

    
138
  def __init__(self, poller, fd):
139
    """Constructor for _SingleNotifyPipeConditionWaiter
140

141
    @type poller: select.poll
142
    @param poller: Poller object
143
    @type fd: int
144
    @param fd: File descriptor to wait for
145

146
    """
147
    object.__init__(self)
148
    self._poller = poller
149
    self._fd = fd
150

    
151
  def __call__(self, timeout):
152
    """Wait for something to happen on the pipe.
153

154
    @type timeout: float or None
155
    @param timeout: Timeout for waiting (can be None)
156

157
    """
158
    running_timeout = RunningTimeout(timeout, True)
159

    
160
    while True:
161
      remaining_time = running_timeout.Remaining()
162

    
163
      if remaining_time is not None:
164
        if remaining_time < 0.0:
165
          break
166

    
167
        # Our calculation uses seconds, poll() wants milliseconds
168
        remaining_time *= 1000
169

    
170
      try:
171
        result = self._poller.poll(remaining_time)
172
      except EnvironmentError, err:
173
        if err.errno != errno.EINTR:
174
          raise
175
        result = None
176

    
177
      # Check whether we were notified
178
      if result and result[0][0] == self._fd:
179
        break
180

    
181

    
182
class _BaseCondition(object):
183
  """Base class containing common code for conditions.
184

185
  Some of this code is taken from python's threading module.
186

187
  """
188
  __slots__ = [
189
    "_lock",
190
    "acquire",
191
    "release",
192
    "_is_owned",
193
    "_acquire_restore",
194
    "_release_save",
195
    ]
196

    
197
  def __init__(self, lock):
198
    """Constructor for _BaseCondition.
199

200
    @type lock: threading.Lock
201
    @param lock: condition base lock
202

203
    """
204
    object.__init__(self)
205

    
206
    try:
207
      self._release_save = lock._release_save
208
    except AttributeError:
209
      self._release_save = self._base_release_save
210
    try:
211
      self._acquire_restore = lock._acquire_restore
212
    except AttributeError:
213
      self._acquire_restore = self._base_acquire_restore
214
    try:
215
      self._is_owned = lock._is_owned
216
    except AttributeError:
217
      self._is_owned = self._base_is_owned
218

    
219
    self._lock = lock
220

    
221
    # Export the lock's acquire() and release() methods
222
    self.acquire = lock.acquire
223
    self.release = lock.release
224

    
225
  def _base_is_owned(self):
226
    """Check whether lock is owned by current thread.
227

228
    """
229
    if self._lock.acquire(0):
230
      self._lock.release()
231
      return False
232
    return True
233

    
234
  def _base_release_save(self):
235
    self._lock.release()
236

    
237
  def _base_acquire_restore(self, _):
238
    self._lock.acquire()
239

    
240
  def _check_owned(self):
241
    """Raise an exception if the current thread doesn't own the lock.
242

243
    """
244
    if not self._is_owned():
245
      raise RuntimeError("cannot work with un-aquired lock")
246

    
247

    
248
class SingleNotifyPipeCondition(_BaseCondition):
249
  """Condition which can only be notified once.
250

251
  This condition class uses pipes and poll, internally, to be able to wait for
252
  notification with a timeout, without resorting to polling. It is almost
253
  compatible with Python's threading.Condition, with the following differences:
254
    - notifyAll can only be called once, and no wait can happen after that
255
    - notify is not supported, only notifyAll
256

257
  """
258

    
259
  __slots__ = [
260
    "_poller",
261
    "_read_fd",
262
    "_write_fd",
263
    "_nwaiters",
264
    "_notified",
265
    ]
266

    
267
  _waiter_class = _SingleNotifyPipeConditionWaiter
268

    
269
  def __init__(self, lock):
270
    """Constructor for SingleNotifyPipeCondition
271

272
    """
273
    _BaseCondition.__init__(self, lock)
274
    self._nwaiters = 0
275
    self._notified = False
276
    self._read_fd = None
277
    self._write_fd = None
278
    self._poller = None
279

    
280
  def _check_unnotified(self):
281
    """Throws an exception if already notified.
282

283
    """
284
    if self._notified:
285
      raise RuntimeError("cannot use already notified condition")
286

    
287
  def _Cleanup(self):
288
    """Cleanup open file descriptors, if any.
289

290
    """
291
    if self._read_fd is not None:
292
      os.close(self._read_fd)
293
      self._read_fd = None
294

    
295
    if self._write_fd is not None:
296
      os.close(self._write_fd)
297
      self._write_fd = None
298
    self._poller = None
299

    
300
  def wait(self, timeout=None):
301
    """Wait for a notification.
302

303
    @type timeout: float or None
304
    @param timeout: Waiting timeout (can be None)
305

306
    """
307
    self._check_owned()
308
    self._check_unnotified()
309

    
310
    self._nwaiters += 1
311
    try:
312
      if self._poller is None:
313
        (self._read_fd, self._write_fd) = os.pipe()
314
        self._poller = select.poll()
315
        self._poller.register(self._read_fd, select.POLLHUP)
316

    
317
      wait_fn = self._waiter_class(self._poller, self._read_fd)
318
      state = self._release_save()
319
      try:
320
        # Wait for notification
321
        wait_fn(timeout)
322
      finally:
323
        # Re-acquire lock
324
        self._acquire_restore(state)
325
    finally:
326
      self._nwaiters -= 1
327
      if self._nwaiters == 0:
328
        self._Cleanup()
329

    
330
  def notifyAll(self): # pylint: disable-msg=C0103
331
    """Close the writing side of the pipe to notify all waiters.
332

333
    """
334
    self._check_owned()
335
    self._check_unnotified()
336
    self._notified = True
337
    if self._write_fd is not None:
338
      os.close(self._write_fd)
339
      self._write_fd = None
340

    
341

    
342
class PipeCondition(_BaseCondition):
343
  """Group-only non-polling condition with counters.
344

345
  This condition class uses pipes and poll, internally, to be able to wait for
346
  notification with a timeout, without resorting to polling. It is almost
347
  compatible with Python's threading.Condition, but only supports notifyAll and
348
  non-recursive locks. As an additional features it's able to report whether
349
  there are any waiting threads.
350

351
  """
352
  __slots__ = [
353
    "_waiters",
354
    "_single_condition",
355
    ]
356

    
357
  _single_condition_class = SingleNotifyPipeCondition
358

    
359
  def __init__(self, lock):
360
    """Initializes this class.
361

362
    """
363
    _BaseCondition.__init__(self, lock)
364
    self._waiters = set()
365
    self._single_condition = self._single_condition_class(self._lock)
366

    
367
  def wait(self, timeout=None):
368
    """Wait for a notification.
369

370
    @type timeout: float or None
371
    @param timeout: Waiting timeout (can be None)
372

373
    """
374
    self._check_owned()
375

    
376
    # Keep local reference to the pipe. It could be replaced by another thread
377
    # notifying while we're waiting.
378
    cond = self._single_condition
379

    
380
    self._waiters.add(threading.currentThread())
381
    try:
382
      cond.wait(timeout)
383
    finally:
384
      self._check_owned()
385
      self._waiters.remove(threading.currentThread())
386

    
387
  def notifyAll(self): # pylint: disable-msg=C0103
388
    """Notify all currently waiting threads.
389

390
    """
391
    self._check_owned()
392
    self._single_condition.notifyAll()
393
    self._single_condition = self._single_condition_class(self._lock)
394

    
395
  def get_waiting(self):
396
    """Returns a list of all waiting threads.
397

398
    """
399
    self._check_owned()
400

    
401
    return self._waiters
402

    
403
  def has_waiting(self):
404
    """Returns whether there are active waiters.
405

406
    """
407
    self._check_owned()
408

    
409
    return bool(self._waiters)
410

    
411

    
412
class _PipeConditionWithMode(PipeCondition):
413
  __slots__ = [
414
    "shared",
415
    ]
416

    
417
  def __init__(self, lock, shared):
418
    """Initializes this class.
419

420
    """
421
    self.shared = shared
422
    PipeCondition.__init__(self, lock)
423

    
424

    
425
class SharedLock(object):
426
  """Implements a shared lock.
427

428
  Multiple threads can acquire the lock in a shared way by calling
429
  C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
430
  threads can call C{acquire(shared=0)}.
431

432
  Notes on data structures: C{__pending} contains a priority queue (heapq) of
433
  all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
434
  ...]}. Each per-priority queue contains a normal in-order list of conditions
435
  to be notified when the lock can be acquired. Shared locks are grouped
436
  together by priority and the condition for them is stored in
437
  C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
438
  references for the per-priority queues indexed by priority for faster access.
439

440
  @type name: string
441
  @ivar name: the name of the lock
442

443
  """
444
  __slots__ = [
445
    "__weakref__",
446
    "__deleted",
447
    "__exc",
448
    "__lock",
449
    "__pending",
450
    "__pending_by_prio",
451
    "__pending_shared",
452
    "__shr",
453
    "name",
454
    ]
455

    
456
  __condition_class = _PipeConditionWithMode
457

    
458
  def __init__(self, name, monitor=None):
459
    """Construct a new SharedLock.
460

461
    @param name: the name of the lock
462
    @type monitor: L{LockMonitor}
463
    @param monitor: Lock monitor with which to register
464

465
    """
466
    object.__init__(self)
467

    
468
    self.name = name
469

    
470
    # Internal lock
471
    self.__lock = threading.Lock()
472

    
473
    # Queue containing waiting acquires
474
    self.__pending = []
475
    self.__pending_by_prio = {}
476
    self.__pending_shared = {}
477

    
478
    # Current lock holders
479
    self.__shr = set()
480
    self.__exc = None
481

    
482
    # is this lock in the deleted state?
483
    self.__deleted = False
484

    
485
    # Register with lock monitor
486
    if monitor:
487
      monitor.RegisterLock(self)
488

    
489
  def GetInfo(self, fields):
490
    """Retrieves information for querying locks.
491

492
    @type fields: list of strings
493
    @param fields: List of fields to return
494

495
    """
496
    self.__lock.acquire()
497
    try:
498
      info = []
499

    
500
      # Note: to avoid unintentional race conditions, no references to
501
      # modifiable objects should be returned unless they were created in this
502
      # function.
503
      for fname in fields:
504
        if fname == "name":
505
          info.append(self.name)
506
        elif fname == "mode":
507
          if self.__deleted:
508
            info.append("deleted")
509
            assert not (self.__exc or self.__shr)
510
          elif self.__exc:
511
            info.append(_EXCLUSIVE_TEXT)
512
          elif self.__shr:
513
            info.append(_SHARED_TEXT)
514
          else:
515
            info.append(None)
516
        elif fname == "owner":
517
          if self.__exc:
518
            owner = [self.__exc]
519
          else:
520
            owner = self.__shr
521

    
522
          if owner:
523
            assert not self.__deleted
524
            info.append([i.getName() for i in owner])
525
          else:
526
            info.append(None)
527
        elif fname == "pending":
528
          data = []
529

    
530
          # Sorting instead of copying and using heaq functions for simplicity
531
          for (_, prioqueue) in sorted(self.__pending):
532
            for cond in prioqueue:
533
              if cond.shared:
534
                mode = _SHARED_TEXT
535
              else:
536
                mode = _EXCLUSIVE_TEXT
537

    
538
              # This function should be fast as it runs with the lock held.
539
              # Hence not using utils.NiceSort.
540
              data.append((mode, sorted(i.getName()
541
                                        for i in cond.get_waiting())))
542

    
543
          info.append(data)
544
        else:
545
          raise errors.OpExecError("Invalid query field '%s'" % fname)
546

    
547
      return info
548
    finally:
549
      self.__lock.release()
550

    
551
  def __check_deleted(self):
552
    """Raises an exception if the lock has been deleted.
553

554
    """
555
    if self.__deleted:
556
      raise errors.LockError("Deleted lock %s" % self.name)
557

    
558
  def __is_sharer(self):
559
    """Is the current thread sharing the lock at this time?
560

561
    """
562
    return threading.currentThread() in self.__shr
563

    
564
  def __is_exclusive(self):
565
    """Is the current thread holding the lock exclusively at this time?
566

567
    """
568
    return threading.currentThread() == self.__exc
569

    
570
  def __is_owned(self, shared=-1):
571
    """Is the current thread somehow owning the lock at this time?
572

573
    This is a private version of the function, which presumes you're holding
574
    the internal lock.
575

576
    """
577
    if shared < 0:
578
      return self.__is_sharer() or self.__is_exclusive()
579
    elif shared:
580
      return self.__is_sharer()
581
    else:
582
      return self.__is_exclusive()
583

    
584
  def _is_owned(self, shared=-1):
585
    """Is the current thread somehow owning the lock at this time?
586

587
    @param shared:
588
        - < 0: check for any type of ownership (default)
589
        - 0: check for exclusive ownership
590
        - > 0: check for shared ownership
591

592
    """
593
    self.__lock.acquire()
594
    try:
595
      return self.__is_owned(shared=shared)
596
    finally:
597
      self.__lock.release()
598

    
599
  def _count_pending(self):
600
    """Returns the number of pending acquires.
601

602
    @rtype: int
603

604
    """
605
    self.__lock.acquire()
606
    try:
607
      return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
608
    finally:
609
      self.__lock.release()
610

    
611
  def _check_empty(self):
612
    """Checks whether there are any pending acquires.
613

614
    @rtype: bool
615

616
    """
617
    self.__lock.acquire()
618
    try:
619
      # Order is important: __find_first_pending_queue modifies __pending
620
      return not (self.__find_first_pending_queue() or
621
                  self.__pending or
622
                  self.__pending_by_prio or
623
                  self.__pending_shared)
624
    finally:
625
      self.__lock.release()
626

    
627
  def __do_acquire(self, shared):
628
    """Actually acquire the lock.
629

630
    """
631
    if shared:
632
      self.__shr.add(threading.currentThread())
633
    else:
634
      self.__exc = threading.currentThread()
635

    
636
  def __can_acquire(self, shared):
637
    """Determine whether lock can be acquired.
638

639
    """
640
    if shared:
641
      return self.__exc is None
642
    else:
643
      return len(self.__shr) == 0 and self.__exc is None
644

    
645
  def __find_first_pending_queue(self):
646
    """Tries to find the topmost queued entry with pending acquires.
647

648
    Removes empty entries while going through the list.
649

650
    """
651
    while self.__pending:
652
      (priority, prioqueue) = self.__pending[0]
653

    
654
      if not prioqueue:
655
        heapq.heappop(self.__pending)
656
        del self.__pending_by_prio[priority]
657
        assert priority not in self.__pending_shared
658
        continue
659

    
660
      if prioqueue:
661
        return prioqueue
662

    
663
    return None
664

    
665
  def __is_on_top(self, cond):
666
    """Checks whether the passed condition is on top of the queue.
667

668
    The caller must make sure the queue isn't empty.
669

670
    """
671
    return cond == self.__find_first_pending_queue()[0]
672

    
673
  def __acquire_unlocked(self, shared, timeout, priority):
674
    """Acquire a shared lock.
675

676
    @param shared: whether to acquire in shared mode; by default an
677
        exclusive lock will be acquired
678
    @param timeout: maximum waiting time before giving up
679
    @type priority: integer
680
    @param priority: Priority for acquiring lock
681

682
    """
683
    self.__check_deleted()
684

    
685
    # We cannot acquire the lock if we already have it
686
    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
687
                                   " %s" % self.name)
688

    
689
    # Remove empty entries from queue
690
    self.__find_first_pending_queue()
691

    
692
    # Check whether someone else holds the lock or there are pending acquires.
693
    if not self.__pending and self.__can_acquire(shared):
694
      # Apparently not, can acquire lock directly.
695
      self.__do_acquire(shared)
696
      return True
697

    
698
    prioqueue = self.__pending_by_prio.get(priority, None)
699

    
700
    if shared:
701
      # Try to re-use condition for shared acquire
702
      wait_condition = self.__pending_shared.get(priority, None)
703
      assert (wait_condition is None or
704
              (wait_condition.shared and wait_condition in prioqueue))
705
    else:
706
      wait_condition = None
707

    
708
    if wait_condition is None:
709
      if prioqueue is None:
710
        assert priority not in self.__pending_by_prio
711

    
712
        prioqueue = []
713
        heapq.heappush(self.__pending, (priority, prioqueue))
714
        self.__pending_by_prio[priority] = prioqueue
715

    
716
      wait_condition = self.__condition_class(self.__lock, shared)
717
      prioqueue.append(wait_condition)
718

    
719
      if shared:
720
        # Keep reference for further shared acquires on same priority. This is
721
        # better than trying to find it in the list of pending acquires.
722
        assert priority not in self.__pending_shared
723
        self.__pending_shared[priority] = wait_condition
724

    
725
    try:
726
      # Wait until we become the topmost acquire in the queue or the timeout
727
      # expires.
728
      # TODO: Decrease timeout with spurious notifications
729
      while not (self.__is_on_top(wait_condition) and
730
                 self.__can_acquire(shared)):
731
        # Wait for notification
732
        wait_condition.wait(timeout)
733
        self.__check_deleted()
734

    
735
        # A lot of code assumes blocking acquires always succeed. Loop
736
        # internally for that case.
737
        if timeout is not None:
738
          break
739

    
740
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
741
        self.__do_acquire(shared)
742
        return True
743
    finally:
744
      # Remove condition from queue if there are no more waiters
745
      if not wait_condition.has_waiting():
746
        prioqueue.remove(wait_condition)
747
        if wait_condition.shared:
748
          del self.__pending_shared[priority]
749

    
750
    return False
751

    
752
  def acquire(self, shared=0, timeout=None, priority=None,
753
              test_notify=None):
754
    """Acquire a shared lock.
755

756
    @type shared: integer (0/1) used as a boolean
757
    @param shared: whether to acquire in shared mode; by default an
758
        exclusive lock will be acquired
759
    @type timeout: float
760
    @param timeout: maximum waiting time before giving up
761
    @type priority: integer
762
    @param priority: Priority for acquiring lock
763
    @type test_notify: callable or None
764
    @param test_notify: Special callback function for unittesting
765

766
    """
767
    if priority is None:
768
      priority = _DEFAULT_PRIORITY
769

    
770
    self.__lock.acquire()
771
    try:
772
      # We already got the lock, notify now
773
      if __debug__ and callable(test_notify):
774
        test_notify()
775

    
776
      return self.__acquire_unlocked(shared, timeout, priority)
777
    finally:
778
      self.__lock.release()
779

    
780
  def release(self):
781
    """Release a Shared Lock.
782

783
    You must have acquired the lock, either in shared or in exclusive mode,
784
    before calling this function.
785

786
    """
787
    self.__lock.acquire()
788
    try:
789
      assert self.__is_exclusive() or self.__is_sharer(), \
790
        "Cannot release non-owned lock"
791

    
792
      # Autodetect release type
793
      if self.__is_exclusive():
794
        self.__exc = None
795
      else:
796
        self.__shr.remove(threading.currentThread())
797

    
798
      # Notify topmost condition in queue
799
      prioqueue = self.__find_first_pending_queue()
800
      if prioqueue:
801
        prioqueue[0].notifyAll()
802

    
803
    finally:
804
      self.__lock.release()
805

    
806
  def delete(self, timeout=None, priority=None):
807
    """Delete a Shared Lock.
808

809
    This operation will declare the lock for removal. First the lock will be
810
    acquired in exclusive mode if you don't already own it, then the lock
811
    will be put in a state where any future and pending acquire() fail.
812

813
    @type timeout: float
814
    @param timeout: maximum waiting time before giving up
815
    @type priority: integer
816
    @param priority: Priority for acquiring lock
817

818
    """
819
    if priority is None:
820
      priority = _DEFAULT_PRIORITY
821

    
822
    self.__lock.acquire()
823
    try:
824
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
825

    
826
      self.__check_deleted()
827

    
828
      # The caller is allowed to hold the lock exclusively already.
829
      acquired = self.__is_exclusive()
830

    
831
      if not acquired:
832
        acquired = self.__acquire_unlocked(0, timeout, priority)
833

    
834
        assert self.__is_exclusive() and not self.__is_sharer(), \
835
          "Lock wasn't acquired in exclusive mode"
836

    
837
      if acquired:
838
        self.__deleted = True
839
        self.__exc = None
840

    
841
        assert not (self.__exc or self.__shr), "Found owner during deletion"
842

    
843
        # Notify all acquires. They'll throw an error.
844
        for (_, prioqueue) in self.__pending:
845
          for cond in prioqueue:
846
            cond.notifyAll()
847

    
848
        assert self.__deleted
849

    
850
      return acquired
851
    finally:
852
      self.__lock.release()
853

    
854
  def _release_save(self):
855
    shared = self.__is_sharer()
856
    self.release()
857
    return shared
858

    
859
  def _acquire_restore(self, shared):
860
    self.acquire(shared=shared)
861

    
862

    
863
# Whenever we want to acquire a full LockSet we pass None as the value
864
# to acquire.  Hide this behind this nicely named constant.
865
ALL_SET = None
866

    
867

    
868
class _AcquireTimeout(Exception):
869
  """Internal exception to abort an acquire on a timeout.
870

871
  """
872

    
873

    
874
class LockSet:
875
  """Implements a set of locks.
876

877
  This abstraction implements a set of shared locks for the same resource type,
878
  distinguished by name. The user can lock a subset of the resources and the
879
  LockSet will take care of acquiring the locks always in the same order, thus
880
  preventing deadlock.
881

882
  All the locks needed in the same set must be acquired together, though.
883

884
  @type name: string
885
  @ivar name: the name of the lockset
886

887
  """
888
  def __init__(self, members, name, monitor=None):
889
    """Constructs a new LockSet.
890

891
    @type members: list of strings
892
    @param members: initial members of the set
893
    @type monitor: L{LockMonitor}
894
    @param monitor: Lock monitor with which to register member locks
895

896
    """
897
    assert members is not None, "members parameter is not a list"
898
    self.name = name
899

    
900
    # Lock monitor
901
    self.__monitor = monitor
902

    
903
    # Used internally to guarantee coherency.
904
    self.__lock = SharedLock(name)
905

    
906
    # The lockdict indexes the relationship name -> lock
907
    # The order-of-locking is implied by the alphabetical order of names
908
    self.__lockdict = {}
909

    
910
    for mname in members:
911
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
912
                                          monitor=monitor)
913

    
914
    # The owner dict contains the set of locks each thread owns. For
915
    # performance each thread can access its own key without a global lock on
916
    # this structure. It is paramount though that *no* other type of access is
917
    # done to this structure (eg. no looping over its keys). *_owner helper
918
    # function are defined to guarantee access is correct, but in general never
919
    # do anything different than __owners[threading.currentThread()], or there
920
    # will be trouble.
921
    self.__owners = {}
922

    
923
  def _GetLockName(self, mname):
924
    """Returns the name for a member lock.
925

926
    """
927
    return "%s/%s" % (self.name, mname)
928

    
929
  def _is_owned(self):
930
    """Is the current thread a current level owner?"""
931
    return threading.currentThread() in self.__owners
932

    
933
  def _add_owned(self, name=None):
934
    """Note the current thread owns the given lock"""
935
    if name is None:
936
      if not self._is_owned():
937
        self.__owners[threading.currentThread()] = set()
938
    else:
939
      if self._is_owned():
940
        self.__owners[threading.currentThread()].add(name)
941
      else:
942
        self.__owners[threading.currentThread()] = set([name])
943

    
944
  def _del_owned(self, name=None):
945
    """Note the current thread owns the given lock"""
946

    
947
    assert not (name is None and self.__lock._is_owned()), \
948
           "Cannot hold internal lock when deleting owner status"
949

    
950
    if name is not None:
951
      self.__owners[threading.currentThread()].remove(name)
952

    
953
    # Only remove the key if we don't hold the set-lock as well
954
    if (not self.__lock._is_owned() and
955
        not self.__owners[threading.currentThread()]):
956
      del self.__owners[threading.currentThread()]
957

    
958
  def _list_owned(self):
959
    """Get the set of resource names owned by the current thread"""
960
    if self._is_owned():
961
      return self.__owners[threading.currentThread()].copy()
962
    else:
963
      return set()
964

    
965
  def _release_and_delete_owned(self):
966
    """Release and delete all resources owned by the current thread"""
967
    for lname in self._list_owned():
968
      lock = self.__lockdict[lname]
969
      if lock._is_owned():
970
        lock.release()
971
      self._del_owned(name=lname)
972

    
973
  def __names(self):
974
    """Return the current set of names.
975

976
    Only call this function while holding __lock and don't iterate on the
977
    result after releasing the lock.
978

979
    """
980
    return self.__lockdict.keys()
981

    
982
  def _names(self):
983
    """Return a copy of the current set of elements.
984

985
    Used only for debugging purposes.
986

987
    """
988
    # If we don't already own the set-level lock acquired
989
    # we'll get it and note we need to release it later.
990
    release_lock = False
991
    if not self.__lock._is_owned():
992
      release_lock = True
993
      self.__lock.acquire(shared=1)
994
    try:
995
      result = self.__names()
996
    finally:
997
      if release_lock:
998
        self.__lock.release()
999
    return set(result)
1000

    
1001
  def acquire(self, names, timeout=None, shared=0, priority=None,
1002
              test_notify=None):
1003
    """Acquire a set of resource locks.
1004

1005
    @type names: list of strings (or string)
1006
    @param names: the names of the locks which shall be acquired
1007
        (special lock names, or instance/node names)
1008
    @type shared: integer (0/1) used as a boolean
1009
    @param shared: whether to acquire in shared mode; by default an
1010
        exclusive lock will be acquired
1011
    @type timeout: float or None
1012
    @param timeout: Maximum time to acquire all locks
1013
    @type priority: integer
1014
    @param priority: Priority for acquiring locks
1015
    @type test_notify: callable or None
1016
    @param test_notify: Special callback function for unittesting
1017

1018
    @return: Set of all locks successfully acquired or None in case of timeout
1019

1020
    @raise errors.LockError: when any lock we try to acquire has
1021
        been deleted before we succeed. In this case none of the
1022
        locks requested will be acquired.
1023

1024
    """
1025
    assert timeout is None or timeout >= 0.0
1026

    
1027
    # Check we don't already own locks at this level
1028
    assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
1029
                                  " (lockset %s)" % self.name)
1030

    
1031
    if priority is None:
1032
      priority = _DEFAULT_PRIORITY
1033

    
1034
    # We need to keep track of how long we spent waiting for a lock. The
1035
    # timeout passed to this function is over all lock acquires.
1036
    running_timeout = RunningTimeout(timeout, False)
1037

    
1038
    try:
1039
      if names is not None:
1040
        # Support passing in a single resource to acquire rather than many
1041
        if isinstance(names, basestring):
1042
          names = [names]
1043

    
1044
        return self.__acquire_inner(names, False, shared, priority,
1045
                                    running_timeout.Remaining, test_notify)
1046

    
1047
      else:
1048
        # If no names are given acquire the whole set by not letting new names
1049
        # being added before we release, and getting the current list of names.
1050
        # Some of them may then be deleted later, but we'll cope with this.
1051
        #
1052
        # We'd like to acquire this lock in a shared way, as it's nice if
1053
        # everybody else can use the instances at the same time. If we are
1054
        # acquiring them exclusively though they won't be able to do this
1055
        # anyway, though, so we'll get the list lock exclusively as well in
1056
        # order to be able to do add() on the set while owning it.
1057
        if not self.__lock.acquire(shared=shared, priority=priority,
1058
                                   timeout=running_timeout.Remaining()):
1059
          raise _AcquireTimeout()
1060
        try:
1061
          # note we own the set-lock
1062
          self._add_owned()
1063

    
1064
          return self.__acquire_inner(self.__names(), True, shared, priority,
1065
                                      running_timeout.Remaining, test_notify)
1066
        except:
1067
          # We shouldn't have problems adding the lock to the owners list, but
1068
          # if we did we'll try to release this lock and re-raise exception.
1069
          # Of course something is going to be really wrong, after this.
1070
          self.__lock.release()
1071
          self._del_owned()
1072
          raise
1073

    
1074
    except _AcquireTimeout:
1075
      return None
1076

    
1077
  def __acquire_inner(self, names, want_all, shared, priority,
1078
                      timeout_fn, test_notify):
1079
    """Inner logic for acquiring a number of locks.
1080

1081
    @param names: Names of the locks to be acquired
1082
    @param want_all: Whether all locks in the set should be acquired
1083
    @param shared: Whether to acquire in shared mode
1084
    @param timeout_fn: Function returning remaining timeout
1085
    @param priority: Priority for acquiring locks
1086
    @param test_notify: Special callback function for unittesting
1087

1088
    """
1089
    acquire_list = []
1090

    
1091
    # First we look the locks up on __lockdict. We have no way of being sure
1092
    # they will still be there after, but this makes it a lot faster should
1093
    # just one of them be the already wrong. Using a sorted sequence to prevent
1094
    # deadlocks.
1095
    for lname in sorted(utils.UniqueSequence(names)):
1096
      try:
1097
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
1098
      except KeyError:
1099
        if want_all:
1100
          # We are acquiring all the set, it doesn't matter if this particular
1101
          # element is not there anymore.
1102
          continue
1103

    
1104
        raise errors.LockError("Non-existing lock %s in set %s" %
1105
                               (lname, self.name))
1106

    
1107
      acquire_list.append((lname, lock))
1108

    
1109
    # This will hold the locknames we effectively acquired.
1110
    acquired = set()
1111

    
1112
    try:
1113
      # Now acquire_list contains a sorted list of resources and locks we
1114
      # want.  In order to get them we loop on this (private) list and
1115
      # acquire() them.  We gave no real guarantee they will still exist till
1116
      # this is done but .acquire() itself is safe and will alert us if the
1117
      # lock gets deleted.
1118
      for (lname, lock) in acquire_list:
1119
        if __debug__ and callable(test_notify):
1120
          test_notify_fn = lambda: test_notify(lname)
1121
        else:
1122
          test_notify_fn = None
1123

    
1124
        timeout = timeout_fn()
1125

    
1126
        try:
1127
          # raises LockError if the lock was deleted
1128
          acq_success = lock.acquire(shared=shared, timeout=timeout,
1129
                                     priority=priority,
1130
                                     test_notify=test_notify_fn)
1131
        except errors.LockError:
1132
          if want_all:
1133
            # We are acquiring all the set, it doesn't matter if this
1134
            # particular element is not there anymore.
1135
            continue
1136

    
1137
          raise errors.LockError("Non-existing lock %s in set %s" %
1138
                                 (lname, self.name))
1139

    
1140
        if not acq_success:
1141
          # Couldn't get lock or timeout occurred
1142
          if timeout is None:
1143
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
1144
            # blocking.
1145
            raise errors.LockError("Failed to get lock %s (set %s)" %
1146
                                   (lname, self.name))
1147

    
1148
          raise _AcquireTimeout()
1149

    
1150
        try:
1151
          # now the lock cannot be deleted, we have it!
1152
          self._add_owned(name=lname)
1153
          acquired.add(lname)
1154

    
1155
        except:
1156
          # We shouldn't have problems adding the lock to the owners list, but
1157
          # if we did we'll try to release this lock and re-raise exception.
1158
          # Of course something is going to be really wrong after this.
1159
          if lock._is_owned():
1160
            lock.release()
1161
          raise
1162

    
1163
    except:
1164
      # Release all owned locks
1165
      self._release_and_delete_owned()
1166
      raise
1167

    
1168
    return acquired
1169

    
1170
  def release(self, names=None):
1171
    """Release a set of resource locks, at the same level.
1172

1173
    You must have acquired the locks, either in shared or in exclusive mode,
1174
    before releasing them.
1175

1176
    @type names: list of strings, or None
1177
    @param names: the names of the locks which shall be released
1178
        (defaults to all the locks acquired at that level).
1179

1180
    """
1181
    assert self._is_owned(), ("release() on lock set %s while not owner" %
1182
                              self.name)
1183

    
1184
    # Support passing in a single resource to release rather than many
1185
    if isinstance(names, basestring):
1186
      names = [names]
1187

    
1188
    if names is None:
1189
      names = self._list_owned()
1190
    else:
1191
      names = set(names)
1192
      assert self._list_owned().issuperset(names), (
1193
               "release() on unheld resources %s (set %s)" %
1194
               (names.difference(self._list_owned()), self.name))
1195

    
1196
    # First of all let's release the "all elements" lock, if set.
1197
    # After this 'add' can work again
1198
    if self.__lock._is_owned():
1199
      self.__lock.release()
1200
      self._del_owned()
1201

    
1202
    for lockname in names:
1203
      # If we are sure the lock doesn't leave __lockdict without being
1204
      # exclusively held we can do this...
1205
      self.__lockdict[lockname].release()
1206
      self._del_owned(name=lockname)
1207

    
1208
  def add(self, names, acquired=0, shared=0):
1209
    """Add a new set of elements to the set
1210

1211
    @type names: list of strings
1212
    @param names: names of the new elements to add
1213
    @type acquired: integer (0/1) used as a boolean
1214
    @param acquired: pre-acquire the new resource?
1215
    @type shared: integer (0/1) used as a boolean
1216
    @param shared: is the pre-acquisition shared?
1217

1218
    """
1219
    # Check we don't already own locks at this level
1220
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1221
      ("Cannot add locks if the set %s is only partially owned, or shared" %
1222
       self.name)
1223

    
1224
    # Support passing in a single resource to add rather than many
1225
    if isinstance(names, basestring):
1226
      names = [names]
1227

    
1228
    # If we don't already own the set-level lock acquired in an exclusive way
1229
    # we'll get it and note we need to release it later.
1230
    release_lock = False
1231
    if not self.__lock._is_owned():
1232
      release_lock = True
1233
      self.__lock.acquire()
1234

    
1235
    try:
1236
      invalid_names = set(self.__names()).intersection(names)
1237
      if invalid_names:
1238
        # This must be an explicit raise, not an assert, because assert is
1239
        # turned off when using optimization, and this can happen because of
1240
        # concurrency even if the user doesn't want it.
1241
        raise errors.LockError("duplicate add(%s) on lockset %s" %
1242
                               (invalid_names, self.name))
1243

    
1244
      for lockname in names:
1245
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1246

    
1247
        if acquired:
1248
          # No need for priority or timeout here as this lock has just been
1249
          # created
1250
          lock.acquire(shared=shared)
1251
          # now the lock cannot be deleted, we have it!
1252
          try:
1253
            self._add_owned(name=lockname)
1254
          except:
1255
            # We shouldn't have problems adding the lock to the owners list,
1256
            # but if we did we'll try to release this lock and re-raise
1257
            # exception.  Of course something is going to be really wrong,
1258
            # after this.  On the other hand the lock hasn't been added to the
1259
            # __lockdict yet so no other threads should be pending on it. This
1260
            # release is just a safety measure.
1261
            lock.release()
1262
            raise
1263

    
1264
        self.__lockdict[lockname] = lock
1265

    
1266
    finally:
1267
      # Only release __lock if we were not holding it previously.
1268
      if release_lock:
1269
        self.__lock.release()
1270

    
1271
    return True
1272

    
1273
  def remove(self, names):
1274
    """Remove elements from the lock set.
1275

1276
    You can either not hold anything in the lockset or already hold a superset
1277
    of the elements you want to delete, exclusively.
1278

1279
    @type names: list of strings
1280
    @param names: names of the resource to remove.
1281

1282
    @return: a list of locks which we removed; the list is always
1283
        equal to the names list if we were holding all the locks
1284
        exclusively
1285

1286
    """
1287
    # Support passing in a single resource to remove rather than many
1288
    if isinstance(names, basestring):
1289
      names = [names]
1290

    
1291
    # If we own any subset of this lock it must be a superset of what we want
1292
    # to delete. The ownership must also be exclusive, but that will be checked
1293
    # by the lock itself.
1294
    assert not self._is_owned() or self._list_owned().issuperset(names), (
1295
      "remove() on acquired lockset %s while not owning all elements" %
1296
      self.name)
1297

    
1298
    removed = []
1299

    
1300
    for lname in names:
1301
      # Calling delete() acquires the lock exclusively if we don't already own
1302
      # it, and causes all pending and subsequent lock acquires to fail. It's
1303
      # fine to call it out of order because delete() also implies release(),
1304
      # and the assertion above guarantees that if we either already hold
1305
      # everything we want to delete, or we hold none.
1306
      try:
1307
        self.__lockdict[lname].delete()
1308
        removed.append(lname)
1309
      except (KeyError, errors.LockError):
1310
        # This cannot happen if we were already holding it, verify:
1311
        assert not self._is_owned(), ("remove failed while holding lockset %s"
1312
                                      % self.name)
1313
      else:
1314
        # If no LockError was raised we are the ones who deleted the lock.
1315
        # This means we can safely remove it from lockdict, as any further or
1316
        # pending delete() or acquire() will fail (and nobody can have the lock
1317
        # since before our call to delete()).
1318
        #
1319
        # This is done in an else clause because if the exception was thrown
1320
        # it's the job of the one who actually deleted it.
1321
        del self.__lockdict[lname]
1322
        # And let's remove it from our private list if we owned it.
1323
        if self._is_owned():
1324
          self._del_owned(name=lname)
1325

    
1326
    return removed
1327

    
1328

    
1329
# Locking levels, must be acquired in increasing order.
1330
# Current rules are:
1331
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1332
#   acquired before performing any operation, either in shared or in exclusive
1333
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1334
#   avoided.
1335
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1336
#   If you need more than one node, or more than one instance, acquire them at
1337
#   the same time.
1338
LEVEL_CLUSTER = 0
1339
LEVEL_INSTANCE = 1
1340
LEVEL_NODE = 2
1341

    
1342
LEVELS = [LEVEL_CLUSTER,
1343
          LEVEL_INSTANCE,
1344
          LEVEL_NODE]
1345

    
1346
# Lock levels which are modifiable
1347
LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1348

    
1349
LEVEL_NAMES = {
1350
  LEVEL_CLUSTER: "cluster",
1351
  LEVEL_INSTANCE: "instance",
1352
  LEVEL_NODE: "node",
1353
  }
1354

    
1355
# Constant for the big ganeti lock
1356
BGL = 'BGL'
1357

    
1358

    
1359
class GanetiLockManager:
1360
  """The Ganeti Locking Library
1361

1362
  The purpose of this small library is to manage locking for ganeti clusters
1363
  in a central place, while at the same time doing dynamic checks against
1364
  possible deadlocks. It will also make it easier to transition to a different
1365
  lock type should we migrate away from python threads.
1366

1367
  """
1368
  _instance = None
1369

    
1370
  def __init__(self, nodes, instances):
1371
    """Constructs a new GanetiLockManager object.
1372

1373
    There should be only a GanetiLockManager object at any time, so this
1374
    function raises an error if this is not the case.
1375

1376
    @param nodes: list of node names
1377
    @param instances: list of instance names
1378

1379
    """
1380
    assert self.__class__._instance is None, \
1381
           "double GanetiLockManager instance"
1382

    
1383
    self.__class__._instance = self
1384

    
1385
    self._monitor = LockMonitor()
1386

    
1387
    # The keyring contains all the locks, at their level and in the correct
1388
    # locking order.
1389
    self.__keyring = {
1390
      LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1391
      LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1392
      LEVEL_INSTANCE: LockSet(instances, "instances",
1393
                              monitor=self._monitor),
1394
      }
1395

    
1396
  def QueryLocks(self, fields, sync):
1397
    """Queries information from all locks.
1398

1399
    See L{LockMonitor.QueryLocks}.
1400

1401
    """
1402
    return self._monitor.QueryLocks(fields, sync)
1403

    
1404
  def _names(self, level):
1405
    """List the lock names at the given level.
1406

1407
    This can be used for debugging/testing purposes.
1408

1409
    @param level: the level whose list of locks to get
1410

1411
    """
1412
    assert level in LEVELS, "Invalid locking level %s" % level
1413
    return self.__keyring[level]._names()
1414

    
1415
  def _is_owned(self, level):
1416
    """Check whether we are owning locks at the given level
1417

1418
    """
1419
    return self.__keyring[level]._is_owned()
1420

    
1421
  is_owned = _is_owned
1422

    
1423
  def _list_owned(self, level):
1424
    """Get the set of owned locks at the given level
1425

1426
    """
1427
    return self.__keyring[level]._list_owned()
1428

    
1429
  def _upper_owned(self, level):
1430
    """Check that we don't own any lock at a level greater than the given one.
1431

1432
    """
1433
    # This way of checking only works if LEVELS[i] = i, which we check for in
1434
    # the test cases.
1435
    return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1436

    
1437
  def _BGL_owned(self): # pylint: disable-msg=C0103
1438
    """Check if the current thread owns the BGL.
1439

1440
    Both an exclusive or a shared acquisition work.
1441

1442
    """
1443
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1444

    
1445
  @staticmethod
1446
  def _contains_BGL(level, names): # pylint: disable-msg=C0103
1447
    """Check if the level contains the BGL.
1448

1449
    Check if acting on the given level and set of names will change
1450
    the status of the Big Ganeti Lock.
1451

1452
    """
1453
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1454

    
1455
  def acquire(self, level, names, timeout=None, shared=0, priority=None):
1456
    """Acquire a set of resource locks, at the same level.
1457

1458
    @type level: member of locking.LEVELS
1459
    @param level: the level at which the locks shall be acquired
1460
    @type names: list of strings (or string)
1461
    @param names: the names of the locks which shall be acquired
1462
        (special lock names, or instance/node names)
1463
    @type shared: integer (0/1) used as a boolean
1464
    @param shared: whether to acquire in shared mode; by default
1465
        an exclusive lock will be acquired
1466
    @type timeout: float
1467
    @param timeout: Maximum time to acquire all locks
1468
    @type priority: integer
1469
    @param priority: Priority for acquiring lock
1470

1471
    """
1472
    assert level in LEVELS, "Invalid locking level %s" % level
1473

    
1474
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1475
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1476
    # so even if we've migrated we need to at least share the BGL to be
1477
    # compatible with them. Of course if we own the BGL exclusively there's no
1478
    # point in acquiring any other lock, unless perhaps we are half way through
1479
    # the migration of the current opcode.
1480
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1481
            "You must own the Big Ganeti Lock before acquiring any other")
1482

    
1483
    # Check we don't own locks at the same or upper levels.
1484
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1485
           " while owning some at a greater one")
1486

    
1487
    # Acquire the locks in the set.
1488
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1489
                                         priority=priority)
1490

    
1491
  def release(self, level, names=None):
1492
    """Release a set of resource locks, at the same level.
1493

1494
    You must have acquired the locks, either in shared or in exclusive
1495
    mode, before releasing them.
1496

1497
    @type level: member of locking.LEVELS
1498
    @param level: the level at which the locks shall be released
1499
    @type names: list of strings, or None
1500
    @param names: the names of the locks which shall be released
1501
        (defaults to all the locks acquired at that level)
1502

1503
    """
1504
    assert level in LEVELS, "Invalid locking level %s" % level
1505
    assert (not self._contains_BGL(level, names) or
1506
            not self._upper_owned(LEVEL_CLUSTER)), (
1507
            "Cannot release the Big Ganeti Lock while holding something"
1508
            " at upper levels (%r)" %
1509
            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1510
                              for i in self.__keyring.keys()]), ))
1511

    
1512
    # Release will complain if we don't own the locks already
1513
    return self.__keyring[level].release(names)
1514

    
1515
  def add(self, level, names, acquired=0, shared=0):
1516
    """Add locks at the specified level.
1517

1518
    @type level: member of locking.LEVELS_MOD
1519
    @param level: the level at which the locks shall be added
1520
    @type names: list of strings
1521
    @param names: names of the locks to acquire
1522
    @type acquired: integer (0/1) used as a boolean
1523
    @param acquired: whether to acquire the newly added locks
1524
    @type shared: integer (0/1) used as a boolean
1525
    @param shared: whether the acquisition will be shared
1526

1527
    """
1528
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1529
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1530
           " operations")
1531
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1532
           " while owning some at a greater one")
1533
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1534

    
1535
  def remove(self, level, names):
1536
    """Remove locks from the specified level.
1537

1538
    You must either already own the locks you are trying to remove
1539
    exclusively or not own any lock at an upper level.
1540

1541
    @type level: member of locking.LEVELS_MOD
1542
    @param level: the level at which the locks shall be removed
1543
    @type names: list of strings
1544
    @param names: the names of the locks which shall be removed
1545
        (special lock names, or instance/node names)
1546

1547
    """
1548
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1549
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1550
           " operations")
1551
    # Check we either own the level or don't own anything from here
1552
    # up. LockSet.remove() will check the case in which we don't own
1553
    # all the needed resources, or we have a shared ownership.
1554
    assert self._is_owned(level) or not self._upper_owned(level), (
1555
           "Cannot remove locks at a level while not owning it or"
1556
           " owning some at a greater one")
1557
    return self.__keyring[level].remove(names)
1558

    
1559

    
1560
class LockMonitor(object):
1561
  _LOCK_ATTR = "_lock"
1562

    
1563
  def __init__(self):
1564
    """Initializes this class.
1565

1566
    """
1567
    self._lock = SharedLock("LockMonitor")
1568

    
1569
    # Tracked locks. Weak references are used to avoid issues with circular
1570
    # references and deletion.
1571
    self._locks = weakref.WeakKeyDictionary()
1572

    
1573
  @ssynchronized(_LOCK_ATTR)
1574
  def RegisterLock(self, lock):
1575
    """Registers a new lock.
1576

1577
    """
1578
    logging.debug("Registering lock %s", lock.name)
1579
    assert lock not in self._locks, "Duplicate lock registration"
1580
    assert not compat.any(lock.name == i.name for i in self._locks.keys()), \
1581
           "Found duplicate lock name"
1582
    self._locks[lock] = None
1583

    
1584
  @ssynchronized(_LOCK_ATTR)
1585
  def _GetLockInfo(self, fields):
1586
    """Get information from all locks while the monitor lock is held.
1587

1588
    """
1589
    result = {}
1590

    
1591
    for lock in self._locks.keys():
1592
      assert lock.name not in result, "Found duplicate lock name"
1593
      result[lock.name] = lock.GetInfo(fields)
1594

    
1595
    return result
1596

    
1597
  def QueryLocks(self, fields, sync):
1598
    """Queries information from all locks.
1599

1600
    @type fields: list of strings
1601
    @param fields: List of fields to return
1602
    @type sync: boolean
1603
    @param sync: Whether to operate in synchronous mode
1604

1605
    """
1606
    if sync:
1607
      raise NotImplementedError("Synchronous queries are not implemented")
1608

    
1609
    # Get all data without sorting
1610
    result = self._GetLockInfo(fields)
1611

    
1612
    # Sort by name
1613
    return [result[name] for name in utils.NiceSort(result.keys())]