Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ 66e884e1

History | View | Annotate | Download (47.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable-msg=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import errno
32
import weakref
33
import logging
34
import heapq
35

    
36
from ganeti import errors
37
from ganeti import utils
38
from ganeti import compat
39

    
40

    
41
_EXCLUSIVE_TEXT = "exclusive"
42
_SHARED_TEXT = "shared"
43

    
44
_DEFAULT_PRIORITY = 0
45

    
46

    
47
def ssynchronized(mylock, shared=0):
48
  """Shared Synchronization decorator.
49

50
  Calls the function holding the given lock, either in exclusive or shared
51
  mode. It requires the passed lock to be a SharedLock (or support its
52
  semantics).
53

54
  @type mylock: lockable object or string
55
  @param mylock: lock to acquire or class member name of the lock to acquire
56

57
  """
58
  def wrap(fn):
59
    def sync_function(*args, **kwargs):
60
      if isinstance(mylock, basestring):
61
        assert args, "cannot ssynchronize on non-class method: self not found"
62
        # args[0] is "self"
63
        lock = getattr(args[0], mylock)
64
      else:
65
        lock = mylock
66
      lock.acquire(shared=shared)
67
      try:
68
        return fn(*args, **kwargs)
69
      finally:
70
        lock.release()
71
    return sync_function
72
  return wrap
73

    
74

    
75
class _SingleNotifyPipeConditionWaiter(object):
76
  """Helper class for SingleNotifyPipeCondition
77

78
  """
79
  __slots__ = [
80
    "_fd",
81
    "_poller",
82
    ]
83

    
84
  def __init__(self, poller, fd):
85
    """Constructor for _SingleNotifyPipeConditionWaiter
86

87
    @type poller: select.poll
88
    @param poller: Poller object
89
    @type fd: int
90
    @param fd: File descriptor to wait for
91

92
    """
93
    object.__init__(self)
94
    self._poller = poller
95
    self._fd = fd
96

    
97
  def __call__(self, timeout):
98
    """Wait for something to happen on the pipe.
99

100
    @type timeout: float or None
101
    @param timeout: Timeout for waiting (can be None)
102

103
    """
104
    running_timeout = utils.RunningTimeout(timeout, True)
105

    
106
    while True:
107
      remaining_time = running_timeout.Remaining()
108

    
109
      if remaining_time is not None:
110
        if remaining_time < 0.0:
111
          break
112

    
113
        # Our calculation uses seconds, poll() wants milliseconds
114
        remaining_time *= 1000
115

    
116
      try:
117
        result = self._poller.poll(remaining_time)
118
      except EnvironmentError, err:
119
        if err.errno != errno.EINTR:
120
          raise
121
        result = None
122

    
123
      # Check whether we were notified
124
      if result and result[0][0] == self._fd:
125
        break
126

    
127

    
128
class _BaseCondition(object):
129
  """Base class containing common code for conditions.
130

131
  Some of this code is taken from python's threading module.
132

133
  """
134
  __slots__ = [
135
    "_lock",
136
    "acquire",
137
    "release",
138
    "_is_owned",
139
    "_acquire_restore",
140
    "_release_save",
141
    ]
142

    
143
  def __init__(self, lock):
144
    """Constructor for _BaseCondition.
145

146
    @type lock: threading.Lock
147
    @param lock: condition base lock
148

149
    """
150
    object.__init__(self)
151

    
152
    try:
153
      self._release_save = lock._release_save
154
    except AttributeError:
155
      self._release_save = self._base_release_save
156
    try:
157
      self._acquire_restore = lock._acquire_restore
158
    except AttributeError:
159
      self._acquire_restore = self._base_acquire_restore
160
    try:
161
      self._is_owned = lock._is_owned
162
    except AttributeError:
163
      self._is_owned = self._base_is_owned
164

    
165
    self._lock = lock
166

    
167
    # Export the lock's acquire() and release() methods
168
    self.acquire = lock.acquire
169
    self.release = lock.release
170

    
171
  def _base_is_owned(self):
172
    """Check whether lock is owned by current thread.
173

174
    """
175
    if self._lock.acquire(0):
176
      self._lock.release()
177
      return False
178
    return True
179

    
180
  def _base_release_save(self):
181
    self._lock.release()
182

    
183
  def _base_acquire_restore(self, _):
184
    self._lock.acquire()
185

    
186
  def _check_owned(self):
187
    """Raise an exception if the current thread doesn't own the lock.
188

189
    """
190
    if not self._is_owned():
191
      raise RuntimeError("cannot work with un-aquired lock")
192

    
193

    
194
class SingleNotifyPipeCondition(_BaseCondition):
195
  """Condition which can only be notified once.
196

197
  This condition class uses pipes and poll, internally, to be able to wait for
198
  notification with a timeout, without resorting to polling. It is almost
199
  compatible with Python's threading.Condition, with the following differences:
200
    - notifyAll can only be called once, and no wait can happen after that
201
    - notify is not supported, only notifyAll
202

203
  """
204

    
205
  __slots__ = [
206
    "_poller",
207
    "_read_fd",
208
    "_write_fd",
209
    "_nwaiters",
210
    "_notified",
211
    ]
212

    
213
  _waiter_class = _SingleNotifyPipeConditionWaiter
214

    
215
  def __init__(self, lock):
216
    """Constructor for SingleNotifyPipeCondition
217

218
    """
219
    _BaseCondition.__init__(self, lock)
220
    self._nwaiters = 0
221
    self._notified = False
222
    self._read_fd = None
223
    self._write_fd = None
224
    self._poller = None
225

    
226
  def _check_unnotified(self):
227
    """Throws an exception if already notified.
228

229
    """
230
    if self._notified:
231
      raise RuntimeError("cannot use already notified condition")
232

    
233
  def _Cleanup(self):
234
    """Cleanup open file descriptors, if any.
235

236
    """
237
    if self._read_fd is not None:
238
      os.close(self._read_fd)
239
      self._read_fd = None
240

    
241
    if self._write_fd is not None:
242
      os.close(self._write_fd)
243
      self._write_fd = None
244
    self._poller = None
245

    
246
  def wait(self, timeout=None):
247
    """Wait for a notification.
248

249
    @type timeout: float or None
250
    @param timeout: Waiting timeout (can be None)
251

252
    """
253
    self._check_owned()
254
    self._check_unnotified()
255

    
256
    self._nwaiters += 1
257
    try:
258
      if self._poller is None:
259
        (self._read_fd, self._write_fd) = os.pipe()
260
        self._poller = select.poll()
261
        self._poller.register(self._read_fd, select.POLLHUP)
262

    
263
      wait_fn = self._waiter_class(self._poller, self._read_fd)
264
      state = self._release_save()
265
      try:
266
        # Wait for notification
267
        wait_fn(timeout)
268
      finally:
269
        # Re-acquire lock
270
        self._acquire_restore(state)
271
    finally:
272
      self._nwaiters -= 1
273
      if self._nwaiters == 0:
274
        self._Cleanup()
275

    
276
  def notifyAll(self): # pylint: disable-msg=C0103
277
    """Close the writing side of the pipe to notify all waiters.
278

279
    """
280
    self._check_owned()
281
    self._check_unnotified()
282
    self._notified = True
283
    if self._write_fd is not None:
284
      os.close(self._write_fd)
285
      self._write_fd = None
286

    
287

    
288
class PipeCondition(_BaseCondition):
289
  """Group-only non-polling condition with counters.
290

291
  This condition class uses pipes and poll, internally, to be able to wait for
292
  notification with a timeout, without resorting to polling. It is almost
293
  compatible with Python's threading.Condition, but only supports notifyAll and
294
  non-recursive locks. As an additional features it's able to report whether
295
  there are any waiting threads.
296

297
  """
298
  __slots__ = [
299
    "_waiters",
300
    "_single_condition",
301
    ]
302

    
303
  _single_condition_class = SingleNotifyPipeCondition
304

    
305
  def __init__(self, lock):
306
    """Initializes this class.
307

308
    """
309
    _BaseCondition.__init__(self, lock)
310
    self._waiters = set()
311
    self._single_condition = self._single_condition_class(self._lock)
312

    
313
  def wait(self, timeout=None):
314
    """Wait for a notification.
315

316
    @type timeout: float or None
317
    @param timeout: Waiting timeout (can be None)
318

319
    """
320
    self._check_owned()
321

    
322
    # Keep local reference to the pipe. It could be replaced by another thread
323
    # notifying while we're waiting.
324
    cond = self._single_condition
325

    
326
    self._waiters.add(threading.currentThread())
327
    try:
328
      cond.wait(timeout)
329
    finally:
330
      self._check_owned()
331
      self._waiters.remove(threading.currentThread())
332

    
333
  def notifyAll(self): # pylint: disable-msg=C0103
334
    """Notify all currently waiting threads.
335

336
    """
337
    self._check_owned()
338
    self._single_condition.notifyAll()
339
    self._single_condition = self._single_condition_class(self._lock)
340

    
341
  def get_waiting(self):
342
    """Returns a list of all waiting threads.
343

344
    """
345
    self._check_owned()
346

    
347
    return self._waiters
348

    
349
  def has_waiting(self):
350
    """Returns whether there are active waiters.
351

352
    """
353
    self._check_owned()
354

    
355
    return bool(self._waiters)
356

    
357

    
358
class _PipeConditionWithMode(PipeCondition):
359
  __slots__ = [
360
    "shared",
361
    ]
362

    
363
  def __init__(self, lock, shared):
364
    """Initializes this class.
365

366
    """
367
    self.shared = shared
368
    PipeCondition.__init__(self, lock)
369

    
370

    
371
class SharedLock(object):
372
  """Implements a shared lock.
373

374
  Multiple threads can acquire the lock in a shared way by calling
375
  C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
376
  threads can call C{acquire(shared=0)}.
377

378
  Notes on data structures: C{__pending} contains a priority queue (heapq) of
379
  all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
380
  ...]}. Each per-priority queue contains a normal in-order list of conditions
381
  to be notified when the lock can be acquired. Shared locks are grouped
382
  together by priority and the condition for them is stored in
383
  C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
384
  references for the per-priority queues indexed by priority for faster access.
385

386
  @type name: string
387
  @ivar name: the name of the lock
388

389
  """
390
  __slots__ = [
391
    "__weakref__",
392
    "__deleted",
393
    "__exc",
394
    "__lock",
395
    "__pending",
396
    "__pending_by_prio",
397
    "__pending_shared",
398
    "__shr",
399
    "name",
400
    ]
401

    
402
  __condition_class = _PipeConditionWithMode
403

    
404
  def __init__(self, name, monitor=None):
405
    """Construct a new SharedLock.
406

407
    @param name: the name of the lock
408
    @type monitor: L{LockMonitor}
409
    @param monitor: Lock monitor with which to register
410

411
    """
412
    object.__init__(self)
413

    
414
    self.name = name
415

    
416
    # Internal lock
417
    self.__lock = threading.Lock()
418

    
419
    # Queue containing waiting acquires
420
    self.__pending = []
421
    self.__pending_by_prio = {}
422
    self.__pending_shared = {}
423

    
424
    # Current lock holders
425
    self.__shr = set()
426
    self.__exc = None
427

    
428
    # is this lock in the deleted state?
429
    self.__deleted = False
430

    
431
    # Register with lock monitor
432
    if monitor:
433
      monitor.RegisterLock(self)
434

    
435
  def GetInfo(self, fields):
436
    """Retrieves information for querying locks.
437

438
    @type fields: list of strings
439
    @param fields: List of fields to return
440

441
    """
442
    self.__lock.acquire()
443
    try:
444
      info = []
445

    
446
      # Note: to avoid unintentional race conditions, no references to
447
      # modifiable objects should be returned unless they were created in this
448
      # function.
449
      for fname in fields:
450
        if fname == "name":
451
          info.append(self.name)
452
        elif fname == "mode":
453
          if self.__deleted:
454
            info.append("deleted")
455
            assert not (self.__exc or self.__shr)
456
          elif self.__exc:
457
            info.append(_EXCLUSIVE_TEXT)
458
          elif self.__shr:
459
            info.append(_SHARED_TEXT)
460
          else:
461
            info.append(None)
462
        elif fname == "owner":
463
          if self.__exc:
464
            owner = [self.__exc]
465
          else:
466
            owner = self.__shr
467

    
468
          if owner:
469
            assert not self.__deleted
470
            info.append([i.getName() for i in owner])
471
          else:
472
            info.append(None)
473
        elif fname == "pending":
474
          data = []
475

    
476
          # Sorting instead of copying and using heaq functions for simplicity
477
          for (_, prioqueue) in sorted(self.__pending):
478
            for cond in prioqueue:
479
              if cond.shared:
480
                mode = _SHARED_TEXT
481
              else:
482
                mode = _EXCLUSIVE_TEXT
483

    
484
              # This function should be fast as it runs with the lock held.
485
              # Hence not using utils.NiceSort.
486
              data.append((mode, sorted(i.getName()
487
                                        for i in cond.get_waiting())))
488

    
489
          info.append(data)
490
        else:
491
          raise errors.OpExecError("Invalid query field '%s'" % fname)
492

    
493
      return info
494
    finally:
495
      self.__lock.release()
496

    
497
  def __check_deleted(self):
498
    """Raises an exception if the lock has been deleted.
499

500
    """
501
    if self.__deleted:
502
      raise errors.LockError("Deleted lock %s" % self.name)
503

    
504
  def __is_sharer(self):
505
    """Is the current thread sharing the lock at this time?
506

507
    """
508
    return threading.currentThread() in self.__shr
509

    
510
  def __is_exclusive(self):
511
    """Is the current thread holding the lock exclusively at this time?
512

513
    """
514
    return threading.currentThread() == self.__exc
515

    
516
  def __is_owned(self, shared=-1):
517
    """Is the current thread somehow owning the lock at this time?
518

519
    This is a private version of the function, which presumes you're holding
520
    the internal lock.
521

522
    """
523
    if shared < 0:
524
      return self.__is_sharer() or self.__is_exclusive()
525
    elif shared:
526
      return self.__is_sharer()
527
    else:
528
      return self.__is_exclusive()
529

    
530
  def _is_owned(self, shared=-1):
531
    """Is the current thread somehow owning the lock at this time?
532

533
    @param shared:
534
        - < 0: check for any type of ownership (default)
535
        - 0: check for exclusive ownership
536
        - > 0: check for shared ownership
537

538
    """
539
    self.__lock.acquire()
540
    try:
541
      return self.__is_owned(shared=shared)
542
    finally:
543
      self.__lock.release()
544

    
545
  def _count_pending(self):
546
    """Returns the number of pending acquires.
547

548
    @rtype: int
549

550
    """
551
    self.__lock.acquire()
552
    try:
553
      return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
554
    finally:
555
      self.__lock.release()
556

    
557
  def _check_empty(self):
558
    """Checks whether there are any pending acquires.
559

560
    @rtype: bool
561

562
    """
563
    self.__lock.acquire()
564
    try:
565
      # Order is important: __find_first_pending_queue modifies __pending
566
      return not (self.__find_first_pending_queue() or
567
                  self.__pending or
568
                  self.__pending_by_prio or
569
                  self.__pending_shared)
570
    finally:
571
      self.__lock.release()
572

    
573
  def __do_acquire(self, shared):
574
    """Actually acquire the lock.
575

576
    """
577
    if shared:
578
      self.__shr.add(threading.currentThread())
579
    else:
580
      self.__exc = threading.currentThread()
581

    
582
  def __can_acquire(self, shared):
583
    """Determine whether lock can be acquired.
584

585
    """
586
    if shared:
587
      return self.__exc is None
588
    else:
589
      return len(self.__shr) == 0 and self.__exc is None
590

    
591
  def __find_first_pending_queue(self):
592
    """Tries to find the topmost queued entry with pending acquires.
593

594
    Removes empty entries while going through the list.
595

596
    """
597
    while self.__pending:
598
      (priority, prioqueue) = self.__pending[0]
599

    
600
      if not prioqueue:
601
        heapq.heappop(self.__pending)
602
        del self.__pending_by_prio[priority]
603
        assert priority not in self.__pending_shared
604
        continue
605

    
606
      if prioqueue:
607
        return prioqueue
608

    
609
    return None
610

    
611
  def __is_on_top(self, cond):
612
    """Checks whether the passed condition is on top of the queue.
613

614
    The caller must make sure the queue isn't empty.
615

616
    """
617
    return cond == self.__find_first_pending_queue()[0]
618

    
619
  def __acquire_unlocked(self, shared, timeout, priority):
620
    """Acquire a shared lock.
621

622
    @param shared: whether to acquire in shared mode; by default an
623
        exclusive lock will be acquired
624
    @param timeout: maximum waiting time before giving up
625
    @type priority: integer
626
    @param priority: Priority for acquiring lock
627

628
    """
629
    self.__check_deleted()
630

    
631
    # We cannot acquire the lock if we already have it
632
    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
633
                                   " %s" % self.name)
