Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ 83f2d5f6

History | View | Annotate | Download (49.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable-msg=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import errno
32
import weakref
33
import logging
34
import heapq
35
import operator
36
import itertools
37

    
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import compat
41
from ganeti import query
42

    
43

    
44
_EXCLUSIVE_TEXT = "exclusive"
45
_SHARED_TEXT = "shared"
46
_DELETED_TEXT = "deleted"
47

    
48
_DEFAULT_PRIORITY = 0
49

    
50

    
51
def ssynchronized(mylock, shared=0):
52
  """Shared Synchronization decorator.
53

54
  Calls the function holding the given lock, either in exclusive or shared
55
  mode. It requires the passed lock to be a SharedLock (or support its
56
  semantics).
57

58
  @type mylock: lockable object or string
59
  @param mylock: lock to acquire or class member name of the lock to acquire
60

61
  """
62
  def wrap(fn):
63
    def sync_function(*args, **kwargs):
64
      if isinstance(mylock, basestring):
65
        assert args, "cannot ssynchronize on non-class method: self not found"
66
        # args[0] is "self"
67
        lock = getattr(args[0], mylock)
68
      else:
69
        lock = mylock
70
      lock.acquire(shared=shared)
71
      try:
72
        return fn(*args, **kwargs)
73
      finally:
74
        lock.release()
75
    return sync_function
76
  return wrap
77

    
78

    
79
class _SingleNotifyPipeConditionWaiter(object):
80
  """Helper class for SingleNotifyPipeCondition
81

82
  """
83
  __slots__ = [
84
    "_fd",
85
    "_poller",
86
    ]
87

    
88
  def __init__(self, poller, fd):
89
    """Constructor for _SingleNotifyPipeConditionWaiter
90

91
    @type poller: select.poll
92
    @param poller: Poller object
93
    @type fd: int
94
    @param fd: File descriptor to wait for
95

96
    """
97
    object.__init__(self)
98
    self._poller = poller
99
    self._fd = fd
100

    
101
  def __call__(self, timeout):
102
    """Wait for something to happen on the pipe.
103

104
    @type timeout: float or None
105
    @param timeout: Timeout for waiting (can be None)
106

107
    """
108
    running_timeout = utils.RunningTimeout(timeout, True)
109

    
110
    while True:
111
      remaining_time = running_timeout.Remaining()
112

    
113
      if remaining_time is not None:
114
        if remaining_time < 0.0:
115
          break
116

    
117
        # Our calculation uses seconds, poll() wants milliseconds
118
        remaining_time *= 1000
119

    
120
      try:
121
        result = self._poller.poll(remaining_time)
122
      except EnvironmentError, err:
123
        if err.errno != errno.EINTR:
124
          raise
125
        result = None
126

    
127
      # Check whether we were notified
128
      if result and result[0][0] == self._fd:
129
        break
130

    
131

    
132
class _BaseCondition(object):
133
  """Base class containing common code for conditions.
134

135
  Some of this code is taken from python's threading module.
136

137
  """
138
  __slots__ = [
139
    "_lock",
140
    "acquire",
141
    "release",
142
    "_is_owned",
143
    "_acquire_restore",
144
    "_release_save",
145
    ]
146

    
147
  def __init__(self, lock):
148
    """Constructor for _BaseCondition.
149

150
    @type lock: threading.Lock
151
    @param lock: condition base lock
152

153
    """
154
    object.__init__(self)
155

    
156
    try:
157
      self._release_save = lock._release_save
158
    except AttributeError:
159
      self._release_save = self._base_release_save
160
    try:
161
      self._acquire_restore = lock._acquire_restore
162
    except AttributeError:
163
      self._acquire_restore = self._base_acquire_restore
164
    try:
165
      self._is_owned = lock._is_owned
166
    except AttributeError:
167
      self._is_owned = self._base_is_owned
168

    
169
    self._lock = lock
170

    
171
    # Export the lock's acquire() and release() methods
172
    self.acquire = lock.acquire
173
    self.release = lock.release
174

    
175
  def _base_is_owned(self):
176
    """Check whether lock is owned by current thread.
177

178
    """
179
    if self._lock.acquire(0):
180
      self._lock.release()
181
      return False
182
    return True
183

    
184
  def _base_release_save(self):
185
    self._lock.release()
186

    
187
  def _base_acquire_restore(self, _):
188
    self._lock.acquire()
189

    
190
  def _check_owned(self):
191
    """Raise an exception if the current thread doesn't own the lock.
192

193
    """
194
    if not self._is_owned():
195
      raise RuntimeError("cannot work with un-aquired lock")
196

    
197

    
198
class SingleNotifyPipeCondition(_BaseCondition):
199
  """Condition which can only be notified once.
200

201
  This condition class uses pipes and poll, internally, to be able to wait for
202
  notification with a timeout, without resorting to polling. It is almost
203
  compatible with Python's threading.Condition, with the following differences:
204
    - notifyAll can only be called once, and no wait can happen after that
205
    - notify is not supported, only notifyAll
206

207
  """
208

    
209
  __slots__ = [
210
    "_poller",
211
    "_read_fd",
212
    "_write_fd",
213
    "_nwaiters",
214
    "_notified",
215
    ]
216

    
217
  _waiter_class = _SingleNotifyPipeConditionWaiter
218

    
219
  def __init__(self, lock):
220
    """Constructor for SingleNotifyPipeCondition
221

222
    """
223
    _BaseCondition.__init__(self, lock)
224
    self._nwaiters = 0
225
    self._notified = False
226
    self._read_fd = None
227
    self._write_fd = None
228
    self._poller = None
229

    
230
  def _check_unnotified(self):
231
    """Throws an exception if already notified.
232

233
    """
234
    if self._notified:
235
      raise RuntimeError("cannot use already notified condition")
236

    
237
  def _Cleanup(self):
238
    """Cleanup open file descriptors, if any.
239

240
    """
241
    if self._read_fd is not None:
242
      os.close(self._read_fd)
243
      self._read_fd = None
244

    
245
    if self._write_fd is not None:
246
      os.close(self._write_fd)
247
      self._write_fd = None
248
    self._poller = None
249

    
250
  def wait(self, timeout):
251
    """Wait for a notification.
252

253
    @type timeout: float or None
254
    @param timeout: Waiting timeout (can be None)
255

256
    """
257
    self._check_owned()
258
    self._check_unnotified()
259

    
260
    self._nwaiters += 1
261
    try:
262
      if self._poller is None:
263
        (self._read_fd, self._write_fd) = os.pipe()
264
        self._poller = select.poll()
265
        self._poller.register(self._read_fd, select.POLLHUP)
266

    
267
      wait_fn = self._waiter_class(self._poller, self._read_fd)
268
      state = self._release_save()
269
      try:
270
        # Wait for notification
271
        wait_fn(timeout)
272
      finally:
273
        # Re-acquire lock
274
        self._acquire_restore(state)
275
    finally:
276
      self._nwaiters -= 1
277
      if self._nwaiters == 0:
278
        self._Cleanup()
279

    
280
  def notifyAll(self): # pylint: disable-msg=C0103
281
    """Close the writing side of the pipe to notify all waiters.
282

283
    """
284
    self._check_owned()
285
    self._check_unnotified()
286
    self._notified = True
287
    if self._write_fd is not None:
288
      os.close(self._write_fd)
289
      self._write_fd = None
290

    
291

    
292
class PipeCondition(_BaseCondition):
293
  """Group-only non-polling condition with counters.
294

295
  This condition class uses pipes and poll, internally, to be able to wait for
296
  notification with a timeout, without resorting to polling. It is almost
297
  compatible with Python's threading.Condition, but only supports notifyAll and
298
  non-recursive locks. As an additional features it's able to report whether
299
  there are any waiting threads.
300

301
  """
302
  __slots__ = [
303
    "_waiters",
304
    "_single_condition",
305
    ]
306

    
307
  _single_condition_class = SingleNotifyPipeCondition
308

    
309
  def __init__(self, lock):
310
    """Initializes this class.
311

312
    """
313
    _BaseCondition.__init__(self, lock)
314
    self._waiters = set()
315
    self._single_condition = self._single_condition_class(self._lock)
316

    
317
  def wait(self, timeout):
318
    """Wait for a notification.
319

320
    @type timeout: float or None
321
    @param timeout: Waiting timeout (can be None)
322

323
    """
324
    self._check_owned()
325

    
326
    # Keep local reference to the pipe. It could be replaced by another thread
327
    # notifying while we're waiting.
328
    cond = self._single_condition
329

    
330
    self._waiters.add(threading.currentThread())
331
    try:
332
      cond.wait(timeout)
333
    finally:
334
      self._check_owned()
335
      self._waiters.remove(threading.currentThread())
336

    
337
  def notifyAll(self): # pylint: disable-msg=C0103
338
    """Notify all currently waiting threads.
339

340
    """
341
    self._check_owned()
342
    self._single_condition.notifyAll()
343
    self._single_condition = self._single_condition_class(self._lock)
344

    
345
  def get_waiting(self):
346
    """Returns a list of all waiting threads.
347

348
    """
349
    self._check_owned()
350

    
351
    return self._waiters
352

    
353
  def has_waiting(self):
354
    """Returns whether there are active waiters.
355

356
    """
357
    self._check_owned()
358

    
359
    return bool(self._waiters)
360

    
361

    
362
class _PipeConditionWithMode(PipeCondition):
363
  __slots__ = [
364
    "shared",
365
    ]
366

    
367
  def __init__(self, lock, shared):
368
    """Initializes this class.
369

370
    """
371
    self.shared = shared
372
    PipeCondition.__init__(self, lock)
373

    
374

    
375
class SharedLock(object):
376
  """Implements a shared lock.
377

378
  Multiple threads can acquire the lock in a shared way by calling
379
  C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
380
  threads can call C{acquire(shared=0)}.
381

382
  Notes on data structures: C{__pending} contains a priority queue (heapq) of
383
  all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
384
  ...]}. Each per-priority queue contains a normal in-order list of conditions
385
  to be notified when the lock can be acquired. Shared locks are grouped
386
  together by priority and the condition for them is stored in
387
  C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
388
  references for the per-priority queues indexed by priority for faster access.
389

390
  @type name: string
391
  @ivar name: the name of the lock
392

393
  """
394
  __slots__ = [
395
    "__weakref__",
396
    "__deleted",
397
    "__exc",
398
    "__lock",
399
    "__pending",
400
    "__pending_by_prio",
401
    "__pending_shared",
402
    "__shr",
403
    "name",
404
    ]
405

    
406
  __condition_class = _PipeConditionWithMode
407

    
408
  def __init__(self, name, monitor=None):
409
    """Construct a new SharedLock.
410

411
    @param name: the name of the lock
412
    @type monitor: L{LockMonitor}
413
    @param monitor: Lock monitor with which to register
414

415
    """
416
    object.__init__(self)
417

    
418
    self.name = name
419

    
420
    # Internal lock
421
    self.__lock = threading.Lock()
422

    
423
    # Queue containing waiting acquires
424
    self.__pending = []
425
    self.__pending_by_prio = {}
426
    self.__pending_shared = {}
427

    
428
    # Current lock holders
429
    self.__shr = set()
430
    self.__exc = None
431

    
432
    # is this lock in the deleted state?
433
    self.__deleted = False
434

    
435
    # Register with lock monitor
436
    if monitor:
437
      monitor.RegisterLock(self)
438

    
439
  def GetInfo(self, requested):
440
    """Retrieves information for querying locks.
441

442
    @type requested: set
443
    @param requested: Requested information, see C{query.LQ_*}
444

445
    """
446
    self.__lock.acquire()
447
    try:
448
      # Note: to avoid unintentional race conditions, no references to
449
      # modifiable objects should be returned unless they were created in this
450
      # function.
451
      mode = None
452
      owner_names = None
453

    
454
      if query.LQ_MODE in requested:
455
        if self.__deleted:
456
          mode = _DELETED_TEXT
457
          assert not (self.__exc or self.__shr)
458
        elif self.__exc:
459
          mode = _EXCLUSIVE_TEXT
460
        elif self.__shr:
461
          mode = _SHARED_TEXT
462

    
463
      # Current owner(s) are wanted
464
      if query.LQ_OWNER in requested:
465
        if self.__exc:
466
          owner = [self.__exc]
467
        else:
468
          owner = self.__shr
469

    
470
        if owner:
471
          assert not self.__deleted
472
          owner_names = [i.getName() for i in owner]
473

    
474
      # Pending acquires are wanted
475
      if query.LQ_PENDING in requested:
476
        pending = []
477

    
478
        # Sorting instead of copying and using heaq functions for simplicity
479
        for (_, prioqueue) in sorted(self.__pending):
480
          for cond in prioqueue:
481
            if cond.shared:
482
              pendmode = _SHARED_TEXT
483
            else:
484
              pendmode = _EXCLUSIVE_TEXT
485

    
486
            # List of names will be sorted in L{query._GetLockPending}
487
            pending.append((pendmode, [i.getName()
488
                                       for i in cond.get_waiting()]))
489
      else:
490
        pending = None
491

    
492
      return (self.name, mode, owner_names, pending)
493
    finally:
494
      self.__lock.release()
495

    
496
  def __check_deleted(self):
497
    """Raises an exception if the lock has been deleted.
498

499
    """
500
    if self.__deleted:
501
      raise errors.LockError("Deleted lock %s" % self.name)
502

    
503
  def __is_sharer(self):
504
    """Is the current thread sharing the lock at this time?
505

506
    """
507
    return threading.currentThread() in self.__shr
508

    
509
  def __is_exclusive(self):
510
    """Is the current thread holding the lock exclusively at this time?
511

512
    """
513
    return threading.currentThread() == self.__exc
514

    
515
  def __is_owned(self, shared=-1):
516
    """Is the current thread somehow owning the lock at this time?
517

518
    This is a private version of the function, which presumes you're holding
519
    the internal lock.
520

521
    """
522
    if shared < 0:
523
      return self.__is_sharer() or self.__is_exclusive()
524
    elif shared:
525
      return self.__is_sharer()
526
    else:
527
      return self.__is_exclusive()
528

    
529
  def _is_owned(self, shared=-1):
530
    """Is the current thread somehow owning the lock at this time?
531

532
    @param shared:
533
        - < 0: check for any type of ownership (default)
534
        - 0: check for exclusive ownership
535
        - > 0: check for shared ownership
536

537
    """
538
    self.__lock.acquire()
539
    try:
540
      return self.__is_owned(shared=shared)
541
    finally:
542
      self.__lock.release()
543

    
544
  def _count_pending(self):
545
    """Returns the number of pending acquires.
546

547
    @rtype: int
548

549
    """
550
    self.__lock.acquire()
551
    try:
552
      return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
553
    finally:
554
      self.__lock.release()
555

    
556
  def _check_empty(self):
557
    """Checks whether there are any pending acquires.
558

559
    @rtype: bool
560

561
    """
562
    self.__lock.acquire()
563
    try:
564
      # Order is important: __find_first_pending_queue modifies __pending
565
      (_, prioqueue) = self.__find_first_pending_queue()
566

    
567
      return not (prioqueue or
568
                  self.__pending or
569
                  self.__pending_by_prio or
570
                  self.__pending_shared)
571
    finally:
572
      self.__lock.release()
573

    
574
  def __do_acquire(self, shared):
575
    """Actually acquire the lock.
576

577
    """
578
    if shared:
579
      self.__shr.add(threading.currentThread())
580
    else:
581
      self.__exc = threading.currentThread()
582

    
583
  def __can_acquire(self, shared):
584
    """Determine whether lock can be acquired.
585

586
    """
587
    if shared:
588
      return self.__exc is None
589
    else:
590
      return len(self.__shr) == 0 and self.__exc is None
591

    
592
  def __find_first_pending_queue(self):
593
    """Tries to find the topmost queued entry with pending acquires.
594

595
    Removes empty entries while going through the list.
596

597
    """
598
    while self.__pending:
599
      (priority, prioqueue) = self.__pending[0]
600

    
601
      if prioqueue:
602
        return (priority, prioqueue)
603

    
604
      # Remove empty queue
605
      heapq.heappop(self.__pending)
606
      del self.__pending_by_prio[priority]
607
      assert priority not in self.__pending_shared
608

    
609
    return (None, None)
610

    
611
  def __is_on_top(self, cond):
612
    """Checks whether the passed condition is on top of the queue.
613

614
    The caller must make sure the queue isn't empty.
615

616
    """
617
    (_, prioqueue) = self.__find_first_pending_queue()
618

    
619
    return cond == prioqueue[0]
620

    
621
  def __acquire_unlocked(self, shared, timeout, priority):
622
    """Acquire a shared lock.
623

624
    @param shared: whether to acquire in shared mode; by default an
625
        exclusive lock will be acquired
626
    @param timeout: maximum waiting time before giving up
627
    @type priority: integer
628
    @param priority: Priority for acquiring lock
629

630
    """
631
    self.__check_deleted()
632

    
633
    # We cannot acquire the lock if we already have it
634
    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
635
                                   " %s" % self.name)
