Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ fdfe88b1

History | View | Annotate | Download (52.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable-msg=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import errno
32
import weakref
33
import logging
34
import heapq
35
import operator
36
import itertools
37

    
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import compat
41
from ganeti import query
42

    
43

    
44
_EXCLUSIVE_TEXT = "exclusive"
45
_SHARED_TEXT = "shared"
46
_DELETED_TEXT = "deleted"
47

    
48
_DEFAULT_PRIORITY = 0
49

    
50

    
51
def ssynchronized(mylock, shared=0):
52
  """Shared Synchronization decorator.
53

54
  Calls the function holding the given lock, either in exclusive or shared
55
  mode. It requires the passed lock to be a SharedLock (or support its
56
  semantics).
57

58
  @type mylock: lockable object or string
59
  @param mylock: lock to acquire or class member name of the lock to acquire
60

61
  """
62
  def wrap(fn):
63
    def sync_function(*args, **kwargs):
64
      if isinstance(mylock, basestring):
65
        assert args, "cannot ssynchronize on non-class method: self not found"
66
        # args[0] is "self"
67
        lock = getattr(args[0], mylock)
68
      else:
69
        lock = mylock
70
      lock.acquire(shared=shared)
71
      try:
72
        return fn(*args, **kwargs)
73
      finally:
74
        lock.release()
75
    return sync_function
76
  return wrap
77

    
78

    
79
class _SingleNotifyPipeConditionWaiter(object):
80
  """Helper class for SingleNotifyPipeCondition
81

82
  """
83
  __slots__ = [
84
    "_fd",
85
    "_poller",
86
    ]
87

    
88
  def __init__(self, poller, fd):
89
    """Constructor for _SingleNotifyPipeConditionWaiter
90

91
    @type poller: select.poll
92
    @param poller: Poller object
93
    @type fd: int
94
    @param fd: File descriptor to wait for
95

96
    """
97
    object.__init__(self)
98
    self._poller = poller
99
    self._fd = fd
100

    
101
  def __call__(self, timeout):
102
    """Wait for something to happen on the pipe.
103

104
    @type timeout: float or None
105
    @param timeout: Timeout for waiting (can be None)
106

107
    """
108
    running_timeout = utils.RunningTimeout(timeout, True)
109

    
110
    while True:
111
      remaining_time = running_timeout.Remaining()
112

    
113
      if remaining_time is not None:
114
        if remaining_time < 0.0:
115
          break
116

    
117
        # Our calculation uses seconds, poll() wants milliseconds
118
        remaining_time *= 1000
119

    
120
      try:
121
        result = self._poller.poll(remaining_time)
122
      except EnvironmentError, err:
123
        if err.errno != errno.EINTR:
124
          raise
125
        result = None
126

    
127
      # Check whether we were notified
128
      if result and result[0][0] == self._fd:
129
        break
130

    
131

    
132
class _BaseCondition(object):
133
  """Base class containing common code for conditions.
134

135
  Some of this code is taken from python's threading module.
136

137
  """
138
  __slots__ = [
139
    "_lock",
140
    "acquire",
141
    "release",
142
    "_is_owned",
143
    "_acquire_restore",
144
    "_release_save",
145
    ]
146

    
147
  def __init__(self, lock):
148
    """Constructor for _BaseCondition.
149

150
    @type lock: threading.Lock
151
    @param lock: condition base lock
152

153
    """
154
    object.__init__(self)
155

    
156
    try:
157
      self._release_save = lock._release_save
158
    except AttributeError:
159
      self._release_save = self._base_release_save
160
    try:
161
      self._acquire_restore = lock._acquire_restore
162
    except AttributeError:
163
      self._acquire_restore = self._base_acquire_restore
164
    try:
165
      self._is_owned = lock._is_owned
166
    except AttributeError:
167
      self._is_owned = self._base_is_owned
168

    
169
    self._lock = lock
170

    
171
    # Export the lock's acquire() and release() methods
172
    self.acquire = lock.acquire
173
    self.release = lock.release
174

    
175
  def _base_is_owned(self):
176
    """Check whether lock is owned by current thread.
177

178
    """
179
    if self._lock.acquire(0):
180
      self._lock.release()
181
      return False
182
    return True
183

    
184
  def _base_release_save(self):
185
    self._lock.release()
186

    
187
  def _base_acquire_restore(self, _):
188
    self._lock.acquire()
189

    
190
  def _check_owned(self):
191
    """Raise an exception if the current thread doesn't own the lock.
192

193
    """
194
    if not self._is_owned():
195
      raise RuntimeError("cannot work with un-aquired lock")
196

    
197

    
198
class SingleNotifyPipeCondition(_BaseCondition):
199
  """Condition which can only be notified once.
200

201
  This condition class uses pipes and poll, internally, to be able to wait for
202
  notification with a timeout, without resorting to polling. It is almost
203
  compatible with Python's threading.Condition, with the following differences:
204
    - notifyAll can only be called once, and no wait can happen after that
205
    - notify is not supported, only notifyAll
206

207
  """
208

    
209
  __slots__ = [
210
    "_poller",
211
    "_read_fd",
212
    "_write_fd",
213
    "_nwaiters",
214
    "_notified",
215
    ]
216

    
217
  _waiter_class = _SingleNotifyPipeConditionWaiter
218

    
219
  def __init__(self, lock):
220
    """Constructor for SingleNotifyPipeCondition
221

222
    """
223
    _BaseCondition.__init__(self, lock)
224
    self._nwaiters = 0
225
    self._notified = False
226
    self._read_fd = None
227
    self._write_fd = None
228
    self._poller = None
229

    
230
  def _check_unnotified(self):
231
    """Throws an exception if already notified.
232

233
    """
234
    if self._notified:
235
      raise RuntimeError("cannot use already notified condition")
236

    
237
  def _Cleanup(self):
238
    """Cleanup open file descriptors, if any.
239

240
    """
241
    if self._read_fd is not None:
242
      os.close(self._read_fd)
243
      self._read_fd = None
244

    
245
    if self._write_fd is not None:
246
      os.close(self._write_fd)
247
      self._write_fd = None
248
    self._poller = None
249

    
250
  def wait(self, timeout):
251
    """Wait for a notification.
252

253
    @type timeout: float or None
254
    @param timeout: Waiting timeout (can be None)
255

256
    """
257
    self._check_owned()
258
    self._check_unnotified()
259

    
260
    self._nwaiters += 1
261
    try:
262
      if self._poller is None:
263
        (self._read_fd, self._write_fd) = os.pipe()
264
        self._poller = select.poll()
265
        self._poller.register(self._read_fd, select.POLLHUP)
266

    
267
      wait_fn = self._waiter_class(self._poller, self._read_fd)
268
      state = self._release_save()
269
      try:
270
        # Wait for notification
271
        wait_fn(timeout)
272
      finally:
273
        # Re-acquire lock
274
        self._acquire_restore(state)
275
    finally:
276
      self._nwaiters -= 1
277
      if self._nwaiters == 0:
278
        self._Cleanup()
279

    
280
  def notifyAll(self): # pylint: disable-msg=C0103
281
    """Close the writing side of the pipe to notify all waiters.
282

283
    """
284
    self._check_owned()
285
    self._check_unnotified()
286
    self._notified = True
287
    if self._write_fd is not None:
288
      os.close(self._write_fd)
289
      self._write_fd = None
290

    
291

    
292
class PipeCondition(_BaseCondition):
293
  """Group-only non-polling condition with counters.
294

295
  This condition class uses pipes and poll, internally, to be able to wait for
296
  notification with a timeout, without resorting to polling. It is almost
297
  compatible with Python's threading.Condition, but only supports notifyAll and
298
  non-recursive locks. As an additional features it's able to report whether
299
  there are any waiting threads.
300

301
  """
302
  __slots__ = [
303
    "_waiters",
304
    "_single_condition",
305
    ]
306

    
307
  _single_condition_class = SingleNotifyPipeCondition
308

    
309
  def __init__(self, lock):
310
    """Initializes this class.
311

312
    """
313
    _BaseCondition.__init__(self, lock)
314
    self._waiters = set()
315
    self._single_condition = self._single_condition_class(self._lock)
316

    
317
  def wait(self, timeout):
318
    """Wait for a notification.
319

320
    @type timeout: float or None
321
    @param timeout: Waiting timeout (can be None)
322

323
    """
324
    self._check_owned()
325

    
326
    # Keep local reference to the pipe. It could be replaced by another thread
327
    # notifying while we're waiting.
328
    cond = self._single_condition
329

    
330
    self._waiters.add(threading.currentThread())
331
    try:
332
      cond.wait(timeout)
333
    finally:
334
      self._check_owned()
335
      self._waiters.remove(threading.currentThread())
336

    
337
  def notifyAll(self): # pylint: disable-msg=C0103
338
    """Notify all currently waiting threads.
339

340
    """
341
    self._check_owned()
342
    self._single_condition.notifyAll()
343
    self._single_condition = self._single_condition_class(self._lock)
344

    
345
  def get_waiting(self):
346
    """Returns a list of all waiting threads.
347

348
    """
349
    self._check_owned()
350

    
351
    return self._waiters
352

    
353
  def has_waiting(self):
354
    """Returns whether there are active waiters.
355

356
    """
357
    self._check_owned()
358

    
359
    return bool(self._waiters)
360

    
361

    
362
class _PipeConditionWithMode(PipeCondition):
363
  __slots__ = [
364
    "shared",
365
    ]
366

    
367
  def __init__(self, lock, shared):
368
    """Initializes this class.
369

370
    """
371
    self.shared = shared
372
    PipeCondition.__init__(self, lock)
373

    
374

    
375
class SharedLock(object):
376
  """Implements a shared lock.
377

378
  Multiple threads can acquire the lock in a shared way by calling
379
  C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
380
  threads can call C{acquire(shared=0)}.
381

382
  Notes on data structures: C{__pending} contains a priority queue (heapq) of
383
  all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
384
  ...]}. Each per-priority queue contains a normal in-order list of conditions
385
  to be notified when the lock can be acquired. Shared locks are grouped
386
  together by priority and the condition for them is stored in
387
  C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
388
  references for the per-priority queues indexed by priority for faster access.
389

390
  @type name: string
391
  @ivar name: the name of the lock
392

393
  """
394
  __slots__ = [
395
    "__weakref__",
396
    "__deleted",
397
    "__exc",
398
    "__lock",
399
    "__pending",
400
    "__pending_by_prio",
401
    "__pending_shared",
402
    "__shr",
403
    "name",
404
    ]
405

    
406
  __condition_class = _PipeConditionWithMode
407

    
408
  def __init__(self, name, monitor=None):
409
    """Construct a new SharedLock.
410

411
    @param name: the name of the lock
412
    @type monitor: L{LockMonitor}
413
    @param monitor: Lock monitor with which to register
414

415
    """
416
    object.__init__(self)
417

    
418
    self.name = name
419

    
420
    # Internal lock
421
    self.__lock = threading.Lock()
422

    
423
    # Queue containing waiting acquires
424
    self.__pending = []
425
    self.__pending_by_prio = {}
426
    self.__pending_shared = {}
427

    
428
    # Current lock holders
429
    self.__shr = set()
430
    self.__exc = None
431

    
432
    # is this lock in the deleted state?
433
    self.__deleted = False
434

    
435
    # Register with lock monitor
436
    if monitor:
437
      monitor.RegisterLock(self)
438

    
439
  def GetInfo(self, requested):
440
    """Retrieves information for querying locks.
441

442
    @type requested: set
443
    @param requested: Requested information, see C{query.LQ_*}
444

445
    """
446
    self.__lock.acquire()
447
    try:
448
      # Note: to avoid unintentional race conditions, no references to
449
      # modifiable objects should be returned unless they were created in this
450
      # function.
451
      mode = None
452
      owner_names = None
453

    
454
      if query.LQ_MODE in requested:
455
        if self.__deleted:
456
          mode = _DELETED_TEXT
457
          assert not (self.__exc or self.__shr)
458
        elif self.__exc:
459
          mode = _EXCLUSIVE_TEXT
460
        elif self.__shr:
461
          mode = _SHARED_TEXT
462

    
463
      # Current owner(s) are wanted
464
      if query.LQ_OWNER in requested:
465
        if self.__exc:
466
          owner = [self.__exc]
467
        else:
468
          owner = self.__shr
469

    
470
        if owner:
471
          assert not self.__deleted
472
          owner_names = [i.getName() for i in owner]
473

    
474
      # Pending acquires are wanted
475
      if query.LQ_PENDING in requested:
476
        pending = []
477

    
478
        # Sorting instead of copying and using heaq functions for simplicity
479
        for (_, prioqueue) in sorted(self.__pending):
480
          for cond in prioqueue:
481
            if cond.shared:
482
              pendmode = _SHARED_TEXT
483
            else:
484
              pendmode = _EXCLUSIVE_TEXT
485

    
486
            # List of names will be sorted in L{query._GetLockPending}
487
            pending.append((pendmode, [i.getName()
488
                                       for i in cond.get_waiting()]))
489
      else:
490
        pending = None
491

    
492
      return (self.name, mode, owner_names, pending)
493
    finally:
494
      self.__lock.release()
495

    
496
  def __check_deleted(self):
497
    """Raises an exception if the lock has been deleted.
498

499
    """
500
    if self.__deleted:
501
      raise errors.LockError("Deleted lock %s" % self.name)
502

    
503
  def __is_sharer(self):
504
    """Is the current thread sharing the lock at this time?
505

506
    """
507
    return threading.currentThread() in self.__shr
508

    
509
  def __is_exclusive(self):
510
    """Is the current thread holding the lock exclusively at this time?
511

512
    """
513
    return threading.currentThread() == self.__exc
514

    
515
  def __is_owned(self, shared=-1):
516
    """Is the current thread somehow owning the lock at this time?
517

518
    This is a private version of the function, which presumes you're holding
519
    the internal lock.
520

521
    """
522
    if shared < 0:
523
      return self.__is_sharer() or self.__is_exclusive()
524
    elif shared:
525
      return self.__is_sharer()
526
    else:
527
      return self.__is_exclusive()
528

    
529
  def _is_owned(self, shared=-1):
530
    """Is the current thread somehow owning the lock at this time?
531

532
    @param shared:
533
        - < 0: check for any type of ownership (default)
534
        - 0: check for exclusive ownership
535
        - > 0: check for shared ownership
536

537
    """
538
    self.__lock.acquire()
539
    try:
540
      return self.__is_owned(shared=shared)
541
    finally:
542
      self.__lock.release()
543

    
544
  is_owned = _is_owned
545

    
546
  def _count_pending(self):
547
    """Returns the number of pending acquires.
548

549
    @rtype: int
550

551
    """
552
    self.__lock.acquire()
553
    try:
554
      return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
555
    finally:
556
      self.__lock.release()
557

    
558
  def _check_empty(self):
559
    """Checks whether there are any pending acquires.
560

561
    @rtype: bool
562

563
    """
564
    self.__lock.acquire()
565
    try:
566
      # Order is important: __find_first_pending_queue modifies __pending
567
      (_, prioqueue) = self.__find_first_pending_queue()
568

    
569
      return not (prioqueue or
570
                  self.__pending or
571
                  self.__pending_by_prio or
572
                  self.__pending_shared)
573
    finally:
574
      self.__lock.release()
575

    
576
  def __do_acquire(self, shared):
577
    """Actually acquire the lock.
578

579
    """
580
    if shared:
581
      self.__shr.add(threading.currentThread())
582
    else:
583
      self.__exc = threading.currentThread()
584

    
585
  def __can_acquire(self, shared):
586
    """Determine whether lock can be acquired.
587

588
    """
589
    if shared:
590
      return self.__exc is None
591
    else:
592
      return len(self.__shr) == 0 and self.__exc is None
593

    
594
  def __find_first_pending_queue(self):
595
    """Tries to find the topmost queued entry with pending acquires.
596

597
    Removes empty entries while going through the list.
598

599
    """
600
    while self.__pending:
601
      (priority, prioqueue) = self.__pending[0]
602

    
603
      if prioqueue:
604
        return (priority, prioqueue)
605

    
606
      # Remove empty queue
607
      heapq.heappop(self.__pending)
608
      del self.__pending_by_prio[priority]
609
      assert priority not in self.__pending_shared
610

    
611
    return (None, None)
612

    
613
  def __is_on_top(self, cond):
614
    """Checks whether the passed condition is on top of the queue.
615

616
    The caller must make sure the queue isn't empty.
617

618
    """
619
    (_, prioqueue) = self.__find_first_pending_queue()
620

    
621
    return cond == prioqueue[0]
622

    
623
  def __acquire_unlocked(self, shared, timeout, priority):
624
    """Acquire a shared lock.
625

626
    @param shared: whether to acquire in shared mode; by default an
627
        exclusive lock will be acquired
628
    @param timeout: maximum waiting time before giving up
629
    @type priority: integer
630
    @param priority: Priority for acquiring lock
631

632
    """
633
    self.__check_deleted()
634

    
635
    # We cannot acquire the lock if we already have it
636
    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
637
                                   " %s" % self.name)