634

    
635
    # Remove empty entries from queue
636
    self.__find_first_pending_queue()
637

    
638
    # Check whether someone else holds the lock or there are pending acquires.
639
    if not self.__pending and self.__can_acquire(shared):
640
      # Apparently not, can acquire lock directly.
641
      self.__do_acquire(shared)
642
      return True
643

    
644
    prioqueue = self.__pending_by_prio.get(priority, None)
645

    
646
    if shared:
647
      # Try to re-use condition for shared acquire
648
      wait_condition = self.__pending_shared.get(priority, None)
649
      assert (wait_condition is None or
650
              (wait_condition.shared and wait_condition in prioqueue))
651
    else:
652
      wait_condition = None
653

    
654
    if wait_condition is None:
655
      if prioqueue is None:
656
        assert priority not in self.__pending_by_prio
657

    
658
        prioqueue = []
659
        heapq.heappush(self.__pending, (priority, prioqueue))
660
        self.__pending_by_prio[priority] = prioqueue
661

    
662
      wait_condition = self.__condition_class(self.__lock, shared)
663
      prioqueue.append(wait_condition)
664

    
665
      if shared:
666
        # Keep reference for further shared acquires on same priority. This is
667
        # better than trying to find it in the list of pending acquires.
668
        assert priority not in self.__pending_shared