636

    
637
    # Remove empty entries from queue
638
    self.__find_first_pending_queue()
639

    
640
    # Check whether someone else holds the lock or there are pending acquires.
641
    if not self.__pending and self.__can_acquire(shared):
642
      # Apparently not, can acquire lock directly.
643
      self.__do_acquire(shared)
644
      return True
645

    
646
    prioqueue = self.__pending_by_prio.get(priority, None)
647

    
648
    if shared:
649
      # Try to re-use condition for shared acquire
650
      wait_condition = self.__pending_shared.get(priority, None)
651
      assert (wait_condition is None or
652
              (wait_condition.shared and wait_condition in prioqueue))
653
    else:
654
      wait_condition = None
655

    
656
    if wait_condition is None:
657
      if prioqueue is None:
658
        assert priority not in self.__pending_by_prio
659

    
660
        prioqueue = []
661
        heapq.heappush(self.__pending, (priority, prioqueue))
662
        self.__pending_by_prio[priority] = prioqueue
663

    
664
      wait_condition = self.__condition_class(self.__lock, shared)
665
      prioqueue.append(wait_condition)
666

    
667
      if shared:
668
        # Keep reference for further shared acquires on same priority. This is
669
        # better than trying to find it in the list of pending acquires.
670
        assert priority not in self.__pending_shared