638

    
639
    # Remove empty entries from queue
640
    self.__find_first_pending_queue()
641

    
642
    # Check whether someone else holds the lock or there are pending acquires.
643
    if not self.__pending and self.__can_acquire(shared):
644
      # Apparently not, can acquire lock directly.
645
      self.__do_acquire(shared)
646
      return True
647

    
648
    prioqueue = self.__pending_by_prio.get(priority, None)
649

    
650
    if shared:
651
      # Try to re-use condition for shared acquire
652
      wait_condition = self.__pending_shared.get(priority, None)
653
      assert (wait_condition is None or
654
              (wait_condition.shared and wait_condition in prioqueue))
655
    else:
656
      wait_condition = None
657

    
658
    if wait_condition is None:
659
      if prioqueue is None:
660
        assert priority not in self.__pending_by_prio
661

    
662
        prioqueue = []
663
        heapq.heappush(self.__pending, (priority, prioqueue))
664
        self.__pending_by_prio[priority] = prioqueue
665

    
666
      wait_condition = self.__condition_class(self.__lock, shared)
667
      prioqueue.append(wait_condition)
668

    
669
      if shared:
670
        # Keep reference for further shared acquires on same priority. This is
671
        # better than trying to find it in the list of pending acquires.
672
        assert priority not in self.__pending_shared