669
        self.__pending_shared[priority] = wait_condition
670

    
671
    try:
672
      # Wait until we become the topmost acquire in the queue or the timeout
673
      # expires.
674
      # TODO: Decrease timeout with spurious notifications
675
      while not (self.__is_on_top(wait_condition) and
676
                 self.__can_acquire(shared)):
677
        # Wait for notification
678
        wait_condition.wait(timeout)
679
        self.__check_deleted()
680

    
681
        # A lot of code assumes blocking acquires always succeed. Loop
682
        # internally for that case.
683
        if timeout is not None:
684
          break
685

    
686
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
687
        self.__do_acquire(shared)
688
        return True
689
    finally:
690
      # Remove condition from queue if there are no more waiters
691
      if not wait_condition.has_waiting():
692
        prioqueue.remove(wait_condition)
693
        if wait_condition.shared:
694
          del self.__pending_shared[priority]
695

    
696
    return False
697

    
698
  def acquire(self, shared=0, timeout=None, priority=None,
699
              test_notify=None):
700
    """Acquire a shared lock.
701

702
    @type shared: integer (0/1) used as a boolean
703
    @param shared: whether to acquire in shared mode; by default an
704
        exclusive lock will be acquired
705
    @type timeout: float
706
    @param timeout: maximum waiting time before giving up
707
    @type priority: integer
708
    @param priority: Priority for acquiring lock
709
    @type test_notify: callable or None
710
    @param test_notify: Special callback function for unittesting
711

712
    """
