Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ 26d3fd2f

History | View | Annotate | Download (48.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable-msg=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import time
32
import errno
33
import weakref
34
import logging
35
import heapq
36

    
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import compat
40

    
41

    
42
_EXCLUSIVE_TEXT = "exclusive"
43
_SHARED_TEXT = "shared"
44

    
45
_DEFAULT_PRIORITY = 0
46

    
47

    
48
def ssynchronized(mylock, shared=0):
49
  """Shared Synchronization decorator.
50

51
  Calls the function holding the given lock, either in exclusive or shared
52
  mode. It requires the passed lock to be a SharedLock (or support its
53
  semantics).
54

55
  @type mylock: lockable object or string
56
  @param mylock: lock to acquire or class member name of the lock to acquire
57

58
  """
59
  def wrap(fn):
60
    def sync_function(*args, **kwargs):
61
      if isinstance(mylock, basestring):
62
        assert args, "cannot ssynchronize on non-class method: self not found"
63
        # args[0] is "self"
64
        lock = getattr(args[0], mylock)
65
      else:
66
        lock = mylock
67
      lock.acquire(shared=shared)
68
      try:
69
        return fn(*args, **kwargs)
70
      finally:
71
        lock.release()
72
    return sync_function
73
  return wrap
74

    
75

    
76
class RunningTimeout(object):
77
  """Class to calculate remaining timeout when doing several operations.
78

79
  """
80
  __slots__ = [
81
    "_allow_negative",
82
    "_start_time",
83
    "_time_fn",
84
    "_timeout",
85
    ]
86

    
87
  def __init__(self, timeout, allow_negative, _time_fn=time.time):
88
    """Initializes this class.
89

90
    @type timeout: float
91
    @param timeout: Timeout duration
92
    @type allow_negative: bool
93
    @param allow_negative: Whether to return values below zero
94
    @param _time_fn: Time function for unittests
95

96
    """
97
    object.__init__(self)
98

    
99
    if timeout is not None and timeout < 0.0:
100
      raise ValueError("Timeout must not be negative")
101

    
102
    self._timeout = timeout
103
    self._allow_negative = allow_negative
104
    self._time_fn = _time_fn
105

    
106
    self._start_time = None
107

    
108
  def Remaining(self):
109
    """Returns the remaining timeout.
110

111
    """
112
    if self._timeout is None:
113
      return None
114

    
115
    # Get start time on first calculation
116
    if self._start_time is None:
117
      self._start_time = self._time_fn()
118

    
119
    # Calculate remaining time
120
    remaining_timeout = self._start_time + self._timeout - self._time_fn()
121

    
122
    if not self._allow_negative:
123
      # Ensure timeout is always >= 0
124
      return max(0.0, remaining_timeout)
125

    
126
    return remaining_timeout
127

    
128

    
129
class _SingleNotifyPipeConditionWaiter(object):
130
  """Helper class for SingleNotifyPipeCondition
131

132
  """
133
  __slots__ = [
134
    "_fd",
135
    "_poller",
136
    ]
137

    
138
  def __init__(self, poller, fd):
139
    """Constructor for _SingleNotifyPipeConditionWaiter
140

141
    @type poller: select.poll
142
    @param poller: Poller object
143
    @type fd: int
144
    @param fd: File descriptor to wait for
145

146
    """
147
    object.__init__(self)
148
    self._poller = poller
149
    self._fd = fd
150

    
151
  def __call__(self, timeout):
152
    """Wait for something to happen on the pipe.
153

154
    @type timeout: float or None
155
    @param timeout: Timeout for waiting (can be None)
156

157
    """
158
    running_timeout = RunningTimeout(timeout, True)
159

    
160
    while True:
161
      remaining_time = running_timeout.Remaining()
162

    
163
      if remaining_time is not None:
164
        if remaining_time < 0.0:
165
          break
166

    
167
        # Our calculation uses seconds, poll() wants milliseconds
168
        remaining_time *= 1000
169

    
170
      try:
171
        result = self._poller.poll(remaining_time)
172
      except EnvironmentError, err:
173
        if err.errno != errno.EINTR:
174
          raise
175
        result = None
176

    
177
      # Check whether we were notified
178
      if result and result[0][0] == self._fd:
179
        break
180

    
181

    
182
class _BaseCondition(object):
183
  """Base class containing common code for conditions.
184

185
  Some of this code is taken from python's threading module.
186

187
  """
188
  __slots__ = [
189
    "_lock",
190
    "acquire",
191
    "release",
192
    "_is_owned",
193
    "_acquire_restore",
194
    "_release_save",
195
    ]
196

    
197
  def __init__(self, lock):
198
    """Constructor for _BaseCondition.
199

200
    @type lock: threading.Lock
201
    @param lock: condition base lock
202

203
    """
204
    object.__init__(self)
205

    
206
    try:
207
      self._release_save = lock._release_save
208
    except AttributeError:
209
      self._release_save = self._base_release_save
210
    try:
211
      self._acquire_restore = lock._acquire_restore
212
    except AttributeError:
213
      self._acquire_restore = self._base_acquire_restore
214
    try:
215
      self._is_owned = lock._is_owned
216
    except AttributeError:
217
      self._is_owned = self._base_is_owned
218

    
219
    self._lock = lock
220

    
221
    # Export the lock's acquire() and release() methods
222
    self.acquire = lock.acquire
223
    self.release = lock.release
224

    
225
  def _base_is_owned(self):
226
    """Check whether lock is owned by current thread.
227

228
    """
229
    if self._lock.acquire(0):
230
      self._lock.release()
231
      return False
232
    return True
233

    
234
  def _base_release_save(self):
235
    self._lock.release()
236

    
237
  def _base_acquire_restore(self, _):
238
    self._lock.acquire()
239

    
240
  def _check_owned(self):
241
    """Raise an exception if the current thread doesn't own the lock.
242

243
    """
244
    if not self._is_owned():
245
      raise RuntimeError("cannot work with un-aquired lock")
246

    
247

    
248
class SingleNotifyPipeCondition(_BaseCondition):
249
  """Condition which can only be notified once.
250

251
  This condition class uses pipes and poll, internally, to be able to wait for
252
  notification with a timeout, without resorting to polling. It is almost
253
  compatible with Python's threading.Condition, with the following differences:
254
    - notifyAll can only be called once, and no wait can happen after that
255
    - notify is not supported, only notifyAll
256

257
  """
258

    
259
  __slots__ = [
260
    "_poller",
261
    "_read_fd",
262
    "_write_fd",
263
    "_nwaiters",
264
    "_notified",
265
    ]
266

    
267
  _waiter_class = _SingleNotifyPipeConditionWaiter
268

    
269
  def __init__(self, lock):
270
    """Constructor for SingleNotifyPipeCondition
271

272
    """
273
    _BaseCondition.__init__(self, lock)
274
    self._nwaiters = 0
275
    self._notified = False
276
    self._read_fd = None
277
    self._write_fd = None
278
    self._poller = None
279

    
280
  def _check_unnotified(self):
281
    """Throws an exception if already notified.
282

283
    """
284
    if self._notified:
285
      raise RuntimeError("cannot use already notified condition")
286

    
287
  def _Cleanup(self):
288
    """Cleanup open file descriptors, if any.
289

290
    """
291
    if self._read_fd is not None:
292
      os.close(self._read_fd)
293
      self._read_fd = None
294

    
295
    if self._write_fd is not None:
296
      os.close(self._write_fd)
297
      self._write_fd = None
298
    self._poller = None
299

    
300
  def wait(self, timeout=None):
301
    """Wait for a notification.
302

303
    @type timeout: float or None
304
    @param timeout: Waiting timeout (can be None)
305

306
    """
307
    self._check_owned()
308
    self._check_unnotified()
309

    
310
    self._nwaiters += 1
311
    try:
312
      if self._poller is None:
313
        (self._read_fd, self._write_fd) = os.pipe()
314
        self._poller = select.poll()
315
        self._poller.register(self._read_fd, select.POLLHUP)
316

    
317
      wait_fn = self._waiter_class(self._poller, self._read_fd)
318
      state = self._release_save()
319
      try:
320
        # Wait for notification
321
        wait_fn(timeout)
322
      finally:
323
        # Re-acquire lock
324
        self._acquire_restore(state)
325
    finally:
326
      self._nwaiters -= 1
327
      if self._nwaiters == 0:
328
        self._Cleanup()
329

    
330
  def notifyAll(self): # pylint: disable-msg=C0103
331
    """Close the writing side of the pipe to notify all waiters.
332

333
    """
334
    self._check_owned()
335
    self._check_unnotified()
336
    self._notified = True
337
    if self._write_fd is not None:
338
      os.close(self._write_fd)
339
      self._write_fd = None
340

    
341

    
342
class PipeCondition(_BaseCondition):
343
  """Group-only non-polling condition with counters.
344

345
  This condition class uses pipes and poll, internally, to be able to wait for
346
  notification with a timeout, without resorting to polling. It is almost
347
  compatible with Python's threading.Condition, but only supports notifyAll and
348
  non-recursive locks. As an additional features it's able to report whether
349
  there are any waiting threads.
350

351
  """
352
  __slots__ = [
353
    "_waiters",
354
    "_single_condition",
355
    ]
356

    
357
  _single_condition_class = SingleNotifyPipeCondition
358

    
359
  def __init__(self, lock):
360
    """Initializes this class.
361

362
    """
363
    _BaseCondition.__init__(self, lock)
364
    self._waiters = set()
365
    self._single_condition = self._single_condition_class(self._lock)
366

    
367
  def wait(self, timeout=None):
368
    """Wait for a notification.
369

370
    @type timeout: float or None
371
    @param timeout: Waiting timeout (can be None)
372

373
    """
374
    self._check_owned()
375

    
376
    # Keep local reference to the pipe. It could be replaced by another thread
377
    # notifying while we're waiting.
378
    cond = self._single_condition
379

    
380
    self._waiters.add(threading.currentThread())
381
    try:
382
      cond.wait(timeout)
383
    finally:
384
      self._check_owned()
385
      self._waiters.remove(threading.currentThread())
386

    
387
  def notifyAll(self): # pylint: disable-msg=C0103
388
    """Notify all currently waiting threads.
389

390
    """
391
    self._check_owned()
392
    self._single_condition.notifyAll()
393
    self._single_condition = self._single_condition_class(self._lock)
394

    
395
  def get_waiting(self):
396
    """Returns a list of all waiting threads.
397

398
    """
399
    self._check_owned()
400

    
401
    return self._waiters
402

    
403
  def has_waiting(self):
404
    """Returns whether there are active waiters.
405

406
    """
407
    self._check_owned()
408

    
409
    return bool(self._waiters)
410

    
411

    
412
class _PipeConditionWithMode(PipeCondition):
413
  __slots__ = [
414
    "shared",
415
    ]
416

    
417
  def __init__(self, lock, shared):
418
    """Initializes this class.
419

420
    """
421
    self.shared = shared
422
    PipeCondition.__init__(self, lock)
423

    
424

    
425
class SharedLock(object):
426
  """Implements a shared lock.
427

428
  Multiple threads can acquire the lock in a shared way by calling
429
  C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
430
  threads can call C{acquire(shared=0)}.
431

432
  Notes on data structures: C{__pending} contains a priority queue (heapq) of
433
  all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
434
  ...]}. Each per-priority queue contains a normal in-order list of conditions
435
  to be notified when the lock can be acquired. Shared locks are grouped
436
  together by priority and the condition for them is stored in
437
  C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
438
  references for the per-priority queues indexed by priority for faster access.
439

440
  @type name: string
441
  @ivar name: the name of the lock
442

443
  """
444
  __slots__ = [
445
    "__weakref__",
446
    "__deleted",
447
    "__exc",
448
    "__lock",
449
    "__pending",
450
    "__pending_by_prio",
451
    "__pending_shared",
452
    "__shr",
453
    "name",
454
    ]
455

    
456
  __condition_class = _PipeConditionWithMode
457

    
458
  def __init__(self, name, monitor=None):
459
    """Construct a new SharedLock.
460

461
    @param name: the name of the lock
462
    @type monitor: L{LockMonitor}
463
    @param monitor: Lock monitor with which to register
464

465
    """
466
    object.__init__(self)
467

    
468
    self.name = name
469

    
470
    # Internal lock
471
    self.__lock = threading.Lock()
472

    
473
    # Queue containing waiting acquires
474
    self.__pending = []
475
    self.__pending_by_prio = {}
476
    self.__pending_shared = {}
477

    
478
    # Current lock holders
479
    self.__shr = set()
480
    self.__exc = None
481

    
482
    # is this lock in the deleted state?
483
    self.__deleted = False
484

    
485
    # Register with lock monitor
486
    if monitor:
487
      monitor.RegisterLock(self)
488

    
489
  def GetInfo(self, fields):
490
    """Retrieves information for querying locks.
491

492
    @type fields: list of strings
493
    @param fields: List of fields to return
494

495
    """
496
    self.__lock.acquire()
497
    try:
498
      info = []
499

    
500
      # Note: to avoid unintentional race conditions, no references to
501
      # modifiable objects should be returned unless they were created in this
502
      # function.
503
      for fname in fields:
504
        if fname == "name":
505
          info.append(self.name)
506
        elif fname == "mode":
507
          if self.__deleted:
508
            info.append("deleted")
509
            assert not (self.__exc or self.__shr)
510
          elif self.__exc:
511
            info.append(_EXCLUSIVE_TEXT)
512
          elif self.__shr:
513
            info.append(_SHARED_TEXT)
514
          else:
515
            info.append(None)
516
        elif fname == "owner":
517
          if self.__exc:
518
            owner = [self.__exc]
519
          else:
520
            owner = self.__shr
521

    
522
          if owner:
523
            assert not self.__deleted
524
            info.append([i.getName() for i in owner])
525
          else:
526
            info.append(None)
527
        elif fname == "pending":
528
          data = []
529

    
530
          # Sorting instead of copying and using heaq functions for simplicity
531
          for (_, prioqueue) in sorted(self.__pending):
532
            for cond in prioqueue:
533
              if cond.shared:
534
                mode = _SHARED_TEXT
535
              else:
536
                mode = _EXCLUSIVE_TEXT
537

    
538
              # This function should be fast as it runs with the lock held.
539
              # Hence not using utils.NiceSort.
540
              data.append((mode, sorted(i.getName()
541
                                        for i in cond.get_waiting())))
542

    
543
          info.append(data)
544
        else:
545
          raise errors.OpExecError("Invalid query field '%s'" % fname)
546

    
547
      return info
548
    finally:
549
      self.__lock.release()
550

    
551
  def __check_deleted(self):
552
    """Raises an exception if the lock has been deleted.
553

554
    """
555
    if self.__deleted:
556
      raise errors.LockError("Deleted lock %s" % self.name)
557

    
558
  def __is_sharer(self):
559
    """Is the current thread sharing the lock at this time?
560

561
    """
562
    return threading.currentThread() in self.__shr
563

    
564
  def __is_exclusive(self):
565
    """Is the current thread holding the lock exclusively at this time?
566

567
    """
568
    return threading.currentThread() == self.__exc
569

    
570
  def __is_owned(self, shared=-1):
571
    """Is the current thread somehow owning the lock at this time?
572

573
    This is a private version of the function, which presumes you're holding
574
    the internal lock.
575

576
    """
577
    if shared < 0:
578
      return self.__is_sharer() or self.__is_exclusive()
579
    elif shared:
580
      return self.__is_sharer()
581
    else:
582
      return self.__is_exclusive()
583

    
584
  def _is_owned(self, shared=-1):
585
    """Is the current thread somehow owning the lock at this time?
586

587
    @param shared:
588
        - < 0: check for any type of ownership (default)
589
        - 0: check for exclusive ownership
590
        - > 0: check for shared ownership
591

592
    """
593
    self.__lock.acquire()
594
    try:
595
      return self.__is_owned(shared=shared)
596
    finally:
597
      self.__lock.release()
598

    
599
  def _count_pending(self):
600
    """Returns the number of pending acquires.
601

602
    @rtype: int
603

604
    """
605
    self.__lock.acquire()
606
    try:
607
      return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
608
    finally:
609
      self.__lock.release()
610

    
611
  def _check_empty(self):
612
    """Checks whether there are any pending acquires.
613

614
    @rtype: bool
615

616
    """
617
    self.__lock.acquire()
618
    try:
619
      # Order is important: __find_first_pending_queue modifies __pending
620
      return not (self.__find_first_pending_queue() or
621
                  self.__pending or
622
                  self.__pending_by_prio or
623
                  self.__pending_shared)
624
    finally:
625
      self.__lock.release()
626

    
627
  def __do_acquire(self, shared):
628
    """Actually acquire the lock.
629

630
    """
631
    if shared:
632
      self.__shr.add(threading.currentThread())
633
    else:
634
      self.__exc = threading.currentThread()
635

    
636
  def __can_acquire(self, shared):
637
    """Determine whether lock can be acquired.
638

639
    """
640
    if shared:
641
      return self.__exc is None
642
    else:
643
      return len(self.__shr) == 0 and self.__exc is None
644

    
645
  def __find_first_pending_queue(self):
646
    """Tries to find the topmost queued entry with pending acquires.
647

648
    Removes empty entries while going through the list.
649

650
    """
651
    while self.__pending:
652
      (priority, prioqueue) = self.__pending[0]
653

    
654
      if not prioqueue:
655
        heapq.heappop(self.__pending)
656
        del self.__pending_by_prio[priority]
657
        assert priority not in self.__pending_shared
658
        continue
659

    
660
      if prioqueue:
661
        return prioqueue
662

    
663
    return None
664

    
665
  def __is_on_top(self, cond):
666
    """Checks whether the passed condition is on top of the queue.
667

668
    The caller must make sure the queue isn't empty.
669

670
    """
671
    return cond == self.__find_first_pending_queue()[0]
672

    
673
  def __acquire_unlocked(self, shared, timeout, priority):
674
    """Acquire a shared lock.
675

676
    @param shared: whether to acquire in shared mode; by default an
677
        exclusive lock will be acquired
678
    @param timeout: maximum waiting time before giving up
679
    @type priority: integer
680
    @param priority: Priority for acquiring lock
681

682
    """
683
    self.__check_deleted()
684

    
685
    # We cannot acquire the lock if we already have it
686
    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
687
                                   " %s" % self.name)