673
        self.__pending_shared[priority] = wait_condition
674

    
675
    try:
676
      # Wait until we become the topmost acquire in the queue or the timeout
677
      # expires.
678
      # TODO: Decrease timeout with spurious notifications
679
      while not (self.__is_on_top(wait_condition) and
680
                 self.__can_acquire(shared)):
681
        # Wait for notification
682
        wait_condition.wait(timeout)
683
        self.__check_deleted()
684

    
685
        # A lot of code assumes blocking acquires always succeed. Loop
686
        # internally for that case.
687
        if timeout is not None:
688
          break
689

    
690
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
691
        self.__do_acquire(shared)
692
        return True
693
    finally:
694
      # Remove condition from queue if there are no more waiters
695
      if not wait_condition.has_waiting():
696
        prioqueue.remove(wait_condition)
697
        if wait_condition.shared:
698
          # Remove from list of shared acquires if it wasn't while releasing
699
          # (e.g. on lock deletion)
700
          self.__pending_shared.pop(priority, None)
701

    
702
    return False
703

    
704
  def acquire(self, shared=0, timeout=None, priority=None,
705
              test_notify=None):
706
    """Acquire a shared lock.
707

708
    @type shared: integer (0/1) used as a boolean
709
    @param shared: whether to acquire in shared mode; by default an
710
        exclusive lock will be acquired
711
    @type timeout: float
712
    @param timeout: maximum waiting time before giving up
713
    @type priority: integer
714
    @param priority: Priority for acquiring lock
715
    @type test_notify: callable or None
716
    @param test_notify: Special callback function for unittesting
717

718
    """
719
    if priority is None:
720
      priority = _DEFAULT_PRIORITY
721

    
722
    self.__lock.acquire()
723
    try:
724
      # We already got the lock, notify now
725
      if __debug__ and callable(test_notify):
726
        test_notify()
727

    
728
      return self.__acquire_unlocked(shared, timeout, priority)
729
    finally:
730
      self.__lock.release()
731

    
732
  def downgrade(self):
733
    """Changes the lock mode from exclusive to shared.
734

735
    Pending acquires in shared mode on the same priority will go ahead.
736

737
    """
