Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ 7afca87f

History | View | Annotate | Download (48.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable-msg=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import errno
32
import weakref
33
import logging
34
import heapq
35
import operator
36
import itertools
37

    
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import compat
41
from ganeti import query
42

    
43

    
44
_EXCLUSIVE_TEXT = "exclusive"
45
_SHARED_TEXT = "shared"
46
_DELETED_TEXT = "deleted"
47

    
48
_DEFAULT_PRIORITY = 0
49

    
50

    
51
def ssynchronized(mylock, shared=0):
52
  """Shared Synchronization decorator.
53

54
  Calls the function holding the given lock, either in exclusive or shared
55
  mode. It requires the passed lock to be a SharedLock (or support its
56
  semantics).
57

58
  @type mylock: lockable object or string
59
  @param mylock: lock to acquire or class member name of the lock to acquire
60

61
  """
62
  def wrap(fn):
63
    def sync_function(*args, **kwargs):
64
      if isinstance(mylock, basestring):
65
        assert args, "cannot ssynchronize on non-class method: self not found"
66
        # args[0] is "self"
67
        lock = getattr(args[0], mylock)
68
      else:
69
        lock = mylock
70
      lock.acquire(shared=shared)
71
      try:
72
        return fn(*args, **kwargs)
73
      finally:
74
        lock.release()
75
    return sync_function
76
  return wrap
77

    
78

    
79
class _SingleNotifyPipeConditionWaiter(object):
80
  """Helper class for SingleNotifyPipeCondition
81

82
  """
83
  __slots__ = [
84
    "_fd",
85
    "_poller",
86
    ]
87

    
88
  def __init__(self, poller, fd):
89
    """Constructor for _SingleNotifyPipeConditionWaiter
90

91
    @type poller: select.poll
92
    @param poller: Poller object
93
    @type fd: int
94
    @param fd: File descriptor to wait for
95

96
    """
97
    object.__init__(self)
98
    self._poller = poller
99
    self._fd = fd
100

    
101
  def __call__(self, timeout):
102
    """Wait for something to happen on the pipe.
103

104
    @type timeout: float or None
105
    @param timeout: Timeout for waiting (can be None)
106

107
    """
108
    running_timeout = utils.RunningTimeout(timeout, True)
109

    
110
    while True:
111
      remaining_time = running_timeout.Remaining()
112

    
113
      if remaining_time is not None:
114
        if remaining_time < 0.0:
115
          break
116

    
117
        # Our calculation uses seconds, poll() wants milliseconds
118
        remaining_time *= 1000
119

    
120
      try:
121
        result = self._poller.poll(remaining_time)
122
      except EnvironmentError, err:
123
        if err.errno != errno.EINTR:
124
          raise
125
        result = None
126

    
127
      # Check whether we were notified
128
      if result and result[0][0] == self._fd:
129
        break
130

    
131

    
132
class _BaseCondition(object):
133
  """Base class containing common code for conditions.
134

135
  Some of this code is taken from python's threading module.
136

137
  """
138
  __slots__ = [
139
    "_lock",
140
    "acquire",
141
    "release",
142
    "_is_owned",
143
    "_acquire_restore",
144
    "_release_save",
145
    ]
146

    
147
  def __init__(self, lock):
148
    """Constructor for _BaseCondition.
149

150
    @type lock: threading.Lock
151
    @param lock: condition base lock
152

153
    """
154
    object.__init__(self)
155

    
156
    try:
157
      self._release_save = lock._release_save
158
    except AttributeError:
159
      self._release_save = self._base_release_save
160
    try:
161
      self._acquire_restore = lock._acquire_restore
162
    except AttributeError:
163
      self._acquire_restore = self._base_acquire_restore
164
    try:
165
      self._is_owned = lock._is_owned
166
    except AttributeError:
167
      self._is_owned = self._base_is_owned
168

    
169
    self._lock = lock
170

    
171
    # Export the lock's acquire() and release() methods
172
    self.acquire = lock.acquire
173
    self.release = lock.release
174

    
175
  def _base_is_owned(self):
176
    """Check whether lock is owned by current thread.
177

178
    """
179
    if self._lock.acquire(0):
180
      self._lock.release()
181
      return False
182
    return True
183

    
184
  def _base_release_save(self):
185
    self._lock.release()
186

    
187
  def _base_acquire_restore(self, _):
188
    self._lock.acquire()
189

    
190
  def _check_owned(self):
191
    """Raise an exception if the current thread doesn't own the lock.
192

193
    """
194
    if not self._is_owned():
195
      raise RuntimeError("cannot work with un-aquired lock")
196

    
197

    
198
class SingleNotifyPipeCondition(_BaseCondition):
199
  """Condition which can only be notified once.
200

201
  This condition class uses pipes and poll, internally, to be able to wait for
202
  notification with a timeout, without resorting to polling. It is almost
203
  compatible with Python's threading.Condition, with the following differences:
204
    - notifyAll can only be called once, and no wait can happen after that
205
    - notify is not supported, only notifyAll
206

207
  """
208

    
209
  __slots__ = [
210
    "_poller",
211
    "_read_fd",
212
    "_write_fd",
213
    "_nwaiters",
214
    "_notified",
215
    ]
216

    
217
  _waiter_class = _SingleNotifyPipeConditionWaiter
218

    
219
  def __init__(self, lock):
220
    """Constructor for SingleNotifyPipeCondition
221

222
    """
223
    _BaseCondition.__init__(self, lock)
224
    self._nwaiters = 0
225
    self._notified = False
226
    self._read_fd = None
227
    self._write_fd = None
228
    self._poller = None
229

    
230
  def _check_unnotified(self):
231
    """Throws an exception if already notified.
232

233
    """
234
    if self._notified:
235
      raise RuntimeError("cannot use already notified condition")
236

    
237
  def _Cleanup(self):
238
    """Cleanup open file descriptors, if any.
239

240
    """
241
    if self._read_fd is not None:
242
      os.close(self._read_fd)
243
      self._read_fd = None
244

    
245
    if self._write_fd is not None:
246
      os.close(self._write_fd)
247
      self._write_fd = None
248
    self._poller = None
249

    
250
  def wait(self, timeout=None):
251
    """Wait for a notification.
252

253
    @type timeout: float or None
254
    @param timeout: Waiting timeout (can be None)
255

256
    """
257
    self._check_owned()
258
    self._check_unnotified()
259

    
260
    self._nwaiters += 1
261
    try:
262
      if self._poller is None:
263
        (self._read_fd, self._write_fd) = os.pipe()
264
        self._poller = select.poll()
265
        self._poller.register(self._read_fd, select.POLLHUP)
266

    
267
      wait_fn = self._waiter_class(self._poller, self._read_fd)
268
      state = self._release_save()
269
      try:
270
        # Wait for notification
271
        wait_fn(timeout)
272
      finally:
273
        # Re-acquire lock
274
        self._acquire_restore(state)
275
    finally:
276
      self._nwaiters -= 1
277
      if self._nwaiters == 0:
278
        self._Cleanup()
279

    
280
  def notifyAll(self): # pylint: disable-msg=C0103
281
    """Close the writing side of the pipe to notify all waiters.
282

283
    """
284
    self._check_owned()
285
    self._check_unnotified()
286
    self._notified = True
287
    if self._write_fd is not None:
288
      os.close(self._write_fd)
289
      self._write_fd = None
290

    
291

    
292
class PipeCondition(_BaseCondition):
293
  """Group-only non-polling condition with counters.
294

295
  This condition class uses pipes and poll, internally, to be able to wait for
296
  notification with a timeout, without resorting to polling. It is almost
297
  compatible with Python's threading.Condition, but only supports notifyAll and
298
  non-recursive locks. As an additional features it's able to report whether
299
  there are any waiting threads.
300

301
  """
302
  __slots__ = [
303
    "_waiters",
304
    "_single_condition",
305
    ]
306

    
307
  _single_condition_class = SingleNotifyPipeCondition
308

    
309
  def __init__(self, lock):
310
    """Initializes this class.
311

312
    """
313
    _BaseCondition.__init__(self, lock)
314
    self._waiters = set()
315
    self._single_condition = self._single_condition_class(self._lock)
316

    
317
  def wait(self, timeout=None):
318
    """Wait for a notification.
319

320
    @type timeout: float or None
321
    @param timeout: Waiting timeout (can be None)
322

323
    """
324
    self._check_owned()
325

    
326
    # Keep local reference to the pipe. It could be replaced by another thread
327
    # notifying while we're waiting.
328
    cond = self._single_condition
329

    
330
    self._waiters.add(threading.currentThread())
331
    try:
332
      cond.wait(timeout)
333
    finally:
334
      self._check_owned()
335
      self._waiters.remove(threading.currentThread())
336

    
337
  def notifyAll(self): # pylint: disable-msg=C0103
338
    """Notify all currently waiting threads.
339

340
    """
341
    self._check_owned()
342
    self._single_condition.notifyAll()
343
    self._single_condition = self._single_condition_class(self._lock)
344

    
345
  def get_waiting(self):
346
    """Returns a list of all waiting threads.
347

348
    """
349
    self._check_owned()
350

    
351
    return self._waiters
352

    
353
  def has_waiting(self):
354
    """Returns whether there are active waiters.
355

356
    """
357
    self._check_owned()
358

    
359
    return bool(self._waiters)
360

    
361

    
362
class _PipeConditionWithMode(PipeCondition):
363
  __slots__ = [
364
    "shared",
365
    ]
366

    
367
  def __init__(self, lock, shared):
368
    """Initializes this class.
369

370
    """
371
    self.shared = shared
372
    PipeCondition.__init__(self, lock)
373

    
374

    
375
class SharedLock(object):
376
  """Implements a shared lock.
377

378
  Multiple threads can acquire the lock in a shared way by calling
379
  C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
380
  threads can call C{acquire(shared=0)}.
381

382
  Notes on data structures: C{__pending} contains a priority queue (heapq) of
383
  all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
384
  ...]}. Each per-priority queue contains a normal in-order list of conditions
385
  to be notified when the lock can be acquired. Shared locks are grouped
386
  together by priority and the condition for them is stored in
387
  C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
388
  references for the per-priority queues indexed by priority for faster access.
389

390
  @type name: string
391
  @ivar name: the name of the lock
392

393
  """
394
  __slots__ = [
395
    "__weakref__",
396
    "__deleted",
397
    "__exc",
398
    "__lock",
399
    "__pending",
400
    "__pending_by_prio",
401
    "__pending_shared",
402
    "__shr",
403
    "name",
404
    ]
405

    
406
  __condition_class = _PipeConditionWithMode
407

    
408
  def __init__(self, name, monitor=None):
409
    """Construct a new SharedLock.
410

411
    @param name: the name of the lock
412
    @type monitor: L{LockMonitor}
413
    @param monitor: Lock monitor with which to register
414

415
    """
416
    object.__init__(self)
417

    
418
    self.name = name
419

    
420
    # Internal lock
421
    self.__lock = threading.Lock()
422

    
423
    # Queue containing waiting acquires
424
    self.__pending = []
425
    self.__pending_by_prio = {}
426
    self.__pending_shared = {}
427

    
428
    # Current lock holders
429
    self.__shr = set()
430
    self.__exc = None
431

    
432
    # is this lock in the deleted state?
433
    self.__deleted = False
434

    
435
    # Register with lock monitor
436
    if monitor:
437
      monitor.RegisterLock(self)
438

    
439
  def GetInfo(self, requested):
440
    """Retrieves information for querying locks.
441

442
    @type requested: set
443
    @param requested: Requested information, see C{query.LQ_*}
444

445
    """
446
    self.__lock.acquire()
447
    try:
448
      # Note: to avoid unintentional race conditions, no references to
449
      # modifiable objects should be returned unless they were created in this
450
      # function.
451
      mode = None
452
      owner_names = None
453

    
454
      if query.LQ_MODE in requested:
455
        if self.__deleted:
456
          mode = _DELETED_TEXT
457
          assert not (self.__exc or self.__shr)
458
        elif self.__exc:
459
          mode = _EXCLUSIVE_TEXT
460
        elif self.__shr:
461
          mode = _SHARED_TEXT
462

    
463
      # Current owner(s) are wanted
464
      if query.LQ_OWNER in requested:
465
        if self.__exc:
466
          owner = [self.__exc]
467
        else:
468
          owner = self.__shr
469

    
470
        if owner:
471
          assert not self.__deleted
472
          owner_names = [i.getName() for i in owner]
473

    
474
      # Pending acquires are wanted
475
      if query.LQ_PENDING in requested:
476
        pending = []
477

    
478
        # Sorting instead of copying and using heaq functions for simplicity
479
        for (_, prioqueue) in sorted(self.__pending):
480
          for cond in prioqueue:
481
            if cond.shared:
482
              pendmode = _SHARED_TEXT
483
            else:
484
              pendmode = _EXCLUSIVE_TEXT
485

    
486
            # List of names will be sorted in L{query._GetLockPending}
487
            pending.append((pendmode, [i.getName()
488
                                       for i in cond.get_waiting()]))
489
      else:
490
        pending = None
491

    
492
      return (self.name, mode, owner_names, pending)
493
    finally:
494
      self.__lock.release()
495

    
496
  def __check_deleted(self):
497
    """Raises an exception if the lock has been deleted.
498

499
    """
500
    if self.__deleted:
501
      raise errors.LockError("Deleted lock %s" % self.name)
502

    
503
  def __is_sharer(self):
504
    """Is the current thread sharing the lock at this time?
505

506
    """
507
    return threading.currentThread() in self.__shr
508

    
509
  def __is_exclusive(self):
510
    """Is the current thread holding the lock exclusively at this time?
511

512
    """
513
    return threading.currentThread() == self.__exc
514

    
515
  def __is_owned(self, shared=-1):
516
    """Is the current thread somehow owning the lock at this time?
517

518
    This is a private version of the function, which presumes you're holding
519
    the internal lock.
520

521
    """
522
    if shared < 0:
523
      return self.__is_sharer() or self.__is_exclusive()
524
    elif shared:
525
      return self.__is_sharer()
526
    else:
527
      return self.__is_exclusive()
528

    
529
  def _is_owned(self, shared=-1):
530
    """Is the current thread somehow owning the lock at this time?
531

532
    @param shared:
533
        - < 0: check for any type of ownership (default)
534
        - 0: check for exclusive ownership
535
        - > 0: check for shared ownership
536

537
    """
538
    self.__lock.acquire()
539
    try:
540
      return self.__is_owned(shared=shared)
541
    finally:
542
      self.__lock.release()
543

    
544
  def _count_pending(self):
545
    """Returns the number of pending acquires.
546

547
    @rtype: int
548

549
    """
550
    self.__lock.acquire()
551
    try:
552
      return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
553
    finally:
554
      self.__lock.release()
555

    
556
  def _check_empty(self):
557
    """Checks whether there are any pending acquires.
558

559
    @rtype: bool
560

561
    """
562
    self.__lock.acquire()
563
    try:
564
      # Order is important: __find_first_pending_queue modifies __pending
565
      return not (self.__find_first_pending_queue() or
566
                  self.__pending or
567
                  self.__pending_by_prio or
568
                  self.__pending_shared)
569
    finally:
570
      self.__lock.release()
571

    
572
  def __do_acquire(self, shared):
573
    """Actually acquire the lock.
574

575
    """
576
    if shared:
577
      self.__shr.add(threading.currentThread())
578
    else:
579
      self.__exc = threading.currentThread()
580

    
581
  def __can_acquire(self, shared):
582
    """Determine whether lock can be acquired.
583

584
    """
585
    if shared:
586
      return self.__exc is None
587
    else:
588
      return len(self.__shr) == 0 and self.__exc is None
589

    
590
  def __find_first_pending_queue(self):
591
    """Tries to find the topmost queued entry with pending acquires.
592

593
    Removes empty entries while going through the list.
594

595
    """
596
    while self.__pending:
597
      (priority, prioqueue) = self.__pending[0]
598

    
599
      if not prioqueue:
600
        heapq.heappop(self.__pending)
601
        del self.__pending_by_prio[priority]
602
        assert priority not in self.__pending_shared
603
        continue
604

    
605
      if prioqueue:
606
        return prioqueue
607

    
608
    return None
609

    
610
  def __is_on_top(self, cond):
611
    """Checks whether the passed condition is on top of the queue.
612

613
    The caller must make sure the queue isn't empty.
614

615
    """
616
    return cond == self.__find_first_pending_queue()[0]
617

    
618
  def __acquire_unlocked(self, shared, timeout, priority):
619
    """Acquire a shared lock.
620

621
    @param shared: whether to acquire in shared mode; by default an
622
        exclusive lock will be acquired
623
    @param timeout: maximum waiting time before giving up
624
    @type priority: integer
625
    @param priority: Priority for acquiring lock
626

627
    """
628
    self.__check_deleted()
629

    
630
    # We cannot acquire the lock if we already have it
631
    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
632
                                   " %s" % self.name)
