Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ 7c74bbe0

History | View | Annotate | Download (53.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import errno
32
import weakref
33
import logging
34
import heapq
35
import itertools
36

    
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import compat
40
from ganeti import query
41

    
42

    
43
_EXCLUSIVE_TEXT = "exclusive"
44
_SHARED_TEXT = "shared"
45
_DELETED_TEXT = "deleted"
46

    
47
_DEFAULT_PRIORITY = 0
48

    
49

    
50
def ssynchronized(mylock, shared=0):
51
  """Shared Synchronization decorator.
52

53
  Calls the function holding the given lock, either in exclusive or shared
54
  mode. It requires the passed lock to be a SharedLock (or support its
55
  semantics).
56

57
  @type mylock: lockable object or string
58
  @param mylock: lock to acquire or class member name of the lock to acquire
59

60
  """
61
  def wrap(fn):
62
    def sync_function(*args, **kwargs):
63
      if isinstance(mylock, basestring):
64
        assert args, "cannot ssynchronize on non-class method: self not found"
65
        # args[0] is "self"
66
        lock = getattr(args[0], mylock)
67
      else:
68
        lock = mylock
69
      lock.acquire(shared=shared)
70
      try:
71
        return fn(*args, **kwargs)
72
      finally:
73
        lock.release()
74
    return sync_function
75
  return wrap
76

    
77

    
78
class _SingleNotifyPipeConditionWaiter(object):
79
  """Helper class for SingleNotifyPipeCondition
80

81
  """
82
  __slots__ = [
83
    "_fd",
84
    "_poller",
85
    ]
86

    
87
  def __init__(self, poller, fd):
88
    """Constructor for _SingleNotifyPipeConditionWaiter
89

90
    @type poller: select.poll
91
    @param poller: Poller object
92
    @type fd: int
93
    @param fd: File descriptor to wait for
94

95
    """
96
    object.__init__(self)
97
    self._poller = poller
98
    self._fd = fd
99

    
100
  def __call__(self, timeout):
101
    """Wait for something to happen on the pipe.
102

103
    @type timeout: float or None
104
    @param timeout: Timeout for waiting (can be None)
105

106
    """
107
    running_timeout = utils.RunningTimeout(timeout, True)
108

    
109
    while True:
110
      remaining_time = running_timeout.Remaining()
111

    
112
      if remaining_time is not None:
113
        if remaining_time < 0.0:
114
          break
115

    
116
        # Our calculation uses seconds, poll() wants milliseconds
117
        remaining_time *= 1000
118

    
119
      try:
120
        result = self._poller.poll(remaining_time)
121
      except EnvironmentError, err:
122
        if err.errno != errno.EINTR:
123
          raise
124
        result = None
125

    
126
      # Check whether we were notified
127
      if result and result[0][0] == self._fd:
128
        break
129

    
130

    
131
class _BaseCondition(object):
132
  """Base class containing common code for conditions.
133

134
  Some of this code is taken from python's threading module.
135

136
  """
137
  __slots__ = [
138
    "_lock",
139
    "acquire",
140
    "release",
141
    "_is_owned",
142
    "_acquire_restore",
143
    "_release_save",
144
    ]
145

    
146
  def __init__(self, lock):
147
    """Constructor for _BaseCondition.
148

149
    @type lock: threading.Lock
150
    @param lock: condition base lock
151

152
    """
153
    object.__init__(self)
154

    
155
    try:
156
      self._release_save = lock._release_save
157
    except AttributeError:
158
      self._release_save = self._base_release_save
159
    try:
160
      self._acquire_restore = lock._acquire_restore
161
    except AttributeError:
162
      self._acquire_restore = self._base_acquire_restore
163
    try:
164
      self._is_owned = lock._is_owned
165
    except AttributeError:
166
      self._is_owned = self._base_is_owned
167

    
168
    self._lock = lock
169

    
170
    # Export the lock's acquire() and release() methods
171
    self.acquire = lock.acquire
172
    self.release = lock.release
173

    
174
  def _base_is_owned(self):
175
    """Check whether lock is owned by current thread.
176

177
    """
178
    if self._lock.acquire(0):
179
      self._lock.release()
180
      return False
181
    return True
182

    
183
  def _base_release_save(self):
184
    self._lock.release()
185

    
186
  def _base_acquire_restore(self, _):
187
    self._lock.acquire()
188

    
189
  def _check_owned(self):
190
    """Raise an exception if the current thread doesn't own the lock.
191

192
    """
193
    if not self._is_owned():
194
      raise RuntimeError("cannot work with un-aquired lock")
195

    
196

    
197
class SingleNotifyPipeCondition(_BaseCondition):
198
  """Condition which can only be notified once.
199

200
  This condition class uses pipes and poll, internally, to be able to wait for
201
  notification with a timeout, without resorting to polling. It is almost
202
  compatible with Python's threading.Condition, with the following differences:
203
    - notifyAll can only be called once, and no wait can happen after that
204
    - notify is not supported, only notifyAll
205

206
  """
207

    
208
  __slots__ = [
209
    "_poller",
210
    "_read_fd",
211
    "_write_fd",
212
    "_nwaiters",
213
    "_notified",
214
    ]
215

    
216
  _waiter_class = _SingleNotifyPipeConditionWaiter
217

    
218
  def __init__(self, lock):
219
    """Constructor for SingleNotifyPipeCondition
220

221
    """
222
    _BaseCondition.__init__(self, lock)
223
    self._nwaiters = 0
224
    self._notified = False
225
    self._read_fd = None
226
    self._write_fd = None
227
    self._poller = None
228

    
229
  def _check_unnotified(self):
230
    """Throws an exception if already notified.
231

232
    """
233
    if self._notified:
234
      raise RuntimeError("cannot use already notified condition")
235

    
236
  def _Cleanup(self):
237
    """Cleanup open file descriptors, if any.
238

239
    """
240
    if self._read_fd is not None:
241
      os.close(self._read_fd)
242
      self._read_fd = None
243

    
244
    if self._write_fd is not None:
245
      os.close(self._write_fd)
246
      self._write_fd = None
247
    self._poller = None
248

    
249
  def wait(self, timeout):
250
    """Wait for a notification.
251

252
    @type timeout: float or None
253
    @param timeout: Waiting timeout (can be None)
254

255
    """
256
    self._check_owned()
257
    self._check_unnotified()
258

    
259
    self._nwaiters += 1
260
    try:
261
      if self._poller is None:
262
        (self._read_fd, self._write_fd) = os.pipe()
263
        self._poller = select.poll()
264
        self._poller.register(self._read_fd, select.POLLHUP)
265

    
266
      wait_fn = self._waiter_class(self._poller, self._read_fd)
267
      state = self._release_save()
268
      try:
269
        # Wait for notification
270
        wait_fn(timeout)
271
      finally:
272
        # Re-acquire lock
273
        self._acquire_restore(state)
274
    finally:
275
      self._nwaiters -= 1
276
      if self._nwaiters == 0:
277
        self._Cleanup()
278

    
279
  def notifyAll(self): # pylint: disable=C0103
280
    """Close the writing side of the pipe to notify all waiters.
281

282
    """
283
    self._check_owned()
284
    self._check_unnotified()
285
    self._notified = True
286
    if self._write_fd is not None:
287
      os.close(self._write_fd)
288
      self._write_fd = None
289

    
290

    
291
class PipeCondition(_BaseCondition):
292
  """Group-only non-polling condition with counters.
293

294
  This condition class uses pipes and poll, internally, to be able to wait for
295
  notification with a timeout, without resorting to polling. It is almost
296
  compatible with Python's threading.Condition, but only supports notifyAll and
297
  non-recursive locks. As an additional features it's able to report whether
298
  there are any waiting threads.
299

300
  """
301
  __slots__ = [
302
    "_waiters",
303
    "_single_condition",
304
    ]
305

    
306
  _single_condition_class = SingleNotifyPipeCondition
307

    
308
  def __init__(self, lock):
309
    """Initializes this class.
310

311
    """
312
    _BaseCondition.__init__(self, lock)
313
    self._waiters = set()
314
    self._single_condition = self._single_condition_class(self._lock)
315

    
316
  def wait(self, timeout):
317
    """Wait for a notification.
318

319
    @type timeout: float or None
320
    @param timeout: Waiting timeout (can be None)
321

322
    """
323
    self._check_owned()
324

    
325
    # Keep local reference to the pipe. It could be replaced by another thread
326
    # notifying while we're waiting.
327
    cond = self._single_condition
328

    
329
    self._waiters.add(threading.currentThread())
330
    try:
331
      cond.wait(timeout)
332
    finally:
333
      self._check_owned()
334
      self._waiters.remove(threading.currentThread())
335

    
336
  def notifyAll(self): # pylint: disable=C0103
337
    """Notify all currently waiting threads.
338

339
    """
340
    self._check_owned()
341
    self._single_condition.notifyAll()
342
    self._single_condition = self._single_condition_class(self._lock)
343

    
344
  def get_waiting(self):
345
    """Returns a list of all waiting threads.
346

347
    """
348
    self._check_owned()
349

    
350
    return self._waiters
351

    
352
  def has_waiting(self):
353
    """Returns whether there are active waiters.
354

355
    """
356
    self._check_owned()
357

    
358
    return bool(self._waiters)
359

    
360

    
361
class _PipeConditionWithMode(PipeCondition):
362
  __slots__ = [
363
    "shared",
364
    ]
365

    
366
  def __init__(self, lock, shared):
367
    """Initializes this class.
368

369
    """
370
    self.shared = shared
371
    PipeCondition.__init__(self, lock)
372

    
373

    
374
class SharedLock(object):
375
  """Implements a shared lock.
376

377
  Multiple threads can acquire the lock in a shared way by calling
378
  C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
379
  threads can call C{acquire(shared=0)}.
380

381
  Notes on data structures: C{__pending} contains a priority queue (heapq) of
382
  all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
383
  ...]}. Each per-priority queue contains a normal in-order list of conditions
384
  to be notified when the lock can be acquired. Shared locks are grouped
385
  together by priority and the condition for them is stored in
386
  C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
387
  references for the per-priority queues indexed by priority for faster access.
388

389
  @type name: string
390
  @ivar name: the name of the lock
391

392
  """
393
  __slots__ = [
394
    "__weakref__",
395
    "__deleted",
396
    "__exc",
397
    "__lock",
398
    "__pending",
399
    "__pending_by_prio",
400
    "__pending_shared",
401
    "__shr",
402
    "name",
403
    ]
404

    
405
  __condition_class = _PipeConditionWithMode
406

    
407
  def __init__(self, name, monitor=None):
408
    """Construct a new SharedLock.
409

410
    @param name: the name of the lock
411
    @type monitor: L{LockMonitor}
412
    @param monitor: Lock monitor with which to register
413

414
    """
415
    object.__init__(self)
416

    
417
    self.name = name
418

    
419
    # Internal lock
420
    self.__lock = threading.Lock()
421

    
422
    # Queue containing waiting acquires
423
    self.__pending = []
424
    self.__pending_by_prio = {}
425
    self.__pending_shared = {}
426

    
427
    # Current lock holders
428
    self.__shr = set()
429
    self.__exc = None
430

    
431
    # is this lock in the deleted state?
432
    self.__deleted = False
433

    
434
    # Register with lock monitor
435
    if monitor:
436
      logging.debug("Adding lock %s to monitor", name)
437
      monitor.RegisterLock(self)
438

    
439
  def GetLockInfo(self, requested):
440
    """Retrieves information for querying locks.
441

442
    @type requested: set
443
    @param requested: Requested information, see C{query.LQ_*}
444

445
    """
446
    self.__lock.acquire()
447
    try:
448
      # Note: to avoid unintentional race conditions, no references to
449
      # modifiable objects should be returned unless they were created in this
450
      # function.
451
      mode = None
452
      owner_names = None
453

    
454
      if query.LQ_MODE in requested:
455
        if self.__deleted:
456
          mode = _DELETED_TEXT
457
          assert not (self.__exc or self.__shr)
458
        elif self.__exc:
459
          mode = _EXCLUSIVE_TEXT
460
        elif self.__shr:
461
          mode = _SHARED_TEXT
462

    
463
      # Current owner(s) are wanted
464
      if query.LQ_OWNER in requested:
465
        if self.__exc:
466
          owner = [self.__exc]
467
        else:
468
          owner = self.__shr
469

    
470
        if owner:
471
          assert not self.__deleted
472
          owner_names = [i.getName() for i in owner]
473

    
474
      # Pending acquires are wanted
475
      if query.LQ_PENDING in requested:
476
        pending = []
477

    
478
        # Sorting instead of copying and using heaq functions for simplicity
479
        for (_, prioqueue) in sorted(self.__pending):
480
          for cond in prioqueue:
481
            if cond.shared:
482
              pendmode = _SHARED_TEXT
483
            else:
484
              pendmode = _EXCLUSIVE_TEXT
485

    
486
            # List of names will be sorted in L{query._GetLockPending}
487
            pending.append((pendmode, [i.getName()
488
                                       for i in cond.get_waiting()]))
489
      else:
490
        pending = None
491

    
492
      return [(self.name, mode, owner_names, pending)]
493
    finally:
494
      self.__lock.release()
495

    
496
  def __check_deleted(self):
497
    """Raises an exception if the lock has been deleted.
498

499
    """
500
    if self.__deleted:
501
      raise errors.LockError("Deleted lock %s" % self.name)
502

    
503
  def __is_sharer(self):
504
    """Is the current thread sharing the lock at this time?
505

506
    """
507
    return threading.currentThread() in self.__shr
508

    
509
  def __is_exclusive(self):
510
    """Is the current thread holding the lock exclusively at this time?
511

512
    """
513
    return threading.currentThread() == self.__exc
514

    
515
  def __is_owned(self, shared=-1):
516
    """Is the current thread somehow owning the lock at this time?
517

518
    This is a private version of the function, which presumes you're holding
519
    the internal lock.
520

521
    """
522
    if shared < 0:
523
      return self.__is_sharer() or self.__is_exclusive()
524
    elif shared:
525
      return self.__is_sharer()
526
    else:
527
      return self.__is_exclusive()
528

    
529
  def _is_owned(self, shared=-1):
530
    """Is the current thread somehow owning the lock at this time?
531

532
    @param shared:
533
        - < 0: check for any type of ownership (default)
534
        - 0: check for exclusive ownership
535
        - > 0: check for shared ownership
536

537
    """
538
    self.__lock.acquire()
539
    try:
540
      return self.__is_owned(shared=shared)
541
    finally:
542
      self.__lock.release()
543

    
544
  is_owned = _is_owned
545

    
546
  def _count_pending(self):
547
    """Returns the number of pending acquires.
548

549
    @rtype: int
550

551
    """
552
    self.__lock.acquire()
553
    try:
554
      return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
555
    finally:
556
      self.__lock.release()
557

    
558
  def _check_empty(self):
559
    """Checks whether there are any pending acquires.
560

561
    @rtype: bool
562

563
    """
564
    self.__lock.acquire()
565
    try:
566
      # Order is important: __find_first_pending_queue modifies __pending
567
      (_, prioqueue) = self.__find_first_pending_queue()
568

    
569
      return not (prioqueue or
570
                  self.__pending or
571
                  self.__pending_by_prio or
572
                  self.__pending_shared)
573
    finally:
574
      self.__lock.release()
575

    
576
  def __do_acquire(self, shared):
577
    """Actually acquire the lock.
578

579
    """
580
    if shared:
581
      self.__shr.add(threading.currentThread())
582
    else:
583
      self.__exc = threading.currentThread()
584

    
585
  def __can_acquire(self, shared):
586
    """Determine whether lock can be acquired.
587

588
    """
589
    if shared:
590
      return self.__exc is None
591
    else:
592
      return len(self.__shr) == 0 and self.__exc is None
593

    
594
  def __find_first_pending_queue(self):
595
    """Tries to find the topmost queued entry with pending acquires.
596

597
    Removes empty entries while going through the list.
598

599
    """
600
    while self.__pending:
601
      (priority, prioqueue) = self.__pending[0]
602

    
603
      if prioqueue:
604
        return (priority, prioqueue)
605

    
606
      # Remove empty queue
607
      heapq.heappop(self.__pending)
608
      del self.__pending_by_prio[priority]
609
      assert priority not in self.__pending_shared
610

    
611
    return (None, None)
612

    
613
  def __is_on_top(self, cond):
614
    """Checks whether the passed condition is on top of the queue.
615

616
    The caller must make sure the queue isn't empty.
617

618
    """
619
    (_, prioqueue) = self.__find_first_pending_queue()
620

    
621
    return cond == prioqueue[0]
622

    
623
  def __acquire_unlocked(self, shared, timeout, priority):
624
    """Acquire a shared lock.
625

626
    @param shared: whether to acquire in shared mode; by default an
627
        exclusive lock will be acquired
628
    @param timeout: maximum waiting time before giving up
629
    @type priority: integer
630
    @param priority: Priority for acquiring lock
631

632
    """
633
    self.__check_deleted()
634

    
635
    # We cannot acquire the lock if we already have it
636
    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
637
                                   " %s" % self.name)