738
    self.__lock.acquire()
739
    try:
740
      assert self.__is_owned(), "Lock must be owned"
741

    
742
      if self.__is_exclusive():
743
        # Do nothing if the lock is already acquired in shared mode
744
        self.__exc = None
745
        self.__do_acquire(1)
746

    
747
        # Important: pending shared acquires should only jump ahead if there
748
        # was a transition from exclusive to shared, otherwise an owner of a
749
        # shared lock can keep calling this function to push incoming shared
750
        # acquires
751
        (priority, prioqueue) = self.__find_first_pending_queue()
752
        if prioqueue:
753
          # Is there a pending shared acquire on this priority?
754
          cond = self.__pending_shared.pop(priority, None)
755
          if cond:
756
            assert cond.shared
757
            assert cond in prioqueue
758

    
759
            # Ensure shared acquire is on top of queue
760
            if len(prioqueue) > 1:
761
              prioqueue.remove(cond)
762
              prioqueue.insert(0, cond)
763

    
764
            # Notify
765
            cond.notifyAll()
766

    
767
      assert not self.__is_exclusive()
768
      assert self.__is_sharer()
769

    
770
      return True
771
    finally:
772
      self.__lock.release()
773

    
774
  def release(self):
775
    """Release a Shared Lock.
776

777
    You must have acquired the lock, either in shared or in exclusive mode,
778
    before calling this function.
779

780
    """
781
    self.__lock.acquire()
782
    try:
783
      assert self.__is_exclusive() or self.__is_sharer(), \
784
        "Cannot release non-owned lock"
785

    
786
      # Autodetect release type
787
      if self.__is_exclusive():
788
        self.__exc = None
789
      else:
790
        self.__shr.remove(threading.currentThread())
791

    
792
      # Notify topmost condition in queue
793
      (priority, prioqueue) = self.__find_first_pending_queue()
794
      if prioqueue:
795
        cond = prioqueue[0]
796
        cond.notifyAll()
797
        if cond.shared:
798
          # Prevent further shared acquires from sneaking in while waiters are
799
          # notified
800
          self.__pending_shared.pop(priority, None)
801

    
802
    finally:
803
      self.__lock.release()
804

    
805
  def delete(self, timeout=None, priority=None):
806
    """Delete a Shared Lock.
807

808
    This operation will declare the lock for removal. First the lock will be
809
    acquired in exclusive mode if you don't already own it, then the lock
810
    will be put in a state where any future and pending acquire() fail.
811

812
    @type timeout: float
813
    @param timeout: maximum waiting time before giving up
814
    @type priority: integer
815
    @param priority: Priority for acquiring lock
816

817
    """
818
    if priority is None:
819
      priority = _DEFAULT_PRIORITY
820

    
821
    self.__lock.acquire()
822
    try:
823
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
824

    
825
      self.__check_deleted()
826

    
827
      # The caller is allowed to hold the lock exclusively already.
828
      acquired = self.__is_exclusive()
829

    
830
      if not acquired:
831
        acquired = self.__acquire_unlocked(0, timeout, priority)
832

    
833
        assert self.__is_exclusive() and not self.__is_sharer(), \
834
          "Lock wasn't acquired in exclusive mode"
835

    
836
      if acquired:
837
        self.__deleted = True
838
        self.__exc = None
839

    
840
        assert not (self.__exc or self.__shr), "Found owner during deletion"
841

    
842
        # Notify all acquires. They'll throw an error.
843
        for (_, prioqueue) in self.__pending:
844
          for cond in prioqueue:
845
            cond.notifyAll()
846

    
847
        assert self.__deleted
848

    
849
      return acquired
850
    finally:
851
      self.__lock.release()
852

    
853
  def _release_save(self):
854
    shared = self.__is_sharer()
855
    self.release()
856
    return shared
857

    
858
  def _acquire_restore(self, shared):
859
    self.acquire(shared=shared)
860

    
861

    
862
# Whenever we want to acquire a full LockSet we pass None as the value
863
# to acquire.  Hide this behind this nicely named constant.
864
ALL_SET = None
865

    
866

    
867
class _AcquireTimeout(Exception):
868
  """Internal exception to abort an acquire on a timeout.
869

870
  """
871

    
872

    
873
class LockSet:
874
  """Implements a set of locks.
875

876
  This abstraction implements a set of shared locks for the same resource type,
877
  distinguished by name. The user can lock a subset of the resources and the
878
  LockSet will take care of acquiring the locks always in the same order, thus
879
  preventing deadlock.
880

881
  All the locks needed in the same set must be acquired together, though.
882

883
  @type name: string
884
  @ivar name: the name of the lockset
885

886
  """
887
  def __init__(self, members, name, monitor=None):
888
    """Constructs a new LockSet.
889

890
    @type members: list of strings
891
    @param members: initial members of the set
892
    @type monitor: L{LockMonitor}
893
    @param monitor: Lock monitor with which to register member locks
894

895
    """
896
    assert members is not None, "members parameter is not a list"
897
    self.name = name
898

    
899
    # Lock monitor
900
    self.__monitor = monitor
901

    
902
    # Used internally to guarantee coherency
903
    self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
904

    
905
    # The lockdict indexes the relationship name -> lock
906
    # The order-of-locking is implied by the alphabetical order of names
907
    self.__lockdict = {}
908

    
909
    for mname in members:
910
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
911
                                          monitor=monitor)
912

    
913
    # The owner dict contains the set of locks each thread owns. For
914
    # performance each thread can access its own key without a global lock on
915
    # this structure. It is paramount though that *no* other type of access is
916
    # done to this structure (eg. no looping over its keys). *_owner helper
917
    # function are defined to guarantee access is correct, but in general never
918
    # do anything different than __owners[threading.currentThread()], or there
919
    # will be trouble.
920
    self.__owners = {}
921

    
922
  def _GetLockName(self, mname):
923
    """Returns the name for a member lock.
924

925
    """
926
    return "%s/%s" % (self.name, mname)
927

    
928
  def _get_lock(self):
929
    """Returns the lockset-internal lock.
930

931
    """
932
    return self.__lock
933

    
934
  def _get_lockdict(self):
935
    """Returns the lockset-internal lock dictionary.
936

937
    Accessing this structure is only safe in single-thread usage or when the
938
    lockset-internal lock is held.
939

940
    """
941
    return self.__lockdict
942

    
943
  def _is_owned(self):
944
    """Is the current thread a current level owner?"""
945
    return threading.currentThread() in self.__owners
946

    
947
  def _add_owned(self, name=None):
948
    """Note the current thread owns the given lock"""
949
    if name is None:
950
      if not self._is_owned():
951
        self.__owners[threading.currentThread()] = set()
952
    else:
953
      if self._is_owned():
954
        self.__owners[threading.currentThread()].add(name)
955
      else:
956
        self.__owners[threading.currentThread()] = set([name])
957

    
958
  def _del_owned(self, name=None):
959
    """Note the current thread owns the given lock"""