633

    
634
    # Remove empty entries from queue
635
    self.__find_first_pending_queue()
636

    
637
    # Check whether someone else holds the lock or there are pending acquires.
638
    if not self.__pending and self.__can_acquire(shared):
639
      # Apparently not, can acquire lock directly.
640
      self.__do_acquire(shared)
641
      return True
642

    
643
    prioqueue = self.__pending_by_prio.get(priority, None)
644

    
645
    if shared:
646
      # Try to re-use condition for shared acquire
647
      wait_condition = self.__pending_shared.get(priority, None)
648
      assert (wait_condition is None or
649
              (wait_condition.shared and wait_condition in prioqueue))
650
    else:
651
      wait_condition = None
652

    
653
    if wait_condition is None:
654
      if prioqueue is None:
655
        assert priority not in self.__pending_by_prio
656

    
657
        prioqueue = []
658
        heapq.heappush(self.__pending, (priority, prioqueue))
659
        self.__pending_by_prio[priority] = prioqueue
660

    
661
      wait_condition = self.__condition_class(self.__lock, shared)
662
      prioqueue.append(wait_condition)
663

    
664
      if shared:
665
        # Keep reference for further shared acquires on same priority. This is
666
        # better than trying to find it in the list of pending acquires.
667
        assert priority not in self.__pending_shared