638

    
639
    # Remove empty entries from queue
640
    self.__find_first_pending_queue()
641

    
642
    # Check whether someone else holds the lock or there are pending acquires.
643
    if not self.__pending and self.__can_acquire(shared):
644
      # Apparently not, can acquire lock directly.
645
      self.__do_acquire(shared)
646
      return True
647

    
648
    prioqueue = self.__pending_by_prio.get(priority, None)
649

    
650
    if shared:
651
      # Try to re-use condition for shared acquire
652
      wait_condition = self.__pending_shared.get(priority, None)
653
      assert (wait_condition is None or
654
              (wait_condition.shared and wait_condition in prioqueue))
655
    else:
656
      wait_condition = None
657

    
658
    if wait_condition is None:
659
      if prioqueue is None:
660
        assert priority not in self.__pending_by_prio
661

    
662
        prioqueue = []
663
        heapq.heappush(self.__pending, (priority, prioqueue))
664
        self.__pending_by_prio[priority] = prioqueue
665

    
666
      wait_condition = self.__condition_class(self.__lock, shared)
667
      prioqueue.append(wait_condition)
668

    
669
      if shared:
670
        # Keep reference for further shared acquires on same priority. This is
671
        # better than trying to find it in the list of pending acquires.
672
        assert priority not in self.__pending_shared