688

    
689
    # Remove empty entries from queue
690
    self.__find_first_pending_queue()
691

    
692
    # Check whether someone else holds the lock or there are pending acquires.
693
    if not self.__pending and self.__can_acquire(shared):
694
      # Apparently not, can acquire lock directly.
695
      self.__do_acquire(shared)
696
      return True
697

    
698
    prioqueue = self.__pending_by_prio.get(priority, None)
699

    
700
    if shared:
701
      # Try to re-use condition for shared acquire
702
      wait_condition = self.__pending_shared.get(priority, None)
703
      assert (wait_condition is None or
704
              (wait_condition.shared and wait_condition in prioqueue))
705
    else:
706
      wait_condition = None
707

    
708
    if wait_condition is None:
709
      if prioqueue is None:
710
        assert priority not in self.__pending_by_prio
711

    
712
        prioqueue = []
713
        heapq.heappush(self.__pending, (priority, prioqueue))
714
        self.__pending_by_prio[priority] = prioqueue
715

    
716
      wait_condition = self.__condition_class(self.__lock, shared)
717
      prioqueue.append(wait_condition)
718

    
719
      if shared:
720
        # Keep reference for further shared acquires on same priority. This is
721
        # better than trying to find it in the list of pending acquires.
722
        assert priority not in self.__pending_shared
