Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ ff699aa9

History | View | Annotate | Download (48.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable-msg=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import errno
32
import weakref
33
import logging
34
import heapq
35
import operator
36

    
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import compat
40
from ganeti import query
41

    
42

    
43
_EXCLUSIVE_TEXT = "exclusive"
44
_SHARED_TEXT = "shared"
45
_DELETED_TEXT = "deleted"
46

    
47
_DEFAULT_PRIORITY = 0
48

    
49

    
50
def ssynchronized(mylock, shared=0):
51
  """Shared Synchronization decorator.
52

53
  Calls the function holding the given lock, either in exclusive or shared
54
  mode. It requires the passed lock to be a SharedLock (or support its
55
  semantics).
56

57
  @type mylock: lockable object or string
58
  @param mylock: lock to acquire or class member name of the lock to acquire
59

60
  """
61
  def wrap(fn):
62
    def sync_function(*args, **kwargs):
63
      if isinstance(mylock, basestring):
64
        assert args, "cannot ssynchronize on non-class method: self not found"
65
        # args[0] is "self"
66
        lock = getattr(args[0], mylock)
67
      else:
68
        lock = mylock
69
      lock.acquire(shared=shared)
70
      try:
71
        return fn(*args, **kwargs)
72
      finally:
73
        lock.release()
74
    return sync_function
75
  return wrap
76

    
77

    
78
class _SingleNotifyPipeConditionWaiter(object):
79
  """Helper class for SingleNotifyPipeCondition
80

81
  """
82
  __slots__ = [
83
    "_fd",
84
    "_poller",
85
    ]
86

    
87
  def __init__(self, poller, fd):
88
    """Constructor for _SingleNotifyPipeConditionWaiter
89

90
    @type poller: select.poll
91
    @param poller: Poller object
92
    @type fd: int
93
    @param fd: File descriptor to wait for
94

95
    """
96
    object.__init__(self)
97
    self._poller = poller
98
    self._fd = fd
99

    
100
  def __call__(self, timeout):
101
    """Wait for something to happen on the pipe.
102

103
    @type timeout: float or None
104
    @param timeout: Timeout for waiting (can be None)
105

106
    """
107
    running_timeout = utils.RunningTimeout(timeout, True)
108

    
109
    while True:
110
      remaining_time = running_timeout.Remaining()
111

    
112
      if remaining_time is not None:
113
        if remaining_time < 0.0:
114
          break
115

    
116
        # Our calculation uses seconds, poll() wants milliseconds
117
        remaining_time *= 1000
118

    
119
      try:
120
        result = self._poller.poll(remaining_time)
121
      except EnvironmentError, err:
122
        if err.errno != errno.EINTR:
123
          raise
124
        result = None
125

    
126
      # Check whether we were notified
127
      if result and result[0][0] == self._fd:
128
        break
129

    
130

    
131
class _BaseCondition(object):
132
  """Base class containing common code for conditions.
133

134
  Some of this code is taken from python's threading module.
135

136
  """
137
  __slots__ = [
138
    "_lock",
139
    "acquire",
140
    "release",
141
    "_is_owned",
142
    "_acquire_restore",
143
    "_release_save",
144
    ]
145

    
146
  def __init__(self, lock):
147
    """Constructor for _BaseCondition.
148

149
    @type lock: threading.Lock
150
    @param lock: condition base lock
151

152
    """
153
    object.__init__(self)
154

    
155
    try:
156
      self._release_save = lock._release_save
157
    except AttributeError:
158
      self._release_save = self._base_release_save
159
    try:
160
      self._acquire_restore = lock._acquire_restore
161
    except AttributeError:
162
      self._acquire_restore = self._base_acquire_restore
163
    try:
164
      self._is_owned = lock._is_owned
165
    except AttributeError:
166
      self._is_owned = self._base_is_owned
167

    
168
    self._lock = lock
169

    
170
    # Export the lock's acquire() and release() methods
171
    self.acquire = lock.acquire
172
    self.release = lock.release
173

    
174
  def _base_is_owned(self):
175
    """Check whether lock is owned by current thread.
176

177
    """
178
    if self._lock.acquire(0):
179
      self._lock.release()
180
      return False
181
    return True
182

    
183
  def _base_release_save(self):
184
    self._lock.release()
185

    
186
  def _base_acquire_restore(self, _):
187
    self._lock.acquire()
188

    
189
  def _check_owned(self):
190
    """Raise an exception if the current thread doesn't own the lock.
191

192
    """
193
    if not self._is_owned():
194
      raise RuntimeError("cannot work with un-aquired lock")
195

    
196

    
197
class SingleNotifyPipeCondition(_BaseCondition):
198
  """Condition which can only be notified once.
199

200
  This condition class uses pipes and poll, internally, to be able to wait for
201
  notification with a timeout, without resorting to polling. It is almost
202
  compatible with Python's threading.Condition, with the following differences:
203
    - notifyAll can only be called once, and no wait can happen after that
204
    - notify is not supported, only notifyAll
205

206
  """
207

    
208
  __slots__ = [
209
    "_poller",
210
    "_read_fd",
211
    "_write_fd",
212
    "_nwaiters",
213
    "_notified",
214
    ]
215

    
216
  _waiter_class = _SingleNotifyPipeConditionWaiter
217

    
218
  def __init__(self, lock):
219
    """Constructor for SingleNotifyPipeCondition
220

221
    """
222
    _BaseCondition.__init__(self, lock)
223
    self._nwaiters = 0
224
    self._notified = False
225
    self._read_fd = None
226
    self._write_fd = None
227
    self._poller = None
228

    
229
  def _check_unnotified(self):
230
    """Throws an exception if already notified.
231

232
    """
233
    if self._notified:
234
      raise RuntimeError("cannot use already notified condition")
235

    
236
  def _Cleanup(self):
237
    """Cleanup open file descriptors, if any.
238

239
    """
240
    if self._read_fd is not None:
241
      os.close(self._read_fd)
242
      self._read_fd = None
243

    
244
    if self._write_fd is not None:
245
      os.close(self._write_fd)
246
      self._write_fd = None
247
    self._poller = None
248

    
249
  def wait(self, timeout=None):
250
    """Wait for a notification.
251

252
    @type timeout: float or None
253
    @param timeout: Waiting timeout (can be None)
254

255
    """
256
    self._check_owned()
257
    self._check_unnotified()
258

    
259
    self._nwaiters += 1
260
    try:
261
      if self._poller is None:
262
        (self._read_fd, self._write_fd) = os.pipe()
263
        self._poller = select.poll()
264
        self._poller.register(self._read_fd, select.POLLHUP)
265

    
266
      wait_fn = self._waiter_class(self._poller, self._read_fd)
267
      state = self._release_save()
268
      try:
269
        # Wait for notification
270
        wait_fn(timeout)
271
      finally:
272
        # Re-acquire lock
273
        self._acquire_restore(state)
274
    finally:
275
      self._nwaiters -= 1
276
      if self._nwaiters == 0:
277
        self._Cleanup()
278

    
279
  def notifyAll(self): # pylint: disable-msg=C0103
280
    """Close the writing side of the pipe to notify all waiters.
281

282
    """
283
    self._check_owned()
284
    self._check_unnotified()
285
    self._notified = True
286
    if self._write_fd is not None:
287
      os.close(self._write_fd)
288
      self._write_fd = None
289

    
290

    
291
class PipeCondition(_BaseCondition):
292
  """Group-only non-polling condition with counters.
293

294
  This condition class uses pipes and poll, internally, to be able to wait for
295
  notification with a timeout, without resorting to polling. It is almost
296
  compatible with Python's threading.Condition, but only supports notifyAll and
297
  non-recursive locks. As an additional features it's able to report whether
298
  there are any waiting threads.
299

300
  """
301
  __slots__ = [
302
    "_waiters",
303
    "_single_condition",
304
    ]
305

    
306
  _single_condition_class = SingleNotifyPipeCondition
307

    
308
  def __init__(self, lock):
309
    """Initializes this class.
310

311
    """
312
    _BaseCondition.__init__(self, lock)
313
    self._waiters = set()
314
    self._single_condition = self._single_condition_class(self._lock)
315

    
316
  def wait(self, timeout=None):
317
    """Wait for a notification.
318

319
    @type timeout: float or None
320
    @param timeout: Waiting timeout (can be None)
321

322
    """
323
    self._check_owned()
324

    
325
    # Keep local reference to the pipe. It could be replaced by another thread
326
    # notifying while we're waiting.
327
    cond = self._single_condition
328

    
329
    self._waiters.add(threading.currentThread())
330
    try:
331
      cond.wait(timeout)
332
    finally:
333
      self._check_owned()
334
      self._waiters.remove(threading.currentThread())
335

    
336
  def notifyAll(self): # pylint: disable-msg=C0103
337
    """Notify all currently waiting threads.
338

339
    """
340
    self._check_owned()
341
    self._single_condition.notifyAll()
342
    self._single_condition = self._single_condition_class(self._lock)
343

    
344
  def get_waiting(self):
345
    """Returns a list of all waiting threads.
346

347
    """
348
    self._check_owned()
349

    
350
    return self._waiters
351

    
352
  def has_waiting(self):
353
    """Returns whether there are active waiters.
354

355
    """
356
    self._check_owned()
357

    
358
    return bool(self._waiters)
359

    
360

    
361
class _PipeConditionWithMode(PipeCondition):
362
  __slots__ = [
363
    "shared",
364
    ]
365

    
366
  def __init__(self, lock, shared):
367
    """Initializes this class.
368

369
    """
370
    self.shared = shared
371
    PipeCondition.__init__(self, lock)
372

    
373

    
374
class SharedLock(object):
375
  """Implements a shared lock.
376

377
  Multiple threads can acquire the lock in a shared way by calling
378
  C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
379
  threads can call C{acquire(shared=0)}.
380

381
  Notes on data structures: C{__pending} contains a priority queue (heapq) of
382
  all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
383
  ...]}. Each per-priority queue contains a normal in-order list of conditions
384
  to be notified when the lock can be acquired. Shared locks are grouped
385
  together by priority and the condition for them is stored in
386
  C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
387
  references for the per-priority queues indexed by priority for faster access.
388

389
  @type name: string
390
  @ivar name: the name of the lock
391

392
  """
393
  __slots__ = [
394
    "__weakref__",
395
    "__deleted",
396
    "__exc",
397
    "__lock",
398
    "__pending",
399
    "__pending_by_prio",
400
    "__pending_shared",
401
    "__shr",
402
    "name",
403
    ]
404

    
405
  __condition_class = _PipeConditionWithMode
406

    
407
  def __init__(self, name, monitor=None):
408
    """Construct a new SharedLock.
409

410
    @param name: the name of the lock
411
    @type monitor: L{LockMonitor}
412
    @param monitor: Lock monitor with which to register
413

414
    """
415
    object.__init__(self)
416

    
417
    self.name = name
418

    
419
    # Internal lock
420
    self.__lock = threading.Lock()
421

    
422
    # Queue containing waiting acquires
423
    self.__pending = []
424
    self.__pending_by_prio = {}
425
    self.__pending_shared = {}
426

    
427
    # Current lock holders
428
    self.__shr = set()
429
    self.__exc = None
430

    
431
    # is this lock in the deleted state?
432
    self.__deleted = False
433

    
434
    # Register with lock monitor
435
    if monitor:
436
      monitor.RegisterLock(self)
437

    
438
  def GetInfo(self, requested):
439
    """Retrieves information for querying locks.
440

441
    @type requested: set
442
    @param requested: Requested information, see C{query.LQ_*}
443

444
    """
445
    self.__lock.acquire()
446
    try:
447
      # Note: to avoid unintentional race conditions, no references to
448
      # modifiable objects should be returned unless they were created in this
449
      # function.
450
      mode = None
451
      owner_names = None
452

    
453
      if query.LQ_MODE in requested:
454
        if self.__deleted:
455
          mode = _DELETED_TEXT
456
          assert not (self.__exc or self.__shr)
457
        elif self.__exc:
458
          mode = _EXCLUSIVE_TEXT
459
        elif self.__shr:
460
          mode = _SHARED_TEXT
461

    
462
      # Current owner(s) are wanted
463
      if query.LQ_OWNER in requested:
464
        if self.__exc:
465
          owner = [self.__exc]
466
        else:
467
          owner = self.__shr
468

    
469
        if owner:
470
          assert not self.__deleted
471
          owner_names = [i.getName() for i in owner]
472

    
473
      # Pending acquires are wanted
474
      if query.LQ_PENDING in requested:
475
        pending = []
476

    
477
        # Sorting instead of copying and using heaq functions for simplicity
478
        for (_, prioqueue) in sorted(self.__pending):
479
          for cond in prioqueue:
480
            if cond.shared:
481
              pendmode = _SHARED_TEXT
482
            else:
483
              pendmode = _EXCLUSIVE_TEXT
484

    
485
            # List of names will be sorted in L{query._GetLockPending}
486
            pending.append((pendmode, [i.getName()
487
                                       for i in cond.get_waiting()]))
488
      else:
489
        pending = None
490

    
491
      return (self.name, mode, owner_names, pending)
492
    finally:
493
      self.__lock.release()
494

    
495
  def __check_deleted(self):
496
    """Raises an exception if the lock has been deleted.
497

498
    """
499
    if self.__deleted:
500
      raise errors.LockError("Deleted lock %s" % self.name)
501

    
502
  def __is_sharer(self):
503
    """Is the current thread sharing the lock at this time?
504

505
    """
506
    return threading.currentThread() in self.__shr
507

    
508
  def __is_exclusive(self):
509
    """Is the current thread holding the lock exclusively at this time?
510

511
    """
512
    return threading.currentThread() == self.__exc
513

    
514
  def __is_owned(self, shared=-1):
515
    """Is the current thread somehow owning the lock at this time?
516

517
    This is a private version of the function, which presumes you're holding
518
    the internal lock.
519

520
    """
521
    if shared < 0:
522
      return self.__is_sharer() or self.__is_exclusive()
523
    elif shared:
524
      return self.__is_sharer()
525
    else:
526
      return self.__is_exclusive()
527

    
528
  def _is_owned(self, shared=-1):
529
    """Is the current thread somehow owning the lock at this time?
530

531
    @param shared:
532
        - < 0: check for any type of ownership (default)
533
        - 0: check for exclusive ownership
534
        - > 0: check for shared ownership
535

536
    """
537
    self.__lock.acquire()
538
    try:
539
      return self.__is_owned(shared=shared)
540
    finally:
541
      self.__lock.release()
542

    
543
  def _count_pending(self):
544
    """Returns the number of pending acquires.
545

546
    @rtype: int
547

548
    """
549
    self.__lock.acquire()
550
    try:
551
      return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
552
    finally:
553
      self.__lock.release()
554

    
555
  def _check_empty(self):
556
    """Checks whether there are any pending acquires.
557

558
    @rtype: bool
559

560
    """
561
    self.__lock.acquire()
562
    try:
563
      # Order is important: __find_first_pending_queue modifies __pending
564
      return not (self.__find_first_pending_queue() or
565
                  self.__pending or
566
                  self.__pending_by_prio or
567
                  self.__pending_shared)
568
    finally:
569
      self.__lock.release()
570

    
571
  def __do_acquire(self, shared):
572
    """Actually acquire the lock.
573

574
    """
575
    if shared:
576
      self.__shr.add(threading.currentThread())
577
    else:
578
      self.__exc = threading.currentThread()
579

    
580
  def __can_acquire(self, shared):
581
    """Determine whether lock can be acquired.
582

583
    """
584
    if shared:
585
      return self.__exc is None
586
    else:
587
      return len(self.__shr) == 0 and self.__exc is None
588

    
589
  def __find_first_pending_queue(self):
590
    """Tries to find the topmost queued entry with pending acquires.
591

592
    Removes empty entries while going through the list.
593

594
    """
595
    while self.__pending:
596
      (priority, prioqueue) = self.__pending[0]
597

    
598
      if not prioqueue:
599
        heapq.heappop(self.__pending)
600
        del self.__pending_by_prio[priority]
601
        assert priority not in self.__pending_shared
602
        continue
603

    
604
      if prioqueue:
605
        return prioqueue
606

    
607
    return None
608

    
609
  def __is_on_top(self, cond):
610
    """Checks whether the passed condition is on top of the queue.
611

612
    The caller must make sure the queue isn't empty.
613

614
    """
615
    return cond == self.__find_first_pending_queue()[0]
616

    
617
  def __acquire_unlocked(self, shared, timeout, priority):
618
    """Acquire a shared lock.
619

620
    @param shared: whether to acquire in shared mode; by default an
621
        exclusive lock will be acquired
622
    @param timeout: maximum waiting time before giving up
623
    @type priority: integer
624
    @param priority: Priority for acquiring lock
625

626
    """
627
    self.__check_deleted()
628

    
629
    # We cannot acquire the lock if we already have it
630
    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
631
                                   " %s" % self.name)