673
        self.__pending_shared[priority] = wait_condition
674

    
675
    try:
676
      # Wait until we become the topmost acquire in the queue or the timeout
677
      # expires.
678
      # TODO: Decrease timeout with spurious notifications
679
      while not (self.__is_on_top(wait_condition) and
680
                 self.__can_acquire(shared)):
681
        # Wait for notification
682
        wait_condition.wait(timeout)
683
        self.__check_deleted()
684

    
685
        # A lot of code assumes blocking acquires always succeed. Loop
686
        # internally for that case.
687
        if timeout is not None:
688
          break
689

    
690
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
691
        self.__do_acquire(shared)
692
        return True
693
    finally:
694
      # Remove condition from queue if there are no more waiters
695
      if not wait_condition.has_waiting():
696
        prioqueue.remove(wait_condition)
697
        if wait_condition.shared:
698
          # Remove from list of shared acquires if it wasn't while releasing
699
          # (e.g. on lock deletion)
700
          self.__pending_shared.pop(priority, None)
701

    
702
    return False
703

    
704
  def acquire(self, shared=0, timeout=None, priority=None,
705
              test_notify=None):
706
    """Acquire a shared lock.
707

708
    @type shared: integer (0/1) used as a boolean
709
    @param shared: whether to acquire in shared mode; by default an
710
        exclusive lock will be acquired
711
    @type timeout: float
712
    @param timeout: maximum waiting time before giving up
713
    @type priority: integer
714
    @param priority: Priority for acquiring lock
715
    @type test_notify: callable or None
716
    @param test_notify: Special callback function for unittesting
717

718
    """
719
    if priority is None:
720
      priority = _DEFAULT_PRIORITY
721

    
722
    self.__lock.acquire()
723
    try:
724
      # We already got the lock, notify now
725
      if __debug__ and callable(test_notify):
726
        test_notify()
727

    
728
      return self.__acquire_unlocked(shared, timeout, priority)
729
    finally:
730
      self.__lock.release()
731

    
732
  def downgrade(self):
733
    """Changes the lock mode from exclusive to shared.
734

735
    Pending acquires in shared mode on the same priority will go ahead.
736

737
    """
738
    self.__lock.acquire()
739
    try:
740
      assert self.__is_owned(), "Lock must be owned"
741

    
742
      if self.__is_exclusive():
743
        # Do nothing if the lock is already acquired in shared mode
744
        self.__exc = None
745
        self.__do_acquire(1)
746

    
747
        # Important: pending shared acquires should only jump ahead if there
748
        # was a transition from exclusive to shared, otherwise an owner of a
749
        # shared lock can keep calling this function to push incoming shared
750
        # acquires
751
        (priority, prioqueue) = self.__find_first_pending_queue()
752
        if prioqueue:
753
          # Is there a pending shared acquire on this priority?