671
        self.__pending_shared[priority] = wait_condition
672

    
673
    try:
674
      # Wait until we become the topmost acquire in the queue or the timeout
675
      # expires.
676
      # TODO: Decrease timeout with spurious notifications
677
      while not (self.__is_on_top(wait_condition) and
678
                 self.__can_acquire(shared)):
679
        # Wait for notification
680
        wait_condition.wait(timeout)
681
        self.__check_deleted()
682

    
683
        # A lot of code assumes blocking acquires always succeed. Loop
684
        # internally for that case.
685
        if timeout is not None:
686
          break
687

    
688
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
689
        self.__do_acquire(shared)
690
        return True
691
    finally:
692
      # Remove condition from queue if there are no more waiters
693
      if not wait_condition.has_waiting():
694
        prioqueue.remove(wait_condition)
695
        if wait_condition.shared:
696
          # Remove from list of shared acquires if it wasn't while releasing
697
          # (e.g. on lock deletion)
698
          self.__pending_shared.pop(priority, None)
699

    
700
    return False
701

    
702
  def acquire(self, shared=0, timeout=None, priority=None,
703
              test_notify=None):
704
    """Acquire a shared lock.
705

706
    @type shared: integer (0/1) used as a boolean
707
    @param shared: whether to acquire in shared mode; by default an
708
        exclusive lock will be acquired
709
    @type timeout: float
710
    @param timeout: maximum waiting time before giving up
711
    @type priority: integer
712
    @param priority: Priority for acquiring lock
713
    @type test_notify: callable or None
714
    @param test_notify: Special callback function for unittesting
715

716
    """