723
        self.__pending_shared[priority] = wait_condition
724

    
725
    try:
726
      # Wait until we become the topmost acquire in the queue or the timeout
727
      # expires.
728
      # TODO: Decrease timeout with spurious notifications
729
      while not (self.__is_on_top(wait_condition) and
730
                 self.__can_acquire(shared)):
731
        # Wait for notification
732
        wait_condition.wait(timeout)
733
        self.__check_deleted()
734

    
735
        # A lot of code assumes blocking acquires always succeed. Loop
736
        # internally for that case.
737
        if timeout is not None:
738
          break
739

    
740
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
741
        self.__do_acquire(shared)
742
        return True
743
    finally:
744
      # Remove condition from queue if there are no more waiters
745
      if not wait_condition.has_waiting():
746
        prioqueue.remove(wait_condition)
747
        if wait_condition.shared:
748
          del self.__pending_shared[priority]
749

    
750
    return False
751

    
752
  def acquire(self, shared=0, timeout=None, priority=_DEFAULT_PRIORITY,
753
              test_notify=None):
754
    """Acquire a shared lock.
755

756
    @type shared: integer (0/1) used as a boolean
757
    @param shared: whether to acquire in shared mode; by default an
758
        exclusive lock will be acquired
759
    @type timeout: float
760
    @param timeout: maximum waiting time before giving up
761
    @type priority: integer
762
    @param priority: Priority for acquiring lock
763
    @type test_notify: callable or None
764
    @param test_notify: Special callback function for unittesting
765

766
    """