668
        self.__pending_shared[priority] = wait_condition
669

    
670
    try:
671
      # Wait until we become the topmost acquire in the queue or the timeout
672
      # expires.
673
      # TODO: Decrease timeout with spurious notifications
674
      while not (self.__is_on_top(wait_condition) and
675
                 self.__can_acquire(shared)):
676
        # Wait for notification
677
        wait_condition.wait(timeout)
678
        self.__check_deleted()
679

    
680
        # A lot of code assumes blocking acquires always succeed. Loop
681
        # internally for that case.
682
        if timeout is not None:
683
          break
684

    
685
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
686
        self.__do_acquire(shared)
687
        return True
688
    finally:
689
      # Remove condition from queue if there are no more waiters
690
      if not wait_condition.has_waiting():
691
        prioqueue.remove(wait_condition)
692
        if wait_condition.shared:
693
          del self.__pending_shared[priority]
694

    
695
    return False
696

    
697
  def acquire(self, shared=0, timeout=None, priority=None,
698
              test_notify=None):
699
    """Acquire a shared lock.
700

701
    @type shared: integer (0/1) used as a boolean
702
    @param shared: whether to acquire in shared mode; by default an
703
        exclusive lock will be acquired
704
    @type timeout: float
705
    @param timeout: maximum waiting time before giving up
706
    @type priority: integer
707
    @param priority: Priority for acquiring lock
708
    @type test_notify: callable or None
709
    @param test_notify: Special callback function for unittesting
710

711
    """