632

    
633
    # Remove empty entries from queue
634
    self.__find_first_pending_queue()
635

    
636
    # Check whether someone else holds the lock or there are pending acquires.
637
    if not self.__pending and self.__can_acquire(shared):
638
      # Apparently not, can acquire lock directly.
639
      self.__do_acquire(shared)
640
      return True
641

    
642
    prioqueue = self.__pending_by_prio.get(priority, None)
643

    
644
    if shared:
645
      # Try to re-use condition for shared acquire
646
      wait_condition = self.__pending_shared.get(priority, None)
647
      assert (wait_condition is None or
648
              (wait_condition.shared and wait_condition in prioqueue))
649
    else:
650
      wait_condition = None
651

    
652
    if wait_condition is None:
653
      if prioqueue is None:
654
        assert priority not in self.__pending_by_prio
655

    
656
        prioqueue = []
657
        heapq.heappush(self.__pending, (priority, prioqueue))
658
        self.__pending_by_prio[priority] = prioqueue
659

    
660
      wait_condition = self.__condition_class(self.__lock, shared)
661
      prioqueue.append(wait_condition)
662

    
663
      if shared:
664
        # Keep reference for further shared acquires on same priority. This is
665
        # better than trying to find it in the list of pending acquires.
666
        assert priority not in self.__pending_shared