767
    self.__lock.acquire()
768
    try:
769
      # We already got the lock, notify now
770
      if __debug__ and callable(test_notify):
771
        test_notify()
772

    
773
      return self.__acquire_unlocked(shared, timeout, priority)
774
    finally:
775
      self.__lock.release()
776

    
777
  def release(self):
778
    """Release a Shared Lock.
779

780
    You must have acquired the lock, either in shared or in exclusive mode,
781
    before calling this function.
782

783
    """
784
    self.__lock.acquire()
785
    try:
786
      assert self.__is_exclusive() or self.__is_sharer(), \
787
        "Cannot release non-owned lock"
788

    
789
      # Autodetect release type
790
      if self.__is_exclusive():
791
        self.__exc = None
792
      else:
793
        self.__shr.remove(threading.currentThread())
794

    
795
      # Notify topmost condition in queue
796
      prioqueue = self.__find_first_pending_queue()
797
      if prioqueue:
798
        prioqueue[0].notifyAll()
799

    
800
    finally:
801
      self.__lock.release()
802

    
803
  def delete(self, timeout=None, priority=_DEFAULT_PRIORITY):
804
    """Delete a Shared Lock.
805

806
    This operation will declare the lock for removal. First the lock will be
807
    acquired in exclusive mode if you don't already own it, then the lock
808
    will be put in a state where any future and pending acquire() fail.
809

810
    @type timeout: float
811
    @param timeout: maximum waiting time before giving up
812
    @type priority: integer
813
    @param priority: Priority for acquiring lock
814

815
    """
816
    self.__lock.acquire()
817
    try:
818
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
819

    
820
      self.__check_deleted()
821

    
822
      # The caller is allowed to hold the lock exclusively already.
823
      acquired = self.__is_exclusive()
824

    
825
      if not acquired:
826
        acquired = self.__acquire_unlocked(0, timeout, priority)
827

    
828
        assert self.__is_exclusive() and not self.__is_sharer(), \
829
          "Lock wasn't acquired in exclusive mode"
830

    
831
      if acquired:
832
        self.__deleted = True
833
        self.__exc = None
834

    
835
        assert not (self.__exc or self.__shr), "Found owner during deletion"
836

    
837
        # Notify all acquires. They'll throw an error.
838
        for (_, prioqueue) in self.__pending:
839
          for cond in prioqueue:
840
            cond.notifyAll()
841

    
842
        assert self.__deleted
843

    
844
      return acquired
845
    finally:
846
      self.__lock.release()
847

    
848
  def _release_save(self):
849
    shared = self.__is_sharer()
850
    self.release()
851
    return shared
852

    
853
  def _acquire_restore(self, shared):
854
    self.acquire(shared=shared)
855

    
856

    
857
# Whenever we want to acquire a full LockSet we pass None as the value
858
# to acquire.  Hide this behind this nicely named constant.
859
ALL_SET = None
860

    
861

    
862
class _AcquireTimeout(Exception):
863
  """Internal exception to abort an acquire on a timeout.
864

865
  """
866

    
867

    
868
class LockSet:
869
  """Implements a set of locks.
870

871
  This abstraction implements a set of shared locks for the same resource type,
872
  distinguished by name. The user can lock a subset of the resources and the
873
  LockSet will take care of acquiring the locks always in the same order, thus
874
  preventing deadlock.
875

876
  All the locks needed in the same set must be acquired together, though.
877

878
  @type name: string
879
  @ivar name: the name of the lockset
880

881
  """
882
  def __init__(self, members, name, monitor=None):
883
    """Constructs a new LockSet.
884

885
    @type members: list of strings
886
    @param members: initial members of the set
887
    @type monitor: L{LockMonitor}
888
    @param monitor: Lock monitor with which to register member locks
889

890
    """
891
    assert members is not None, "members parameter is not a list"
892
    self.name = name
893

    
894
    # Lock monitor
895
    self.__monitor = monitor