712
    if priority is None:
713
      priority = _DEFAULT_PRIORITY
714

    
715
    self.__lock.acquire()
716
    try:
717
      # We already got the lock, notify now
718
      if __debug__ and callable(test_notify):
719
        test_notify()
720

    
721
      return self.__acquire_unlocked(shared, timeout, priority)
722
    finally:
723
      self.__lock.release()
724

    
725
  def release(self):
726
    """Release a Shared Lock.
727

728
    You must have acquired the lock, either in shared or in exclusive mode,
729
    before calling this function.
730

731
    """
732
    self.__lock.acquire()
733
    try:
734
      assert self.__is_exclusive() or self.__is_sharer(), \
735
        "Cannot release non-owned lock"
736

    
737
      # Autodetect release type
738
      if self.__is_exclusive():
739
        self.__exc = None
740
      else:
741
        self.__shr.remove(threading.currentThread())
742

    
743
      # Notify topmost condition in queue
744
      prioqueue = self.__find_first_pending_queue()
745
      if prioqueue:
746
        prioqueue[0].notifyAll()
747

    
748
    finally:
749
      self.__lock.release()
750

    
751
  def delete(self, timeout=None, priority=None):
752
    """Delete a Shared Lock.
753

754
    This operation will declare the lock for removal. First the lock will be
755
    acquired in exclusive mode if you don't already own it, then the lock
756
    will be put in a state where any future and pending acquire() fail.
757

758
    @type timeout: float
759
    @param timeout: maximum waiting time before giving up
760
    @type priority: integer
761
    @param priority: Priority for acquiring lock
762

763
    """
764
    if priority is None:
765
      priority = _DEFAULT_PRIORITY
766

    
767
    self.__lock.acquire()
768
    try:
769
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
770

    
771
      self.__check_deleted()
772

    
773
      # The caller is allowed to hold the lock exclusively already.
774
      acquired = self.__is_exclusive()
775

    
776
      if not acquired:
777
        acquired = self.__acquire_unlocked(0, timeout, priority)
778

    
779
        assert self.__is_exclusive() and not self.__is_sharer(), \
780
          "Lock wasn't acquired in exclusive mode"
781

    
782
      if acquired:
783
        self.__deleted = True
784
        self.__exc = None
785

    
786
        assert not (self.__exc or self.__shr), "Found owner during deletion"
787

    
788
        # Notify all acquires. They'll throw an error.
789
        for (_, prioqueue) in self.__pending:
790
          for cond in prioqueue:
791
            cond.notifyAll()
792

    
793
        assert self.__deleted
794

    
795
      return acquired
796
    finally:
797
      self.__lock.release()
798

    
799
  def _release_save(self):
800
    shared = self.__is_sharer()
801
    self.release()
802
    return shared
803

    
804
  def _acquire_restore(self, shared):
805
    self.acquire(shared=shared)
806

    
807

    
808
# Whenever we want to acquire a full LockSet we pass None as the value
809
# to acquire.  Hide this behind this nicely named constant.
810
ALL_SET = None
811

    
812

    
813
class _AcquireTimeout(Exception):
814
  """Internal exception to abort an acquire on a timeout.
815

816
  """
817

    
818

    
819
class LockSet:
820
  """Implements a set of locks.
821

822
  This abstraction implements a set of shared locks for the same resource type,
823
  distinguished by name. The user can lock a subset of the resources and the
824
  LockSet will take care of acquiring the locks always in the same order, thus
825
  preventing deadlock.
826

827
  All the locks needed in the same set must be acquired together, though.
828

829
  @type name: string
830
  @ivar name: the name of the lockset
831

832
  """
833
  def __init__(self, members, name, monitor=None):
834
    """Constructs a new LockSet.
835

836
    @type members: list of strings
837
    @param members: initial members of the set
838
    @type monitor: L{LockMonitor}
839
    @param monitor: Lock monitor with which to register member locks
840

841
    """
842
    assert members is not None, "members parameter is not a list"
843
    self.name = name
844

    
845
    # Lock monitor
846
    self.__monitor = monitor
847

    
848
    # Used internally to guarantee coherency.
849
    self.__lock = SharedLock(name)
850

    
851
    # The lockdict indexes the relationship name -> lock
852
    # The order-of-locking is implied by the alphabetical order of names
853
    self.__lockdict = {}
854

    
855
    for mname in members:
856
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
857
                                          monitor=monitor)
858

    
859
    # The owner dict contains the set of locks each thread owns. For
860
    # performance each thread can access its own key without a global lock on
861
    # this structure. It is paramount though that *no* other type of access is
862
    # done to this structure (eg. no looping over its keys). *_owner helper
863
    # function are defined to guarantee access is correct, but in general never
864
    # do anything different than __owners[threading.currentThread()], or there
865
    # will be trouble.
866
    self.__owners = {}
867

    
868
  def _GetLockName(self, mname):
869
    """Returns the name for a member lock.
870

871
    """
872
    return "%s/%s" % (self.name, mname)
873

    
874
  def _is_owned(self):
875
    """Is the current thread a current level owner?"""
876
    return threading.currentThread() in self.__owners
877

    
878
  def _add_owned(self, name=None):
879
    """Note the current thread owns the given lock"""
880
    if name is None:
881
      if not self._is_owned():
882
        self.__owners[threading.currentThread()] = set()
883
    else:
884
      if self._is_owned():
885
        self.__owners[threading.currentThread()].add(name)
886
      else:
887
        self.__owners[threading.currentThread()] = set([name])
888

    
889
  def _del_owned(self, name=None):
890
    """Note the current thread owns the given lock"""
891

    
892
    assert not (name is None and self.__lock._is_owned()), \