667
        self.__pending_shared[priority] = wait_condition
668

    
669
    try:
670
      # Wait until we become the topmost acquire in the queue or the timeout
671
      # expires.
672
      # TODO: Decrease timeout with spurious notifications
673
      while not (self.__is_on_top(wait_condition) and
674
                 self.__can_acquire(shared)):
675
        # Wait for notification
676
        wait_condition.wait(timeout)
677
        self.__check_deleted()
678

    
679
        # A lot of code assumes blocking acquires always succeed. Loop
680
        # internally for that case.
681
        if timeout is not None:
682
          break
683

    
684
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
685
        self.__do_acquire(shared)
686
        return True
687
    finally:
688
      # Remove condition from queue if there are no more waiters
689
      if not wait_condition.has_waiting():
690
        prioqueue.remove(wait_condition)
691
        if wait_condition.shared:
692
          del self.__pending_shared[priority]
693

    
694
    return False
695

    
696
  def acquire(self, shared=0, timeout=None, priority=None,
697
              test_notify=None):
698
    """Acquire a shared lock.
699

700
    @type shared: integer (0/1) used as a boolean
701
    @param shared: whether to acquire in shared mode; by default an
702
        exclusive lock will be acquired
703
    @type timeout: float
704
    @param timeout: maximum waiting time before giving up
705
    @type priority: integer
706
    @param priority: Priority for acquiring lock
707
    @type test_notify: callable or None
708
    @param test_notify: Special callback function for unittesting
709

710
    """