896

    
897
    # Used internally to guarantee coherency.
898
    self.__lock = SharedLock(name)
899

    
900
    # The lockdict indexes the relationship name -> lock
901
    # The order-of-locking is implied by the alphabetical order of names
902
    self.__lockdict = {}
903

    
904
    for mname in members:
905
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
906
                                          monitor=monitor)
907

    
908
    # The owner dict contains the set of locks each thread owns. For
909
    # performance each thread can access its own key without a global lock on
910
    # this structure. It is paramount though that *no* other type of access is
911
    # done to this structure (eg. no looping over its keys). *_owner helper
912
    # function are defined to guarantee access is correct, but in general never
913
    # do anything different than __owners[threading.currentThread()], or there
914
    # will be trouble.
915
    self.__owners = {}
916

    
917
  def _GetLockName(self, mname):
918
    """Returns the name for a member lock.
919

920
    """
921
    return "%s/%s" % (self.name, mname)
922

    
923
  def _is_owned(self):
924
    """Is the current thread a current level owner?"""
925
    return threading.currentThread() in self.__owners
926

    
927
  def _add_owned(self, name=None):
928
    """Note the current thread owns the given lock"""
929
    if name is None:
930
      if not self._is_owned():
931
        self.__owners[threading.currentThread()] = set()
932
    else:
933
      if self._is_owned():
934
        self.__owners[threading.currentThread()].add(name)
935
      else:
936
        self.__owners[threading.currentThread()] = set([name])
937

    
938
  def _del_owned(self, name=None):
939
    """Note the current thread owns the given lock"""
940

    
941
    assert not (name is None and self.__lock._is_owned()), \
942
           "Cannot hold internal lock when deleting owner status"
943

    
944
    if name is not None:
945
      self.__owners[threading.currentThread()].remove(name)
946

    
947
    # Only remove the key if we don't hold the set-lock as well
948
    if (not self.__lock._is_owned() and
949
        not self.__owners[threading.currentThread()]):
950
      del self.__owners[threading.currentThread()]
951

    
952
  def _list_owned(self):
953
    """Get the set of resource names owned by the current thread"""
954
    if self._is_owned():
955
      return self.__owners[threading.currentThread()].copy()
956
    else:
957
      return set()
958

    
959
  def _release_and_delete_owned(self):
960
    """Release and delete all resources owned by the current thread"""
961
    for lname in self._list_owned():
962
      lock = self.__lockdict[lname]
963
      if lock._is_owned():
964
        lock.release()
965
      self._del_owned(name=lname)
966

    
967
  def __names(self):
968
    """Return the current set of names.
969

970
    Only call this function while holding __lock and don't iterate on the
971
    result after releasing the lock.
972

973
    """
974
    return self.__lockdict.keys()
975

    
976
  def _names(self):
977
    """Return a copy of the current set of elements.
978

979
    Used only for debugging purposes.
980

981
    """
982
    # If we don't already own the set-level lock acquired
983
    # we'll get it and note we need to release it later.
984
    release_lock = False
985
    if not self.__lock._is_owned():
986
      release_lock = True
987
      self.__lock.acquire(shared=1)
988
    try:
989
      result = self.__names()
990
    finally:
991
      if release_lock:
992
        self.__lock.release()
993
    return set(result)
994

    
995
  def acquire(self, names, timeout=None, shared=0, priority=_DEFAULT_PRIORITY,
996
              test_notify=None):
997
    """Acquire a set of resource locks.
998

999
    @type names: list of strings (or string)
1000
    @param names: the names of the locks which shall be acquired
1001
        (special lock names, or instance/node names)
1002
    @type shared: integer (0/1) used as a boolean
1003
    @param shared: whether to acquire in shared mode; by default an
1004
        exclusive lock will be acquired
1005
    @type timeout: float or None
1006
    @param timeout: Maximum time to acquire all locks
1007
    @type priority: integer
1008
    @param priority: Priority for acquiring locks
1009
    @type test_notify: callable or None
1010
    @param test_notify: Special callback function for unittesting
1011

1012
    @return: Set of all locks successfully acquired or None in case of timeout
1013

1014
    @raise errors.LockError: when any lock we try to acquire has
1015
        been deleted before we succeed. In this case none of the
1016
        locks requested will be acquired.
1017

1018
    """
1019
    assert timeout is None or timeout >= 0.0
1020

    
1021
    # Check we don't already own locks at this level
1022
    assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
1023
                                  " (lockset %s)" % self.name)
1024

    
1025
    # We need to keep track of how long we spent waiting for a lock. The
1026
    # timeout passed to this function is over all lock acquires.
1027
    running_timeout = RunningTimeout(timeout, False)
1028

    
1029
    try:
1030
      if names is not None:
1031
        # Support passing in a single resource to acquire rather than many
1032
        if isinstance(names, basestring):
1033
          names = [names]
1034

    
1035
        return self.__acquire_inner(names, False, shared, priority,
1036
                                    running_timeout.Remaining, test_notify)
1037

    
1038
      else:
1039
        # If no names are given acquire the whole set by not letting new names
1040
        # being added before we release, and getting the current list of names.
1041
        # Some of them may then be deleted later, but we'll cope with this.
1042
        #
1043
        # We'd like to acquire this lock in a shared way, as it's nice if
1044
        # everybody else can use the instances at the same time. If we are
1045
        # acquiring them exclusively though they won't be able to do this
1046
        # anyway, though, so we'll get the list lock exclusively as well in
1047
        # order to be able to do add() on the set while owning it.
1048
        if not self.__lock.acquire(shared=shared, priority=priority,
1049
                                   timeout=running_timeout.Remaining()):
1050
          raise _AcquireTimeout()
1051
        try:
1052
          # note we own the set-lock
1053
          self._add_owned()
1054

    
1055
          return self.__acquire_inner(self.__names(), True, shared, priority,
1056
                                      running_timeout.Remaining, test_notify)
1057
        except:
1058
          # We shouldn't have problems adding the lock to the owners list, but
1059
          # if we did we'll try to release this lock and re-raise exception.
1060
          # Of course something is going to be really wrong, after this.
1061
          self.__lock.release()
1062
          self._del_owned()
1063
          raise
1064

    
1065
    except _AcquireTimeout:
1066
      return None
1067

    
1068
  def __acquire_inner(self, names, want_all, shared, priority,
1069
                      timeout_fn, test_notify):