717
    if priority is None:
718
      priority = _DEFAULT_PRIORITY
719

    
720
    self.__lock.acquire()
721
    try:
722
      # We already got the lock, notify now
723
      if __debug__ and callable(test_notify):
724
        test_notify()
725

    
726
      return self.__acquire_unlocked(shared, timeout, priority)
727
    finally:
728
      self.__lock.release()
729

    
730
  def release(self):
731
    """Release a Shared Lock.
732

733
    You must have acquired the lock, either in shared or in exclusive mode,
734
    before calling this function.
735

736
    """
737
    self.__lock.acquire()
738
    try:
739
      assert self.__is_exclusive() or self.__is_sharer(), \
740
        "Cannot release non-owned lock"
741

    
742
      # Autodetect release type
743
      if self.__is_exclusive():
744
        self.__exc = None
745
      else:
746
        self.__shr.remove(threading.currentThread())
747

    
748
      # Notify topmost condition in queue
749
      (priority, prioqueue) = self.__find_first_pending_queue()
750
      if prioqueue:
751
        cond = prioqueue[0]
752
        cond.notifyAll()
753
        if cond.shared:
754
          # Prevent further shared acquires from sneaking in while waiters are
755
          # notified
756
          self.__pending_shared.pop(priority, None)
757

    
758
    finally:
759
      self.__lock.release()
760

    
761
  def delete(self, timeout=None, priority=None):
762
    """Delete a Shared Lock.
763

764
    This operation will declare the lock for removal. First the lock will be
765
    acquired in exclusive mode if you don't already own it, then the lock
766
    will be put in a state where any future and pending acquire() fail.
767

768
    @type timeout: float
769
    @param timeout: maximum waiting time before giving up
770
    @type priority: integer
771
    @param priority: Priority for acquiring lock
772

773
    """
774
    if priority is None:
775
      priority = _DEFAULT_PRIORITY
776

    
777
    self.__lock.acquire()
778
    try:
779
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
780

    
781
      self.__check_deleted()
782

    
783
      # The caller is allowed to hold the lock exclusively already.
784
      acquired = self.__is_exclusive()
785

    
786
      if not acquired:
787
        acquired = self.__acquire_unlocked(0, timeout, priority)
788

    
789
        assert self.__is_exclusive() and not self.__is_sharer(), \
790
          "Lock wasn't acquired in exclusive mode"
791

    
792
      if acquired:
793
        self.__deleted = True
794
        self.__exc = None
795

    
796
        assert not (self.__exc or self.__shr), "Found owner during deletion"
797

    
798
        # Notify all acquires. They'll throw an error.
799
        for (_, prioqueue) in self.__pending:
800
          for cond in prioqueue:
801
            cond.notifyAll()
802

    
803
        assert self.__deleted
804

    
805
      return acquired
806
    finally:
807
      self.__lock.release()
808

    
809
  def _release_save(self):
810
    shared = self.__is_sharer()
811
    self.release()
812
    return shared
813

    
814
  def _acquire_restore(self, shared):
815
    self.acquire(shared=shared)
816

    
817

    
818
# Whenever we want to acquire a full LockSet we pass None as the value
819
# to acquire.  Hide this behind this nicely named constant.
820
ALL_SET = None
821

    
822

    
823
class _AcquireTimeout(Exception):
824
  """Internal exception to abort an acquire on a timeout.
825

826
  """
827

    
828

    
829
class LockSet:
830
  """Implements a set of locks.
831

832
  This abstraction implements a set of shared locks for the same resource type,
833
  distinguished by name. The user can lock a subset of the resources and the
834
  LockSet will take care of acquiring the locks always in the same order, thus
835
  preventing deadlock.
836

837
  All the locks needed in the same set must be acquired together, though.
838

839
  @type name: string
840
  @ivar name: the name of the lockset
841

842
  """
843
  def __init__(self, members, name, monitor=None):
844
    """Constructs a new LockSet.
845

846
    @type members: list of strings
847
    @param members: initial members of the set
848
    @type monitor: L{LockMonitor}
849
    @param monitor: Lock monitor with which to register member locks
850

851
    """
852
    assert members is not None, "members parameter is not a list"
853
    self.name = name
854

    
855
    # Lock monitor
856
    self.__monitor = monitor
857

    
858
    # Used internally to guarantee coherency.
859
    self.__lock = SharedLock(name)
860

    
861
    # The lockdict indexes the relationship name -> lock
862
    # The order-of-locking is implied by the alphabetical order of names
863
    self.__lockdict = {}
864

    
865
    for mname in members:
866
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
867
                                          monitor=monitor)
868

    
869
    # The owner dict contains the set of locks each thread owns. For
870
    # performance each thread can access its own key without a global lock on
871
    # this structure. It is paramount though that *no* other type of access is
872
    # done to this structure (eg. no looping over its keys). *_owner helper
873
    # function are defined to guarantee access is correct, but in general never
874
    # do anything different than __owners[threading.currentThread()], or there
875
    # will be trouble.
876
    self.__owners = {}
877

    
878
  def _GetLockName(self, mname):
879
    """Returns the name for a member lock.
880

881
    """
882
    return "%s/%s" % (self.name, mname)
883

    
884
  def _is_owned(self):
885
    """Is the current thread a current level owner?"""
886
    return threading.currentThread() in self.__owners
887

    
888
  def _add_owned(self, name=None):
889
    """Note the current thread owns the given lock"""
890
    if name is None:
891
      if not self._is_owned():
892
        self.__owners[threading.currentThread()] = set()
893
    else:
894
      if self._is_owned():
895
        self.__owners[threading.currentThread()].add(name)
896
      else:
897
        self.__owners[threading.currentThread()] = set([name])
898

    
899
  def _del_owned(self, name=None):
900
    """Note the current thread owns the given lock"""
901

    
902
    assert not (name is None and self.__lock._is_owned()), \
903
           "Cannot hold internal lock when deleting owner status"
904

    
905
    if name is not None:
906
      self.__owners[threading.currentThread()].remove(name)
907

    
908
    # Only remove the key if we don't hold the set-lock as well
909
    if (not self.__lock._is_owned() and
910
        not self.__owners[threading.currentThread()]):
911
      del self.__owners[threading.currentThread()]
912

    
913
  def _list_owned(self):
914
    """Get the set of resource names owned by the current thread"""
915
    if self._is_owned():
916
      return self.__owners[threading.currentThread()].copy()
917
    else:
918
      return set()
919

    
920
  def _release_and_delete_owned(self):
921
    """Release and delete all resources owned by the current thread"""
922
    for lname in self._list_owned():
923
      lock = self.__lockdict[lname]
924
      if lock._is_owned():
925
        lock.release()
926
      self._del_owned(name=lname)
927

    
928
  def __names(self):
929
    """Return the current set of names.
930

931
    Only call this function while holding __lock and don't iterate on the
932
    result after releasing the lock.
933

934
    """
935
    return self.__lockdict.keys()
936

    
937
  def _names(self):
938
    """Return a copy of the current set of elements.
939

940
    Used only for debugging purposes.
941

942
    """
943
    # If we don't already own the set-level lock acquired
944
    # we'll get it and note we need to release it later.
945
    release_lock = False
946
    if not self.__lock._is_owned():
947
      release_lock = True
948
      self.__lock.acquire(shared=1)
949
    try:
950
      result = self.__names()
951
    finally:
952
      if release_lock:
953
        self.__lock.release()
954
    return set(result)
955

    
956
  def acquire(self, names, timeout=None, shared=0, priority=None,
957
              test_notify=None):
958
    """Acquire a set of resource locks.
959

960
    @type names: list of strings (or string)
961
    @param names: the names of the locks which shall be acquired
962
        (special lock names, or instance/node names)
963
    @type shared: integer (0/1) used as a boolean
964
    @param shared: whether to acquire in shared mode; by default an
965
        exclusive lock will be acquired
966
    @type timeout: float or None
967
    @param timeout: Maximum time to acquire all locks
968
    @type priority: integer
969
    @param priority: Priority for acquiring locks
970
    @type test_notify: callable or None
971
    @param test_notify: Special callback function for unittesting
972

973
    @return: Set of all locks successfully acquired or None in case of timeout
974

975
    @raise errors.LockError: when any lock we try to acquire has
976
        been deleted before we succeed. In this case none of the
977
        locks requested will be acquired.
978

979
    """
980
    assert timeout is None or timeout >= 0.0
981

    
982
    # Check we don't already own locks at this level
983
    assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
984
                                  " (lockset %s)" % self.name)
985

    
986
    if priority is None:
987
      priority = _DEFAULT_PRIORITY
988

    
989
    # We need to keep track of how long we spent waiting for a lock. The
990
    # timeout passed to this function is over all lock acquires.
991
    running_timeout = utils.RunningTimeout(timeout, False)
992

    
993
    try:
994
      if names is not None:
995
        # Support passing in a single resource to acquire rather than many
996
        if isinstance(names, basestring):
997
          names = [names]
998

    
999
        return self.__acquire_inner(names, False, shared, priority,
1000
                                    running_timeout.Remaining, test_notify)
1001

    
1002
      else:
1003
        # If no names are given acquire the whole set by not letting new names
1004
        # being added before we release, and getting the current list of names.
1005
        # Some of them may then be deleted later, but we'll cope with this.
1006
        #
1007
        # We'd like to acquire this lock in a shared way, as it's nice if
1008
        # everybody else can use the instances at the same time. If we are
1009
        # acquiring them exclusively though they won't be able to do this
1010
        # anyway, though, so we'll get the list lock exclusively as well in
1011
        # order to be able to do add() on the set while owning it.
1012
        if not self.__lock.acquire(shared=shared, priority=priority,
1013
                                   timeout=running_timeout.Remaining()):
1014
          raise _AcquireTimeout()
1015
        try:
1016
          # note we own the set-lock
1017
          self._add_owned()
1018

    
1019
          return self.__acquire_inner(self.__names(), True, shared, priority,
1020
                                      running_timeout.Remaining, test_notify)
1021
        except:
1022
          # We shouldn't have problems adding the lock to the owners list, but
1023
          # if we did we'll try to release this lock and re-raise exception.
1024
          # Of course something is going to be really wrong, after this.
1025
          self.__lock.release()
1026
          self._del_owned()
1027
          raise
1028

    
1029
    except _AcquireTimeout:
1030
      return None
1031

    
1032
  def __acquire_inner(self, names, want_all, shared, priority,
1033
                      timeout_fn, test_notify):
1034
    """Inner logic for acquiring a number of locks.
1035

1036
    @param names: Names of the locks to be acquired
1037
    @param want_all: Whether all locks in the set should be acquired
1038
    @param shared: Whether to acquire in shared mode
1039
    @param timeout_fn: Function returning remaining timeout
1040
    @param priority: Priority for acquiring locks
1041
    @param test_notify: Special callback function for unittesting
1042

1043
    """
1044
    acquire_list = []
1045

    
1046
    # First we look the locks up on __lockdict. We have no way of being sure
1047
    # they will still be there after, but this makes it a lot faster should
1048
    # just one of them be the already wrong. Using a sorted sequence to prevent
1049
    # deadlocks.
1050
    for lname in sorted(utils.UniqueSequence(names)):
1051
      try:
1052
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
1053
      except KeyError:
1054
        if want_all:
1055
          # We are acquiring all the set, it doesn't matter if this particular
1056
          # element is not there anymore.
1057
          continue
1058

    
1059
        raise errors.LockError("Non-existing lock %s in set %s (it may have"
1060
                               " been removed)" % (lname, self.name))
1061

    
1062
      acquire_list.append((lname, lock))
1063

    
1064
    # This will hold the locknames we effectively acquired.
1065
    acquired = set()
1066

    
1067
    try:
1068
      # Now acquire_list contains a sorted list of resources and locks we
1069
      # want.  In order to get them we loop on this (private) list and
1070
      # acquire() them.  We gave no real guarantee they will still exist till