711
    if priority is None:
712
      priority = _DEFAULT_PRIORITY
713

    
714
    self.__lock.acquire()
715
    try:
716
      # We already got the lock, notify now
717
      if __debug__ and callable(test_notify):
718
        test_notify()
719

    
720
      return self.__acquire_unlocked(shared, timeout, priority)
721
    finally:
722
      self.__lock.release()
723

    
724
  def release(self):
725
    """Release a Shared Lock.
726

727
    You must have acquired the lock, either in shared or in exclusive mode,
728
    before calling this function.
729

730
    """
731
    self.__lock.acquire()
732
    try:
733
      assert self.__is_exclusive() or self.__is_sharer(), \
734
        "Cannot release non-owned lock"
735

    
736
      # Autodetect release type
737
      if self.__is_exclusive():
738
        self.__exc = None
739
      else:
740
        self.__shr.remove(threading.currentThread())
741

    
742
      # Notify topmost condition in queue
743
      prioqueue = self.__find_first_pending_queue()
744
      if prioqueue:
745
        prioqueue[0].notifyAll()
746

    
747
    finally:
748
      self.__lock.release()
749

    
750
  def delete(self, timeout=None, priority=None):
751
    """Delete a Shared Lock.
752

753
    This operation will declare the lock for removal. First the lock will be
754
    acquired in exclusive mode if you don't already own it, then the lock
755
    will be put in a state where any future and pending acquire() fail.
756

757
    @type timeout: float
758
    @param timeout: maximum waiting time before giving up
759
    @type priority: integer
760
    @param priority: Priority for acquiring lock
761

762
    """
763
    if priority is None:
764
      priority = _DEFAULT_PRIORITY
765

    
766
    self.__lock.acquire()
767
    try:
768
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
769

    
770
      self.__check_deleted()
771

    
772
      # The caller is allowed to hold the lock exclusively already.
773
      acquired = self.__is_exclusive()
774

    
775
      if not acquired:
776
        acquired = self.__acquire_unlocked(0, timeout, priority)
777

    
778
        assert self.__is_exclusive() and not self.__is_sharer(), \
779
          "Lock wasn't acquired in exclusive mode"
780

    
781
      if acquired:
782
        self.__deleted = True
783
        self.__exc = None
784

    
785
        assert not (self.__exc or self.__shr), "Found owner during deletion"
786

    
787
        # Notify all acquires. They'll throw an error.
788
        for (_, prioqueue) in self.__pending:
789
          for cond in prioqueue:
790
            cond.notifyAll()
791

    
792
        assert self.__deleted
793

    
794
      return acquired
795
    finally:
796
      self.__lock.release()
797

    
798
  def _release_save(self):
799
    shared = self.__is_sharer()
800
    self.release()
801
    return shared
802

    
803
  def _acquire_restore(self, shared):
804
    self.acquire(shared=shared)
805

    
806

    
807
# Whenever we want to acquire a full LockSet we pass None as the value
808
# to acquire.  Hide this behind this nicely named constant.
809
ALL_SET = None
810

    
811

    
812
class _AcquireTimeout(Exception):
813
  """Internal exception to abort an acquire on a timeout.
814

815
  """
816

    
817

    
818
class LockSet:
819
  """Implements a set of locks.
820

821
  This abstraction implements a set of shared locks for the same resource type,
822
  distinguished by name. The user can lock a subset of the resources and the
823
  LockSet will take care of acquiring the locks always in the same order, thus
824
  preventing deadlock.
825

826
  All the locks needed in the same set must be acquired together, though.
827

828
  @type name: string
829
  @ivar name: the name of the lockset
830

831
  """
832
  def __init__(self, members, name, monitor=None):
833
    """Constructs a new LockSet.
834

835
    @type members: list of strings
836
    @param members: initial members of the set
837
    @type monitor: L{LockMonitor}
838
    @param monitor: Lock monitor with which to register member locks
839

840
    """
841
    assert members is not None, "members parameter is not a list"
842
    self.name = name
843

    
844
    # Lock monitor
845
    self.__monitor = monitor
846

    
847
    # Used internally to guarantee coherency.
848
    self.__lock = SharedLock(name)
849

    
850
    # The lockdict indexes the relationship name -> lock
851
    # The order-of-locking is implied by the alphabetical order of names
852
    self.__lockdict = {}
853

    
854
    for mname in members:
855
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
856
                                          monitor=monitor)
857

    
858
    # The owner dict contains the set of locks each thread owns. For
859
    # performance each thread can access its own key without a global lock on
860
    # this structure. It is paramount though that *no* other type of access is
861
    # done to this structure (eg. no looping over its keys). *_owner helper
862
    # function are defined to guarantee access is correct, but in general never
863
    # do anything different than __owners[threading.currentThread()], or there
864
    # will be trouble.
865
    self.__owners = {}
866

    
867
  def _GetLockName(self, mname):
868
    """Returns the name for a member lock.
869

870
    """
871
    return "%s/%s" % (self.name, mname)
872

    
873
  def _is_owned(self):
874
    """Is the current thread a current level owner?"""
875
    return threading.currentThread() in self.__owners
876

    
877
  def _add_owned(self, name=None):
878
    """Note the current thread owns the given lock"""
879
    if name is None:
880
      if not self._is_owned():