754
          cond = self.__pending_shared.pop(priority, None)
755
          if cond:
756
            assert cond.shared
757
            assert cond in prioqueue
758

    
759
            # Ensure shared acquire is on top of queue
760
            if len(prioqueue) > 1:
761
              prioqueue.remove(cond)
762
              prioqueue.insert(0, cond)
763

    
764
            # Notify
765
            cond.notifyAll()
766

    
767
      assert not self.__is_exclusive()
768
      assert self.__is_sharer()
769

    
770
      return True
771
    finally:
772
      self.__lock.release()
773

    
774
  def release(self):
775
    """Release a Shared Lock.
776

777
    You must have acquired the lock, either in shared or in exclusive mode,
778
    before calling this function.
779

780
    """
781
    self.__lock.acquire()
782
    try:
783
      assert self.__is_exclusive() or self.__is_sharer(), \
784
        "Cannot release non-owned lock"
785

    
786
      # Autodetect release type
787
      if self.__is_exclusive():
788
        self.__exc = None
789
      else:
790
        self.__shr.remove(threading.currentThread())
791

    
792
      # Notify topmost condition in queue
793
      (priority, prioqueue) = self.__find_first_pending_queue()
794
      if prioqueue:
795
        cond = prioqueue[0]
796
        cond.notifyAll()
797
        if cond.shared:
798
          # Prevent further shared acquires from sneaking in while waiters are
799
          # notified
800
          self.__pending_shared.pop(priority, None)
801

    
802
    finally:
803
      self.__lock.release()
804

    
805
  def delete(self, timeout=None, priority=None):
806
    """Delete a Shared Lock.
807

808
    This operation will declare the lock for removal. First the lock will be
809
    acquired in exclusive mode if you don't already own it, then the lock
810
    will be put in a state where any future and pending acquire() fail.
811

812
    @type timeout: float
813
    @param timeout: maximum waiting time before giving up
814
    @type priority: integer
815
    @param priority: Priority for acquiring lock
816

817
    """
818
    if priority is None:
819
      priority = _DEFAULT_PRIORITY
820

    
821
    self.__lock.acquire()
822
    try:
823
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
824

    
825
      self.__check_deleted()
826

    
827
      # The caller is allowed to hold the lock exclusively already.
828
      acquired = self.__is_exclusive()
829

    
830
      if not acquired:
831
        acquired = self.__acquire_unlocked(0, timeout, priority)
832

    
833
        assert self.__is_exclusive() and not self.__is_sharer(), \
834
          "Lock wasn't acquired in exclusive mode"
835

    
836
      if acquired:
837
        self.__deleted = True
838
        self.__exc = None
839

    
840
        assert not (self.__exc or self.__shr), "Found owner during deletion"
841

    
842
        # Notify all acquires. They'll throw an error.
843
        for (_, prioqueue) in self.__pending:
844
          for cond in prioqueue:
845
            cond.notifyAll()
846

    
847
        assert self.__deleted
848

    
849
      return acquired
850
    finally:
851
      self.__lock.release()
852

    
853
  def _release_save(self):
854
    shared = self.__is_sharer()
855
    self.release()
856
    return shared
857

    
858
  def _acquire_restore(self, shared):
859
    self.acquire(shared=shared)
860

    
861

    
862
# Whenever we want to acquire a full LockSet we pass None as the value
863
# to acquire.  Hide this behind this nicely named constant.
864
ALL_SET = None
865

    
866

    
867
class _AcquireTimeout(Exception):
868
  """Internal exception to abort an acquire on a timeout.
869

870
  """
871

    
872

    
873
class LockSet:
874
  """Implements a set of locks.
875

876
  This abstraction implements a set of shared locks for the same resource type,
877
  distinguished by name. The user can lock a subset of the resources and the
878
  LockSet will take care of acquiring the locks always in the same order, thus
879
  preventing deadlock.
880

881
  All the locks needed in the same set must be acquired together, though.
882

883
  @type name: string
884
  @ivar name: the name of the lockset
885

886
  """
887
  def __init__(self, members, name, monitor=None):
888
    """Constructs a new LockSet.
889

890
    @type members: list of strings
891
    @param members: initial members of the set
892
    @type monitor: L{LockMonitor}
893
    @param monitor: Lock monitor with which to register member locks
894

895
    """
896
    assert members is not None, "members parameter is not a list"
897
    self.name = name
898

    
899
    # Lock monitor
900
    self.__monitor = monitor
901

    
902
    # Used internally to guarantee coherency
903
    self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
904

    
905
    # The lockdict indexes the relationship name -> lock
906
    # The order-of-locking is implied by the alphabetical order of names
907
    self.__lockdict = {}
908

    
909
    for mname in members:
910
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
911
                                          monitor=monitor)
912

    
913
    # The owner dict contains the set of locks each thread owns. For
914
    # performance each thread can access its own key without a global lock on
915
    # this structure. It is paramount though that *no* other type of access is
916
    # done to this structure (eg. no looping over its keys). *_owner helper
917
    # function are defined to guarantee access is correct, but in general never
918
    # do anything different than __owners[threading.currentThread()], or there
919
    # will be trouble.
920
    self.__owners = {}
921

    
922
  def _GetLockName(self, mname):
923
    """Returns the name for a member lock.
924

925
    """
926
    return "%s/%s" % (self.name, mname)
927

    
928
  def _get_lock(self):
929
    """Returns the lockset-internal lock.
930

931
    """
932
    return self.__lock
933

    
934
  def _get_lockdict(self):
935
    """Returns the lockset-internal lock dictionary.
936

937
    Accessing this structure is only safe in single-thread usage or when the
938
    lockset-internal lock is held.
939

940
    """
941
    return self.__lockdict
942

    
943
  def _is_owned(self):
944
    """Is the current thread a current level owner?"""
945
    return threading.currentThread() in self.__owners
946

    
947
  def _add_owned(self, name=None):
948
    """Note the current thread owns the given lock"""
949
    if name is None:
950
      if not self._is_owned():
951
        self.__owners[threading.currentThread()] = set()
952
    else:
953
      if self._is_owned():
954
        self.__owners[threading.currentThread()].add(name)
955
      else:
956
        self.__owners[threading.currentThread()] = set([name])
957

    
958
  def _del_owned(self, name=None):
959
    """Note the current thread owns the given lock"""
960

    
961
    assert not (name is None and self.__lock._is_owned()), \
962
           "Cannot hold internal lock when deleting owner status"
963

    
964
    if name is not None:
965
      self.__owners[threading.currentThread()].remove(name)
966

    
967
    # Only remove the key if we don't hold the set-lock as well
968
    if (not self.__lock._is_owned() and
969
        not self.__owners[threading.currentThread()]):
970
      del self.__owners[threading.currentThread()]
971

    
972
  def _list_owned(self):
973
    """Get the set of resource names owned by the current thread"""
974
    if self._is_owned():
975
      return self.__owners[threading.currentThread()].copy()
976
    else:
977
      return set()
978

    
979
  def _release_and_delete_owned(self):
980
    """Release and delete all resources owned by the current thread"""
981
    for lname in self._list_owned():
982
      lock = self.__lockdict[lname]