960

    
961
    assert not (name is None and self.__lock._is_owned()), \
962
           "Cannot hold internal lock when deleting owner status"
963

    
964
    if name is not None:
965
      self.__owners[threading.currentThread()].remove(name)
966

    
967
    # Only remove the key if we don't hold the set-lock as well
968
    if (not self.__lock._is_owned() and
969
        not self.__owners[threading.currentThread()]):
970
      del self.__owners[threading.currentThread()]
971

    
972
  def _list_owned(self):
973
    """Get the set of resource names owned by the current thread"""
974
    if self._is_owned():
975
      return self.__owners[threading.currentThread()].copy()
976
    else:
977
      return set()
978

    
979
  def _release_and_delete_owned(self):
980
    """Release and delete all resources owned by the current thread"""
981
    for lname in self._list_owned():
982
      lock = self.__lockdict[lname]
983
      if lock._is_owned():
984
        lock.release()
985
      self._del_owned(name=lname)
986

    
987
  def __names(self):
988
    """Return the current set of names.
989

990
    Only call this function while holding __lock and don't iterate on the
991
    result after releasing the lock.
992

993
    """
994
    return self.__lockdict.keys()
995

    
996
  def _names(self):
997
    """Return a copy of the current set of elements.
998

999
    Used only for debugging purposes.
1000

1001
    """
1002
    # If we don't already own the set-level lock acquired
1003
    # we'll get it and note we need to release it later.
1004
    release_lock = False
1005
    if not self.__lock._is_owned():
1006
      release_lock = True
1007
      self.__lock.acquire(shared=1)
1008
    try:
1009
      result = self.__names()
1010
    finally:
1011
      if release_lock:
1012
        self.__lock.release()
1013
    return set(result)
1014

    
1015
  def acquire(self, names, timeout=None, shared=0, priority=None,
1016
              test_notify=None):
1017
    """Acquire a set of resource locks.
1018

1019
    @type names: list of strings (or string)
1020
    @param names: the names of the locks which shall be acquired
1021
        (special lock names, or instance/node names)
1022
    @type shared: integer (0/1) used as a boolean
1023
    @param shared: whether to acquire in shared mode; by default an
1024
        exclusive lock will be acquired
1025
    @type timeout: float or None
1026
    @param timeout: Maximum time to acquire all locks
1027
    @type priority: integer
1028
    @param priority: Priority for acquiring locks
1029
    @type test_notify: callable or None
1030
    @param test_notify: Special callback function for unittesting
1031

1032
    @return: Set of all locks successfully acquired or None in case of timeout
1033

1034
    @raise errors.LockError: when any lock we try to acquire has
1035
        been deleted before we succeed. In this case none of the
1036
        locks requested will be acquired.
1037

1038
    """
1039
    assert timeout is None or timeout >= 0.0
1040

    
1041
    # Check we don't already own locks at this level
1042
    assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
1043
                                  " (lockset %s)" % self.name)
1044

    
1045
    if priority is None:
1046
      priority = _DEFAULT_PRIORITY
1047

    
1048
    # We need to keep track of how long we spent waiting for a lock. The
1049
    # timeout passed to this function is over all lock acquires.
1050
    running_timeout = utils.RunningTimeout(timeout, False)
1051

    
1052
    try:
1053
      if names is not None:
1054
        # Support passing in a single resource to acquire rather than many
1055
        if isinstance(names, basestring):
1056
          names = [names]
1057

    
1058
        return self.__acquire_inner(names, False, shared, priority,
1059
                                    running_timeout.Remaining, test_notify)
1060

    
1061
      else:
1062
        # If no names are given acquire the whole set by not letting new names
1063
        # being added before we release, and getting the current list of names.
1064
        # Some of them may then be deleted later, but we'll cope with this.
1065
        #
1066
        # We'd like to acquire this lock in a shared way, as it's nice if
1067
        # everybody else can use the instances at the same time. If we are
1068
        # acquiring them exclusively though they won't be able to do this
1069
        # anyway, though, so we'll get the list lock exclusively as well in
1070
        # order to be able to do add() on the set while owning it.
1071
        if not self.__lock.acquire(shared=shared, priority=priority,
1072
                                   timeout=running_timeout.Remaining()):
1073
          raise _AcquireTimeout()
1074
        try:
1075
          # note we own the set-lock
1076
          self._add_owned()
1077

    
1078
          return self.__acquire_inner(self.__names(), True, shared, priority,
1079
                                      running_timeout.Remaining, test_notify)
1080
        except:
1081
          # We shouldn't have problems adding the lock to the owners list, but
1082
          # if we did we'll try to release this lock and re-raise exception.
1083
          # Of course something is going to be really wrong, after this.
1084
          self.__lock.release()
1085
          self._del_owned()
1086
          raise
1087

    
1088
    except _AcquireTimeout:
1089
      return None
1090

    
1091
  def __acquire_inner(self, names, want_all, shared, priority,
1092
                      timeout_fn, test_notify):
1093
    """Inner logic for acquiring a number of locks.
1094

1095
    @param names: Names of the locks to be acquired
1096
    @param want_all: Whether all locks in the set should be acquired
1097
    @param shared: Whether to acquire in shared mode
1098
    @param timeout_fn: Function returning remaining timeout
1099
    @param priority: Priority for acquiring locks
1100
    @param test_notify: Special callback function for unittesting
1101

1102
    """
1103
    acquire_list = []
1104

    
1105
    # First we look the locks up on __lockdict. We have no way of being sure
1106
    # they will still be there after, but this makes it a lot faster should
1107
    # just one of them be the already wrong. Using a sorted sequence to prevent
1108
    # deadlocks.
1109
    for lname in sorted(utils.UniqueSequence(names)):
1110
      try:
1111
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
1112
      except KeyError:
1113
        if want_all:
1114
          # We are acquiring all the set, it doesn't matter if this particular
1115
          # element is not there anymore.
1116
          continue
1117

    
1118
        raise errors.LockError("Non-existing lock %s in set %s (it may have"
1119
                               " been removed)" % (lname, self.name))
1120

    
1121
      acquire_list.append((lname, lock))
1122

    
1123
    # This will hold the locknames we effectively acquired.
1124
    acquired = set()
1125

    
1126
    try:
1127
      # Now acquire_list contains a sorted list of resources and locks we
1128
      # want.  In order to get them we loop on this (private) list and
1129
      # acquire() them.  We gave no real guarantee they will still exist till
1130
      # this is done but .acquire() itself is safe and will alert us if the
1131
      # lock gets deleted.
1132
      for (lname, lock) in acquire_list:
1133
        if __debug__ and callable(test_notify):
1134
          test_notify_fn = lambda: test_notify(lname)
1135
        else:
1136
          test_notify_fn = None
1137

    
1138
        timeout = timeout_fn()
1139

    
1140
        try:
1141
          # raises LockError if the lock was deleted