713
    if priority is None:
714
      priority = _DEFAULT_PRIORITY
715

    
716
    self.__lock.acquire()
717
    try:
718
      # We already got the lock, notify now
719
      if __debug__ and callable(test_notify):
720
        test_notify()
721

    
722
      return self.__acquire_unlocked(shared, timeout, priority)
723
    finally:
724
      self.__lock.release()
725

    
726
  def release(self):
727
    """Release a Shared Lock.
728

729
    You must have acquired the lock, either in shared or in exclusive mode,
730
    before calling this function.
731

732
    """
733
    self.__lock.acquire()
734
    try:
735
      assert self.__is_exclusive() or self.__is_sharer(), \
736
        "Cannot release non-owned lock"
737

    
738
      # Autodetect release type
739
      if self.__is_exclusive():
740
        self.__exc = None
741
      else:
742
        self.__shr.remove(threading.currentThread())
743

    
744
      # Notify topmost condition in queue
745
      prioqueue = self.__find_first_pending_queue()
746
      if prioqueue:
747
        prioqueue[0].notifyAll()
748

    
749
    finally:
750
      self.__lock.release()
751

    
752
  def delete(self, timeout=None, priority=None):
753
    """Delete a Shared Lock.
754

755
    This operation will declare the lock for removal. First the lock will be
756
    acquired in exclusive mode if you don't already own it, then the lock
757
    will be put in a state where any future and pending acquire() fail.
758

759
    @type timeout: float
760
    @param timeout: maximum waiting time before giving up
761
    @type priority: integer
762
    @param priority: Priority for acquiring lock
763

764
    """
765
    if priority is None:
766
      priority = _DEFAULT_PRIORITY
767

    
768
    self.__lock.acquire()
769
    try:
770
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
771

    
772
      self.__check_deleted()
773

    
774
      # The caller is allowed to hold the lock exclusively already.
775
      acquired = self.__is_exclusive()
776

    
777
      if not acquired:
778
        acquired = self.__acquire_unlocked(0, timeout, priority)
779

    
780
        assert self.__is_exclusive() and not self.__is_sharer(), \
781
          "Lock wasn't acquired in exclusive mode"
782

    
783
      if acquired:
784
        self.__deleted = True
785
        self.__exc = None
786

    
787
        assert not (self.__exc or self.__shr), "Found owner during deletion"
788

    
789
        # Notify all acquires. They'll throw an error.
790
        for (_, prioqueue) in self.__pending:
791
          for cond in prioqueue:
792
            cond.notifyAll()
793

    
794
        assert self.__deleted
795

    
796
      return acquired
797
    finally:
798
      self.__lock.release()
799

    
800
  def _release_save(self):
801
    shared = self.__is_sharer()
802
    self.release()
803
    return shared
804

    
805
  def _acquire_restore(self, shared):
806
    self.acquire(shared=shared)
807

    
808

    
809
# Whenever we want to acquire a full LockSet we pass None as the value
810
# to acquire.  Hide this behind this nicely named constant.
811
ALL_SET = None
812

    
813

    
814
class _AcquireTimeout(Exception):
815
  """Internal exception to abort an acquire on a timeout.
816

817
  """
818

    
819

    
820
class LockSet:
821
  """Implements a set of locks.
822

823
  This abstraction implements a set of shared locks for the same resource type,
824
  distinguished by name. The user can lock a subset of the resources and the
825
  LockSet will take care of acquiring the locks always in the same order, thus
826
  preventing deadlock.
827

828
  All the locks needed in the same set must be acquired together, though.
829

830
  @type name: string
831
  @ivar name: the name of the lockset
832

833
  """
834
  def __init__(self, members, name, monitor=None):
835
    """Constructs a new LockSet.
836

837
    @type members: list of strings
838
    @param members: initial members of the set
839
    @type monitor: L{LockMonitor}
840
    @param monitor: Lock monitor with which to register member locks
841

842
    """
843
    assert members is not None, "members parameter is not a list"
844
    self.name = name
845

    
846
    # Lock monitor
847
    self.__monitor = monitor
848

    
849
    # Used internally to guarantee coherency.
850
    self.__lock = SharedLock(name)
851

    
852
    # The lockdict indexes the relationship name -> lock
853
    # The order-of-locking is implied by the alphabetical order of names
854
    self.__lockdict = {}
855

    
856
    for mname in members:
857
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
858
                                          monitor=monitor)
859

    
860
    # The owner dict contains the set of locks each thread owns. For
861
    # performance each thread can access its own key without a global lock on
862
    # this structure. It is paramount though that *no* other type of access is
863
    # done to this structure (eg. no looping over its keys). *_owner helper
864
    # function are defined to guarantee access is correct, but in general never
865
    # do anything different than __owners[threading.currentThread()], or there
866
    # will be trouble.
867
    self.__owners = {}
868

    
869
  def _GetLockName(self, mname):