1070
    """Inner logic for acquiring a number of locks.
1071

1072
    @param names: Names of the locks to be acquired
1073
    @param want_all: Whether all locks in the set should be acquired
1074
    @param shared: Whether to acquire in shared mode
1075
    @param timeout_fn: Function returning remaining timeout
1076
    @param priority: Priority for acquiring locks
1077
    @param test_notify: Special callback function for unittesting
1078

1079
    """
1080
    acquire_list = []
1081

    
1082
    # First we look the locks up on __lockdict. We have no way of being sure
1083
    # they will still be there after, but this makes it a lot faster should
1084
    # just one of them be the already wrong. Using a sorted sequence to prevent
1085
    # deadlocks.
1086
    for lname in sorted(utils.UniqueSequence(names)):
1087
      try:
1088
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
1089
      except KeyError:
1090
        if want_all:
1091
          # We are acquiring all the set, it doesn't matter if this particular
1092
          # element is not there anymore.
1093
          continue
1094

    
1095
        raise errors.LockError("Non-existing lock %s in set %s" %
1096
                               (lname, self.name))
1097

    
1098
      acquire_list.append((lname, lock))
1099

    
1100
    # This will hold the locknames we effectively acquired.
1101
    acquired = set()
1102

    
1103
    try:
1104
      # Now acquire_list contains a sorted list of resources and locks we
1105
      # want.  In order to get them we loop on this (private) list and
1106
      # acquire() them.  We gave no real guarantee they will still exist till
1107
      # this is done but .acquire() itself is safe and will alert us if the
1108
      # lock gets deleted.
1109
      for (lname, lock) in acquire_list:
1110
        if __debug__ and callable(test_notify):
1111
          test_notify_fn = lambda: test_notify(lname)
1112
        else:
1113
          test_notify_fn = None
1114

    
1115
        timeout = timeout_fn()
1116

    
1117
        try:
1118
          # raises LockError if the lock was deleted
1119
          acq_success = lock.acquire(shared=shared, timeout=timeout,
1120
                                     priority=priority,
1121
                                     test_notify=test_notify_fn)
1122
        except errors.LockError:
1123
          if want_all:
1124
            # We are acquiring all the set, it doesn't matter if this
1125
            # particular element is not there anymore.
1126
            continue
1127

    
1128
          raise errors.LockError("Non-existing lock %s in set %s" %
1129
                                 (lname, self.name))
1130

    
1131
        if not acq_success:
1132
          # Couldn't get lock or timeout occurred
1133
          if timeout is None:
1134
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
1135
            # blocking.
1136
            raise errors.LockError("Failed to get lock %s (set %s)" %
1137
                                   (lname, self.name))
1138

    
1139
          raise _AcquireTimeout()
1140

    
1141
        try:
1142
          # now the lock cannot be deleted, we have it!
1143
          self._add_owned(name=lname)
1144
          acquired.add(lname)
1145

    
1146
        except:
1147
          # We shouldn't have problems adding the lock to the owners list, but
1148
          # if we did we'll try to release this lock and re-raise exception.
1149
          # Of course something is going to be really wrong after this.
1150
          if lock._is_owned():
1151
            lock.release()
1152
          raise
1153

    
1154
    except:
1155
      # Release all owned locks
1156
      self._release_and_delete_owned()
1157
      raise
1158

    
1159
    return acquired
1160

    
1161
  def release(self, names=None):
1162
    """Release a set of resource locks, at the same level.
1163

1164
    You must have acquired the locks, either in shared or in exclusive mode,
1165
    before releasing them.
1166

1167
    @type names: list of strings, or None
1168
    @param names: the names of the locks which shall be released
1169
        (defaults to all the locks acquired at that level).
1170

1171
    """
1172
    assert self._is_owned(), ("release() on lock set %s while not owner" %
1173
                              self.name)
1174

    
1175
    # Support passing in a single resource to release rather than many
1176
    if isinstance(names, basestring):
1177
      names = [names]
1178

    
1179
    if names is None:
1180
      names = self._list_owned()
1181
    else:
1182
      names = set(names)
1183
      assert self._list_owned().issuperset(names), (
1184
               "release() on unheld resources %s (set %s)" %
1185
               (names.difference(self._list_owned()), self.name))
1186

    
1187
    # First of all let's release the "all elements" lock, if set.
1188
    # After this 'add' can work again
1189
    if self.__lock._is_owned():
1190
      self.__lock.release()
1191
      self._del_owned()
1192

    
1193
    for lockname in names:
1194
      # If we are sure the lock doesn't leave __lockdict without being
1195
      # exclusively held we can do this...
1196
      self.__lockdict[lockname].release()
1197
      self._del_owned(name=lockname)
1198

    
1199
  def add(self, names, acquired=0, shared=0):
1200
    """Add a new set of elements to the set
1201

1202
    @type names: list of strings
1203
    @param names: names of the new elements to add
1204
    @type acquired: integer (0/1) used as a boolean
1205
    @param acquired: pre-acquire the new resource?
1206
    @type shared: integer (0/1) used as a boolean
1207
    @param shared: is the pre-acquisition shared?
1208

1209
    """
1210
    # Check we don't already own locks at this level
1211
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1212
      ("Cannot add locks if the set %s is only partially owned, or shared" %
1213
       self.name)
1214

    
1215
    # Support passing in a single resource to add rather than many
1216
    if isinstance(names, basestring):
1217
      names = [names]
1218

    
1219
    # If we don't already own the set-level lock acquired in an exclusive way
1220
    # we'll get it and note we need to release it later.
1221
    release_lock = False
1222
    if not self.__lock._is_owned():
1223
      release_lock = True
1224
      self.__lock.acquire()
1225

    
1226
    try:
1227
      invalid_names = set(self.__names()).intersection(names)
1228
      if invalid_names:
1229
        # This must be an explicit raise, not an assert, because assert is
1230
        # turned off when using optimization, and this can happen because of
1231
        # concurrency even if the user doesn't want it.
1232
        raise errors.LockError("duplicate add(%s) on lockset %s" %
1233
                               (invalid_names, self.name))
1234

    
1235
      for lockname in names:
1236
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1237

    
1238
        if acquired:
1239
          # No need for priority or timeout here as this lock has just been
1240
          # created
1241
          lock.acquire(shared=shared)