1071
      # this is done but .acquire() itself is safe and will alert us if the
1072
      # lock gets deleted.
1073
      for (lname, lock) in acquire_list:
1074
        if __debug__ and callable(test_notify):
1075
          test_notify_fn = lambda: test_notify(lname)
1076
        else:
1077
          test_notify_fn = None
1078

    
1079
        timeout = timeout_fn()
1080

    
1081
        try:
1082
          # raises LockError if the lock was deleted
1083
          acq_success = lock.acquire(shared=shared, timeout=timeout,
1084
                                     priority=priority,
1085
                                     test_notify=test_notify_fn)
1086
        except errors.LockError:
1087
          if want_all:
1088
            # We are acquiring all the set, it doesn't matter if this
1089
            # particular element is not there anymore.
1090
            continue
1091

    
1092
          raise errors.LockError("Non-existing lock %s in set %s (it may"
1093
                                 " have been removed)" % (lname, self.name))
1094

    
1095
        if not acq_success:
1096
          # Couldn't get lock or timeout occurred
1097
          if timeout is None:
1098
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
1099
            # blocking.
1100
            raise errors.LockError("Failed to get lock %s (set %s)" %
1101
                                   (lname, self.name))
1102

    
1103
          raise _AcquireTimeout()
1104

    
1105
        try:
1106
          # now the lock cannot be deleted, we have it!
1107
          self._add_owned(name=lname)
1108
          acquired.add(lname)
1109

    
1110
        except:
1111
          # We shouldn't have problems adding the lock to the owners list, but
1112
          # if we did we'll try to release this lock and re-raise exception.
1113
          # Of course something is going to be really wrong after this.
1114
          if lock._is_owned():
1115
            lock.release()
1116
          raise
1117

    
1118
    except:
1119
      # Release all owned locks
1120
      self._release_and_delete_owned()
1121
      raise
1122

    
1123
    return acquired
1124

    
1125
  def release(self, names=None):
1126
    """Release a set of resource locks, at the same level.
1127

1128
    You must have acquired the locks, either in shared or in exclusive mode,
1129
    before releasing them.
1130

1131
    @type names: list of strings, or None
1132
    @param names: the names of the locks which shall be released
1133
        (defaults to all the locks acquired at that level).
1134

1135
    """
1136
    assert self._is_owned(), ("release() on lock set %s while not owner" %
1137
                              self.name)
1138

    
1139
    # Support passing in a single resource to release rather than many
1140
    if isinstance(names, basestring):
1141
      names = [names]
1142

    
1143
    if names is None:
1144
      names = self._list_owned()
1145
    else:
1146
      names = set(names)
1147
      assert self._list_owned().issuperset(names), (
1148
               "release() on unheld resources %s (set %s)" %
1149
               (names.difference(self._list_owned()), self.name))
1150

    
1151
    # First of all let's release the "all elements" lock, if set.
1152
    # After this 'add' can work again
1153
    if self.__lock._is_owned():
1154
      self.__lock.release()
1155
      self._del_owned()
1156

    
1157
    for lockname in names:
1158
      # If we are sure the lock doesn't leave __lockdict without being
1159
      # exclusively held we can do this...
1160
      self.__lockdict[lockname].release()
1161
      self._del_owned(name=lockname)
1162

    
1163
  def add(self, names, acquired=0, shared=0):
1164
    """Add a new set of elements to the set
1165

1166
    @type names: list of strings
1167
    @param names: names of the new elements to add
1168
    @type acquired: integer (0/1) used as a boolean
1169
    @param acquired: pre-acquire the new resource?
1170
    @type shared: integer (0/1) used as a boolean
1171
    @param shared: is the pre-acquisition shared?
1172

1173
    """
1174
    # Check we don't already own locks at this level
1175
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1176
      ("Cannot add locks if the set %s is only partially owned, or shared" %
1177
       self.name)
1178

    
1179
    # Support passing in a single resource to add rather than many
1180
    if isinstance(names, basestring):
1181
      names = [names]
1182

    
1183
    # If we don't already own the set-level lock acquired in an exclusive way
1184
    # we'll get it and note we need to release it later.
1185
    release_lock = False
1186
    if not self.__lock._is_owned():
1187
      release_lock = True
1188
      self.__lock.acquire()
1189

    
1190
    try:
1191
      invalid_names = set(self.__names()).intersection(names)
1192
      if invalid_names:
1193
        # This must be an explicit raise, not an assert, because assert is
1194
        # turned off when using optimization, and this can happen because of
1195
        # concurrency even if the user doesn't want it.
1196
        raise errors.LockError("duplicate add(%s) on lockset %s" %
1197
                               (invalid_names, self.name))
1198

    
1199
      for lockname in names:
1200
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1201

    
1202
        if acquired:
1203
          # No need for priority or timeout here as this lock has just been
1204
          # created
1205
          lock.acquire(shared=shared)
1206
          # now the lock cannot be deleted, we have it!
1207
          try:
1208
            self._add_owned(name=lockname)
1209
          except:
1210
            # We shouldn't have problems adding the lock to the owners list,
1211
            # but if we did we'll try to release this lock and re-raise
1212
            # exception.  Of course something is going to be really wrong,
1213
            # after this.  On the other hand the lock hasn't been added to the
1214
            # __lockdict yet so no other threads should be pending on it. This
1215
            # release is just a safety measure.
1216
            lock.release()
1217
            raise
1218

    
1219
        self.__lockdict[lockname] = lock
1220

    
1221
    finally:
1222
      # Only release __lock if we were not holding it previously.
1223
      if release_lock:
1224
        self.__lock.release()
1225

    
1226
    return True
1227

    
1228
  def remove(self, names):
1229
    """Remove elements from the lock set.
1230

1231
    You can either not hold anything in the lockset or already hold a superset
1232
    of the elements you want to delete, exclusively.
1233

1234
    @type names: list of strings
1235
    @param names: names of the resource to remove.
1236

1237
    @return: a list of locks which we removed; the list is always
1238
        equal to the names list if we were holding all the locks
1239
        exclusively
1240

1241
    """
1242
    # Support passing in a single resource to remove rather than many
1243
    if isinstance(names, basestring):
1244
      names = [names]
1245

    
1246
    # If we own any subset of this lock it must be a superset of what we want
1247
    # to delete. The ownership must also be exclusive, but that will be checked
1248
    # by the lock itself.
1249
    assert not self._is_owned() or self._list_owned().issuperset(names), (
1250
      "remove() on acquired lockset %s while not owning all elements" %
1251
      self.name)