870
    """Returns the name for a member lock.
871

872
    """
873
    return "%s/%s" % (self.name, mname)
874

    
875
  def _is_owned(self):
876
    """Is the current thread a current level owner?"""
877
    return threading.currentThread() in self.__owners
878

    
879
  def _add_owned(self, name=None):
880
    """Note the current thread owns the given lock"""
881
    if name is None:
882
      if not self._is_owned():
883
        self.__owners[threading.currentThread()] = set()
884
    else:
885
      if self._is_owned():
886
        self.__owners[threading.currentThread()].add(name)
887
      else:
888
        self.__owners[threading.currentThread()] = set([name])
889

    
890
  def _del_owned(self, name=None):
891
    """Note the current thread owns the given lock"""
892

    
893
    assert not (name is None and self.__lock._is_owned()), \
894
           "Cannot hold internal lock when deleting owner status"
895

    
896
    if name is not None:
897
      self.__owners[threading.currentThread()].remove(name)
898

    
899
    # Only remove the key if we don't hold the set-lock as well
900
    if (not self.__lock._is_owned() and
901
        not self.__owners[threading.currentThread()]):
902
      del self.__owners[threading.currentThread()]
903

    
904
  def _list_owned(self):
905
    """Get the set of resource names owned by the current thread"""
906
    if self._is_owned():
907
      return self.__owners[threading.currentThread()].copy()
908
    else:
909
      return set()
910

    
911
  def _release_and_delete_owned(self):
912
    """Release and delete all resources owned by the current thread"""
913
    for lname in self._list_owned():
914
      lock = self.__lockdict[lname]
915
      if lock._is_owned():
916
        lock.release()
917
      self._del_owned(name=lname)
918

    
919
  def __names(self):
920
    """Return the current set of names.
921

922
    Only call this function while holding __lock and don't iterate on the
923
    result after releasing the lock.
924

925
    """
926
    return self.__lockdict.keys()
927

    
928
  def _names(self):
929
    """Return a copy of the current set of elements.
930

931
    Used only for debugging purposes.
932

933
    """
934
    # If we don't already own the set-level lock acquired
935
    # we'll get it and note we need to release it later.
936
    release_lock = False
937
    if not self.__lock._is_owned():
938
      release_lock = True
939
      self.__lock.acquire(shared=1)
940
    try:
941
      result = self.__names()
942
    finally:
943
      if release_lock:
944
        self.__lock.release()
945
    return set(result)
946

    
947
  def acquire(self, names, timeout=None, shared=0, priority=None,
948
              test_notify=None):
949
    """Acquire a set of resource locks.
950

951
    @type names: list of strings (or string)
952
    @param names: the names of the locks which shall be acquired
953
        (special lock names, or instance/node names)
954
    @type shared: integer (0/1) used as a boolean
955
    @param shared: whether to acquire in shared mode; by default an
956
        exclusive lock will be acquired
957
    @type timeout: float or None
958
    @param timeout: Maximum time to acquire all locks
959
    @type priority: integer
960
    @param priority: Priority for acquiring locks
961
    @type test_notify: callable or None
962
    @param test_notify: Special callback function for unittesting
963

964
    @return: Set of all locks successfully acquired or None in case of timeout
965

966
    @raise errors.LockError: when any lock we try to acquire has
967
        been deleted before we succeed. In this case none of the
968
        locks requested will be acquired.
969

970
    """
971
    assert timeout is None or timeout >= 0.0
972

    
973
    # Check we don't already own locks at this level
974
    assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
975
                                  " (lockset %s)" % self.name)
976

    
977
    if priority is None:
978
      priority = _DEFAULT_PRIORITY
979

    
980
    # We need to keep track of how long we spent waiting for a lock. The
981
    # timeout passed to this function is over all lock acquires.
982
    running_timeout = utils.RunningTimeout(timeout, False)
983

    
984
    try:
985
      if names is not None:
986
        # Support passing in a single resource to acquire rather than many
987
        if isinstance(names, basestring):
988
          names = [names]
989

    
990
        return self.__acquire_inner(names, False, shared, priority,
991
                                    running_timeout.Remaining, test_notify)
992

    
993
      else:
994
        # If no names are given acquire the whole set by not letting new names
995
        # being added before we release, and getting the current list of names.
996
        # Some of them may then be deleted later, but we'll cope with this.
997
        #
998
        # We'd like to acquire this lock in a shared way, as it's nice if
999
        # everybody else can use the instances at the same time. If we are
1000
        # acquiring them exclusively though they won't be able to do this
1001
        # anyway, though, so we'll get the list lock exclusively as well in
1002
        # order to be able to do add() on the set while owning it.
1003
        if not self.__lock.acquire(shared=shared, priority=priority,
1004
                                   timeout=running_timeout.Remaining()):
1005
          raise _AcquireTimeout()
1006
        try:
1007
          # note we own the set-lock
1008
          self._add_owned()
1009

    
1010
          return self.__acquire_inner(self.__names(), True, shared, priority,
1011
                                      running_timeout.Remaining, test_notify)
1012
        except:
1013
          # We shouldn't have problems adding the lock to the owners list, but
1014
          # if we did we'll try to release this lock and re-raise exception.
1015
          # Of course something is going to be really wrong, after this.
1016
          self.__lock.release()
1017
          self._del_owned()
1018
          raise
1019

    
1020
    except _AcquireTimeout:
1021
      return None
1022

    
1023
  def __acquire_inner(self, names, want_all, shared, priority,
1024
                      timeout_fn, test_notify):
1025
    """Inner logic for acquiring a number of locks.
1026

1027
    @param names: Names of the locks to be acquired
1028
    @param want_all: Whether all locks in the set should be acquired
1029
    @param shared: Whether to acquire in shared mode
1030
    @param timeout_fn: Function returning remaining timeout
1031
    @param priority: Priority for acquiring locks
1032
    @param test_notify: Special callback function for unittesting
1033

1034
    """
1035
    acquire_list = []
1036

    
1037
    # First we look the locks up on __lockdict. We have no way of being sure
1038
    # they will still be there after, but this makes it a lot faster should
1039
    # just one of them be the already wrong. Using a sorted sequence to prevent
1040
    # deadlocks.
1041
    for lname in sorted(utils.UniqueSequence(names)):