893
           "Cannot hold internal lock when deleting owner status"
894

    
895
    if name is not None:
896
      self.__owners[threading.currentThread()].remove(name)
897

    
898
    # Only remove the key if we don't hold the set-lock as well
899
    if (not self.__lock._is_owned() and
900
        not self.__owners[threading.currentThread()]):
901
      del self.__owners[threading.currentThread()]
902

    
903
  def _list_owned(self):
904
    """Get the set of resource names owned by the current thread"""
905
    if self._is_owned():
906
      return self.__owners[threading.currentThread()].copy()
907
    else:
908
      return set()
909

    
910
  def _release_and_delete_owned(self):
911
    """Release and delete all resources owned by the current thread"""
912
    for lname in self._list_owned():
913
      lock = self.__lockdict[lname]
914
      if lock._is_owned():
915
        lock.release()
916
      self._del_owned(name=lname)
917

    
918
  def __names(self):
919
    """Return the current set of names.
920

921
    Only call this function while holding __lock and don't iterate on the
922
    result after releasing the lock.
923

924
    """
925
    return self.__lockdict.keys()
926

    
927
  def _names(self):
928
    """Return a copy of the current set of elements.
929

930
    Used only for debugging purposes.
931

932
    """
933
    # If we don't already own the set-level lock acquired
934
    # we'll get it and note we need to release it later.
935
    release_lock = False
936
    if not self.__lock._is_owned():
937
      release_lock = True
938
      self.__lock.acquire(shared=1)
939
    try:
940
      result = self.__names()
941
    finally:
942
      if release_lock:
943
        self.__lock.release()
944
    return set(result)
945

    
946
  def acquire(self, names, timeout=None, shared=0, priority=None,
947
              test_notify=None):
948
    """Acquire a set of resource locks.
949

950
    @type names: list of strings (or string)
951
    @param names: the names of the locks which shall be acquired
952
        (special lock names, or instance/node names)
953
    @type shared: integer (0/1) used as a boolean
954
    @param shared: whether to acquire in shared mode; by default an
955
        exclusive lock will be acquired
956
    @type timeout: float or None
957
    @param timeout: Maximum time to acquire all locks
958
    @type priority: integer
959
    @param priority: Priority for acquiring locks
960
    @type test_notify: callable or None
961
    @param test_notify: Special callback function for unittesting
962

963
    @return: Set of all locks successfully acquired or None in case of timeout
964

965
    @raise errors.LockError: when any lock we try to acquire has
966
        been deleted before we succeed. In this case none of the
967
        locks requested will be acquired.
968

969
    """
970
    assert timeout is None or timeout >= 0.0
971

    
972
    # Check we don't already own locks at this level
973
    assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
974
                                  " (lockset %s)" % self.name)
975

    
976
    if priority is None:
977
      priority = _DEFAULT_PRIORITY
978

    
979
    # We need to keep track of how long we spent waiting for a lock. The
980
    # timeout passed to this function is over all lock acquires.
981
    running_timeout = utils.RunningTimeout(timeout, False)
982

    
983
    try:
984
      if names is not None:
985
        # Support passing in a single resource to acquire rather than many
986
        if isinstance(names, basestring):
987
          names = [names]
988

    
989
        return self.__acquire_inner(names, False, shared, priority,
990
                                    running_timeout.Remaining, test_notify)
991

    
992
      else:
993
        # If no names are given acquire the whole set by not letting new names
994
        # being added before we release, and getting the current list of names.
995
        # Some of them may then be deleted later, but we'll cope with this.
996
        #
997
        # We'd like to acquire this lock in a shared way, as it's nice if
998
        # everybody else can use the instances at the same time. If we are
999
        # acquiring them exclusively though they won't be able to do this
1000
        # anyway, though, so we'll get the list lock exclusively as well in
1001
        # order to be able to do add() on the set while owning it.
1002
        if not self.__lock.acquire(shared=shared, priority=priority,
1003
                                   timeout=running_timeout.Remaining()):
1004
          raise _AcquireTimeout()
1005
        try:
1006
          # note we own the set-lock
1007
          self._add_owned()
1008

    
1009
          return self.__acquire_inner(self.__names(), True, shared, priority,
1010
                                      running_timeout.Remaining, test_notify)
1011
        except:
1012
          # We shouldn't have problems adding the lock to the owners list, but
1013
          # if we did we'll try to release this lock and re-raise exception.
1014
          # Of course something is going to be really wrong, after this.
1015
          self.__lock.release()
1016
          self._del_owned()
1017
          raise
1018

    
1019
    except _AcquireTimeout:
1020
      return None
1021

    
1022
  def __acquire_inner(self, names, want_all, shared, priority,
1023
                      timeout_fn, test_notify):
1024
    """Inner logic for acquiring a number of locks.
1025

1026
    @param names: Names of the locks to be acquired
1027
    @param want_all: Whether all locks in the set should be acquired
1028
    @param shared: Whether to acquire in shared mode
1029
    @param timeout_fn: Function returning remaining timeout
1030
    @param priority: Priority for acquiring locks
1031
    @param test_notify: Special callback function for unittesting
1032

1033
    """
1034
    acquire_list = []
1035

    
1036
    # First we look the locks up on __lockdict. We have no way of being sure
1037
    # they will still be there after, but this makes it a lot faster should
1038
    # just one of them be the already wrong. Using a sorted sequence to prevent
1039
    # deadlocks.
1040
    for lname in sorted(utils.UniqueSequence(names)):
1041
      try:
1042
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
1043
      except KeyError:
1044
        if want_all:
1045
          # We are acquiring all the set, it doesn't matter if this particular
1046
          # element is not there anymore.
1047
          continue
1048

    
1049
        raise errors.LockError("Non-existing lock %s in set %s (it may have"
1050
                               " been removed)" % (lname, self.name))
1051

    
1052
      acquire_list.append((lname, lock))
1053

    
1054
    # This will hold the locknames we effectively acquired.
1055
    acquired = set()
1056

    
1057
    try:
1058
      # Now acquire_list contains a sorted list of resources and locks we
1059
      # want.  In order to get them we loop on this (private) list and
1060
      # acquire() them.  We gave no real guarantee they will still exist till
1061
      # this is done but .acquire() itself is safe and will alert us if the
1062
      # lock gets deleted.
1063
      for (lname, lock) in acquire_list:
1064
        if __debug__ and callable(test_notify):