1252

    
1253
    removed = []
1254

    
1255
    for lname in names:
1256
      # Calling delete() acquires the lock exclusively if we don't already own
1257
      # it, and causes all pending and subsequent lock acquires to fail. It's
1258
      # fine to call it out of order because delete() also implies release(),
1259
      # and the assertion above guarantees that if we either already hold
1260
      # everything we want to delete, or we hold none.
1261
      try:
1262
        self.__lockdict[lname].delete()
1263
        removed.append(lname)
1264
      except (KeyError, errors.LockError):
1265
        # This cannot happen if we were already holding it, verify:
1266
        assert not self._is_owned(), ("remove failed while holding lockset %s"
1267
                                      % self.name)
1268
      else:
1269
        # If no LockError was raised we are the ones who deleted the lock.
1270
        # This means we can safely remove it from lockdict, as any further or
1271
        # pending delete() or acquire() will fail (and nobody can have the lock
1272
        # since before our call to delete()).
1273
        #
1274
        # This is done in an else clause because if the exception was thrown
1275
        # it's the job of the one who actually deleted it.
1276
        del self.__lockdict[lname]
1277
        # And let's remove it from our private list if we owned it.
1278
        if self._is_owned():
1279
          self._del_owned(name=lname)
1280

    
1281
    return removed
1282

    
1283

    
1284
# Locking levels, must be acquired in increasing order.
1285
# Current rules are:
1286
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1287
#   acquired before performing any operation, either in shared or in exclusive
1288
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1289
#   avoided.
1290
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1291
#   If you need more than one node, or more than one instance, acquire them at
1292
#   the same time.
1293
LEVEL_CLUSTER = 0
1294
LEVEL_INSTANCE = 1
1295
LEVEL_NODEGROUP = 2
1296
LEVEL_NODE = 3
1297

    
1298
LEVELS = [LEVEL_CLUSTER,
1299
          LEVEL_INSTANCE,
1300
          LEVEL_NODEGROUP,
1301
          LEVEL_NODE]
1302

    
1303
# Lock levels which are modifiable
1304
LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE]
1305

    
1306
LEVEL_NAMES = {
1307
  LEVEL_CLUSTER: "cluster",
1308
  LEVEL_INSTANCE: "instance",
1309
  LEVEL_NODEGROUP: "nodegroup",
1310
  LEVEL_NODE: "node",
1311
  }
1312

    
1313
# Constant for the big ganeti lock
1314
BGL = 'BGL'
1315

    
1316

    
1317
class GanetiLockManager:
1318
  """The Ganeti Locking Library
1319

1320
  The purpose of this small library is to manage locking for ganeti clusters
1321
  in a central place, while at the same time doing dynamic checks against
1322
  possible deadlocks. It will also make it easier to transition to a different
1323
  lock type should we migrate away from python threads.
1324

1325
  """
1326
  _instance = None
1327

    
1328
  def __init__(self, nodes, nodegroups, instances):
1329
    """Constructs a new GanetiLockManager object.
1330

1331
    There should be only a GanetiLockManager object at any time, so this
1332
    function raises an error if this is not the case.
1333

1334
    @param nodes: list of node names
1335
    @param nodegroups: list of nodegroup uuids
1336
    @param instances: list of instance names
1337

1338
    """
1339
    assert self.__class__._instance is None, \
1340
           "double GanetiLockManager instance"
1341

    
1342
    self.__class__._instance = self
1343

    
1344
    self._monitor = LockMonitor()
1345

    
1346
    # The keyring contains all the locks, at their level and in the correct
1347
    # locking order.
1348
    self.__keyring = {
1349
      LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1350
      LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1351
      LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor),
1352
      LEVEL_INSTANCE: LockSet(instances, "instances",
1353
                              monitor=self._monitor),
1354
      }
1355

    
1356
  def QueryLocks(self, fields):
1357
    """Queries information from all locks.
1358

1359
    See L{LockMonitor.QueryLocks}.
1360

1361
    """
1362
    return self._monitor.QueryLocks(fields)
1363

    
1364
  def OldStyleQueryLocks(self, fields):
1365
    """Queries information from all locks, returning old-style data.
1366

1367
    See L{LockMonitor.OldStyleQueryLocks}.
1368

1369
    """
1370
    return self._monitor.OldStyleQueryLocks(fields)
1371

    
1372
  def _names(self, level):
1373
    """List the lock names at the given level.
1374

1375
    This can be used for debugging/testing purposes.
1376

1377
    @param level: the level whose list of locks to get
1378

1379
    """
1380
    assert level in LEVELS, "Invalid locking level %s" % level
1381
    return self.__keyring[level]._names()
1382

    
1383
  def _is_owned(self, level):
1384
    """Check whether we are owning locks at the given level
1385

1386
    """
1387
    return self.__keyring[level]._is_owned()
1388

    
1389
  is_owned = _is_owned
1390

    
1391
  def _list_owned(self, level):
1392
    """Get the set of owned locks at the given level
1393

1394
    """
1395
    return self.__keyring[level]._list_owned()
1396

    
1397
  list_owned = _list_owned
1398

    
1399
  def _upper_owned(self, level):
1400
    """Check that we don't own any lock at a level greater than the given one.
1401

1402
    """
1403
    # This way of checking only works if LEVELS[i] = i, which we check for in
1404
    # the test cases.
1405
    return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1406

    
1407
  def _BGL_owned(self): # pylint: disable-msg=C0103
1408
    """Check if the current thread owns the BGL.
1409

1410
    Both an exclusive or a shared acquisition work.
1411

1412
    """
1413
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1414

    
1415
  @staticmethod
1416
  def _contains_BGL(level, names): # pylint: disable-msg=C0103
1417
    """Check if the level contains the BGL.
1418

1419
    Check if acting on the given level and set of names will change
1420
    the status of the Big Ganeti Lock.
1421

1422
    """
1423
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1424

    
1425
  def acquire(self, level, names, timeout=None, shared=0, priority=None):
1426
    """Acquire a set of resource locks, at the same level.
1427

1428
    @type level: member of locking.LEVELS
1429
    @param level: the level at which the locks shall be acquired
1430
    @type names: list of strings (or string)
1431
    @param names: the names of the locks which shall be acquired
1432
        (special lock names, or instance/node names)
1433
    @type shared: integer (0/1) used as a boolean
1434
    @param shared: whether to acquire in shared mode; by default
1435
        an exclusive lock will be acquired
1436
    @type timeout: float
1437
    @param timeout: Maximum time to acquire all locks
1438
    @type priority: integer
1439
    @param priority: Priority for acquiring lock
1440

1441
    """