1242
          # now the lock cannot be deleted, we have it!
1243
          try:
1244
            self._add_owned(name=lockname)
1245
          except:
1246
            # We shouldn't have problems adding the lock to the owners list,
1247
            # but if we did we'll try to release this lock and re-raise
1248
            # exception.  Of course something is going to be really wrong,
1249
            # after this.  On the other hand the lock hasn't been added to the
1250
            # __lockdict yet so no other threads should be pending on it. This
1251
            # release is just a safety measure.
1252
            lock.release()
1253
            raise
1254

    
1255
        self.__lockdict[lockname] = lock
1256

    
1257
    finally:
1258
      # Only release __lock if we were not holding it previously.
1259
      if release_lock:
1260
        self.__lock.release()
1261

    
1262
    return True
1263

    
1264
  def remove(self, names):
1265
    """Remove elements from the lock set.
1266

1267
    You can either not hold anything in the lockset or already hold a superset
1268
    of the elements you want to delete, exclusively.
1269

1270
    @type names: list of strings
1271
    @param names: names of the resource to remove.
1272

1273
    @return: a list of locks which we removed; the list is always
1274
        equal to the names list if we were holding all the locks
1275
        exclusively
1276

1277
    """
1278
    # Support passing in a single resource to remove rather than many
1279
    if isinstance(names, basestring):
1280
      names = [names]
1281

    
1282
    # If we own any subset of this lock it must be a superset of what we want
1283
    # to delete. The ownership must also be exclusive, but that will be checked
1284
    # by the lock itself.
1285
    assert not self._is_owned() or self._list_owned().issuperset(names), (
1286
      "remove() on acquired lockset %s while not owning all elements" %
1287
      self.name)
1288

    
1289
    removed = []
1290

    
1291
    for lname in names:
1292
      # Calling delete() acquires the lock exclusively if we don't already own
1293
      # it, and causes all pending and subsequent lock acquires to fail. It's
1294
      # fine to call it out of order because delete() also implies release(),
1295
      # and the assertion above guarantees that if we either already hold
1296
      # everything we want to delete, or we hold none.
1297
      try:
1298
        self.__lockdict[lname].delete()
1299
        removed.append(lname)
1300
      except (KeyError, errors.LockError):
1301
        # This cannot happen if we were already holding it, verify:
1302
        assert not self._is_owned(), ("remove failed while holding lockset %s"
1303
                                      % self.name)
1304
      else:
1305
        # If no LockError was raised we are the ones who deleted the lock.
1306
        # This means we can safely remove it from lockdict, as any further or
1307
        # pending delete() or acquire() will fail (and nobody can have the lock
1308
        # since before our call to delete()).
1309
        #
1310
        # This is done in an else clause because if the exception was thrown
1311
        # it's the job of the one who actually deleted it.
1312
        del self.__lockdict[lname]
1313
        # And let's remove it from our private list if we owned it.
1314
        if self._is_owned():
1315
          self._del_owned(name=lname)
1316

    
1317
    return removed
1318

    
1319

    
1320
# Locking levels, must be acquired in increasing order.
1321
# Current rules are:
1322
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1323
#   acquired before performing any operation, either in shared or in exclusive
1324
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1325
#   avoided.
1326
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1327
#   If you need more than one node, or more than one instance, acquire them at
1328
#   the same time.
1329
LEVEL_CLUSTER = 0
1330
LEVEL_INSTANCE = 1
1331
LEVEL_NODE = 2
1332

    
1333
LEVELS = [LEVEL_CLUSTER,
1334
          LEVEL_INSTANCE,
1335
          LEVEL_NODE]
1336

    
1337
# Lock levels which are modifiable
1338
LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1339

    
1340
LEVEL_NAMES = {
1341
  LEVEL_CLUSTER: "cluster",
1342
  LEVEL_INSTANCE: "instance",
1343
  LEVEL_NODE: "node",
1344
  }
1345

    
1346
# Constant for the big ganeti lock
1347
BGL = 'BGL'
1348

    
1349

    
1350
class GanetiLockManager:
1351
  """The Ganeti Locking Library
1352

1353
  The purpose of this small library is to manage locking for ganeti clusters
1354
  in a central place, while at the same time doing dynamic checks against
1355
  possible deadlocks. It will also make it easier to transition to a different
1356
  lock type should we migrate away from python threads.
1357

1358
  """
1359
  _instance = None
1360

    
1361
  def __init__(self, nodes=None, instances=None):
1362
    """Constructs a new GanetiLockManager object.
1363

1364
    There should be only a GanetiLockManager object at any time, so this
1365
    function raises an error if this is not the case.
1366

1367
    @param nodes: list of node names
1368
    @param instances: list of instance names
1369

1370
    """
1371
    assert self.__class__._instance is None, \
1372
           "double GanetiLockManager instance"
1373

    
1374
    self.__class__._instance = self
1375

    
1376
    self._monitor = LockMonitor()
1377

    
1378
    # The keyring contains all the locks, at their level and in the correct
1379
    # locking order.
1380
    self.__keyring = {
1381
      LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1382
      LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1383
      LEVEL_INSTANCE: LockSet(instances, "instances",
1384
                              monitor=self._monitor),
1385
      }
1386

    
1387
  def QueryLocks(self, fields, sync):
1388
    """Queries information from all locks.
1389

1390
    See L{LockMonitor.QueryLocks}.
1391

1392
    """
1393
    return self._monitor.QueryLocks(fields, sync)
1394

    
1395
  def _names(self, level):
1396
    """List the lock names at the given level.
1397

1398
    This can be used for debugging/testing purposes.
1399

1400
    @param level: the level whose list of locks to get
1401

1402
    """
1403
    assert level in LEVELS, "Invalid locking level %s" % level
1404
    return self.__keyring[level]._names()
1405

    
1406
  def _is_owned(self, level):
1407
    """Check whether we are owning locks at the given level
1408

1409
    """
1410
    return self.__keyring[level]._is_owned()
1411

    
1412
  is_owned = _is_owned
1413

    
1414
  def _list_owned(self, level):
1415
    """Get the set of owned locks at the given level
1416

1417
    """
1418
    return self.__keyring[level]._list_owned()
1419

    
1420
  def _upper_owned(self, level):
1421
    """Check that we don't own any lock at a level greater than the given one.
1422

1423
    """
1424
    # This way of checking only works if LEVELS[i] = i, which we check for in