983
      if lock._is_owned():
984
        lock.release()
985
      self._del_owned(name=lname)
986

    
987
  def __names(self):
988
    """Return the current set of names.
989

990
    Only call this function while holding __lock and don't iterate on the
991
    result after releasing the lock.
992

993
    """
994
    return self.__lockdict.keys()
995

    
996
  def _names(self):
997
    """Return a copy of the current set of elements.
998

999
    Used only for debugging purposes.
1000

1001
    """
1002
    # If we don't already own the set-level lock acquired
1003
    # we'll get it and note we need to release it later.
1004
    release_lock = False
1005
    if not self.__lock._is_owned():
1006
      release_lock = True
1007
      self.__lock.acquire(shared=1)
1008
    try:
1009
      result = self.__names()
1010
    finally:
1011
      if release_lock:
1012
        self.__lock.release()
1013
    return set(result)
1014

    
1015
  def acquire(self, names, timeout=None, shared=0, priority=None,
1016
              test_notify=None):
1017
    """Acquire a set of resource locks.
1018

1019
    @type names: list of strings (or string)
1020
    @param names: the names of the locks which shall be acquired
1021
        (special lock names, or instance/node names)
1022
    @type shared: integer (0/1) used as a boolean
1023
    @param shared: whether to acquire in shared mode; by default an
1024
        exclusive lock will be acquired
1025
    @type timeout: float or None
1026
    @param timeout: Maximum time to acquire all locks
1027
    @type priority: integer
1028
    @param priority: Priority for acquiring locks
1029
    @type test_notify: callable or None
1030
    @param test_notify: Special callback function for unittesting
1031

1032
    @return: Set of all locks successfully acquired or None in case of timeout
1033

1034
    @raise errors.LockError: when any lock we try to acquire has
1035
        been deleted before we succeed. In this case none of the
1036
        locks requested will be acquired.
1037

1038
    """
1039
    assert timeout is None or timeout >= 0.0
1040

    
1041
    # Check we don't already own locks at this level
1042
    assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
1043
                                  " (lockset %s)" % self.name)
1044

    
1045
    if priority is None:
1046
      priority = _DEFAULT_PRIORITY
1047

    
1048
    # We need to keep track of how long we spent waiting for a lock. The
1049
    # timeout passed to this function is over all lock acquires.
1050
    running_timeout = utils.RunningTimeout(timeout, False)
1051

    
1052
    try:
1053
      if names is not None:
1054
        # Support passing in a single resource to acquire rather than many
1055
        if isinstance(names, basestring):
1056
          names = [names]
1057

    
1058
        return self.__acquire_inner(names, False, shared, priority,
1059
                                    running_timeout.Remaining, test_notify)
1060

    
1061
      else:
1062
        # If no names are given acquire the whole set by not letting new names
1063
        # being added before we release, and getting the current list of names.
1064
        # Some of them may then be deleted later, but we'll cope with this.
1065
        #
1066
        # We'd like to acquire this lock in a shared way, as it's nice if
1067
        # everybody else can use the instances at the same time. If we are
1068
        # acquiring them exclusively though they won't be able to do this
1069
        # anyway, though, so we'll get the list lock exclusively as well in
1070
        # order to be able to do add() on the set while owning it.
1071
        if not self.__lock.acquire(shared=shared, priority=priority,
1072
                                   timeout=running_timeout.Remaining()):
1073
          raise _AcquireTimeout()
1074
        try:
1075
          # note we own the set-lock
1076
          self._add_owned()
1077

    
1078
          return self.__acquire_inner(self.__names(), True, shared, priority,
1079
                                      running_timeout.Remaining, test_notify)
1080
        except:
1081
          # We shouldn't have problems adding the lock to the owners list, but
1082
          # if we did we'll try to release this lock and re-raise exception.
1083
          # Of course something is going to be really wrong, after this.
1084
          self.__lock.release()
1085
          self._del_owned()
1086
          raise
1087

    
1088
    except _AcquireTimeout:
1089
      return None
1090

    
1091
  def __acquire_inner(self, names, want_all, shared, priority,
1092
                      timeout_fn, test_notify):
1093
    """Inner logic for acquiring a number of locks.
1094

1095
    @param names: Names of the locks to be acquired
1096
    @param want_all: Whether all locks in the set should be acquired
1097
    @param shared: Whether to acquire in shared mode
1098
    @param timeout_fn: Function returning remaining timeout
1099
    @param priority: Priority for acquiring locks
1100
    @param test_notify: Special callback function for unittesting
1101

1102
    """
1103
    acquire_list = []
1104

    
1105
    # First we look the locks up on __lockdict. We have no way of being sure
1106
    # they will still be there after, but this makes it a lot faster should
1107
    # just one of them be the already wrong. Using a sorted sequence to prevent
1108
    # deadlocks.
1109
    for lname in sorted(utils.UniqueSequence(names)):
1110
      try:
1111
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
1112
      except KeyError:
1113
        if want_all:
1114
          # We are acquiring all the set, it doesn't matter if this particular
1115
          # element is not there anymore.
1116
          continue
1117

    
1118
        raise errors.LockError("Non-existing lock %s in set %s (it may have"
1119
                               " been removed)" % (lname, self.name))
1120

    
1121
      acquire_list.append((lname, lock))
1122

    
1123
    # This will hold the locknames we effectively acquired.
1124
    acquired = set()
1125

    
1126
    try:
1127
      # Now acquire_list contains a sorted list of resources and locks we
1128
      # want.  In order to get them we loop on this (private) list and
1129
      # acquire() them.  We gave no real guarantee they will still exist till
1130
      # this is done but .acquire() itself is safe and will alert us if the
1131
      # lock gets deleted.
1132
      for (lname, lock) in acquire_list:
1133
        if __debug__ and callable(test_notify):
1134
          test_notify_fn = lambda: test_notify(lname)
1135
        else:
1136
          test_notify_fn = None
1137

    
1138
        timeout = timeout_fn()
1139

    
1140
        try:
1141
          # raises LockError if the lock was deleted
1142
          acq_success = lock.acquire(shared=shared, timeout=timeout,
1143
                                     priority=priority,
1144
                                     test_notify=test_notify_fn)
1145
        except errors.LockError:
1146
          if want_all:
1147
            # We are acquiring all the set, it doesn't matter if this
1148
            # particular element is not there anymore.
1149
            continue
1150

    
1151
          raise errors.LockError("Non-existing lock %s in set %s (it may"
1152
                                 " have been removed)" % (lname, self.name))
1153

    
1154
        if not acq_success:
1155
          # Couldn't get lock or timeout occurred
1156
          if timeout is None:
1157
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
1158
            # blocking.
1159
            raise errors.LockError("Failed to get lock %s (set %s)" %
1160
                                   (lname, self.name))
1161

    
1162
          raise _AcquireTimeout()
1163

    
1164
        try:
1165
          # now the lock cannot be deleted, we have it!
1166
          self._add_owned(name=lname)
1167
          acquired.add(lname)
1168

    
1169
        except:
1170
          # We shouldn't have problems adding the lock to the owners list, but
1171
          # if we did we'll try to release this lock and re-raise exception.
1172
          # Of course something is going to be really wrong after this.