1042
      try:
1043
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
1044
      except KeyError:
1045
        if want_all:
1046
          # We are acquiring all the set, it doesn't matter if this particular
1047
          # element is not there anymore.
1048
          continue
1049

    
1050
        raise errors.LockError("Non-existing lock %s in set %s (it may have"
1051
                               " been removed)" % (lname, self.name))
1052

    
1053
      acquire_list.append((lname, lock))
1054

    
1055
    # This will hold the locknames we effectively acquired.
1056
    acquired = set()
1057

    
1058
    try:
1059
      # Now acquire_list contains a sorted list of resources and locks we
1060
      # want.  In order to get them we loop on this (private) list and
1061
      # acquire() them.  We gave no real guarantee they will still exist till
1062
      # this is done but .acquire() itself is safe and will alert us if the
1063
      # lock gets deleted.
1064
      for (lname, lock) in acquire_list:
1065
        if __debug__ and callable(test_notify):
1066
          test_notify_fn = lambda: test_notify(lname)
1067
        else:
1068
          test_notify_fn = None
1069

    
1070
        timeout = timeout_fn()
1071

    
1072
        try:
1073
          # raises LockError if the lock was deleted
1074
          acq_success = lock.acquire(shared=shared, timeout=timeout,
1075
                                     priority=priority,
1076
                                     test_notify=test_notify_fn)
1077
        except errors.LockError:
1078
          if want_all:
1079
            # We are acquiring all the set, it doesn't matter if this
1080
            # particular element is not there anymore.
1081
            continue
1082

    
1083
          raise errors.LockError("Non-existing lock %s in set %s (it may"
1084
                                 " have been removed)" % (lname, self.name))
1085

    
1086
        if not acq_success:
1087
          # Couldn't get lock or timeout occurred
1088
          if timeout is None:
1089
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
1090
            # blocking.
1091
            raise errors.LockError("Failed to get lock %s (set %s)" %
1092
                                   (lname, self.name))
1093

    
1094
          raise _AcquireTimeout()
1095

    
1096
        try:
1097
          # now the lock cannot be deleted, we have it!
1098
          self._add_owned(name=lname)
1099
          acquired.add(lname)
1100

    
1101
        except:
1102
          # We shouldn't have problems adding the lock to the owners list, but
1103
          # if we did we'll try to release this lock and re-raise exception.
1104
          # Of course something is going to be really wrong after this.
1105
          if lock._is_owned():
1106
            lock.release()
1107
          raise
1108

    
1109
    except:
1110
      # Release all owned locks
1111
      self._release_and_delete_owned()
1112
      raise
1113

    
1114
    return acquired
1115

    
1116
  def release(self, names=None):
1117
    """Release a set of resource locks, at the same level.
1118

1119
    You must have acquired the locks, either in shared or in exclusive mode,
1120
    before releasing them.
1121

1122
    @type names: list of strings, or None
1123
    @param names: the names of the locks which shall be released
1124
        (defaults to all the locks acquired at that level).
1125

1126
    """
1127
    assert self._is_owned(), ("release() on lock set %s while not owner" %
1128
                              self.name)
1129

    
1130
    # Support passing in a single resource to release rather than many
1131
    if isinstance(names, basestring):
1132
      names = [names]
1133

    
1134
    if names is None:
1135
      names = self._list_owned()
1136
    else:
1137
      names = set(names)
1138
      assert self._list_owned().issuperset(names), (
1139
               "release() on unheld resources %s (set %s)" %
1140
               (names.difference(self._list_owned()), self.name))
1141

    
1142
    # First of all let's release the "all elements" lock, if set.
1143
    # After this 'add' can work again
1144
    if self.__lock._is_owned():
1145
      self.__lock.release()
1146
      self._del_owned()
1147

    
1148
    for lockname in names:
1149
      # If we are sure the lock doesn't leave __lockdict without being
1150
      # exclusively held we can do this...
1151
      self.__lockdict[lockname].release()
1152
      self._del_owned(name=lockname)
1153

    
1154
  def add(self, names, acquired=0, shared=0):
1155
    """Add a new set of elements to the set
1156

1157
    @type names: list of strings
1158
    @param names: names of the new elements to add
1159
    @type acquired: integer (0/1) used as a boolean
1160
    @param acquired: pre-acquire the new resource?
1161
    @type shared: integer (0/1) used as a boolean
1162
    @param shared: is the pre-acquisition shared?
1163

1164
    """
1165
    # Check we don't already own locks at this level
1166
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1167
      ("Cannot add locks if the set %s is only partially owned, or shared" %
1168
       self.name)
1169

    
1170
    # Support passing in a single resource to add rather than many
1171
    if isinstance(names, basestring):
1172
      names = [names]
1173

    
1174
    # If we don't already own the set-level lock acquired in an exclusive way
1175
    # we'll get it and note we need to release it later.
1176
    release_lock = False
1177
    if not self.__lock._is_owned():
1178
      release_lock = True
1179
      self.__lock.acquire()
1180

    
1181
    try:
1182
      invalid_names = set(self.__names()).intersection(names)
1183
      if invalid_names:
1184
        # This must be an explicit raise, not an assert, because assert is
1185
        # turned off when using optimization, and this can happen because of
1186
        # concurrency even if the user doesn't want it.
1187
        raise errors.LockError("duplicate add(%s) on lockset %s" %
1188
                               (invalid_names, self.name))
1189

    
1190
      for lockname in names:
1191
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1192

    
1193
        if acquired:
1194
          # No need for priority or timeout here as this lock has just been
1195
          # created
1196
          lock.acquire(shared=shared)
1197
          # now the lock cannot be deleted, we have it!
1198
          try:
1199
            self._add_owned(name=lockname)
1200
          except:
1201
            # We shouldn't have problems adding the lock to the owners list,
1202
            # but if we did we'll try to release this lock and re-raise
1203
            # exception.  Of course something is going to be really wrong,
1204
            # after this.  On the other hand the lock hasn't been added to the
1205
            # __lockdict yet so no other threads should be pending on it. This
1206
            # release is just a safety measure.
1207
            lock.release()
1208
            raise
1209

    
1210
        self.__lockdict[lockname] = lock
1211

    
1212
    finally:
1213
      # Only release __lock if we were not holding it previously.
1214
      if release_lock:
1215
        self.__lock.release()
1216

    
1217
    return True
1218

    
1219
  def remove(self, names):
1220
    """Remove elements from the lock set.
1221

1222
    You can either not hold anything in the lockset or already hold a superset
1223
    of the elements you want to delete, exclusively.
1224

1225
    @type names: list of strings
1226
    @param names: names of the resource to remove.
1227

1228
    @return: a list of locks which we removed; the list is always
1229
        equal to the names list if we were holding all the locks
1230
        exclusively
1231

1232
    """
1233
    # Support passing in a single resource to remove rather than many
1234
    if isinstance(names, basestring):
1235
      names = [names]
1236

    
1237
    # If we own any subset of this lock it must be a superset of what we want
1238
    # to delete. The ownership must also be exclusive, but that will be checked
1239
    # by the lock itself.
1240
    assert not self._is_owned() or self._list_owned().issuperset(names), (
1241
      "remove() on acquired lockset %s while not owning all elements" %
1242
      self.name)
1243

    
1244
    removed = []
1245

    
1246
    for lname in names:
1247
      # Calling delete() acquires the lock exclusively if we don't already own
1248
      # it, and causes all pending and subsequent lock acquires to fail. It's
1249
      # fine to call it out of order because delete() also implies release(),
1250
      # and the assertion above guarantees that if we either already hold
1251
      # everything we want to delete, or we hold none.
1252
      try:
1253
        self.__lockdict[lname].delete()
1254
        removed.append(lname)
1255
      except (KeyError, errors.LockError):
1256
        # This cannot happen if we were already holding it, verify:
1257
        assert not self._is_owned(), ("remove failed while holding lockset %s"
1258
                                      % self.name)
1259
      else:
1260
        # If no LockError was raised we are the ones who deleted the lock.
1261
        # This means we can safely remove it from lockdict, as any further or
1262
        # pending delete() or acquire() will fail (and nobody can have the lock
1263
        # since before our call to delete()).
1264
        #
1265
        # This is done in an else clause because if the exception was thrown
1266
        # it's the job of the one who actually deleted it.
1267
        del self.__lockdict[lname]
1268
        # And let's remove it from our private list if we owned it.
1269
        if self._is_owned():
1270
          self._del_owned(name=lname)
1271

    
1272
    return removed
1273

    
1274

    
1275
# Locking levels, must be acquired in increasing order.
1276
# Current rules are:
1277
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1278
#   acquired before performing any operation, either in shared or in exclusive
1279
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1280
#   avoided.
1281
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1282
#   If you need more than one node, or more than one instance, acquire them at
1283
#   the same time.
1284
LEVEL_CLUSTER = 0
1285
LEVEL_INSTANCE = 1
1286
LEVEL_NODEGROUP = 2
1287
LEVEL_NODE = 3
1288

    
1289
LEVELS = [LEVEL_CLUSTER,
1290
          LEVEL_INSTANCE,
1291
          LEVEL_NODEGROUP,
1292
          LEVEL_NODE]
1293

    
1294
# Lock levels which are modifiable
1295
LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE]
1296

    
1297
LEVEL_NAMES = {
1298
  LEVEL_CLUSTER: "cluster",
1299
  LEVEL_INSTANCE: "instance",
1300
  LEVEL_NODEGROUP: "nodegroup",
1301
  LEVEL_NODE: "node",
1302
  }
1303

    
1304
# Constant for the big ganeti lock
1305
BGL = 'BGL'
1306

    
1307

    
1308
class GanetiLockManager:
1309
  """The Ganeti Locking Library
1310

1311
  The purpose of this small library is to manage locking for ganeti clusters
1312
  in a central place, while at the same time doing dynamic checks against
1313
  possible deadlocks. It will also make it easier to transition to a different
1314
  lock type should we migrate away from python threads.
1315

1316
  """
1317
  _instance = None
1318

    
1319
  def __init__(self, nodes, nodegroups, instances):
1320
    """Constructs a new GanetiLockManager object.
1321

1322
    There should be only a GanetiLockManager object at any time, so this
1323
    function raises an error if this is not the case.
1324

1325
    @param nodes: list of node names
1326
    @param nodegroups: list of nodegroup uuids
1327
    @param instances: list of instance names
1328

1329
    """
1330
    assert self.__class__._instance is None, \
1331
           "double GanetiLockManager instance"
1332

    
1333
    self.__class__._instance = self
1334

    
1335
    self._monitor = LockMonitor()
1336

    
1337
    # The keyring contains all the locks, at their level and in the correct
1338
    # locking order.
1339
    self.__keyring = {
1340
      LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1341
      LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1342
      LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor),
1343
      LEVEL_INSTANCE: LockSet(instances, "instances",
1344
                              monitor=self._monitor),
1345
      }
1346

    
1347
  def QueryLocks(self, fields, sync):
1348
    """Queries information from all locks.
1349

1350
    See L{LockMonitor.QueryLocks}.
1351

1352
    """
1353
    return self._monitor.QueryLocks(fields, sync)
1354

    
1355
  def _names(self, level):
1356
    """List the lock names at the given level.
1357

1358
    This can be used for debugging/testing purposes.
1359

1360
    @param level: the level whose list of locks to get
1361

1362
    """
1363
    assert level in LEVELS, "Invalid locking level %s" % level
1364
    return self.__keyring[level]._names()
1365

    
1366
  def _is_owned(self, level):
1367
    """Check whether we are owning locks at the given level
1368

1369
    """
1370
    return self.__keyring[level]._is_owned()
1371

    
1372
  is_owned = _is_owned
1373

    
1374
  def _list_owned(self, level):
1375
    """Get the set of owned locks at the given level
1376

1377
    """
1378
    return self.__keyring[level]._list_owned()
1379

    
1380
  def _upper_owned(self, level):
1381
    """Check that we don't own any lock at a level greater than the given one.
1382

1383
    """
1384
    # This way of checking only works if LEVELS[i] = i, which we check for in
1385
    # the test cases.
1386
    return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1387

    
1388
  def _BGL_owned(self): # pylint: disable-msg=C0103
1389
    """Check if the current thread owns the BGL.
1390

1391
    Both an exclusive or a shared acquisition work.
1392

1393
    """