1425
    # the test cases.
1426
    return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1427

    
1428
  def _BGL_owned(self): # pylint: disable-msg=C0103
1429
    """Check if the current thread owns the BGL.
1430

1431
    Both an exclusive or a shared acquisition work.
1432

1433
    """
1434
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1435

    
1436
  @staticmethod
1437
  def _contains_BGL(level, names): # pylint: disable-msg=C0103
1438
    """Check if the level contains the BGL.
1439

1440
    Check if acting on the given level and set of names will change
1441
    the status of the Big Ganeti Lock.
1442

1443
    """
1444
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1445

    
1446
  def acquire(self, level, names, timeout=None, shared=0):
1447
    """Acquire a set of resource locks, at the same level.
1448

1449
    @type level: member of locking.LEVELS
1450
    @param level: the level at which the locks shall be acquired
1451
    @type names: list of strings (or string)
1452
    @param names: the names of the locks which shall be acquired
1453
        (special lock names, or instance/node names)
1454
    @type shared: integer (0/1) used as a boolean
1455
    @param shared: whether to acquire in shared mode; by default
1456
        an exclusive lock will be acquired
1457
    @type timeout: float
1458
    @param timeout: Maximum time to acquire all locks
1459

1460
    """
1461
    assert level in LEVELS, "Invalid locking level %s" % level
1462

    
1463
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1464
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1465
    # so even if we've migrated we need to at least share the BGL to be
1466
    # compatible with them. Of course if we own the BGL exclusively there's no
1467
    # point in acquiring any other lock, unless perhaps we are half way through
1468
    # the migration of the current opcode.
1469
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1470
            "You must own the Big Ganeti Lock before acquiring any other")
1471

    
1472
    # Check we don't own locks at the same or upper levels.
1473
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1474
           " while owning some at a greater one")
1475

    
1476
    # Acquire the locks in the set.
1477
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1478

    
1479
  def release(self, level, names=None):
1480
    """Release a set of resource locks, at the same level.
1481

1482
    You must have acquired the locks, either in shared or in exclusive
1483
    mode, before releasing them.
1484

1485
    @type level: member of locking.LEVELS
1486
    @param level: the level at which the locks shall be released
1487
    @type names: list of strings, or None
1488
    @param names: the names of the locks which shall be released
1489
        (defaults to all the locks acquired at that level)
1490

1491
    """
1492
    assert level in LEVELS, "Invalid locking level %s" % level
1493
    assert (not self._contains_BGL(level, names) or
1494
            not self._upper_owned(LEVEL_CLUSTER)), (
1495
            "Cannot release the Big Ganeti Lock while holding something"
1496
            " at upper levels (%r)" %
1497
            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1498
                              for i in self.__keyring.keys()]), ))
1499

    
1500
    # Release will complain if we don't own the locks already
1501
    return self.__keyring[level].release(names)
1502

    
1503
  def add(self, level, names, acquired=0, shared=0):
1504
    """Add locks at the specified level.
1505

1506
    @type level: member of locking.LEVELS_MOD
1507
    @param level: the level at which the locks shall be added
1508
    @type names: list of strings
1509
    @param names: names of the locks to acquire
1510
    @type acquired: integer (0/1) used as a boolean
1511
    @param acquired: whether to acquire the newly added locks
1512
    @type shared: integer (0/1) used as a boolean
1513
    @param shared: whether the acquisition will be shared
1514

1515
    """
1516
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1517
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1518
           " operations")
1519
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1520
           " while owning some at a greater one")
1521
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1522

    
1523
  def remove(self, level, names):
1524
    """Remove locks from the specified level.
1525

1526
    You must either already own the locks you are trying to remove
1527
    exclusively or not own any lock at an upper level.
1528

1529
    @type level: member of locking.LEVELS_MOD
1530
    @param level: the level at which the locks shall be removed
1531
    @type names: list of strings
1532
    @param names: the names of the locks which shall be removed
1533
        (special lock names, or instance/node names)
1534

1535
    """
1536
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1537
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1538
           " operations")
1539
    # Check we either own the level or don't own anything from here
1540
    # up. LockSet.remove() will check the case in which we don't own
1541
    # all the needed resources, or we have a shared ownership.
1542
    assert self._is_owned(level) or not self._upper_owned(level), (
1543
           "Cannot remove locks at a level while not owning it or"
1544
           " owning some at a greater one")
1545
    return self.__keyring[level].remove(names)
1546

    
1547

    
1548
class LockMonitor(object):
1549
  _LOCK_ATTR = "_lock"
1550

    
1551
  def __init__(self):
1552
    """Initializes this class.
1553

1554
    """
1555
    self._lock = SharedLock("LockMonitor")
1556

    
1557
    # Tracked locks. Weak references are used to avoid issues with circular
1558
    # references and deletion.
1559
    self._locks = weakref.WeakKeyDictionary()
1560

    
1561
  @ssynchronized(_LOCK_ATTR)
1562
  def RegisterLock(self, lock):
1563
    """Registers a new lock.
1564

1565
    """
1566
    logging.debug("Registering lock %s", lock.name)
1567
    assert lock not in self._locks, "Duplicate lock registration"
1568
    assert not compat.any(lock.name == i.name for i in self._locks.keys()), \
1569
           "Found duplicate lock name"
1570
    self._locks[lock] = None
1571

    
1572
  @ssynchronized(_LOCK_ATTR)
1573
  def _GetLockInfo(self, fields):
1574
    """Get information from all locks while the monitor lock is held.
1575

1576
    """
1577
    result = {}
1578

    
1579
    for lock in self._locks.keys():
1580
      assert lock.name not in result, "Found duplicate lock name"
1581
      result[lock.name] = lock.GetInfo(fields)
1582

    
1583
    return result
1584

    
1585
  def QueryLocks(self, fields, sync):
1586
    """Queries information from all locks.
1587

1588
    @type fields: list of strings
1589
    @param fields: List of fields to return
1590
    @type sync: boolean
1591
    @param sync: Whether to operate in synchronous mode
1592

1593
    """
1594
    if sync:
1595
      raise NotImplementedError("Synchronous queries are not implemented")
1596

    
1597
    # Get all data without sorting
1598
    result = self._GetLockInfo(fields)
1599

    
1600
    # Sort by name
1601
    return [result[name] for name in utils.NiceSort(result.keys())]