1142
          acq_success = lock.acquire(shared=shared, timeout=timeout,
1143
                                     priority=priority,
1144
                                     test_notify=test_notify_fn)
1145
        except errors.LockError:
1146
          if want_all:
1147
            # We are acquiring all the set, it doesn't matter if this
1148
            # particular element is not there anymore.
1149
            continue
1150

    
1151
          raise errors.LockError("Non-existing lock %s in set %s (it may"
1152
                                 " have been removed)" % (lname, self.name))
1153

    
1154
        if not acq_success:
1155
          # Couldn't get lock or timeout occurred
1156
          if timeout is None:
1157
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
1158
            # blocking.
1159
            raise errors.LockError("Failed to get lock %s (set %s)" %
1160
                                   (lname, self.name))
1161

    
1162
          raise _AcquireTimeout()
1163

    
1164
        try:
1165
          # now the lock cannot be deleted, we have it!
1166
          self._add_owned(name=lname)
1167
          acquired.add(lname)
1168

    
1169
        except:
1170
          # We shouldn't have problems adding the lock to the owners list, but
1171
          # if we did we'll try to release this lock and re-raise exception.
1172
          # Of course something is going to be really wrong after this.
1173
          if lock._is_owned():
1174
            lock.release()
1175
          raise
1176

    
1177
    except:
1178
      # Release all owned locks
1179
      self._release_and_delete_owned()
1180
      raise
1181

    
1182
    return acquired
1183

    
1184
  def downgrade(self, names=None):
1185
    """Downgrade a set of resource locks from exclusive to shared mode.
1186

1187
    The locks must have been acquired in exclusive mode.
1188

1189
    """
1190
    assert self._is_owned(), ("downgrade on lockset %s while not owning any"
1191
                              " lock" % self.name)
1192

    
1193
    # Support passing in a single resource to downgrade rather than many
1194
    if isinstance(names, basestring):
1195
      names = [names]
1196

    
1197
    owned = self._list_owned()
1198

    
1199
    if names is None:
1200
      names = owned
1201
    else:
1202
      names = set(names)
1203
      assert owned.issuperset(names), \
1204
        ("downgrade() on unheld resources %s (set %s)" %
1205
         (names.difference(owned), self.name))
1206

    
1207
    for lockname in names:
1208
      self.__lockdict[lockname].downgrade()
1209

    
1210
    # Do we own the lockset in exclusive mode?
1211
    if self.__lock._is_owned(shared=0):
1212
      # Have all locks been downgraded?
1213
      if not compat.any(lock._is_owned(shared=0)
1214
                        for lock in self.__lockdict.values()):
1215
        self.__lock.downgrade()
1216
        assert self.__lock._is_owned(shared=1)
1217

    
1218
    return True
1219

    
1220
  def release(self, names=None):
1221
    """Release a set of resource locks, at the same level.
1222

1223
    You must have acquired the locks, either in shared or in exclusive mode,
1224
    before releasing them.
1225

1226
    @type names: list of strings, or None
1227
    @param names: the names of the locks which shall be released
1228
        (defaults to all the locks acquired at that level).
1229

1230
    """
1231
    assert self._is_owned(), ("release() on lock set %s while not owner" %
1232
                              self.name)
1233

    
1234
    # Support passing in a single resource to release rather than many
1235
    if isinstance(names, basestring):
1236
      names = [names]
1237

    
1238
    if names is None:
1239
      names = self._list_owned()
1240
    else:
1241
      names = set(names)
1242
      assert self._list_owned().issuperset(names), (
1243
               "release() on unheld resources %s (set %s)" %
1244
               (names.difference(self._list_owned()), self.name))
1245

    
1246
    # First of all let's release the "all elements" lock, if set.
1247
    # After this 'add' can work again
1248
    if self.__lock._is_owned():
1249
      self.__lock.release()
1250
      self._del_owned()
1251

    
1252
    for lockname in names:
1253
      # If we are sure the lock doesn't leave __lockdict without being
1254
      # exclusively held we can do this...
1255
      self.__lockdict[lockname].release()
1256
      self._del_owned(name=lockname)
1257

    
1258
  def add(self, names, acquired=0, shared=0):
1259
    """Add a new set of elements to the set
1260

1261
    @type names: list of strings
1262
    @param names: names of the new elements to add
1263
    @type acquired: integer (0/1) used as a boolean
1264
    @param acquired: pre-acquire the new resource?
1265
    @type shared: integer (0/1) used as a boolean
1266
    @param shared: is the pre-acquisition shared?
1267

1268
    """
1269
    # Check we don't already own locks at this level
1270
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1271
      ("Cannot add locks if the set %s is only partially owned, or shared" %
1272
       self.name)
1273

    
1274
    # Support passing in a single resource to add rather than many
1275
    if isinstance(names, basestring):
1276
      names = [names]
1277

    
1278
    # If we don't already own the set-level lock acquired in an exclusive way
1279
    # we'll get it and note we need to release it later.
1280
    release_lock = False
1281
    if not self.__lock._is_owned():
1282
      release_lock = True
1283
      self.__lock.acquire()
1284

    
1285
    try:
1286
      invalid_names = set(self.__names()).intersection(names)
1287
      if invalid_names:
1288
        # This must be an explicit raise, not an assert, because assert is
1289
        # turned off when using optimization, and this can happen because of
1290
        # concurrency even if the user doesn't want it.
1291
        raise errors.LockError("duplicate add(%s) on lockset %s" %
1292
                               (invalid_names, self.name))
1293

    
1294
      for lockname in names:
1295
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1296

    
1297
        if acquired:
1298
          # No need for priority or timeout here as this lock has just been
1299
          # created
1300
          lock.acquire(shared=shared)
1301
          # now the lock cannot be deleted, we have it!
1302
          try:
1303
            self._add_owned(name=lockname)
1304
          except:
1305
            # We shouldn't have problems adding the lock to the owners list,
1306
            # but if we did we'll try to release this lock and re-raise
1307
            # exception.  Of course something is going to be really wrong,
1308
            # after this.  On the other hand the lock hasn't been added to the
1309
            # __lockdict yet so no other threads should be pending on it. This
1310
            # release is just a safety measure.
1311
            lock.release()
1312
            raise
1313

    
1314
        self.__lockdict[lockname] = lock
1315

    
1316
    finally:
1317
      # Only release __lock if we were not holding it previously.
1318
      if release_lock:
1319
        self.__lock.release()
1320

    
1321
    return True
1322

    
1323
  def remove(self, names):
1324
    """Remove elements from the lock set.
1325

1326
    You can either not hold anything in the lockset or already hold a superset
1327
    of the elements you want to delete, exclusively.
1328

1329
    @type names: list of strings
1330
    @param names: names of the resource to remove.
1331

1332
    @return: a list of locks which we removed; the list is always
1333
        equal to the names list if we were holding all the locks
1334
        exclusively
1335

1336
    """
1337
    # Support passing in a single resource to remove rather than many
1338
    if isinstance(names, basestring):