881
        self.__owners[threading.currentThread()] = set()
882
    else:
883
      if self._is_owned():
884
        self.__owners[threading.currentThread()].add(name)
885
      else:
886
        self.__owners[threading.currentThread()] = set([name])
887

    
888
  def _del_owned(self, name=None):
889
    """Note the current thread owns the given lock"""
890

    
891
    assert not (name is None and self.__lock._is_owned()), \
892
           "Cannot hold internal lock when deleting owner status"
893

    
894
    if name is not None:
895
      self.__owners[threading.currentThread()].remove(name)
896

    
897
    # Only remove the key if we don't hold the set-lock as well
898
    if (not self.__lock._is_owned() and
899
        not self.__owners[threading.currentThread()]):
900
      del self.__owners[threading.currentThread()]
901

    
902
  def _list_owned(self):
903
    """Get the set of resource names owned by the current thread"""
904
    if self._is_owned():
905
      return self.__owners[threading.currentThread()].copy()
906
    else:
907
      return set()
908

    
909
  def _release_and_delete_owned(self):
910
    """Release and delete all resources owned by the current thread"""
911
    for lname in self._list_owned():
912
      lock = self.__lockdict[lname]
913
      if lock._is_owned():
914
        lock.release()
915
      self._del_owned(name=lname)
916

    
917
  def __names(self):
918
    """Return the current set of names.
919

920
    Only call this function while holding __lock and don't iterate on the
921
    result after releasing the lock.
922

923
    """
924
    return self.__lockdict.keys()
925

    
926
  def _names(self):
927
    """Return a copy of the current set of elements.
928

929
    Used only for debugging purposes.
930

931
    """
932
    # If we don't already own the set-level lock acquired
933
    # we'll get it and note we need to release it later.
934
    release_lock = False
935
    if not self.__lock._is_owned():
936
      release_lock = True
937
      self.__lock.acquire(shared=1)
938
    try:
939
      result = self.__names()
940
    finally:
941
      if release_lock:
942
        self.__lock.release()
943
    return set(result)
944

    
945
  def acquire(self, names, timeout=None, shared=0, priority=None,
946
              test_notify=None):
947
    """Acquire a set of resource locks.
948

949
    @type names: list of strings (or string)
950
    @param names: the names of the locks which shall be acquired
951
        (special lock names, or instance/node names)
952
    @type shared: integer (0/1) used as a boolean
953
    @param shared: whether to acquire in shared mode; by default an
954
        exclusive lock will be acquired
955
    @type timeout: float or None
956
    @param timeout: Maximum time to acquire all locks
957
    @type priority: integer
958
    @param priority: Priority for acquiring locks
959
    @type test_notify: callable or None
960
    @param test_notify: Special callback function for unittesting
961

962
    @return: Set of all locks successfully acquired or None in case of timeout
963

964
    @raise errors.LockError: when any lock we try to acquire has
965
        been deleted before we succeed. In this case none of the
966
        locks requested will be acquired.
967

968
    """
969
    assert timeout is None or timeout >= 0.0
970

    
971
    # Check we don't already own locks at this level
972
    assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
973
                                  " (lockset %s)" % self.name)
974

    
975
    if priority is None:
976
      priority = _DEFAULT_PRIORITY
977

    
978
    # We need to keep track of how long we spent waiting for a lock. The
979
    # timeout passed to this function is over all lock acquires.
980
    running_timeout = utils.RunningTimeout(timeout, False)
981

    
982
    try:
983
      if names is not None:
984
        # Support passing in a single resource to acquire rather than many
985
        if isinstance(names, basestring):
986
          names = [names]
987

    
988
        return self.__acquire_inner(names, False, shared, priority,
989
                                    running_timeout.Remaining, test_notify)
990

    
991
      else:
992
        # If no names are given acquire the whole set by not letting new names
993
        # being added before we release, and getting the current list of names.
994
        # Some of them may then be deleted later, but we'll cope with this.
995
        #
996
        # We'd like to acquire this lock in a shared way, as it's nice if
997
        # everybody else can use the instances at the same time. If we are
998
        # acquiring them exclusively though they won't be able to do this
999
        # anyway, though, so we'll get the list lock exclusively as well in
1000
        # order to be able to do add() on the set while owning it.
1001
        if not self.__lock.acquire(shared=shared, priority=priority,
1002
                                   timeout=running_timeout.Remaining()):
1003
          raise _AcquireTimeout()
1004
        try:
1005
          # note we own the set-lock
1006
          self._add_owned()
1007

    
1008
          return self.__acquire_inner(self.__names(), True, shared, priority,
1009
                                      running_timeout.Remaining, test_notify)
1010
        except:
1011
          # We shouldn't have problems adding the lock to the owners list, but
1012
          # if we did we'll try to release this lock and re-raise exception.
1013
          # Of course something is going to be really wrong, after this.
1014
          self.__lock.release()
1015
          self._del_owned()
1016
          raise
1017

    
1018
    except _AcquireTimeout:
1019
      return None
1020

    
1021
  def __acquire_inner(self, names, want_all, shared, priority,
1022
                      timeout_fn, test_notify):
1023
    """Inner logic for acquiring a number of locks.
1024

1025
    @param names: Names of the locks to be acquired
1026
    @param want_all: Whether all locks in the set should be acquired
1027
    @param shared: Whether to acquire in shared mode
1028
    @param timeout_fn: Function returning remaining timeout
1029
    @param priority: Priority for acquiring locks
1030
    @param test_notify: Special callback function for unittesting
1031

1032
    """
1033
    acquire_list = []
1034

    
1035
    # First we look the locks up on __lockdict. We have no way of being sure
1036
    # they will still be there after, but this makes it a lot faster should
1037
    # just one of them be the already wrong. Using a sorted sequence to prevent
1038
    # deadlocks.
1039
    for lname in sorted(utils.UniqueSequence(names)):
1040
      try:
1041
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
1042
      except KeyError:
1043
        if want_all:
1044
          # We are acquiring all the set, it doesn't matter if this particular
1045
          # element is not there anymore.
1046
          continue
1047

    
1048
        raise errors.LockError("Non-existing lock %s in set %s (it may have"
1049
                               " been removed)" % (lname, self.name))
1050

    
1051
      acquire_list.append((lname, lock))
1052

    
1053
    # This will hold the locknames we effectively acquired.
1054
    acquired = set()