1065
          test_notify_fn = lambda: test_notify(lname)
1066
        else:
1067
          test_notify_fn = None
1068

    
1069
        timeout = timeout_fn()
1070

    
1071
        try:
1072
          # raises LockError if the lock was deleted
1073
          acq_success = lock.acquire(shared=shared, timeout=timeout,
1074
                                     priority=priority,
1075
                                     test_notify=test_notify_fn)
1076
        except errors.LockError:
1077
          if want_all:
1078
            # We are acquiring all the set, it doesn't matter if this
1079
            # particular element is not there anymore.
1080
            continue
1081

    
1082
          raise errors.LockError("Non-existing lock %s in set %s (it may"
1083
                                 " have been removed)" % (lname, self.name))
1084

    
1085
        if not acq_success:
1086
          # Couldn't get lock or timeout occurred
1087
          if timeout is None:
1088
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
1089
            # blocking.
1090
            raise errors.LockError("Failed to get lock %s (set %s)" %
1091
                                   (lname, self.name))
1092

    
1093
          raise _AcquireTimeout()
1094

    
1095
        try:
1096
          # now the lock cannot be deleted, we have it!
1097
          self._add_owned(name=lname)
1098
          acquired.add(lname)
1099

    
1100
        except:
1101
          # We shouldn't have problems adding the lock to the owners list, but
1102
          # if we did we'll try to release this lock and re-raise exception.
1103
          # Of course something is going to be really wrong after this.
1104
          if lock._is_owned():
1105
            lock.release()
1106
          raise
1107

    
1108
    except:
1109
      # Release all owned locks
1110
      self._release_and_delete_owned()
1111
      raise
1112

    
1113
    return acquired
1114

    
1115
  def release(self, names=None):
1116
    """Release a set of resource locks, at the same level.
1117

1118
    You must have acquired the locks, either in shared or in exclusive mode,
1119
    before releasing them.
1120

1121
    @type names: list of strings, or None
1122
    @param names: the names of the locks which shall be released
1123
        (defaults to all the locks acquired at that level).
1124

1125
    """
1126
    assert self._is_owned(), ("release() on lock set %s while not owner" %
1127
                              self.name)
1128

    
1129
    # Support passing in a single resource to release rather than many
1130
    if isinstance(names, basestring):
1131
      names = [names]
1132

    
1133
    if names is None:
1134
      names = self._list_owned()
1135
    else:
1136
      names = set(names)
1137
      assert self._list_owned().issuperset(names), (
1138
               "release() on unheld resources %s (set %s)" %
1139
               (names.difference(self._list_owned()), self.name))
1140

    
1141
    # First of all let's release the "all elements" lock, if set.
1142
    # After this 'add' can work again
1143
    if self.__lock._is_owned():
1144
      self.__lock.release()
1145
      self._del_owned()
1146

    
1147
    for lockname in names:
1148
      # If we are sure the lock doesn't leave __lockdict without being
1149
      # exclusively held we can do this...
1150
      self.__lockdict[lockname].release()
1151
      self._del_owned(name=lockname)
1152

    
1153
  def add(self, names, acquired=0, shared=0):
1154
    """Add a new set of elements to the set
1155

1156
    @type names: list of strings
1157
    @param names: names of the new elements to add
1158
    @type acquired: integer (0/1) used as a boolean
1159
    @param acquired: pre-acquire the new resource?
1160
    @type shared: integer (0/1) used as a boolean
1161
    @param shared: is the pre-acquisition shared?
1162

1163
    """
1164
    # Check we don't already own locks at this level
1165
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1166
      ("Cannot add locks if the set %s is only partially owned, or shared" %
1167
       self.name)
1168

    
1169
    # Support passing in a single resource to add rather than many
1170
    if isinstance(names, basestring):
1171
      names = [names]
1172

    
1173
    # If we don't already own the set-level lock acquired in an exclusive way
1174
    # we'll get it and note we need to release it later.
1175
    release_lock = False
1176
    if not self.__lock._is_owned():
1177
      release_lock = True
1178
      self.__lock.acquire()
1179

    
1180
    try:
1181
      invalid_names = set(self.__names()).intersection(names)
1182
      if invalid_names:
1183
        # This must be an explicit raise, not an assert, because assert is
1184
        # turned off when using optimization, and this can happen because of
1185
        # concurrency even if the user doesn't want it.
1186
        raise errors.LockError("duplicate add(%s) on lockset %s" %
1187
                               (invalid_names, self.name))
1188

    
1189
      for lockname in names:
1190
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1191

    
1192
        if acquired:
1193
          # No need for priority or timeout here as this lock has just been
1194
          # created
1195
          lock.acquire(shared=shared)
1196
          # now the lock cannot be deleted, we have it!
1197
          try:
1198
            self._add_owned(name=lockname)
1199
          except:
1200
            # We shouldn't have problems adding the lock to the owners list,
1201
            # but if we did we'll try to release this lock and re-raise
1202
            # exception.  Of course something is going to be really wrong,
1203
            # after this.  On the other hand the lock hasn't been added to the
1204
            # __lockdict yet so no other threads should be pending on it. This
1205
            # release is just a safety measure.
1206
            lock.release()
1207
            raise
1208

    
1209
        self.__lockdict[lockname] = lock
1210

    
1211
    finally:
1212
      # Only release __lock if we were not holding it previously.
1213
      if release_lock:
1214
        self.__lock.release()
1215

    
1216
    return True
1217

    
1218
  def remove(self, names):
1219
    """Remove elements from the lock set.
1220

1221
    You can either not hold anything in the lockset or already hold a superset
1222
    of the elements you want to delete, exclusively.
1223

1224
    @type names: list of strings
1225
    @param names: names of the resource to remove.
1226

1227
    @return: a list of locks which we removed; the list is always
1228
        equal to the names list if we were holding all the locks
1229
        exclusively
1230

1231
    """
1232
    # Support passing in a single resource to remove rather than many
1233
    if isinstance(names, basestring):
1234
      names = [names]
1235

    
1236
    # If we own any subset of this lock it must be a superset of what we want
1237
    # to delete. The ownership must also be exclusive, but that will be checked
1238
    # by the lock itself.
1239
    assert not self._is_owned() or self._list_owned().issuperset(names), (
1240
      "remove() on acquired lockset %s while not owning all elements" %
1241
      self.name)
1242

    
1243
    removed = []
1244

    
1245
    for lname in names:
1246
      # Calling delete() acquires the lock exclusively if we don't already own
1247
      # it, and causes all pending and subsequent lock acquires to fail. It's
1248
      # fine to call it out of order because delete() also implies release(),
1249
      # and the assertion above guarantees that if we either already hold
1250
      # everything we want to delete, or we hold none.
1251
      try:
1252
        self.__lockdict[lname].delete()
1253
        removed.append(lname)
1254
      except (KeyError, errors.LockError):
1255
        # This cannot happen if we were already holding it, verify:
1256
        assert not self._is_owned(), ("remove failed while holding lockset %s"
1257
                                      % self.name)
1258
      else:
1259
        # If no LockError was raised we are the ones who deleted the lock.
1260
        # This means we can safely remove it from lockdict, as any further or
1261
        # pending delete() or acquire() will fail (and nobody can have the lock
1262
        # since before our call to delete()).
1263
        #
1264
        # This is done in an else clause because if the exception was thrown
1265
        # it's the job of the one who actually deleted it.
1266
        del self.__lockdict[lname]
1267
        # And let's remove it from our private list if we owned it.
1268
        if self._is_owned():
1269
          self._del_owned(name=lname)
1270

    
1271
    return removed
1272

    
1273

    
1274
# Locking levels, must be acquired in increasing order.
1275
# Current rules are:
1276
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1277
#   acquired before performing any operation, either in shared or in exclusive
1278
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1279
#   avoided.
1280
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1281
#   If you need more than one node, or more than one instance, acquire them at
1282
#   the same time.
1283
LEVEL_CLUSTER = 0
1284
LEVEL_INSTANCE = 1
1285
LEVEL_NODEGROUP = 2
1286
LEVEL_NODE = 3
1287

    
1288
LEVELS = [LEVEL_CLUSTER,
1289
          LEVEL_INSTANCE,
1290
          LEVEL_NODEGROUP,
1291
          LEVEL_NODE]
1292

    
1293
# Lock levels which are modifiable
1294
LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE]
1295

    
1296
LEVEL_NAMES = {
1297
  LEVEL_CLUSTER: "cluster",
1298
  LEVEL_INSTANCE: "instance",
1299
  LEVEL_NODEGROUP: "nodegroup",
1300
  LEVEL_NODE: "node",
1301
  }
1302

    
1303
# Constant for the big ganeti lock
1304
BGL = 'BGL'
1305

    
1306

    
1307
class GanetiLockManager:
1308
  """The Ganeti Locking Library
1309

1310
  The purpose of this small library is to manage locking for ganeti clusters
1311
  in a central place, while at the same time doing dynamic checks against
1312
  possible deadlocks. It will also make it easier to transition to a different
1313
  lock type should we migrate away from python threads.
1314

1315
  """
1316
  _instance = None
1317

    
1318
  def __init__(self, nodes, nodegroups, instances):
1319
    """Constructs a new GanetiLockManager object.
1320

1321
    There should be only a GanetiLockManager object at any time, so this
1322
    function raises an error if this is not the case.
1323

1324
    @param nodes: list of node names
1325
    @param nodegroups: list of nodegroup uuids
1326
    @param instances: list of instance names
1327

1328
    """
1329
    assert self.__class__._instance is None, \
1330
           "double GanetiLockManager instance"
1331

    
1332
    self.__class__._instance = self
1333

    
1334
    self._monitor = LockMonitor()
1335

    
1336
    # The keyring contains all the locks, at their level and in the correct
1337
    # locking order.
1338
    self.__keyring = {
1339
      LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1340
      LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1341
      LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor),
1342
      LEVEL_INSTANCE: LockSet(instances, "instances",
1343
                              monitor=self._monitor),
1344
      }
1345

    
1346
  def QueryLocks(self, fields):
1347
    """Queries information from all locks.
1348

1349
    See L{LockMonitor.QueryLocks}.
1350

1351
    """
1352
    return self._monitor.QueryLocks(fields)
1353

    
1354
  def OldStyleQueryLocks(self, fields):
1355
    """Queries information from all locks, returning old-style data.
1356

1357
    See L{LockMonitor.OldStyleQueryLocks}.
1358

1359
    """
1360
    return self._monitor.OldStyleQueryLocks(fields)
1361

    
1362
  def _names(self, level):
1363
    """List the lock names at the given level.
1364

1365
    This can be used for debugging/testing purposes.
1366

1367
    @param level: the level whose list of locks to get
1368

1369
    """
1370
    assert level in LEVELS, "Invalid locking level %s" % level
1371
    return self.__keyring[level]._names()
1372

    
1373
  def _is_owned(self, level):
1374
    """Check whether we are owning locks at the given level
1375

1376
    """
1377
    return self.__keyring[level]._is_owned()
1378

    
1379
  is_owned = _is_owned
1380

    
1381
  def _list_owned(self, level):
1382
    """Get the set of owned locks at the given level
1383

1384
    """
1385
    return self.__keyring[level]._list_owned()
1386

    
1387
  list_owned = _list_owned
1388

    
1389
  def _upper_owned(self, level):
1390
    """Check that we don't own any lock at a level greater than the given one.
1391

1392
    """
1393
    # This way of checking only works if LEVELS[i] = i, which we check for in
1394
    # the test cases.
1395
    return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1396

    
1397
  def _BGL_owned(self): # pylint: disable-msg=C0103
1398
    """Check if the current thread owns the BGL.
1399

1400
    Both an exclusive or a shared acquisition work.
1401

1402
    """
1403
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1404

    
1405
  @staticmethod
1406
  def _contains_BGL(level, names): # pylint: disable-msg=C0103
1407
    """Check if the level contains the BGL.
1408

1409
    Check if acting on the given level and set of names will change
1410
    the status of the Big Ganeti Lock.
1411

1412
    """
1413
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1414

    
1415
  def acquire(self, level, names, timeout=None, shared=0, priority=None):
1416
    """Acquire a set of resource locks, at the same level.
1417

1418
    @type level: member of locking.LEVELS
1419
    @param level: the level at which the locks shall be acquired
1420
    @type names: list of strings (or string)
1421
    @param names: the names of the locks which shall be acquired
1422
        (special lock names, or instance/node names)
1423
    @type shared: integer (0/1) used as a boolean
1424
    @param shared: whether to acquire in shared mode; by default
1425
        an exclusive lock will be acquired
1426
    @type timeout: float
1427
    @param timeout: Maximum time to acquire all locks
1428
    @type priority: integer
1429
    @param priority: Priority for acquiring lock
1430

1431
    """