1173
          if lock._is_owned():
1174
            lock.release()
1175
          raise
1176

    
1177
    except:
1178
      # Release all owned locks
1179
      self._release_and_delete_owned()
1180
      raise
1181

    
1182
    return acquired
1183

    
1184
  def downgrade(self, names=None):
1185
    """Downgrade a set of resource locks from exclusive to shared mode.
1186

1187
    The locks must have been acquired in exclusive mode.
1188

1189
    """
1190
    assert self._is_owned(), ("downgrade on lockset %s while not owning any"
1191
                              " lock" % self.name)
1192

    
1193
    # Support passing in a single resource to downgrade rather than many
1194
    if isinstance(names, basestring):
1195
      names = [names]
1196

    
1197
    owned = self._list_owned()
1198

    
1199
    if names is None:
1200
      names = owned
1201
    else:
1202
      names = set(names)
1203
      assert owned.issuperset(names), \
1204
        ("downgrade() on unheld resources %s (set %s)" %
1205
         (names.difference(owned), self.name))
1206

    
1207
    for lockname in names:
1208
      self.__lockdict[lockname].downgrade()
1209

    
1210
    # Do we own the lockset in exclusive mode?
1211
    if self.__lock._is_owned(shared=0):
1212
      # Have all locks been downgraded?
1213
      if not compat.any(lock._is_owned(shared=0)
1214
                        for lock in self.__lockdict.values()):
1215
        self.__lock.downgrade()
1216
        assert self.__lock._is_owned(shared=1)
1217

    
1218
    return True
1219

    
1220
  def release(self, names=None):
1221
    """Release a set of resource locks, at the same level.
1222

1223
    You must have acquired the locks, either in shared or in exclusive mode,
1224
    before releasing them.
1225

1226
    @type names: list of strings, or None
1227
    @param names: the names of the locks which shall be released
1228
        (defaults to all the locks acquired at that level).
1229

1230
    """
1231
    assert self._is_owned(), ("release() on lock set %s while not owner" %
1232
                              self.name)
1233

    
1234
    # Support passing in a single resource to release rather than many
1235
    if isinstance(names, basestring):
1236
      names = [names]
1237

    
1238
    if names is None:
1239
      names = self._list_owned()
1240
    else:
1241
      names = set(names)
1242
      assert self._list_owned().issuperset(names), (
1243
               "release() on unheld resources %s (set %s)" %
1244
               (names.difference(self._list_owned()), self.name))
1245

    
1246
    # First of all let's release the "all elements" lock, if set.
1247
    # After this 'add' can work again
1248
    if self.__lock._is_owned():
1249
      self.__lock.release()
1250
      self._del_owned()
1251

    
1252
    for lockname in names:
1253
      # If we are sure the lock doesn't leave __lockdict without being
1254
      # exclusively held we can do this...
1255
      self.__lockdict[lockname].release()
1256
      self._del_owned(name=lockname)
1257

    
1258
  def add(self, names, acquired=0, shared=0):
1259
    """Add a new set of elements to the set
1260

1261
    @type names: list of strings
1262
    @param names: names of the new elements to add
1263
    @type acquired: integer (0/1) used as a boolean
1264
    @param acquired: pre-acquire the new resource?
1265
    @type shared: integer (0/1) used as a boolean
1266
    @param shared: is the pre-acquisition shared?
1267

1268
    """
1269
    # Check we don't already own locks at this level
1270
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1271
      ("Cannot add locks if the set %s is only partially owned, or shared" %
1272
       self.name)
1273

    
1274
    # Support passing in a single resource to add rather than many
1275
    if isinstance(names, basestring):
1276
      names = [names]
1277

    
1278
    # If we don't already own the set-level lock acquired in an exclusive way
1279
    # we'll get it and note we need to release it later.
1280
    release_lock = False
1281
    if not self.__lock._is_owned():
1282
      release_lock = True
1283
      self.__lock.acquire()
1284

    
1285
    try:
1286
      invalid_names = set(self.__names()).intersection(names)
1287
      if invalid_names:
1288
        # This must be an explicit raise, not an assert, because assert is
1289
        # turned off when using optimization, and this can happen because of
1290
        # concurrency even if the user doesn't want it.
1291
        raise errors.LockError("duplicate add(%s) on lockset %s" %
1292
                               (invalid_names, self.name))
1293

    
1294
      for lockname in names:
1295
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1296

    
1297
        if acquired:
1298
          # No need for priority or timeout here as this lock has just been
1299
          # created
1300
          lock.acquire(shared=shared)
1301
          # now the lock cannot be deleted, we have it!
1302
          try:
1303
            self._add_owned(name=lockname)
1304
          except:
1305
            # We shouldn't have problems adding the lock to the owners list,
1306
            # but if we did we'll try to release this lock and re-raise
1307
            # exception.  Of course something is going to be really wrong,
1308
            # after this.  On the other hand the lock hasn't been added to the
1309
            # __lockdict yet so no other threads should be pending on it. This
1310
            # release is just a safety measure.
1311
            lock.release()
1312
            raise
1313

    
1314
        self.__lockdict[lockname] = lock
1315

    
1316
    finally:
1317
      # Only release __lock if we were not holding it previously.
1318
      if release_lock:
1319
        self.__lock.release()
1320

    
1321
    return True
1322

    
1323
  def remove(self, names):
1324
    """Remove elements from the lock set.
1325

1326
    You can either not hold anything in the lockset or already hold a superset
1327
    of the elements you want to delete, exclusively.
1328

1329
    @type names: list of strings
1330
    @param names: names of the resource to remove.
1331

1332
    @return: a list of locks which we removed; the list is always
1333
        equal to the names list if we were holding all the locks
1334
        exclusively
1335

1336
    """
1337
    # Support passing in a single resource to remove rather than many
1338
    if isinstance(names, basestring):
1339
      names = [names]
1340

    
1341
    # If we own any subset of this lock it must be a superset of what we want
1342
    # to delete. The ownership must also be exclusive, but that will be checked
1343
    # by the lock itself.
1344
    assert not self._is_owned() or self._list_owned().issuperset(names), (
1345
      "remove() on acquired lockset %s while not owning all elements" %
1346
      self.name)
1347

    
1348
    removed = []
1349

    
1350
    for lname in names:
1351
      # Calling delete() acquires the lock exclusively if we don't already own
1352
      # it, and causes all pending and subsequent lock acquires to fail. It's
1353
      # fine to call it out of order because delete() also implies release(),
1354
      # and the assertion above guarantees that if we either already hold
1355
      # everything we want to delete, or we hold none.
1356
      try:
1357
        self.__lockdict[lname].delete()
1358
        removed.append(lname)
1359
      except (KeyError, errors.LockError):
1360
        # This cannot happen if we were already holding it, verify:
1361
        assert not self._is_owned(), ("remove failed while holding lockset %s"
1362
                                      % self.name)
1363
      else:
1364
        # If no LockError was raised we are the ones who deleted the lock.
1365
        # This means we can safely remove it from lockdict, as any further or
1366
        # pending delete() or acquire() will fail (and nobody can have the lock
1367
        # since before our call to delete()).
1368
        #