1055

    
1056
    try:
1057
      # Now acquire_list contains a sorted list of resources and locks we
1058
      # want.  In order to get them we loop on this (private) list and
1059
      # acquire() them.  We gave no real guarantee they will still exist till
1060
      # this is done but .acquire() itself is safe and will alert us if the
1061
      # lock gets deleted.
1062
      for (lname, lock) in acquire_list:
1063
        if __debug__ and callable(test_notify):
1064
          test_notify_fn = lambda: test_notify(lname)
1065
        else:
1066
          test_notify_fn = None
1067

    
1068
        timeout = timeout_fn()
1069

    
1070
        try:
1071
          # raises LockError if the lock was deleted
1072
          acq_success = lock.acquire(shared=shared, timeout=timeout,
1073
                                     priority=priority,
1074
                                     test_notify=test_notify_fn)
1075
        except errors.LockError:
1076
          if want_all:
1077
            # We are acquiring all the set, it doesn't matter if this
1078
            # particular element is not there anymore.
1079
            continue
1080

    
1081
          raise errors.LockError("Non-existing lock %s in set %s (it may"
1082
                                 " have been removed)" % (lname, self.name))
1083

    
1084
        if not acq_success:
1085
          # Couldn't get lock or timeout occurred
1086
          if timeout is None:
1087
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
1088
            # blocking.
1089
            raise errors.LockError("Failed to get lock %s (set %s)" %
1090
                                   (lname, self.name))
1091

    
1092
          raise _AcquireTimeout()
1093

    
1094
        try:
1095
          # now the lock cannot be deleted, we have it!
1096
          self._add_owned(name=lname)
1097
          acquired.add(lname)
1098

    
1099
        except:
1100
          # We shouldn't have problems adding the lock to the owners list, but
1101
          # if we did we'll try to release this lock and re-raise exception.
1102
          # Of course something is going to be really wrong after this.
1103
          if lock._is_owned():
1104
            lock.release()
1105
          raise
1106

    
1107
    except:
1108
      # Release all owned locks
1109
      self._release_and_delete_owned()
1110
      raise
1111

    
1112
    return acquired
1113

    
1114
  def release(self, names=None):
1115
    """Release a set of resource locks, at the same level.
1116

1117
    You must have acquired the locks, either in shared or in exclusive mode,
1118
    before releasing them.
1119

1120
    @type names: list of strings, or None
1121
    @param names: the names of the locks which shall be released
1122
        (defaults to all the locks acquired at that level).
1123

1124
    """
1125
    assert self._is_owned(), ("release() on lock set %s while not owner" %
1126
                              self.name)
1127

    
1128
    # Support passing in a single resource to release rather than many
1129
    if isinstance(names, basestring):
1130
      names = [names]
1131

    
1132
    if names is None:
1133
      names = self._list_owned()
1134
    else:
1135
      names = set(names)
1136
      assert self._list_owned().issuperset(names), (
1137
               "release() on unheld resources %s (set %s)" %
1138
               (names.difference(self._list_owned()), self.name))
1139

    
1140
    # First of all let's release the "all elements" lock, if set.
1141
    # After this 'add' can work again
1142
    if self.__lock._is_owned():
1143
      self.__lock.release()
1144
      self._del_owned()
1145

    
1146
    for lockname in names:
1147
      # If we are sure the lock doesn't leave __lockdict without being
1148
      # exclusively held we can do this...
1149
      self.__lockdict[lockname].release()
1150
      self._del_owned(name=lockname)
1151

    
1152
  def add(self, names, acquired=0, shared=0):
1153
    """Add a new set of elements to the set
1154

1155
    @type names: list of strings
1156
    @param names: names of the new elements to add
1157
    @type acquired: integer (0/1) used as a boolean
1158
    @param acquired: pre-acquire the new resource?
1159
    @type shared: integer (0/1) used as a boolean
1160
    @param shared: is the pre-acquisition shared?
1161

1162
    """
1163
    # Check we don't already own locks at this level
1164
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1165
      ("Cannot add locks if the set %s is only partially owned, or shared" %
1166
       self.name)
1167

    
1168
    # Support passing in a single resource to add rather than many
1169
    if isinstance(names, basestring):
1170
      names = [names]
1171

    
1172
    # If we don't already own the set-level lock acquired in an exclusive way
1173
    # we'll get it and note we need to release it later.
1174
    release_lock = False
1175
    if not self.__lock._is_owned():
1176
      release_lock = True
1177
      self.__lock.acquire()
1178

    
1179
    try:
1180
      invalid_names = set(self.__names()).intersection(names)
1181
      if invalid_names:
1182
        # This must be an explicit raise, not an assert, because assert is
1183
        # turned off when using optimization, and this can happen because of
1184
        # concurrency even if the user doesn't want it.
1185
        raise errors.LockError("duplicate add(%s) on lockset %s" %
1186
                               (invalid_names, self.name))
1187

    
1188
      for lockname in names:
1189
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1190

    
1191
        if acquired:
1192
          # No need for priority or timeout here as this lock has just been
1193
          # created
1194
          lock.acquire(shared=shared)
1195
          # now the lock cannot be deleted, we have it!
1196
          try:
1197
            self._add_owned(name=lockname)
1198
          except:
1199
            # We shouldn't have problems adding the lock to the owners list,
1200
            # but if we did we'll try to release this lock and re-raise
1201
            # exception.  Of course something is going to be really wrong,
1202
            # after this.  On the other hand the lock hasn't been added to the
1203
            # __lockdict yet so no other threads should be pending on it. This
1204
            # release is just a safety measure.
1205
            lock.release()
1206
            raise
1207

    
1208
        self.__lockdict[lockname] = lock
1209

    
1210
    finally:
1211
      # Only release __lock if we were not holding it previously.
1212
      if release_lock:
1213
        self.__lock.release()
1214

    
1215
    return True
1216

    
1217
  def remove(self, names):
1218
    """Remove elements from the lock set.
1219

1220
    You can either not hold anything in the lockset or already hold a superset
1221
    of the elements you want to delete, exclusively.
1222

1223
    @type names: list of strings
1224
    @param names: names of the resource to remove.
1225

1226
    @return: a list of locks which we removed; the list is always
1227
        equal to the names list if we were holding all the locks
1228
        exclusively
1229

1230
    """