1432
    assert level in LEVELS, "Invalid locking level %s" % level
1433

    
1434
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1435
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1436
    # so even if we've migrated we need to at least share the BGL to be
1437
    # compatible with them. Of course if we own the BGL exclusively there's no
1438
    # point in acquiring any other lock, unless perhaps we are half way through
1439
    # the migration of the current opcode.
1440
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1441
            "You must own the Big Ganeti Lock before acquiring any other")
1442

    
1443
    # Check we don't own locks at the same or upper levels.
1444
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1445
           " while owning some at a greater one")
1446

    
1447
    # Acquire the locks in the set.
1448
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1449
                                         priority=priority)
1450

    
1451
  def release(self, level, names=None):
1452
    """Release a set of resource locks, at the same level.
1453

1454
    You must have acquired the locks, either in shared or in exclusive
1455
    mode, before releasing them.
1456

1457
    @type level: member of locking.LEVELS
1458
    @param level: the level at which the locks shall be released
1459
    @type names: list of strings, or None
1460
    @param names: the names of the locks which shall be released
1461
        (defaults to all the locks acquired at that level)
1462

1463
    """
1464
    assert level in LEVELS, "Invalid locking level %s" % level
1465
    assert (not self._contains_BGL(level, names) or
1466
            not self._upper_owned(LEVEL_CLUSTER)), (
1467
            "Cannot release the Big Ganeti Lock while holding something"
1468
            " at upper levels (%r)" %
1469
            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1470
                              for i in self.__keyring.keys()]), ))
1471

    
1472
    # Release will complain if we don't own the locks already
1473
    return self.__keyring[level].release(names)
1474

    
1475
  def add(self, level, names, acquired=0, shared=0):
1476
    """Add locks at the specified level.
1477

1478
    @type level: member of locking.LEVELS_MOD
1479
    @param level: the level at which the locks shall be added
1480
    @type names: list of strings
1481
    @param names: names of the locks to acquire
1482
    @type acquired: integer (0/1) used as a boolean
1483
    @param acquired: whether to acquire the newly added locks
1484
    @type shared: integer (0/1) used as a boolean
1485
    @param shared: whether the acquisition will be shared
1486

1487
    """
1488
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1489
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1490
           " operations")
1491
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1492
           " while owning some at a greater one")
1493
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1494

    
1495
  def remove(self, level, names):
1496
    """Remove locks from the specified level.
1497

1498
    You must either already own the locks you are trying to remove
1499
    exclusively or not own any lock at an upper level.
1500

1501
    @type level: member of locking.LEVELS_MOD
1502
    @param level: the level at which the locks shall be removed
1503
    @type names: list of strings
1504
    @param names: the names of the locks which shall be removed
1505
        (special lock names, or instance/node names)
1506

1507
    """
1508
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1509
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1510
           " operations")
1511
    # Check we either own the level or don't own anything from here
1512
    # up. LockSet.remove() will check the case in which we don't own
1513
    # all the needed resources, or we have a shared ownership.
1514
    assert self._is_owned(level) or not self._upper_owned(level), (
1515
           "Cannot remove locks at a level while not owning it or"
1516
           " owning some at a greater one")
1517
    return self.__keyring[level].remove(names)
1518

    
1519

    
1520
def _MonitorSortKey((num, item)):
1521
  """Sorting key function.
1522

1523
  Sort by name, then by incoming order.
1524

1525
  """
1526
  (name, _, _, _) = item
1527

    
1528
  return (utils.NiceSortKey(name), num)
1529

    
1530

    
1531
class LockMonitor(object):
1532
  _LOCK_ATTR = "_lock"
1533

    
1534
  def __init__(self):
1535
    """Initializes this class.
1536

1537
    """
1538
    self._lock = SharedLock("LockMonitor")
1539

    
1540
    # Counter for stable sorting
1541
    self._counter = itertools.count(0)
1542

    
1543
    # Tracked locks. Weak references are used to avoid issues with circular
1544
    # references and deletion.
1545
    self._locks = weakref.WeakKeyDictionary()
1546

    
1547
  @ssynchronized(_LOCK_ATTR)
1548
  def RegisterLock(self, lock):
1549
    """Registers a new lock.
1550

1551
    """
1552
    logging.debug("Registering lock %s", lock.name)
1553
    assert lock not in self._locks, "Duplicate lock registration"
1554

    
1555
    # There used to be a check for duplicate names here. As it turned out, when
1556
    # a lock is re-created with the same name in a very short timeframe, the
1557
    # previous instance might not yet be removed from the weakref dictionary.
1558
    # By keeping track of the order of incoming registrations, a stable sort
1559
    # ordering can still be guaranteed.
1560

    
1561
    self._locks[lock] = self._counter.next()
1562

    
1563
  @ssynchronized(_LOCK_ATTR)
1564
  def _GetLockInfo(self, requested):
1565
    """Get information from all locks while the monitor lock is held.
1566

1567
    """
1568
    return [(num, lock.GetInfo(requested)) for lock, num in self._locks.items()]
1569

    
1570
  def _Query(self, fields):
1571
    """Queries information from all locks.
1572

1573
    @type fields: list of strings
1574
    @param fields: List of fields to return
1575

1576
    """
1577
    qobj = query.Query(query.LOCK_FIELDS, fields)
1578

    
1579
    # Get all data with internal lock held and then sort by name and incoming
1580
    # order
1581
    lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
1582
                      key=_MonitorSortKey)
1583

    
1584
    # Extract lock information and build query data
1585
    return (qobj, query.LockQueryData(map(operator.itemgetter(1), lockinfo)))
1586

    
1587
  def QueryLocks(self, fields):
1588
    """Queries information from all locks.
1589

1590
    @type fields: list of strings
1591
    @param fields: List of fields to return
1592

1593
    """
1594
    (qobj, ctx) = self._Query(fields)
1595

    
1596
    # Prepare query response
1597
    return query.GetQueryResponse(qobj, ctx)
1598

    
1599
  def OldStyleQueryLocks(self, fields):
1600
    """Queries information from all locks, returning old-style data.
1601

1602
    @type fields: list of strings
1603
    @param fields: List of fields to return
1604

1605
    """
1606
    (qobj, ctx) = self._Query(fields)
1607

    
1608
    return qobj.OldStyleQuery(ctx)