1339
      names = [names]
1340

    
1341
    # If we own any subset of this lock it must be a superset of what we want
1342
    # to delete. The ownership must also be exclusive, but that will be checked
1343
    # by the lock itself.
1344
    assert not self._is_owned() or self._list_owned().issuperset(names), (
1345
      "remove() on acquired lockset %s while not owning all elements" %
1346
      self.name)
1347

    
1348
    removed = []
1349

    
1350
    for lname in names:
1351
      # Calling delete() acquires the lock exclusively if we don't already own
1352
      # it, and causes all pending and subsequent lock acquires to fail. It's
1353
      # fine to call it out of order because delete() also implies release(),
1354
      # and the assertion above guarantees that if we either already hold
1355
      # everything we want to delete, or we hold none.
1356
      try:
1357
        self.__lockdict[lname].delete()
1358
        removed.append(lname)
1359
      except (KeyError, errors.LockError):
1360
        # This cannot happen if we were already holding it, verify:
1361
        assert not self._is_owned(), ("remove failed while holding lockset %s"
1362
                                      % self.name)
1363
      else:
1364
        # If no LockError was raised we are the ones who deleted the lock.
1365
        # This means we can safely remove it from lockdict, as any further or
1366
        # pending delete() or acquire() will fail (and nobody can have the lock
1367
        # since before our call to delete()).
1368
        #
1369
        # This is done in an else clause because if the exception was thrown
1370
        # it's the job of the one who actually deleted it.
1371
        del self.__lockdict[lname]
1372
        # And let's remove it from our private list if we owned it.
1373
        if self._is_owned():
1374
          self._del_owned(name=lname)
1375

    
1376
    return removed
1377

    
1378

    
1379
# Locking levels, must be acquired in increasing order.
1380
# Current rules are:
1381
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1382
#   acquired before performing any operation, either in shared or in exclusive
1383
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1384
#   avoided.
1385
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1386
#   If you need more than one node, or more than one instance, acquire them at
1387
#   the same time.
1388
LEVEL_CLUSTER = 0
1389
LEVEL_INSTANCE = 1
1390
LEVEL_NODEGROUP = 2
1391
LEVEL_NODE = 3
1392

    
1393
LEVELS = [LEVEL_CLUSTER,
1394
          LEVEL_INSTANCE,
1395
          LEVEL_NODEGROUP,
1396
          LEVEL_NODE]
1397

    
1398
# Lock levels which are modifiable
1399
LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE]
1400

    
1401
LEVEL_NAMES = {
1402
  LEVEL_CLUSTER: "cluster",
1403
  LEVEL_INSTANCE: "instance",
1404
  LEVEL_NODEGROUP: "nodegroup",
1405
  LEVEL_NODE: "node",
1406
  }
1407

    
1408
# Constant for the big ganeti lock
1409
BGL = 'BGL'
1410

    
1411

    
1412
class GanetiLockManager:
1413
  """The Ganeti Locking Library
1414

1415
  The purpose of this small library is to manage locking for ganeti clusters
1416
  in a central place, while at the same time doing dynamic checks against
1417
  possible deadlocks. It will also make it easier to transition to a different
1418
  lock type should we migrate away from python threads.
1419

1420
  """
1421
  _instance = None
1422

    
1423
  def __init__(self, nodes, nodegroups, instances):
1424
    """Constructs a new GanetiLockManager object.
1425

1426
    There should be only a GanetiLockManager object at any time, so this
1427
    function raises an error if this is not the case.
1428

1429
    @param nodes: list of node names
1430
    @param nodegroups: list of nodegroup uuids
1431
    @param instances: list of instance names
1432

1433
    """
1434
    assert self.__class__._instance is None, \
1435
           "double GanetiLockManager instance"
1436

    
1437
    self.__class__._instance = self
1438

    
1439
    self._monitor = LockMonitor()
1440

    
1441
    # The keyring contains all the locks, at their level and in the correct
1442
    # locking order.
1443
    self.__keyring = {
1444
      LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1445
      LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1446
      LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor),
1447
      LEVEL_INSTANCE: LockSet(instances, "instances",
1448
                              monitor=self._monitor),
1449
      }
1450

    
1451
  def QueryLocks(self, fields):
1452
    """Queries information from all locks.
1453

1454
    See L{LockMonitor.QueryLocks}.
1455

1456
    """
1457
    return self._monitor.QueryLocks(fields)
1458

    
1459
  def OldStyleQueryLocks(self, fields):
1460
    """Queries information from all locks, returning old-style data.
1461

1462
    See L{LockMonitor.OldStyleQueryLocks}.
1463

1464
    """
1465
    return self._monitor.OldStyleQueryLocks(fields)
1466

    
1467
  def _names(self, level):
1468
    """List the lock names at the given level.
1469

1470
    This can be used for debugging/testing purposes.
1471

1472
    @param level: the level whose list of locks to get
1473

1474
    """
1475
    assert level in LEVELS, "Invalid locking level %s" % level
1476
    return self.__keyring[level]._names()
1477

    
1478
  def _is_owned(self, level):
1479
    """Check whether we are owning locks at the given level
1480

1481
    """
1482
    return self.__keyring[level]._is_owned()
1483

    
1484
  is_owned = _is_owned
1485

    
1486
  def _list_owned(self, level):
1487
    """Get the set of owned locks at the given level
1488

1489
    """
1490
    return self.__keyring[level]._list_owned()
1491

    
1492
  list_owned = _list_owned
1493

    
1494
  def _upper_owned(self, level):
1495
    """Check that we don't own any lock at a level greater than the given one.
1496

1497
    """
1498
    # This way of checking only works if LEVELS[i] = i, which we check for in
1499
    # the test cases.
1500
    return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1501

    
1502
  def _BGL_owned(self): # pylint: disable-msg=C0103
1503
    """Check if the current thread owns the BGL.
1504

1505
    Both an exclusive or a shared acquisition work.
1506

1507
    """
1508
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1509

    
1510
  @staticmethod
1511
  def _contains_BGL(level, names): # pylint: disable-msg=C0103
1512
    """Check if the level contains the BGL.
1513

1514
    Check if acting on the given level and set of names will change
1515
    the status of the Big Ganeti Lock.
1516

1517
    """
1518
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1519

    
1520
  def acquire(self, level, names, timeout=None, shared=0, priority=None):
1521
    """Acquire a set of resource locks, at the same level.
1522

1523
    @type level: member of locking.LEVELS
1524
    @param level: the level at which the locks shall be acquired
1525
    @type names: list of strings (or string)
1526
    @param names: the names of the locks which shall be acquired
1527
        (special lock names, or instance/node names)
1528
    @type shared: integer (0/1) used as a boolean
1529
    @param shared: whether to acquire in shared mode; by default
1530
        an exclusive lock will be acquired
1531
    @type timeout: float
1532
    @param timeout: Maximum time to acquire all locks
1533
    @type priority: integer
1534
    @param priority: Priority for acquiring lock
1535

1536
    """