1369
        # This is done in an else clause because if the exception was thrown
1370
        # it's the job of the one who actually deleted it.
1371
        del self.__lockdict[lname]
1372
        # And let's remove it from our private list if we owned it.
1373
        if self._is_owned():
1374
          self._del_owned(name=lname)
1375

    
1376
    return removed
1377

    
1378

    
1379
# Locking levels, must be acquired in increasing order.
1380
# Current rules are:
1381
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1382
#   acquired before performing any operation, either in shared or in exclusive
1383
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1384
#   avoided.
1385
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1386
#   If you need more than one node, or more than one instance, acquire them at
1387
#   the same time.
1388
LEVEL_CLUSTER = 0
1389
LEVEL_INSTANCE = 1
1390
LEVEL_NODEGROUP = 2
1391
LEVEL_NODE = 3
1392
LEVEL_NODE_RES = 4
1393

    
1394
LEVELS = [
1395
  LEVEL_CLUSTER,
1396
  LEVEL_INSTANCE,
1397
  LEVEL_NODEGROUP,
1398
  LEVEL_NODE,
1399
  LEVEL_NODE_RES,
1400
  ]
1401

    
1402
# Lock levels which are modifiable
1403
LEVELS_MOD = frozenset([
1404
  LEVEL_NODE_RES,
1405
  LEVEL_NODE,
1406
  LEVEL_NODEGROUP,
1407
  LEVEL_INSTANCE,
1408
  ])
1409

    
1410
#: Lock level names (make sure to use singular form)
1411
LEVEL_NAMES = {
1412
  LEVEL_CLUSTER: "cluster",
1413
  LEVEL_INSTANCE: "instance",
1414
  LEVEL_NODEGROUP: "nodegroup",
1415
  LEVEL_NODE: "node",
1416
  LEVEL_NODE_RES: "node-res",
1417
  }
1418

    
1419
# Constant for the big ganeti lock
1420
BGL = 'BGL'
1421

    
1422

    
1423
class GanetiLockManager:
1424
  """The Ganeti Locking Library
1425

1426
  The purpose of this small library is to manage locking for ganeti clusters
1427
  in a central place, while at the same time doing dynamic checks against
1428
  possible deadlocks. It will also make it easier to transition to a different
1429
  lock type should we migrate away from python threads.
1430

1431
  """
1432
  _instance = None
1433

    
1434
  def __init__(self, nodes, nodegroups, instances):
1435
    """Constructs a new GanetiLockManager object.
1436

1437
    There should be only a GanetiLockManager object at any time, so this
1438
    function raises an error if this is not the case.
1439

1440
    @param nodes: list of node names
1441
    @param nodegroups: list of nodegroup uuids
1442
    @param instances: list of instance names
1443

1444
    """
1445
    assert self.__class__._instance is None, \
1446
           "double GanetiLockManager instance"
1447

    
1448
    self.__class__._instance = self
1449

    
1450
    self._monitor = LockMonitor()
1451

    
1452
    # The keyring contains all the locks, at their level and in the correct
1453
    # locking order.
1454
    self.__keyring = {
1455
      LEVEL_CLUSTER: LockSet([BGL], "cluster", monitor=self._monitor),
1456
      LEVEL_NODE: LockSet(nodes, "node", monitor=self._monitor),
1457
      LEVEL_NODE_RES: LockSet(nodes, "node-res", monitor=self._monitor),
1458
      LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroup", monitor=self._monitor),
1459
      LEVEL_INSTANCE: LockSet(instances, "instance",
1460
                              monitor=self._monitor),
1461
      }
1462

    
1463
    assert compat.all(ls.name == LEVEL_NAMES[level]
1464
                      for (level, ls) in self.__keyring.items())
1465

    
1466
  def AddToLockMonitor(self, provider):
1467
    """Registers a new lock with the monitor.
1468

1469
    See L{LockMonitor.RegisterLock}.
1470

1471
    """
1472
    return self._monitor.RegisterLock(provider)
1473

    
1474
  def QueryLocks(self, fields):
1475
    """Queries information from all locks.
1476

1477
    See L{LockMonitor.QueryLocks}.
1478

1479
    """
1480
    return self._monitor.QueryLocks(fields)
1481

    
1482
  def OldStyleQueryLocks(self, fields):
1483
    """Queries information from all locks, returning old-style data.
1484

1485
    See L{LockMonitor.OldStyleQueryLocks}.
1486

1487
    """
1488
    return self._monitor.OldStyleQueryLocks(fields)
1489

    
1490
  def _names(self, level):
1491
    """List the lock names at the given level.
1492

1493
    This can be used for debugging/testing purposes.
1494

1495
    @param level: the level whose list of locks to get
1496

1497
    """
1498
    assert level in LEVELS, "Invalid locking level %s" % level
1499
    return self.__keyring[level]._names()
1500

    
1501
  def _is_owned(self, level):
1502
    """Check whether we are owning locks at the given level
1503

1504
    """
1505
    return self.__keyring[level]._is_owned()
1506

    
1507
  is_owned = _is_owned
1508

    
1509
  def _list_owned(self, level):
1510
    """Get the set of owned locks at the given level
1511

1512
    """
1513
    return self.__keyring[level]._list_owned()
1514

    
1515
  list_owned = _list_owned
1516

    
1517
  def _upper_owned(self, level):
1518
    """Check that we don't own any lock at a level greater than the given one.
1519

1520
    """
1521
    # This way of checking only works if LEVELS[i] = i, which we check for in
1522
    # the test cases.
1523
    return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1524

    
1525
  def _BGL_owned(self): # pylint: disable=C0103
1526
    """Check if the current thread owns the BGL.
1527

1528
    Both an exclusive or a shared acquisition work.
1529

1530
    """
1531
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1532

    
1533
  @staticmethod
1534
  def _contains_BGL(level, names): # pylint: disable=C0103
1535
    """Check if the level contains the BGL.
1536

1537
    Check if acting on the given level and set of names will change
1538
    the status of the Big Ganeti Lock.
1539

1540
    """
1541
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1542

    
1543
  def acquire(self, level, names, timeout=None, shared=0, priority=None):
1544
    """Acquire a set of resource locks, at the same level.
1545

1546
    @type level: member of locking.LEVELS
1547
    @param level: the level at which the locks shall be acquired
1548
    @type names: list of strings (or string)
1549
    @param names: the names of the locks which shall be acquired
1550
        (special lock names, or instance/node names)
1551
    @type shared: integer (0/1) used as a boolean
1552
    @param shared: whether to acquire in shared mode; by default
1553
        an exclusive lock will be acquired
1554
    @type timeout: float
1555
    @param timeout: Maximum time to acquire all locks
1556
    @type priority: integer
1557
    @param priority: Priority for acquiring lock
1558

1559
    """
1560
    assert level in LEVELS, "Invalid locking level %s" % level
1561

    
1562
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1563
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1564
    # so even if we've migrated we need to at least share the BGL to be
1565
    # compatible with them. Of course if we own the BGL exclusively there's no
1566
    # point in acquiring any other lock, unless perhaps we are half way through
1567
    # the migration of the current opcode.
1568
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1569
            "You must own the Big Ganeti Lock before acquiring any other")