1442
    assert level in LEVELS, "Invalid locking level %s" % level
1443

    
1444
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1445
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1446
    # so even if we've migrated we need to at least share the BGL to be
1447
    # compatible with them. Of course if we own the BGL exclusively there's no
1448
    # point in acquiring any other lock, unless perhaps we are half way through
1449
    # the migration of the current opcode.
1450
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1451
            "You must own the Big Ganeti Lock before acquiring any other")
1452

    
1453
    # Check we don't own locks at the same or upper levels.
1454
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1455
           " while owning some at a greater one")
1456

    
1457
    # Acquire the locks in the set.
1458
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1459
                                         priority=priority)
1460

    
1461
  def release(self, level, names=None):
1462
    """Release a set of resource locks, at the same level.
1463

1464
    You must have acquired the locks, either in shared or in exclusive
1465
    mode, before releasing them.
1466

1467
    @type level: member of locking.LEVELS
1468
    @param level: the level at which the locks shall be released
1469
    @type names: list of strings, or None
1470
    @param names: the names of the locks which shall be released
1471
        (defaults to all the locks acquired at that level)
1472

1473
    """
1474
    assert level in LEVELS, "Invalid locking level %s" % level
1475
    assert (not self._contains_BGL(level, names) or
1476
            not self._upper_owned(LEVEL_CLUSTER)), (
1477
            "Cannot release the Big Ganeti Lock while holding something"
1478
            " at upper levels (%r)" %
1479
            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1480
                              for i in self.__keyring.keys()]), ))
1481

    
1482
    # Release will complain if we don't own the locks already
1483
    return self.__keyring[level].release(names)
1484

    
1485
  def add(self, level, names, acquired=0, shared=0):
1486
    """Add locks at the specified level.
1487

1488
    @type level: member of locking.LEVELS_MOD
1489
    @param level: the level at which the locks shall be added
1490
    @type names: list of strings
1491
    @param names: names of the locks to acquire
1492
    @type acquired: integer (0/1) used as a boolean
1493
    @param acquired: whether to acquire the newly added locks
1494
    @type shared: integer (0/1) used as a boolean
1495
    @param shared: whether the acquisition will be shared
1496

1497
    """
1498
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1499
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1500
           " operations")
1501
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1502
           " while owning some at a greater one")
1503
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1504

    
1505
  def remove(self, level, names):
1506
    """Remove locks from the specified level.
1507

1508
    You must either already own the locks you are trying to remove
1509
    exclusively or not own any lock at an upper level.
1510

1511
    @type level: member of locking.LEVELS_MOD
1512
    @param level: the level at which the locks shall be removed
1513
    @type names: list of strings
1514
    @param names: the names of the locks which shall be removed
1515
        (special lock names, or instance/node names)
1516

1517
    """
1518
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1519
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1520
           " operations")
1521
    # Check we either own the level or don't own anything from here
1522
    # up. LockSet.remove() will check the case in which we don't own
1523
    # all the needed resources, or we have a shared ownership.
1524
    assert self._is_owned(level) or not self._upper_owned(level), (
1525
           "Cannot remove locks at a level while not owning it or"
1526
           " owning some at a greater one")
1527
    return self.__keyring[level].remove(names)
1528

    
1529

    
1530
def _MonitorSortKey((num, item)):
1531
  """Sorting key function.
1532

1533
  Sort by name, then by incoming order.
1534

1535
  """
1536
  (name, _, _, _) = item
1537

    
1538
  return (utils.NiceSortKey(name), num)
1539

    
1540

    
1541
class LockMonitor(object):
1542
  _LOCK_ATTR = "_lock"
1543

    
1544
  def __init__(self):
1545
    """Initializes this class.
1546

1547
    """
1548
    self._lock = SharedLock("LockMonitor")
1549

    
1550
    # Counter for stable sorting
1551
    self._counter = itertools.count(0)
1552

    
1553
    # Tracked locks. Weak references are used to avoid issues with circular
1554
    # references and deletion.
1555
    self._locks = weakref.WeakKeyDictionary()
1556

    
1557
  @ssynchronized(_LOCK_ATTR)
1558
  def RegisterLock(self, lock):
1559
    """Registers a new lock.
1560

1561
    """
1562
    logging.debug("Registering lock %s", lock.name)
1563
    assert lock not in self._locks, "Duplicate lock registration"
1564

    
1565
    # There used to be a check for duplicate names here. As it turned out, when
1566
    # a lock is re-created with the same name in a very short timeframe, the
1567
    # previous instance might not yet be removed from the weakref dictionary.
1568
    # By keeping track of the order of incoming registrations, a stable sort
1569
    # ordering can still be guaranteed.
1570

    
1571
    self._locks[lock] = self._counter.next()
1572

    
1573
  @ssynchronized(_LOCK_ATTR)
1574
  def _GetLockInfo(self, requested):
1575
    """Get information from all locks while the monitor lock is held.
1576

1577
    """
1578
    return [(num, lock.GetInfo(requested)) for lock, num in self._locks.items()]
1579

    
1580
  def _Query(self, fields):
1581
    """Queries information from all locks.
1582

1583
    @type fields: list of strings
1584
    @param fields: List of fields to return
1585

1586
    """
1587
    qobj = query.Query(query.LOCK_FIELDS, fields)
1588

    
1589
    # Get all data with internal lock held and then sort by name and incoming
1590
    # order
1591
    lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
1592
                      key=_MonitorSortKey)
1593

    
1594
    # Extract lock information and build query data
1595
    return (qobj, query.LockQueryData(map(operator.itemgetter(1), lockinfo)))
1596

    
1597
  def QueryLocks(self, fields):
1598
    """Queries information from all locks.
1599

1600
    @type fields: list of strings
1601
    @param fields: List of fields to return
1602

1603
    """
1604
    (qobj, ctx) = self._Query(fields)
1605

    
1606
    # Prepare query response
1607
    return query.GetQueryResponse(qobj, ctx)
1608

    
1609
  def OldStyleQueryLocks(self, fields):
1610
    """Queries information from all locks, returning old-style data.
1611

1612
    @type fields: list of strings
1613
    @param fields: List of fields to return
1614

1615
    """
1616
    (qobj, ctx) = self._Query(fields)
1617

    
1618
    return qobj.OldStyleQuery(ctx)