1394
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1395

    
1396
  @staticmethod
1397
  def _contains_BGL(level, names): # pylint: disable-msg=C0103
1398
    """Check if the level contains the BGL.
1399

1400
    Check if acting on the given level and set of names will change
1401
    the status of the Big Ganeti Lock.
1402

1403
    """
1404
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1405

    
1406
  def acquire(self, level, names, timeout=None, shared=0, priority=None):
1407
    """Acquire a set of resource locks, at the same level.
1408

1409
    @type level: member of locking.LEVELS
1410
    @param level: the level at which the locks shall be acquired
1411
    @type names: list of strings (or string)
1412
    @param names: the names of the locks which shall be acquired
1413
        (special lock names, or instance/node names)
1414
    @type shared: integer (0/1) used as a boolean
1415
    @param shared: whether to acquire in shared mode; by default
1416
        an exclusive lock will be acquired
1417
    @type timeout: float
1418
    @param timeout: Maximum time to acquire all locks
1419
    @type priority: integer
1420
    @param priority: Priority for acquiring lock
1421

1422
    """
1423
    assert level in LEVELS, "Invalid locking level %s" % level
1424

    
1425
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1426
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1427
    # so even if we've migrated we need to at least share the BGL to be
1428
    # compatible with them. Of course if we own the BGL exclusively there's no
1429
    # point in acquiring any other lock, unless perhaps we are half way through
1430
    # the migration of the current opcode.
1431
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1432
            "You must own the Big Ganeti Lock before acquiring any other")
1433

    
1434
    # Check we don't own locks at the same or upper levels.
1435
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1436
           " while owning some at a greater one")
1437

    
1438
    # Acquire the locks in the set.
1439
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1440
                                         priority=priority)
1441

    
1442
  def release(self, level, names=None):
1443
    """Release a set of resource locks, at the same level.
1444

1445
    You must have acquired the locks, either in shared or in exclusive
1446
    mode, before releasing them.
1447

1448
    @type level: member of locking.LEVELS
1449
    @param level: the level at which the locks shall be released
1450
    @type names: list of strings, or None
1451
    @param names: the names of the locks which shall be released
1452
        (defaults to all the locks acquired at that level)
1453

1454
    """
1455
    assert level in LEVELS, "Invalid locking level %s" % level
1456
    assert (not self._contains_BGL(level, names) or
1457
            not self._upper_owned(LEVEL_CLUSTER)), (
1458
            "Cannot release the Big Ganeti Lock while holding something"
1459
            " at upper levels (%r)" %
1460
            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1461
                              for i in self.__keyring.keys()]), ))
1462

    
1463
    # Release will complain if we don't own the locks already
1464
    return self.__keyring[level].release(names)
1465

    
1466
  def add(self, level, names, acquired=0, shared=0):
1467
    """Add locks at the specified level.
1468

1469
    @type level: member of locking.LEVELS_MOD
1470
    @param level: the level at which the locks shall be added
1471
    @type names: list of strings
1472
    @param names: names of the locks to acquire
1473
    @type acquired: integer (0/1) used as a boolean
1474
    @param acquired: whether to acquire the newly added locks
1475
    @type shared: integer (0/1) used as a boolean
1476
    @param shared: whether the acquisition will be shared
1477

1478
    """
1479
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1480
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1481
           " operations")
1482
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1483
           " while owning some at a greater one")
1484
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1485

    
1486
  def remove(self, level, names):
1487
    """Remove locks from the specified level.
1488

1489
    You must either already own the locks you are trying to remove
1490
    exclusively or not own any lock at an upper level.
1491

1492
    @type level: member of locking.LEVELS_MOD
1493
    @param level: the level at which the locks shall be removed
1494
    @type names: list of strings
1495
    @param names: the names of the locks which shall be removed
1496
        (special lock names, or instance/node names)
1497

1498
    """
1499
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1500
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1501
           " operations")
1502
    # Check we either own the level or don't own anything from here
1503
    # up. LockSet.remove() will check the case in which we don't own
1504
    # all the needed resources, or we have a shared ownership.
1505
    assert self._is_owned(level) or not self._upper_owned(level), (
1506
           "Cannot remove locks at a level while not owning it or"
1507
           " owning some at a greater one")
1508
    return self.__keyring[level].remove(names)
1509

    
1510

    
1511
class LockMonitor(object):
1512
  _LOCK_ATTR = "_lock"
1513

    
1514
  def __init__(self):
1515
    """Initializes this class.
1516

1517
    """
1518
    self._lock = SharedLock("LockMonitor")
1519

    
1520
    # Tracked locks. Weak references are used to avoid issues with circular
1521
    # references and deletion.
1522
    self._locks = weakref.WeakKeyDictionary()
1523

    
1524
  @ssynchronized(_LOCK_ATTR)
1525
  def RegisterLock(self, lock):
1526
    """Registers a new lock.
1527

1528
    """
1529
    logging.debug("Registering lock %s", lock.name)
1530
    assert lock not in self._locks, "Duplicate lock registration"
1531
    assert not compat.any(lock.name == i.name for i in self._locks.keys()), \
1532
           "Found duplicate lock name"
1533
    self._locks[lock] = None
1534

    
1535
  @ssynchronized(_LOCK_ATTR)
1536
  def _GetLockInfo(self, fields):
1537
    """Get information from all locks while the monitor lock is held.
1538

1539
    """
1540
    result = {}
1541

    
1542
    for lock in self._locks.keys():
1543
      assert lock.name not in result, "Found duplicate lock name"
1544
      result[lock.name] = lock.GetInfo(fields)
1545

    
1546
    return result
1547

    
1548
  def QueryLocks(self, fields, sync):
1549
    """Queries information from all locks.
1550

1551
    @type fields: list of strings
1552
    @param fields: List of fields to return
1553
    @type sync: boolean
1554
    @param sync: Whether to operate in synchronous mode
1555

1556
    """
1557
    if sync:
1558
      raise NotImplementedError("Synchronous queries are not implemented")
1559

    
1560
    # Get all data without sorting
1561
    result = self._GetLockInfo(fields)
1562

    
1563
    # Sort by name
1564
    return [result[name] for name in utils.NiceSort(result.keys())]