1570

    
1571
    # Check we don't own locks at the same or upper levels.
1572
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1573
           " while owning some at a greater one")
1574

    
1575
    # Acquire the locks in the set.
1576
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1577
                                         priority=priority)
1578

    
1579
  def downgrade(self, level, names=None):
1580
    """Downgrade a set of resource locks from exclusive to shared mode.
1581

1582
    You must have acquired the locks in exclusive mode.
1583

1584
    @type level: member of locking.LEVELS
1585
    @param level: the level at which the locks shall be downgraded
1586
    @type names: list of strings, or None
1587
    @param names: the names of the locks which shall be downgraded
1588
        (defaults to all the locks acquired at the level)
1589

1590
    """
1591
    assert level in LEVELS, "Invalid locking level %s" % level
1592

    
1593
    return self.__keyring[level].downgrade(names=names)
1594

    
1595
  def release(self, level, names=None):
1596
    """Release a set of resource locks, at the same level.
1597

1598
    You must have acquired the locks, either in shared or in exclusive
1599
    mode, before releasing them.
1600

1601
    @type level: member of locking.LEVELS
1602
    @param level: the level at which the locks shall be released
1603
    @type names: list of strings, or None
1604
    @param names: the names of the locks which shall be released
1605
        (defaults to all the locks acquired at that level)
1606

1607
    """
1608
    assert level in LEVELS, "Invalid locking level %s" % level
1609
    assert (not self._contains_BGL(level, names) or
1610
            not self._upper_owned(LEVEL_CLUSTER)), (
1611
            "Cannot release the Big Ganeti Lock while holding something"
1612
            " at upper levels (%r)" %
1613
            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1614
                              for i in self.__keyring.keys()]), ))
1615

    
1616
    # Release will complain if we don't own the locks already
1617
    return self.__keyring[level].release(names)
1618

    
1619
  def add(self, level, names, acquired=0, shared=0):
1620
    """Add locks at the specified level.
1621

1622
    @type level: member of locking.LEVELS_MOD
1623
    @param level: the level at which the locks shall be added
1624
    @type names: list of strings
1625
    @param names: names of the locks to acquire
1626
    @type acquired: integer (0/1) used as a boolean
1627
    @param acquired: whether to acquire the newly added locks
1628
    @type shared: integer (0/1) used as a boolean
1629
    @param shared: whether the acquisition will be shared
1630

1631
    """
1632
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1633
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1634
           " operations")
1635
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1636
           " while owning some at a greater one")
1637
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1638

    
1639
  def remove(self, level, names):
1640
    """Remove locks from the specified level.
1641

1642
    You must either already own the locks you are trying to remove
1643
    exclusively or not own any lock at an upper level.
1644

1645
    @type level: member of locking.LEVELS_MOD
1646
    @param level: the level at which the locks shall be removed
1647
    @type names: list of strings
1648
    @param names: the names of the locks which shall be removed
1649
        (special lock names, or instance/node names)
1650

1651
    """
1652
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1653
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1654
           " operations")
1655
    # Check we either own the level or don't own anything from here
1656
    # up. LockSet.remove() will check the case in which we don't own
1657
    # all the needed resources, or we have a shared ownership.
1658
    assert self._is_owned(level) or not self._upper_owned(level), (
1659
           "Cannot remove locks at a level while not owning it or"
1660
           " owning some at a greater one")
1661
    return self.__keyring[level].remove(names)
1662

    
1663

    
1664
def _MonitorSortKey((item, idx, num)):
1665
  """Sorting key function.
1666

1667
  Sort by name, registration order and then order of information. This provides
1668
  a stable sort order over different providers, even if they return the same
1669
  name.
1670

1671
  """
1672
  (name, _, _, _) = item
1673

    
1674
  return (utils.NiceSortKey(name), num, idx)
1675

    
1676

    
1677
class LockMonitor(object):
1678
  _LOCK_ATTR = "_lock"
1679

    
1680
  def __init__(self):
1681
    """Initializes this class.
1682

1683
    """
1684
    self._lock = SharedLock("LockMonitor")
1685

    
1686
    # Counter for stable sorting
1687
    self._counter = itertools.count(0)
1688

    
1689
    # Tracked locks. Weak references are used to avoid issues with circular
1690
    # references and deletion.
1691
    self._locks = weakref.WeakKeyDictionary()
1692

    
1693
  @ssynchronized(_LOCK_ATTR)
1694
  def RegisterLock(self, provider):
1695
    """Registers a new lock.
1696

1697
    @param provider: Object with a callable method named C{GetLockInfo}, taking
1698
      a single C{set} containing the requested information items
1699
    @note: It would be nicer to only receive the function generating the
1700
      requested information but, as it turns out, weak references to bound
1701
      methods (e.g. C{self.GetLockInfo}) are tricky; there are several
1702
      workarounds, but none of the ones I found works properly in combination
1703
      with a standard C{WeakKeyDictionary}
1704

1705
    """
1706
    assert provider not in self._locks, "Duplicate registration"
1707

    
1708
    # There used to be a check for duplicate names here. As it turned out, when
1709
    # a lock is re-created with the same name in a very short timeframe, the
1710
    # previous instance might not yet be removed from the weakref dictionary.
1711
    # By keeping track of the order of incoming registrations, a stable sort
1712
    # ordering can still be guaranteed.
1713

    
1714
    self._locks[provider] = self._counter.next()
1715

    
1716
  def _GetLockInfo(self, requested):
1717
    """Get information from all locks.
1718

1719
    """
1720
    # Must hold lock while getting consistent list of tracked items
1721
    self._lock.acquire(shared=1)
1722
    try:
1723
      items = self._locks.items()
1724
    finally:
1725
      self._lock.release()
1726

    
1727
    return [(info, idx, num)
1728
            for (provider, num) in items
1729
            for (idx, info) in enumerate(provider.GetLockInfo(requested))]
1730

    
1731
  def _Query(self, fields):
1732
    """Queries information from all locks.
1733

1734
    @type fields: list of strings
1735
    @param fields: List of fields to return
1736

1737
    """
1738
    qobj = query.Query(query.LOCK_FIELDS, fields)
1739

    
1740
    # Get all data with internal lock held and then sort by name and incoming
1741
    # order
1742
    lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
1743
                      key=_MonitorSortKey)
1744

    
1745
    # Extract lock information and build query data
1746
    return (qobj, query.LockQueryData(map(compat.fst, lockinfo)))
1747

    
1748
  def QueryLocks(self, fields):
1749
    """Queries information from all locks.
1750

1751
    @type fields: list of strings
1752
    @param fields: List of fields to return
1753

1754
    """
1755
    (qobj, ctx) = self._Query(fields)
1756

    
1757
    # Prepare query response
1758
    return query.GetQueryResponse(qobj, ctx)
1759

    
1760
  def OldStyleQueryLocks(self, fields):
1761
    """Queries information from all locks, returning old-style data.
1762

1763
    @type fields: list of strings
1764
    @param fields: List of fields to return
1765

1766
    """
1767
    (qobj, ctx) = self._Query(fields)
1768

    
1769
    return qobj.OldStyleQuery(ctx)