1231
    # Support passing in a single resource to remove rather than many
1232
    if isinstance(names, basestring):
1233
      names = [names]
1234

    
1235
    # If we own any subset of this lock it must be a superset of what we want
1236
    # to delete. The ownership must also be exclusive, but that will be checked
1237
    # by the lock itself.
1238
    assert not self._is_owned() or self._list_owned().issuperset(names), (
1239
      "remove() on acquired lockset %s while not owning all elements" %
1240
      self.name)
1241

    
1242
    removed = []
1243

    
1244
    for lname in names:
1245
      # Calling delete() acquires the lock exclusively if we don't already own
1246
      # it, and causes all pending and subsequent lock acquires to fail. It's
1247
      # fine to call it out of order because delete() also implies release(),
1248
      # and the assertion above guarantees that if we either already hold
1249
      # everything we want to delete, or we hold none.
1250
      try:
1251
        self.__lockdict[lname].delete()
1252
        removed.append(lname)
1253
      except (KeyError, errors.LockError):
1254
        # This cannot happen if we were already holding it, verify:
1255
        assert not self._is_owned(), ("remove failed while holding lockset %s"
1256
                                      % self.name)
1257
      else:
1258
        # If no LockError was raised we are the ones who deleted the lock.
1259
        # This means we can safely remove it from lockdict, as any further or
1260
        # pending delete() or acquire() will fail (and nobody can have the lock
1261
        # since before our call to delete()).
1262
        #
1263
        # This is done in an else clause because if the exception was thrown
1264
        # it's the job of the one who actually deleted it.
1265
        del self.__lockdict[lname]
1266
        # And let's remove it from our private list if we owned it.
1267
        if self._is_owned():
1268
          self._del_owned(name=lname)
1269

    
1270
    return removed
1271

    
1272

    
1273
# Locking levels, must be acquired in increasing order.
1274
# Current rules are:
1275
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1276
#   acquired before performing any operation, either in shared or in exclusive
1277
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1278
#   avoided.
1279
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1280
#   If you need more than one node, or more than one instance, acquire them at
1281
#   the same time.
1282
LEVEL_CLUSTER = 0
1283
LEVEL_INSTANCE = 1
1284
LEVEL_NODEGROUP = 2
1285
LEVEL_NODE = 3
1286

    
1287
LEVELS = [LEVEL_CLUSTER,
1288
          LEVEL_INSTANCE,
1289
          LEVEL_NODEGROUP,
1290
          LEVEL_NODE]
1291

    
1292
# Lock levels which are modifiable
1293
LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE]
1294

    
1295
LEVEL_NAMES = {
1296
  LEVEL_CLUSTER: "cluster",
1297
  LEVEL_INSTANCE: "instance",
1298
  LEVEL_NODEGROUP: "nodegroup",
1299
  LEVEL_NODE: "node",
1300
  }
1301

    
1302
# Constant for the big ganeti lock
1303
BGL = 'BGL'
1304

    
1305

    
1306
class GanetiLockManager:
1307
  """The Ganeti Locking Library
1308

1309
  The purpose of this small library is to manage locking for ganeti clusters
1310
  in a central place, while at the same time doing dynamic checks against
1311
  possible deadlocks. It will also make it easier to transition to a different
1312
  lock type should we migrate away from python threads.
1313

1314
  """
1315
  _instance = None
1316

    
1317
  def __init__(self, nodes, nodegroups, instances):
1318
    """Constructs a new GanetiLockManager object.
1319

1320
    There should be only a GanetiLockManager object at any time, so this
1321
    function raises an error if this is not the case.
1322

1323
    @param nodes: list of node names
1324
    @param nodegroups: list of nodegroup uuids
1325
    @param instances: list of instance names
1326

1327
    """
1328
    assert self.__class__._instance is None, \
1329
           "double GanetiLockManager instance"
1330

    
1331
    self.__class__._instance = self
1332

    
1333
    self._monitor = LockMonitor()
1334

    
1335
    # The keyring contains all the locks, at their level and in the correct
1336
    # locking order.
1337
    self.__keyring = {
1338
      LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1339
      LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1340
      LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor),
1341
      LEVEL_INSTANCE: LockSet(instances, "instances",
1342
                              monitor=self._monitor),
1343
      }
1344

    
1345
  def QueryLocks(self, fields):
1346
    """Queries information from all locks.
1347

1348
    See L{LockMonitor.QueryLocks}.
1349

1350
    """
1351
    return self._monitor.QueryLocks(fields)
1352

    
1353
  def OldStyleQueryLocks(self, fields):
1354
    """Queries information from all locks, returning old-style data.
1355

1356
    See L{LockMonitor.OldStyleQueryLocks}.
1357

1358
    """
1359
    return self._monitor.OldStyleQueryLocks(fields)
1360

    
1361
  def _names(self, level):
1362
    """List the lock names at the given level.
1363

1364
    This can be used for debugging/testing purposes.
1365

1366
    @param level: the level whose list of locks to get
1367

1368
    """
1369
    assert level in LEVELS, "Invalid locking level %s" % level
1370
    return self.__keyring[level]._names()
1371

    
1372
  def _is_owned(self, level):
1373
    """Check whether we are owning locks at the given level
1374

1375
    """
1376
    return self.__keyring[level]._is_owned()
1377

    
1378
  is_owned = _is_owned
1379

    
1380
  def _list_owned(self, level):
1381
    """Get the set of owned locks at the given level
1382

1383
    """
1384
    return self.__keyring[level]._list_owned()
1385

    
1386
  def _upper_owned(self, level):
1387
    """Check that we don't own any lock at a level greater than the given one.
1388

1389
    """
1390
    # This way of checking only works if LEVELS[i] = i, which we check for in
1391
    # the test cases.
1392
    return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1393

    
1394
  def _BGL_owned(self): # pylint: disable-msg=C0103
1395
    """Check if the current thread owns the BGL.
1396

1397
    Both an exclusive or a shared acquisition work.
1398

1399
    """
1400
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1401

    
1402
  @staticmethod
1403
  def _contains_BGL(level, names): # pylint: disable-msg=C0103
1404
    """Check if the level contains the BGL.
1405

1406
    Check if acting on the given level and set of names will change
1407
    the status of the Big Ganeti Lock.
1408

1409
    """
1410
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1411

    
1412
  def acquire(self, level, names, timeout=None, shared=0, priority=None):