1537
    assert level in LEVELS, "Invalid locking level %s" % level
1538

    
1539
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1540
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1541
    # so even if we've migrated we need to at least share the BGL to be
1542
    # compatible with them. Of course if we own the BGL exclusively there's no
1543
    # point in acquiring any other lock, unless perhaps we are half way through
1544
    # the migration of the current opcode.
1545
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1546
            "You must own the Big Ganeti Lock before acquiring any other")
1547

    
1548
    # Check we don't own locks at the same or upper levels.
1549
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1550
           " while owning some at a greater one")
1551

    
1552
    # Acquire the locks in the set.
1553
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1554
                                         priority=priority)
1555

    
1556
  def downgrade(self, level, names=None):
1557
    """Downgrade a set of resource locks from exclusive to shared mode.
1558

1559
    You must have acquired the locks in exclusive mode.
1560

1561
    @type level: member of locking.LEVELS
1562
    @param level: the level at which the locks shall be downgraded
1563
    @type names: list of strings, or None
1564
    @param names: the names of the locks which shall be downgraded
1565
        (defaults to all the locks acquired at the level)
1566

1567
    """
1568
    assert level in LEVELS, "Invalid locking level %s" % level
1569

    
1570
    return self.__keyring[level].downgrade(names=names)
1571

    
1572
  def release(self, level, names=None):
1573
    """Release a set of resource locks, at the same level.
1574

1575
    You must have acquired the locks, either in shared or in exclusive
1576
    mode, before releasing them.
1577

1578
    @type level: member of locking.LEVELS
1579
    @param level: the level at which the locks shall be released
1580
    @type names: list of strings, or None
1581
    @param names: the names of the locks which shall be released
1582
        (defaults to all the locks acquired at that level)
1583

1584
    """
1585
    assert level in LEVELS, "Invalid locking level %s" % level
1586
    assert (not self._contains_BGL(level, names) or
1587
            not self._upper_owned(LEVEL_CLUSTER)), (
1588
            "Cannot release the Big Ganeti Lock while holding something"
1589
            " at upper levels (%r)" %
1590
            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1591
                              for i in self.__keyring.keys()]), ))
1592

    
1593
    # Release will complain if we don't own the locks already
1594
    return self.__keyring[level].release(names)
1595

    
1596
  def add(self, level, names, acquired=0, shared=0):
1597
    """Add locks at the specified level.
1598

1599
    @type level: member of locking.LEVELS_MOD
1600
    @param level: the level at which the locks shall be added
1601
    @type names: list of strings
1602
    @param names: names of the locks to acquire
1603
    @type acquired: integer (0/1) used as a boolean
1604
    @param acquired: whether to acquire the newly added locks
1605
    @type shared: integer (0/1) used as a boolean
1606
    @param shared: whether the acquisition will be shared
1607

1608
    """
1609
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1610
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1611
           " operations")
1612
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1613
           " while owning some at a greater one")
1614
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1615

    
1616
  def remove(self, level, names):
1617
    """Remove locks from the specified level.
1618

1619
    You must either already own the locks you are trying to remove
1620
    exclusively or not own any lock at an upper level.
1621

1622
    @type level: member of locking.LEVELS_MOD
1623
    @param level: the level at which the locks shall be removed
1624
    @type names: list of strings
1625
    @param names: the names of the locks which shall be removed
1626
        (special lock names, or instance/node names)
1627

1628
    """
1629
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1630
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1631
           " operations")
1632
    # Check we either own the level or don't own anything from here
1633
    # up. LockSet.remove() will check the case in which we don't own
1634
    # all the needed resources, or we have a shared ownership.
1635
    assert self._is_owned(level) or not self._upper_owned(level), (
1636
           "Cannot remove locks at a level while not owning it or"
1637
           " owning some at a greater one")
1638
    return self.__keyring[level].remove(names)
1639

    
1640

    
1641
def _MonitorSortKey((num, item)):
1642
  """Sorting key function.
1643

1644
  Sort by name, then by incoming order.
1645

1646
  """
1647
  (name, _, _, _) = item
1648

    
1649
  return (utils.NiceSortKey(name), num)
1650

    
1651

    
1652
class LockMonitor(object):
1653
  _LOCK_ATTR = "_lock"
1654

    
1655
  def __init__(self):
1656
    """Initializes this class.
1657

1658
    """
1659
    self._lock = SharedLock("LockMonitor")
1660

    
1661
    # Counter for stable sorting
1662
    self._counter = itertools.count(0)
1663

    
1664
    # Tracked locks. Weak references are used to avoid issues with circular
1665
    # references and deletion.
1666
    self._locks = weakref.WeakKeyDictionary()
1667

    
1668
  @ssynchronized(_LOCK_ATTR)
1669
  def RegisterLock(self, lock):
1670
    """Registers a new lock.
1671

1672
    """
1673
    logging.debug("Registering lock %s", lock.name)
1674
    assert lock not in self._locks, "Duplicate lock registration"
1675

    
1676
    # There used to be a check for duplicate names here. As it turned out, when
1677
    # a lock is re-created with the same name in a very short timeframe, the
1678
    # previous instance might not yet be removed from the weakref dictionary.
1679
    # By keeping track of the order of incoming registrations, a stable sort
1680
    # ordering can still be guaranteed.
1681

    
1682
    self._locks[lock] = self._counter.next()
1683

    
1684
  @ssynchronized(_LOCK_ATTR)
1685
  def _GetLockInfo(self, requested):
1686
    """Get information from all locks while the monitor lock is held.
1687

1688
    """
1689
    return [(num, lock.GetInfo(requested)) for lock, num in self._locks.items()]
1690

    
1691
  def _Query(self, fields):
1692
    """Queries information from all locks.
1693

1694
    @type fields: list of strings
1695
    @param fields: List of fields to return
1696

1697
    """
1698
    qobj = query.Query(query.LOCK_FIELDS, fields)
1699

    
1700
    # Get all data with internal lock held and then sort by name and incoming
1701
    # order
1702
    lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
1703
                      key=_MonitorSortKey)
1704

    
1705
    # Extract lock information and build query data
1706
    return (qobj, query.LockQueryData(map(operator.itemgetter(1), lockinfo)))
1707

    
1708
  def QueryLocks(self, fields):
1709
    """Queries information from all locks.
1710

1711
    @type fields: list of strings
1712
    @param fields: List of fields to return
1713

1714
    """
1715
    (qobj, ctx) = self._Query(fields)
1716

    
1717
    # Prepare query response
1718
    return query.GetQueryResponse(qobj, ctx)
1719

    
1720
  def OldStyleQueryLocks(self, fields):
1721
    """Queries information from all locks, returning old-style data.
1722

1723
    @type fields: list of strings
1724
    @param fields: List of fields to return
1725

1726
    """
1727
    (qobj, ctx) = self._Query(fields)
1728

    
1729
    return qobj.OldStyleQuery(ctx)