1413
    """Acquire a set of resource locks, at the same level.
1414

1415
    @type level: member of locking.LEVELS
1416
    @param level: the level at which the locks shall be acquired
1417
    @type names: list of strings (or string)
1418
    @param names: the names of the locks which shall be acquired
1419
        (special lock names, or instance/node names)
1420
    @type shared: integer (0/1) used as a boolean
1421
    @param shared: whether to acquire in shared mode; by default
1422
        an exclusive lock will be acquired
1423
    @type timeout: float
1424
    @param timeout: Maximum time to acquire all locks
1425
    @type priority: integer
1426
    @param priority: Priority for acquiring lock
1427

1428
    """
1429
    assert level in LEVELS, "Invalid locking level %s" % level
1430

    
1431
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1432
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1433
    # so even if we've migrated we need to at least share the BGL to be
1434
    # compatible with them. Of course if we own the BGL exclusively there's no
1435
    # point in acquiring any other lock, unless perhaps we are half way through
1436
    # the migration of the current opcode.
1437
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1438
            "You must own the Big Ganeti Lock before acquiring any other")
1439

    
1440
    # Check we don't own locks at the same or upper levels.
1441
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1442
           " while owning some at a greater one")
1443

    
1444
    # Acquire the locks in the set.
1445
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1446
                                         priority=priority)
1447

    
1448
  def release(self, level, names=None):
1449
    """Release a set of resource locks, at the same level.
1450

1451
    You must have acquired the locks, either in shared or in exclusive
1452
    mode, before releasing them.
1453

1454
    @type level: member of locking.LEVELS
1455
    @param level: the level at which the locks shall be released
1456
    @type names: list of strings, or None
1457
    @param names: the names of the locks which shall be released
1458
        (defaults to all the locks acquired at that level)
1459

1460
    """
1461
    assert level in LEVELS, "Invalid locking level %s" % level
1462
    assert (not self._contains_BGL(level, names) or
1463
            not self._upper_owned(LEVEL_CLUSTER)), (
1464
            "Cannot release the Big Ganeti Lock while holding something"
1465
            " at upper levels (%r)" %
1466
            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1467
                              for i in self.__keyring.keys()]), ))
1468

    
1469
    # Release will complain if we don't own the locks already
1470
    return self.__keyring[level].release(names)
1471

    
1472
  def add(self, level, names, acquired=0, shared=0):
1473
    """Add locks at the specified level.
1474

1475
    @type level: member of locking.LEVELS_MOD
1476
    @param level: the level at which the locks shall be added
1477
    @type names: list of strings
1478
    @param names: names of the locks to acquire
1479
    @type acquired: integer (0/1) used as a boolean
1480
    @param acquired: whether to acquire the newly added locks
1481
    @type shared: integer (0/1) used as a boolean
1482
    @param shared: whether the acquisition will be shared
1483

1484
    """
1485
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1486
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1487
           " operations")
1488
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1489
           " while owning some at a greater one")
1490
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1491

    
1492
  def remove(self, level, names):
1493
    """Remove locks from the specified level.
1494

1495
    You must either already own the locks you are trying to remove
1496
    exclusively or not own any lock at an upper level.
1497

1498
    @type level: member of locking.LEVELS_MOD
1499
    @param level: the level at which the locks shall be removed
1500
    @type names: list of strings
1501
    @param names: the names of the locks which shall be removed
1502
        (special lock names, or instance/node names)
1503

1504
    """
1505
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1506
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1507
           " operations")
1508
    # Check we either own the level or don't own anything from here
1509
    # up. LockSet.remove() will check the case in which we don't own
1510
    # all the needed resources, or we have a shared ownership.
1511
    assert self._is_owned(level) or not self._upper_owned(level), (
1512
           "Cannot remove locks at a level while not owning it or"
1513
           " owning some at a greater one")
1514
    return self.__keyring[level].remove(names)
1515

    
1516

    
1517
class LockMonitor(object):
1518
  _LOCK_ATTR = "_lock"
1519

    
1520
  def __init__(self):
1521
    """Initializes this class.
1522

1523
    """
1524
    self._lock = SharedLock("LockMonitor")
1525

    
1526
    # Tracked locks. Weak references are used to avoid issues with circular
1527
    # references and deletion.
1528
    self._locks = weakref.WeakKeyDictionary()
1529

    
1530
  @ssynchronized(_LOCK_ATTR)
1531
  def RegisterLock(self, lock):
1532
    """Registers a new lock.
1533

1534
    """
1535
    logging.debug("Registering lock %s", lock.name)
1536
    assert lock not in self._locks, "Duplicate lock registration"
1537
    assert not compat.any(lock.name == i.name for i in self._locks.keys()), \
1538
           "Found duplicate lock name"
1539
    self._locks[lock] = None
1540

    
1541
  @ssynchronized(_LOCK_ATTR)
1542
  def _GetLockInfo(self, requested):
1543
    """Get information from all locks while the monitor lock is held.
1544

1545
    """
1546
    return [lock.GetInfo(requested) for lock in self._locks.keys()]
1547

    
1548
  def _Query(self, fields):
1549
    """Queries information from all locks.
1550

1551
    @type fields: list of strings
1552
    @param fields: List of fields to return
1553

1554
    """
1555
    qobj = query.Query(query.LOCK_FIELDS, fields)
1556

    
1557
    # Get all data and sort by name
1558
    lockinfo = utils.NiceSort(self._GetLockInfo(qobj.RequestedData()),
1559
                              key=operator.itemgetter(0))
1560

    
1561
    return (qobj, query.LockQueryData(lockinfo))
1562

    
1563
  def QueryLocks(self, fields):
1564
    """Queries information from all locks.
1565

1566
    @type fields: list of strings
1567
    @param fields: List of fields to return
1568

1569
    """
1570
    (qobj, ctx) = self._Query(fields)
1571

    
1572
    # Prepare query response
1573
    return query.GetQueryResponse(qobj, ctx)
1574

    
1575
  def OldStyleQueryLocks(self, fields):
1576
    """Queries information from all locks, returning old-style data.
1577

1578
    @type fields: list of strings
1579
    @param fields: List of fields to return
1580

1581
    """
1582
    (qobj, ctx) = self._Query(fields)
1583

    
1584
    return qobj.OldStyleQuery(ctx)