Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ 8d7d8b57

History | View | Annotate | Download (54.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import errno
32
import weakref
33
import logging
34
import heapq
35
import itertools
36
import time
37

    
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import compat
41
from ganeti import query
42

    
43

    
44
_EXCLUSIVE_TEXT = "exclusive"
45
_SHARED_TEXT = "shared"
46
_DELETED_TEXT = "deleted"
47

    
48
_DEFAULT_PRIORITY = 0
49

    
50

    
51
def ssynchronized(mylock, shared=0):
52
  """Shared Synchronization decorator.
53

54
  Calls the function holding the given lock, either in exclusive or shared
55
  mode. It requires the passed lock to be a SharedLock (or support its
56
  semantics).
57

58
  @type mylock: lockable object or string
59
  @param mylock: lock to acquire or class member name of the lock to acquire
60

61
  """
62
  def wrap(fn):
63
    def sync_function(*args, **kwargs):
64
      if isinstance(mylock, basestring):
65
        assert args, "cannot ssynchronize on non-class method: self not found"
66
        # args[0] is "self"
67
        lock = getattr(args[0], mylock)
68
      else:
69
        lock = mylock
70
      lock.acquire(shared=shared)
71
      try:
72
        return fn(*args, **kwargs)
73
      finally:
74
        lock.release()
75
    return sync_function
76
  return wrap
77

    
78

    
79
class _SingleNotifyPipeConditionWaiter(object):
80
  """Helper class for SingleNotifyPipeCondition
81

82
  """
83
  __slots__ = [
84
    "_fd",
85
    "_poller",
86
    ]
87

    
88
  def __init__(self, poller, fd):
89
    """Constructor for _SingleNotifyPipeConditionWaiter
90

91
    @type poller: select.poll
92
    @param poller: Poller object
93
    @type fd: int
94
    @param fd: File descriptor to wait for
95

96
    """
97
    object.__init__(self)
98
    self._poller = poller
99
    self._fd = fd
100

    
101
  def __call__(self, timeout):
102
    """Wait for something to happen on the pipe.
103

104
    @type timeout: float or None
105
    @param timeout: Timeout for waiting (can be None)
106

107
    """
108
    running_timeout = utils.RunningTimeout(timeout, True)
109

    
110
    while True:
111
      remaining_time = running_timeout.Remaining()
112

    
113
      if remaining_time is not None:
114
        if remaining_time < 0.0:
115
          break
116

    
117
        # Our calculation uses seconds, poll() wants milliseconds
118
        remaining_time *= 1000
119

    
120
      try:
121
        result = self._poller.poll(remaining_time)
122
      except EnvironmentError, err:
123
        if err.errno != errno.EINTR:
124
          raise
125
        result = None
126

    
127
      # Check whether we were notified
128
      if result and result[0][0] == self._fd:
129
        break
130

    
131

    
132
class _BaseCondition(object):
133
  """Base class containing common code for conditions.
134

135
  Some of this code is taken from python's threading module.
136

137
  """
138
  __slots__ = [
139
    "_lock",
140
    "acquire",
141
    "release",
142
    "_is_owned",
143
    "_acquire_restore",
144
    "_release_save",
145
    ]
146

    
147
  def __init__(self, lock):
148
    """Constructor for _BaseCondition.
149

150
    @type lock: threading.Lock
151
    @param lock: condition base lock
152

153
    """
154
    object.__init__(self)
155

    
156
    try:
157
      self._release_save = lock._release_save
158
    except AttributeError:
159
      self._release_save = self._base_release_save
160
    try:
161
      self._acquire_restore = lock._acquire_restore
162
    except AttributeError:
163
      self._acquire_restore = self._base_acquire_restore
164
    try:
165
      self._is_owned = lock._is_owned
166
    except AttributeError:
167
      self._is_owned = self._base_is_owned
168

    
169
    self._lock = lock
170

    
171
    # Export the lock's acquire() and release() methods
172
    self.acquire = lock.acquire
173
    self.release = lock.release
174

    
175
  def _base_is_owned(self):
176
    """Check whether lock is owned by current thread.
177

178
    """
179
    if self._lock.acquire(0):
180
      self._lock.release()
181
      return False
182
    return True
183

    
184
  def _base_release_save(self):
185
    self._lock.release()
186

    
187
  def _base_acquire_restore(self, _):
188
    self._lock.acquire()
189

    
190
  def _check_owned(self):
191
    """Raise an exception if the current thread doesn't own the lock.
192

193
    """
194
    if not self._is_owned():
195
      raise RuntimeError("cannot work with un-aquired lock")
196

    
197

    
198
class SingleNotifyPipeCondition(_BaseCondition):
199
  """Condition which can only be notified once.
200

201
  This condition class uses pipes and poll, internally, to be able to wait for
202
  notification with a timeout, without resorting to polling. It is almost
203
  compatible with Python's threading.Condition, with the following differences:
204
    - notifyAll can only be called once, and no wait can happen after that
205
    - notify is not supported, only notifyAll
206

207
  """
208

    
209
  __slots__ = [
210
    "_poller",
211
    "_read_fd",
212
    "_write_fd",
213
    "_nwaiters",
214
    "_notified",
215
    ]
216

    
217
  _waiter_class = _SingleNotifyPipeConditionWaiter
218

    
219
  def __init__(self, lock):
220
    """Constructor for SingleNotifyPipeCondition
221

222
    """
223
    _BaseCondition.__init__(self, lock)
224
    self._nwaiters = 0
225
    self._notified = False
226
    self._read_fd = None
227
    self._write_fd = None
228
    self._poller = None
229

    
230
  def _check_unnotified(self):
231
    """Throws an exception if already notified.
232

233
    """
234
    if self._notified:
235
      raise RuntimeError("cannot use already notified condition")
236

    
237
  def _Cleanup(self):
238
    """Cleanup open file descriptors, if any.
239

240
    """
241
    if self._read_fd is not None:
242
      os.close(self._read_fd)
243
      self._read_fd = None
244

    
245
    if self._write_fd is not None:
246
      os.close(self._write_fd)
247
      self._write_fd = None
248
    self._poller = None
249

    
250
  def wait(self, timeout):
251
    """Wait for a notification.
252

253
    @type timeout: float or None
254
    @param timeout: Waiting timeout (can be None)
255

256
    """
257
    self._check_owned()
258
    self._check_unnotified()
259

    
260
    self._nwaiters += 1
261
    try:
262
      if self._poller is None:
263
        (self._read_fd, self._write_fd) = os.pipe()
264
        self._poller = select.poll()
265
        self._poller.register(self._read_fd, select.POLLHUP)
266

    
267
      wait_fn = self._waiter_class(self._poller, self._read_fd)
268
      state = self._release_save()
269
      try:
270
        # Wait for notification
271
        wait_fn(timeout)
272
      finally:
273
        # Re-acquire lock
274
        self._acquire_restore(state)
275
    finally:
276
      self._nwaiters -= 1
277
      if self._nwaiters == 0:
278
        self._Cleanup()
279

    
280
  def notifyAll(self): # pylint: disable=C0103
281
    """Close the writing side of the pipe to notify all waiters.
282

283
    """
284
    self._check_owned()
285
    self._check_unnotified()
286
    self._notified = True
287
    if self._write_fd is not None:
288
      os.close(self._write_fd)
289
      self._write_fd = None
290

    
291

    
292
class PipeCondition(_BaseCondition):
293
  """Group-only non-polling condition with counters.
294

295
  This condition class uses pipes and poll, internally, to be able to wait for
296
  notification with a timeout, without resorting to polling. It is almost
297
  compatible with Python's threading.Condition, but only supports notifyAll and
298
  non-recursive locks. As an additional features it's able to report whether
299
  there are any waiting threads.
300

301
  """
302
  __slots__ = [
303
    "_waiters",
304
    "_single_condition",
305
    ]
306

    
307
  _single_condition_class = SingleNotifyPipeCondition
308

    
309
  def __init__(self, lock):
310
    """Initializes this class.
311

312
    """
313
    _BaseCondition.__init__(self, lock)
314
    self._waiters = set()
315
    self._single_condition = self._single_condition_class(self._lock)
316

    
317
  def wait(self, timeout):
318
    """Wait for a notification.
319

320
    @type timeout: float or None
321
    @param timeout: Waiting timeout (can be None)
322

323
    """
324
    self._check_owned()
325

    
326
    # Keep local reference to the pipe. It could be replaced by another thread
327
    # notifying while we're waiting.
328
    cond = self._single_condition
329

    
330
    self._waiters.add(threading.currentThread())
331
    try:
332
      cond.wait(timeout)
333
    finally:
334
      self._check_owned()
335
      self._waiters.remove(threading.currentThread())
336

    
337
  def notifyAll(self): # pylint: disable=C0103
338
    """Notify all currently waiting threads.
339

340
    """
341
    self._check_owned()
342
    self._single_condition.notifyAll()
343
    self._single_condition = self._single_condition_class(self._lock)
344

    
345
  def get_waiting(self):
346
    """Returns a list of all waiting threads.
347

348
    """
349
    self._check_owned()
350

    
351
    return self._waiters
352

    
353
  def has_waiting(self):
354
    """Returns whether there are active waiters.
355

356
    """
357
    self._check_owned()
358

    
359
    return bool(self._waiters)
360

    
361
  def __repr__(self):
362
    return ("<%s.%s waiters=%s at %#x>" %
363
            (self.__class__.__module__, self.__class__.__name__,
364
             self._waiters, id(self)))
365

    
366

    
367
class _PipeConditionWithMode(PipeCondition):
368
  __slots__ = [
369
    "shared",
370
    ]
371

    
372
  def __init__(self, lock, shared):
373
    """Initializes this class.
374

375
    """
376
    self.shared = shared
377
    PipeCondition.__init__(self, lock)
378

    
379

    
380
class SharedLock(object):
381
  """Implements a shared lock.
382

383
  Multiple threads can acquire the lock in a shared way by calling
384
  C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
385
  threads can call C{acquire(shared=0)}.
386

387
  Notes on data structures: C{__pending} contains a priority queue (heapq) of
388
  all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
389
  ...]}. Each per-priority queue contains a normal in-order list of conditions
390
  to be notified when the lock can be acquired. Shared locks are grouped
391
  together by priority and the condition for them is stored in
392
  C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
393
  references for the per-priority queues indexed by priority for faster access.
394

395
  @type name: string
396
  @ivar name: the name of the lock
397

398
  """
399
  __slots__ = [
400
    "__weakref__",
401
    "__deleted",
402
    "__exc",
403
    "__lock",
404
    "__pending",
405
    "__pending_by_prio",
406
    "__pending_shared",
407
    "__shr",
408
    "__time_fn",
409
    "name",
410
    ]
411

    
412
  __condition_class = _PipeConditionWithMode
413

    
414
  def __init__(self, name, monitor=None, _time_fn=time.time):
415
    """Construct a new SharedLock.
416

417
    @param name: the name of the lock
418
    @type monitor: L{LockMonitor}
419
    @param monitor: Lock monitor with which to register
420

421
    """
422
    object.__init__(self)
423

    
424
    self.name = name
425

    
426
    # Used for unittesting
427
    self.__time_fn = _time_fn
428

    
429
    # Internal lock
430
    self.__lock = threading.Lock()
431

    
432
    # Queue containing waiting acquires
433
    self.__pending = []
434
    self.__pending_by_prio = {}
435
    self.__pending_shared = {}
436

    
437
    # Current lock holders
438
    self.__shr = set()
439
    self.__exc = None
440

    
441
    # is this lock in the deleted state?
442
    self.__deleted = False
443

    
444
    # Register with lock monitor
445
    if monitor:
446
      logging.debug("Adding lock %s to monitor", name)
447
      monitor.RegisterLock(self)
448

    
449
  def __repr__(self):
450
    return ("<%s.%s name=%s at %#x>" %
451
            (self.__class__.__module__, self.__class__.__name__,
452
             self.name, id(self)))
453

    
454
  def GetLockInfo(self, requested):
455
    """Retrieves information for querying locks.
456

457
    @type requested: set
458
    @param requested: Requested information, see C{query.LQ_*}
459

460
    """
461
    self.__lock.acquire()
462
    try:
463
      # Note: to avoid unintentional race conditions, no references to
464
      # modifiable objects should be returned unless they were created in this
465
      # function.
466
      mode = None
467
      owner_names = None
468

    
469
      if query.LQ_MODE in requested:
470
        if self.__deleted:
471
          mode = _DELETED_TEXT
472
          assert not (self.__exc or self.__shr)
473
        elif self.__exc:
474
          mode = _EXCLUSIVE_TEXT
475
        elif self.__shr:
476
          mode = _SHARED_TEXT
477

    
478
      # Current owner(s) are wanted
479
      if query.LQ_OWNER in requested:
480
        if self.__exc:
481
          owner = [self.__exc]
482
        else:
483
          owner = self.__shr
484

    
485
        if owner:
486
          assert not self.__deleted
487
          owner_names = [i.getName() for i in owner]
488

    
489
      # Pending acquires are wanted
490
      if query.LQ_PENDING in requested:
491
        pending = []
492

    
493
        # Sorting instead of copying and using heaq functions for simplicity
494
        for (_, prioqueue) in sorted(self.__pending):
495
          for cond in prioqueue:
496
            if cond.shared:
497
              pendmode = _SHARED_TEXT
498
            else:
499
              pendmode = _EXCLUSIVE_TEXT
500

    
501
            # List of names will be sorted in L{query._GetLockPending}
502
            pending.append((pendmode, [i.getName()
503
                                       for i in cond.get_waiting()]))
504
      else:
505
        pending = None
506

    
507
      return [(self.name, mode, owner_names, pending)]
508
    finally:
509
      self.__lock.release()
510

    
511
  def __check_deleted(self):
512
    """Raises an exception if the lock has been deleted.
513

514
    """
515
    if self.__deleted:
516
      raise errors.LockError("Deleted lock %s" % self.name)
517

    
518
  def __is_sharer(self):
519
    """Is the current thread sharing the lock at this time?
520

521
    """
522
    return threading.currentThread() in self.__shr
523

    
524
  def __is_exclusive(self):
525
    """Is the current thread holding the lock exclusively at this time?
526

527
    """
528
    return threading.currentThread() == self.__exc
529

    
530
  def __is_owned(self, shared=-1):
531
    """Is the current thread somehow owning the lock at this time?
532

533
    This is a private version of the function, which presumes you're holding
534
    the internal lock.
535

536
    """
537
    if shared < 0:
538
      return self.__is_sharer() or self.__is_exclusive()
539
    elif shared:
540
      return self.__is_sharer()
541
    else:
542
      return self.__is_exclusive()
543

    
544
  def _is_owned(self, shared=-1):
545
    """Is the current thread somehow owning the lock at this time?
546

547
    @param shared:
548
        - < 0: check for any type of ownership (default)
549
        - 0: check for exclusive ownership
550
        - > 0: check for shared ownership
551

552
    """
553
    self.__lock.acquire()
554
    try:
555
      return self.__is_owned(shared=shared)
556
    finally:
557
      self.__lock.release()
558

    
559
  is_owned = _is_owned
560

    
561
  def _count_pending(self):
562
    """Returns the number of pending acquires.
563

564
    @rtype: int
565

566
    """
567
    self.__lock.acquire()
568
    try:
569
      return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
570
    finally:
571
      self.__lock.release()
572

    
573
  def _check_empty(self):
574
    """Checks whether there are any pending acquires.
575

576
    @rtype: bool
577

578
    """
579
    self.__lock.acquire()
580
    try:
581
      # Order is important: __find_first_pending_queue modifies __pending
582
      (_, prioqueue) = self.__find_first_pending_queue()
583

    
584
      return not (prioqueue or
585
                  self.__pending or
586
                  self.__pending_by_prio or
587
                  self.__pending_shared)
588
    finally:
589
      self.__lock.release()
590

    
591
  def __do_acquire(self, shared):
592
    """Actually acquire the lock.
593

594
    """
595
    if shared:
596
      self.__shr.add(threading.currentThread())
597
    else:
598
      self.__exc = threading.currentThread()
599

    
600
  def __can_acquire(self, shared):
601
    """Determine whether lock can be acquired.
602

603
    """
604
    if shared:
605
      return self.__exc is None
606
    else:
607
      return len(self.__shr) == 0 and self.__exc is None
608

    
609
  def __find_first_pending_queue(self):
610
    """Tries to find the topmost queued entry with pending acquires.
611

612
    Removes empty entries while going through the list.
613

614
    """
615
    while self.__pending:
616
      (priority, prioqueue) = self.__pending[0]
617

    
618
      if prioqueue:
619
        return (priority, prioqueue)
620

    
621
      # Remove empty queue
622
      heapq.heappop(self.__pending)
623
      del self.__pending_by_prio[priority]
624
      assert priority not in self.__pending_shared
625

    
626
    return (None, None)
627

    
628
  def __is_on_top(self, cond):
629
    """Checks whether the passed condition is on top of the queue.
630

631
    The caller must make sure the queue isn't empty.
632

633
    """
634
    (_, prioqueue) = self.__find_first_pending_queue()
635

    
636
    return cond == prioqueue[0]
637

    
638
  def __acquire_unlocked(self, shared, timeout, priority):
639
    """Acquire a shared lock.
640

641
    @param shared: whether to acquire in shared mode; by default an
642
        exclusive lock will be acquired
643
    @param timeout: maximum waiting time before giving up
644
    @type priority: integer
645
    @param priority: Priority for acquiring lock
646

647
    """
648
    self.__check_deleted()
649

    
650
    # We cannot acquire the lock if we already have it
651
    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
652
                                   " %s" % self.name)
653

    
654
    # Remove empty entries from queue
655
    self.__find_first_pending_queue()
656

    
657
    # Check whether someone else holds the lock or there are pending acquires.
658
    if not self.__pending and self.__can_acquire(shared):
659
      # Apparently not, can acquire lock directly.
660
      self.__do_acquire(shared)
661
      return True
662

    
663
    prioqueue = self.__pending_by_prio.get(priority, None)
664

    
665
    if shared:
666
      # Try to re-use condition for shared acquire
667
      wait_condition = self.__pending_shared.get(priority, None)
668
      assert (wait_condition is None or
669
              (wait_condition.shared and wait_condition in prioqueue))
670
    else:
671
      wait_condition = None
672

    
673
    if wait_condition is None:
674
      if prioqueue is None:
675
        assert priority not in self.__pending_by_prio
676

    
677
        prioqueue = []
678
        heapq.heappush(self.__pending, (priority, prioqueue))
679
        self.__pending_by_prio[priority] = prioqueue
680

    
681
      wait_condition = self.__condition_class(self.__lock, shared)
682
      prioqueue.append(wait_condition)
683

    
684
      if shared:
685
        # Keep reference for further shared acquires on same priority. This is
686
        # better than trying to find it in the list of pending acquires.
687
        assert priority not in self.__pending_shared
688
        self.__pending_shared[priority] = wait_condition
689

    
690
    wait_start = self.__time_fn()
691
    acquired = False
692

    
693
    try:
694
      # Wait until we become the topmost acquire in the queue or the timeout
695
      # expires.
696
      while True:
697
        if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
698
          self.__do_acquire(shared)
699
          acquired = True
700
          break
701

    
702
        # A lot of code assumes blocking acquires always succeed, therefore we
703
        # can never return False for a blocking acquire
704
        if (timeout is not None and
705
            utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)):
706
          break
707

    
708
        # Wait for notification
709
        wait_condition.wait(timeout)
710
        self.__check_deleted()
711
    finally:
712
      # Remove condition from queue if there are no more waiters
713
      if not wait_condition.has_waiting():
714
        prioqueue.remove(wait_condition)
715
        if wait_condition.shared:
716
          # Remove from list of shared acquires if it wasn't while releasing
717
          # (e.g. on lock deletion)
718
          self.__pending_shared.pop(priority, None)
719

    
720
    return acquired
721

    
722
  def acquire(self, shared=0, timeout=None, priority=None,
723
              test_notify=None):
724
    """Acquire a shared lock.
725

726
    @type shared: integer (0/1) used as a boolean
727
    @param shared: whether to acquire in shared mode; by default an
728
        exclusive lock will be acquired
729
    @type timeout: float
730
    @param timeout: maximum waiting time before giving up
731
    @type priority: integer
732
    @param priority: Priority for acquiring lock
733
    @type test_notify: callable or None
734
    @param test_notify: Special callback function for unittesting
735

736
    """
737
    if priority is None:
738
      priority = _DEFAULT_PRIORITY
739

    
740
    self.__lock.acquire()
741
    try:
742
      # We already got the lock, notify now
743
      if __debug__ and callable(test_notify):
744
        test_notify()
745

    
746
      return self.__acquire_unlocked(shared, timeout, priority)
747
    finally:
748
      self.__lock.release()
749

    
750
  def downgrade(self):
751
    """Changes the lock mode from exclusive to shared.
752

753
    Pending acquires in shared mode on the same priority will go ahead.
754

755
    """
756
    self.__lock.acquire()
757
    try:
758
      assert self.__is_owned(), "Lock must be owned"
759

    
760
      if self.__is_exclusive():
761
        # Do nothing if the lock is already acquired in shared mode
762
        self.__exc = None
763
        self.__do_acquire(1)
764

    
765
        # Important: pending shared acquires should only jump ahead if there
766
        # was a transition from exclusive to shared, otherwise an owner of a
767
        # shared lock can keep calling this function to push incoming shared
768
        # acquires
769
        (priority, prioqueue) = self.__find_first_pending_queue()
770
        if prioqueue:
771
          # Is there a pending shared acquire on this priority?
772
          cond = self.__pending_shared.pop(priority, None)
773
          if cond:
774
            assert cond.shared
775
            assert cond in prioqueue
776

    
777
            # Ensure shared acquire is on top of queue
778
            if len(prioqueue) > 1:
779
              prioqueue.remove(cond)
780
              prioqueue.insert(0, cond)
781

    
782
            # Notify
783
            cond.notifyAll()
784

    
785
      assert not self.__is_exclusive()
786
      assert self.__is_sharer()
787

    
788
      return True
789
    finally:
790
      self.__lock.release()
791

    
792
  def release(self):
793
    """Release a Shared Lock.
794

795
    You must have acquired the lock, either in shared or in exclusive mode,
796
    before calling this function.
797

798
    """
799
    self.__lock.acquire()
800
    try:
801
      assert self.__is_exclusive() or self.__is_sharer(), \
802
        "Cannot release non-owned lock"
803

    
804
      # Autodetect release type
805
      if self.__is_exclusive():
806
        self.__exc = None
807
      else:
808
        self.__shr.remove(threading.currentThread())
809

    
810
      # Notify topmost condition in queue
811
      self.__notify_topmost()
812
    finally:
813
      self.__lock.release()
814

    
815
  def __notify_topmost(self):
816
    """Notifies topmost condition in queue of pending acquires.
817

818
    """
819
    (priority, prioqueue) = self.__find_first_pending_queue()
820
    if prioqueue:
821
      cond = prioqueue[0]
822
      cond.notifyAll()
823
      if cond.shared:
824
        # Prevent further shared acquires from sneaking in while waiters are
825
        # notified
826
        self.__pending_shared.pop(priority, None)
827

    
828
  def _notify_topmost(self):
829
    """Exported version of L{__notify_topmost}.
830

831
    """
832
    self.__lock.acquire()
833
    try:
834
      return self.__notify_topmost()
835
    finally:
836
      self.__lock.release()
837

    
838
  def delete(self, timeout=None, priority=None):
839
    """Delete a Shared Lock.
840

841
    This operation will declare the lock for removal. First the lock will be
842
    acquired in exclusive mode if you don't already own it, then the lock
843
    will be put in a state where any future and pending acquire() fail.
844

845
    @type timeout: float
846
    @param timeout: maximum waiting time before giving up
847
    @type priority: integer
848
    @param priority: Priority for acquiring lock
849

850
    """
851
    if priority is None:
852
      priority = _DEFAULT_PRIORITY
853

    
854
    self.__lock.acquire()
855
    try:
856
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
857

    
858
      self.__check_deleted()
859

    
860
      # The caller is allowed to hold the lock exclusively already.
861
      acquired = self.__is_exclusive()
862

    
863
      if not acquired:
864
        acquired = self.__acquire_unlocked(0, timeout, priority)
865

    
866
      if acquired:
867
        assert self.__is_exclusive() and not self.__is_sharer(), \
868
          "Lock wasn't acquired in exclusive mode"
869

    
870
        self.__deleted = True
871
        self.__exc = None
872

    
873
        assert not (self.__exc or self.__shr), "Found owner during deletion"
874

    
875
        # Notify all acquires. They'll throw an error.
876
        for (_, prioqueue) in self.__pending:
877
          for cond in prioqueue:
878
            cond.notifyAll()
879

    
880
        assert self.__deleted
881

    
882
      return acquired
883
    finally:
884
      self.__lock.release()
885

    
886
  def _release_save(self):
887
    shared = self.__is_sharer()
888
    self.release()
889
    return shared
890

    
891
  def _acquire_restore(self, shared):
892
    self.acquire(shared=shared)
893

    
894

    
895
# Whenever we want to acquire a full LockSet we pass None as the value
896
# to acquire.  Hide this behind this nicely named constant.
897
ALL_SET = None
898

    
899

    
900
class _AcquireTimeout(Exception):
901
  """Internal exception to abort an acquire on a timeout.
902

903
  """
904

    
905

    
906
class LockSet:
907
  """Implements a set of locks.
908

909
  This abstraction implements a set of shared locks for the same resource type,
910
  distinguished by name. The user can lock a subset of the resources and the
911
  LockSet will take care of acquiring the locks always in the same order, thus
912
  preventing deadlock.
913

914
  All the locks needed in the same set must be acquired together, though.
915

916
  @type name: string
917
  @ivar name: the name of the lockset
918

919
  """
920
  def __init__(self, members, name, monitor=None):
921
    """Constructs a new LockSet.
922

923
    @type members: list of strings
924
    @param members: initial members of the set
925
    @type monitor: L{LockMonitor}
926
    @param monitor: Lock monitor with which to register member locks
927

928
    """
929
    assert members is not None, "members parameter is not a list"
930
    self.name = name
931

    
932
    # Lock monitor
933
    self.__monitor = monitor
934

    
935
    # Used internally to guarantee coherency
936
    self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
937

    
938
    # The lockdict indexes the relationship name -> lock
939
    # The order-of-locking is implied by the alphabetical order of names
940
    self.__lockdict = {}
941

    
942
    for mname in members:
943
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
944
                                          monitor=monitor)
945

    
946
    # The owner dict contains the set of locks each thread owns. For
947
    # performance each thread can access its own key without a global lock on
948
    # this structure. It is paramount though that *no* other type of access is
949
    # done to this structure (eg. no looping over its keys). *_owner helper
950
    # function are defined to guarantee access is correct, but in general never
951
    # do anything different than __owners[threading.currentThread()], or there
952
    # will be trouble.
953
    self.__owners = {}
954

    
955
  def _GetLockName(self, mname):
956
    """Returns the name for a member lock.
957

958
    """
959
    return "%s/%s" % (self.name, mname)
960

    
961
  def _get_lock(self):
962
    """Returns the lockset-internal lock.
963

964
    """
965
    return self.__lock
966

    
967
  def _get_lockdict(self):
968
    """Returns the lockset-internal lock dictionary.
969

970
    Accessing this structure is only safe in single-thread usage or when the
971
    lockset-internal lock is held.
972

973
    """
974
    return self.__lockdict
975

    
976
  def _is_owned(self):
977
    """Is the current thread a current level owner?"""
978
    return threading.currentThread() in self.__owners
979

    
980
  def _add_owned(self, name=None):
981
    """Note the current thread owns the given lock"""
982
    if name is None:
983
      if not self._is_owned():
984
        self.__owners[threading.currentThread()] = set()
985
    else:
986
      if self._is_owned():
987
        self.__owners[threading.currentThread()].add(name)
988
      else:
989
        self.__owners[threading.currentThread()] = set([name])
990

    
991
  def _del_owned(self, name=None):
992
    """Note the current thread owns the given lock"""
993

    
994
    assert not (name is None and self.__lock._is_owned()), \
995
           "Cannot hold internal lock when deleting owner status"
996

    
997
    if name is not None:
998
      self.__owners[threading.currentThread()].remove(name)
999

    
1000
    # Only remove the key if we don't hold the set-lock as well
1001
    if (not self.__lock._is_owned() and
1002
        not self.__owners[threading.currentThread()]):
1003
      del self.__owners[threading.currentThread()]
1004

    
1005
  def _list_owned(self):
1006
    """Get the set of resource names owned by the current thread"""
1007
    if self._is_owned():
1008
      return self.__owners[threading.currentThread()].copy()
1009
    else:
1010
      return set()
1011

    
1012
  def _release_and_delete_owned(self):
1013
    """Release and delete all resources owned by the current thread"""
1014
    for lname in self._list_owned():
1015
      lock = self.__lockdict[lname]
1016
      if lock._is_owned():
1017
        lock.release()
1018
      self._del_owned(name=lname)
1019

    
1020
  def __names(self):
1021
    """Return the current set of names.
1022

1023
    Only call this function while holding __lock and don't iterate on the
1024
    result after releasing the lock.
1025

1026
    """
1027
    return self.__lockdict.keys()
1028

    
1029
  def _names(self):
1030
    """Return a copy of the current set of elements.
1031

1032
    Used only for debugging purposes.
1033

1034
    """
1035
    # If we don't already own the set-level lock acquired
1036
    # we'll get it and note we need to release it later.
1037
    release_lock = False
1038
    if not self.__lock._is_owned():
1039
      release_lock = True
1040
      self.__lock.acquire(shared=1)
1041
    try:
1042
      result = self.__names()
1043
    finally:
1044
      if release_lock:
1045
        self.__lock.release()
1046
    return set(result)
1047

    
1048
  def acquire(self, names, timeout=None, shared=0, priority=None,
1049
              test_notify=None):
1050
    """Acquire a set of resource locks.
1051

1052
    @type names: list of strings (or string)
1053
    @param names: the names of the locks which shall be acquired
1054
        (special lock names, or instance/node names)
1055
    @type shared: integer (0/1) used as a boolean
1056
    @param shared: whether to acquire in shared mode; by default an
1057
        exclusive lock will be acquired
1058
    @type timeout: float or None
1059
    @param timeout: Maximum time to acquire all locks
1060
    @type priority: integer
1061
    @param priority: Priority for acquiring locks
1062
    @type test_notify: callable or None
1063
    @param test_notify: Special callback function for unittesting
1064

1065
    @return: Set of all locks successfully acquired or None in case of timeout
1066

1067
    @raise errors.LockError: when any lock we try to acquire has
1068
        been deleted before we succeed. In this case none of the
1069
        locks requested will be acquired.
1070

1071
    """
1072
    assert timeout is None or timeout >= 0.0
1073

    
1074
    # Check we don't already own locks at this level
1075
    assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
1076
                                  " (lockset %s)" % self.name)
1077

    
1078
    if priority is None:
1079
      priority = _DEFAULT_PRIORITY
1080

    
1081
    # We need to keep track of how long we spent waiting for a lock. The
1082
    # timeout passed to this function is over all lock acquires.
1083
    running_timeout = utils.RunningTimeout(timeout, False)
1084

    
1085
    try:
1086
      if names is not None:
1087
        # Support passing in a single resource to acquire rather than many
1088
        if isinstance(names, basestring):
1089
          names = [names]
1090

    
1091
        return self.__acquire_inner(names, False, shared, priority,
1092
                                    running_timeout.Remaining, test_notify)
1093

    
1094
      else:
1095
        # If no names are given acquire the whole set by not letting new names
1096
        # being added before we release, and getting the current list of names.
1097
        # Some of them may then be deleted later, but we'll cope with this.
1098
        #
1099
        # We'd like to acquire this lock in a shared way, as it's nice if
1100
        # everybody else can use the instances at the same time. If we are
1101
        # acquiring them exclusively though they won't be able to do this
1102
        # anyway, though, so we'll get the list lock exclusively as well in
1103
        # order to be able to do add() on the set while owning it.
1104
        if not self.__lock.acquire(shared=shared, priority=priority,
1105
                                   timeout=running_timeout.Remaining()):
1106
          raise _AcquireTimeout()
1107
        try:
1108
          # note we own the set-lock
1109
          self._add_owned()
1110

    
1111
          return self.__acquire_inner(self.__names(), True, shared, priority,
1112
                                      running_timeout.Remaining, test_notify)
1113
        except:
1114
          # We shouldn't have problems adding the lock to the owners list, but
1115
          # if we did we'll try to release this lock and re-raise exception.
1116
          # Of course something is going to be really wrong, after this.
1117
          self.__lock.release()
1118
          self._del_owned()
1119
          raise
1120

    
1121
    except _AcquireTimeout:
1122
      return None
1123

    
1124
  def __acquire_inner(self, names, want_all, shared, priority,
1125
                      timeout_fn, test_notify):
1126
    """Inner logic for acquiring a number of locks.
1127

1128
    @param names: Names of the locks to be acquired
1129
    @param want_all: Whether all locks in the set should be acquired
1130
    @param shared: Whether to acquire in shared mode
1131
    @param timeout_fn: Function returning remaining timeout
1132
    @param priority: Priority for acquiring locks
1133
    @param test_notify: Special callback function for unittesting
1134

1135
    """
1136
    acquire_list = []
1137

    
1138
    # First we look the locks up on __lockdict. We have no way of being sure
1139
    # they will still be there after, but this makes it a lot faster should
1140
    # just one of them be the already wrong. Using a sorted sequence to prevent
1141
    # deadlocks.
1142
    for lname in sorted(utils.UniqueSequence(names)):
1143
      try:
1144
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
1145
      except KeyError:
1146
        if want_all:
1147
          # We are acquiring all the set, it doesn't matter if this particular
1148
          # element is not there anymore.
1149
          continue
1150

    
1151
        raise errors.LockError("Non-existing lock %s in set %s (it may have"
1152
                               " been removed)" % (lname, self.name))
1153

    
1154
      acquire_list.append((lname, lock))
1155

    
1156
    # This will hold the locknames we effectively acquired.
1157
    acquired = set()
1158

    
1159
    try:
1160
      # Now acquire_list contains a sorted list of resources and locks we
1161
      # want.  In order to get them we loop on this (private) list and
1162
      # acquire() them.  We gave no real guarantee they will still exist till
1163
      # this is done but .acquire() itself is safe and will alert us if the
1164
      # lock gets deleted.
1165
      for (lname, lock) in acquire_list:
1166
        if __debug__ and callable(test_notify):
1167
          test_notify_fn = lambda: test_notify(lname)
1168
        else:
1169
          test_notify_fn = None
1170

    
1171
        timeout = timeout_fn()
1172

    
1173
        try:
1174
          # raises LockError if the lock was deleted
1175
          acq_success = lock.acquire(shared=shared, timeout=timeout,
1176
                                     priority=priority,
1177
                                     test_notify=test_notify_fn)
1178
        except errors.LockError:
1179
          if want_all:
1180
            # We are acquiring all the set, it doesn't matter if this
1181
            # particular element is not there anymore.
1182
            continue
1183

    
1184
          raise errors.LockError("Non-existing lock %s in set %s (it may"
1185
                                 " have been removed)" % (lname, self.name))
1186

    
1187
        if not acq_success:
1188
          # Couldn't get lock or timeout occurred
1189
          if timeout is None:
1190
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
1191
            # blocking.
1192
            raise errors.LockError("Failed to get lock %s (set %s)" %
1193
                                   (lname, self.name))
1194

    
1195
          raise _AcquireTimeout()
1196

    
1197
        try:
1198
          # now the lock cannot be deleted, we have it!
1199
          self._add_owned(name=lname)
1200
          acquired.add(lname)
1201

    
1202
        except:
1203
          # We shouldn't have problems adding the lock to the owners list, but
1204
          # if we did we'll try to release this lock and re-raise exception.
1205
          # Of course something is going to be really wrong after this.
1206
          if lock._is_owned():
1207
            lock.release()
1208
          raise
1209

    
1210
    except:
1211
      # Release all owned locks
1212
      self._release_and_delete_owned()
1213
      raise
1214

    
1215
    return acquired
1216

    
1217
  def downgrade(self, names=None):
1218
    """Downgrade a set of resource locks from exclusive to shared mode.
1219

1220
    The locks must have been acquired in exclusive mode.
1221

1222
    """
1223
    assert self._is_owned(), ("downgrade on lockset %s while not owning any"
1224
                              " lock" % self.name)
1225

    
1226
    # Support passing in a single resource to downgrade rather than many
1227
    if isinstance(names, basestring):
1228
      names = [names]
1229

    
1230
    owned = self._list_owned()
1231

    
1232
    if names is None:
1233
      names = owned
1234
    else:
1235
      names = set(names)
1236
      assert owned.issuperset(names), \
1237
        ("downgrade() on unheld resources %s (set %s)" %
1238
         (names.difference(owned), self.name))
1239

    
1240
    for lockname in names:
1241
      self.__lockdict[lockname].downgrade()
1242

    
1243
    # Do we own the lockset in exclusive mode?
1244
    if self.__lock._is_owned(shared=0):
1245
      # Have all locks been downgraded?
1246
      if not compat.any(lock._is_owned(shared=0)
1247
                        for lock in self.__lockdict.values()):
1248
        self.__lock.downgrade()
1249
        assert self.__lock._is_owned(shared=1)
1250

    
1251
    return True
1252

    
1253
  def release(self, names=None):
1254
    """Release a set of resource locks, at the same level.
1255

1256
    You must have acquired the locks, either in shared or in exclusive mode,
1257
    before releasing them.
1258

1259
    @type names: list of strings, or None
1260
    @param names: the names of the locks which shall be released
1261
        (defaults to all the locks acquired at that level).
1262

1263
    """
1264
    assert self._is_owned(), ("release() on lock set %s while not owner" %
1265
                              self.name)
1266

    
1267
    # Support passing in a single resource to release rather than many
1268
    if isinstance(names, basestring):
1269
      names = [names]
1270

    
1271
    if names is None:
1272
      names = self._list_owned()
1273
    else:
1274
      names = set(names)
1275
      assert self._list_owned().issuperset(names), (
1276
               "release() on unheld resources %s (set %s)" %
1277
               (names.difference(self._list_owned()), self.name))
1278

    
1279
    # First of all let's release the "all elements" lock, if set.
1280
    # After this 'add' can work again
1281
    if self.__lock._is_owned():
1282
      self.__lock.release()
1283
      self._del_owned()
1284

    
1285
    for lockname in names:
1286
      # If we are sure the lock doesn't leave __lockdict without being
1287
      # exclusively held we can do this...
1288
      self.__lockdict[lockname].release()
1289
      self._del_owned(name=lockname)
1290

    
1291
  def add(self, names, acquired=0, shared=0):
1292
    """Add a new set of elements to the set
1293

1294
    @type names: list of strings
1295
    @param names: names of the new elements to add
1296
    @type acquired: integer (0/1) used as a boolean
1297
    @param acquired: pre-acquire the new resource?
1298
    @type shared: integer (0/1) used as a boolean
1299
    @param shared: is the pre-acquisition shared?
1300

1301
    """
1302
    # Check we don't already own locks at this level
1303
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1304
      ("Cannot add locks if the set %s is only partially owned, or shared" %
1305
       self.name)
1306

    
1307
    # Support passing in a single resource to add rather than many
1308
    if isinstance(names, basestring):
1309
      names = [names]
1310

    
1311
    # If we don't already own the set-level lock acquired in an exclusive way
1312
    # we'll get it and note we need to release it later.
1313
    release_lock = False
1314
    if not self.__lock._is_owned():
1315
      release_lock = True
1316
      self.__lock.acquire()
1317

    
1318
    try:
1319
      invalid_names = set(self.__names()).intersection(names)
1320
      if invalid_names:
1321
        # This must be an explicit raise, not an assert, because assert is
1322
        # turned off when using optimization, and this can happen because of
1323
        # concurrency even if the user doesn't want it.
1324
        raise errors.LockError("duplicate add(%s) on lockset %s" %
1325
                               (invalid_names, self.name))
1326

    
1327
      for lockname in names:
1328
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1329

    
1330
        if acquired:
1331
          # No need for priority or timeout here as this lock has just been
1332
          # created
1333
          lock.acquire(shared=shared)
1334
          # now the lock cannot be deleted, we have it!
1335
          try:
1336
            self._add_owned(name=lockname)
1337
          except:
1338
            # We shouldn't have problems adding the lock to the owners list,
1339
            # but if we did we'll try to release this lock and re-raise
1340
            # exception.  Of course something is going to be really wrong,
1341
            # after this.  On the other hand the lock hasn't been added to the
1342
            # __lockdict yet so no other threads should be pending on it. This
1343
            # release is just a safety measure.
1344
            lock.release()
1345
            raise
1346

    
1347
        self.__lockdict[lockname] = lock
1348

    
1349
    finally:
1350
      # Only release __lock if we were not holding it previously.
1351
      if release_lock:
1352
        self.__lock.release()
1353

    
1354
    return True
1355

    
1356
  def remove(self, names):
1357
    """Remove elements from the lock set.
1358

1359
    You can either not hold anything in the lockset or already hold a superset
1360
    of the elements you want to delete, exclusively.
1361

1362
    @type names: list of strings
1363
    @param names: names of the resource to remove.
1364

1365
    @return: a list of locks which we removed; the list is always
1366
        equal to the names list if we were holding all the locks
1367
        exclusively
1368

1369
    """
1370
    # Support passing in a single resource to remove rather than many
1371
    if isinstance(names, basestring):
1372
      names = [names]
1373

    
1374
    # If we own any subset of this lock it must be a superset of what we want
1375
    # to delete. The ownership must also be exclusive, but that will be checked
1376
    # by the lock itself.
1377
    assert not self._is_owned() or self._list_owned().issuperset(names), (
1378
      "remove() on acquired lockset %s while not owning all elements" %
1379
      self.name)
1380

    
1381
    removed = []
1382

    
1383
    for lname in names:
1384
      # Calling delete() acquires the lock exclusively if we don't already own
1385
      # it, and causes all pending and subsequent lock acquires to fail. It's
1386
      # fine to call it out of order because delete() also implies release(),
1387
      # and the assertion above guarantees that if we either already hold
1388
      # everything we want to delete, or we hold none.
1389
      try:
1390
        self.__lockdict[lname].delete()
1391
        removed.append(lname)
1392
      except (KeyError, errors.LockError):
1393
        # This cannot happen if we were already holding it, verify:
1394
        assert not self._is_owned(), ("remove failed while holding lockset %s"
1395
                                      % self.name)
1396
      else:
1397
        # If no LockError was raised we are the ones who deleted the lock.
1398
        # This means we can safely remove it from lockdict, as any further or
1399
        # pending delete() or acquire() will fail (and nobody can have the lock
1400
        # since before our call to delete()).
1401
        #
1402
        # This is done in an else clause because if the exception was thrown
1403
        # it's the job of the one who actually deleted it.
1404
        del self.__lockdict[lname]
1405
        # And let's remove it from our private list if we owned it.
1406
        if self._is_owned():
1407
          self._del_owned(name=lname)
1408

    
1409
    return removed
1410

    
1411

    
1412
# Locking levels, must be acquired in increasing order.
1413
# Current rules are:
1414
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1415
#   acquired before performing any operation, either in shared or in exclusive
1416
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1417
#   avoided.
1418
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1419
#   If you need more than one node, or more than one instance, acquire them at
1420
#   the same time.
1421
LEVEL_CLUSTER = 0
1422
LEVEL_INSTANCE = 1
1423
LEVEL_NODEGROUP = 2
1424
LEVEL_NODE = 3
1425

    
1426
LEVELS = [LEVEL_CLUSTER,
1427
          LEVEL_INSTANCE,
1428
          LEVEL_NODEGROUP,
1429
          LEVEL_NODE]
1430

    
1431
# Lock levels which are modifiable
1432
LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE]
1433

    
1434
LEVEL_NAMES = {
1435
  LEVEL_CLUSTER: "cluster",
1436
  LEVEL_INSTANCE: "instance",
1437
  LEVEL_NODEGROUP: "nodegroup",
1438
  LEVEL_NODE: "node",
1439
  }
1440

    
1441
# Constant for the big ganeti lock
1442
BGL = 'BGL'
1443

    
1444

    
1445
class GanetiLockManager:
1446
  """The Ganeti Locking Library
1447

1448
  The purpose of this small library is to manage locking for ganeti clusters
1449
  in a central place, while at the same time doing dynamic checks against
1450
  possible deadlocks. It will also make it easier to transition to a different
1451
  lock type should we migrate away from python threads.
1452

1453
  """
1454
  _instance = None
1455

    
1456
  def __init__(self, nodes, nodegroups, instances):
1457
    """Constructs a new GanetiLockManager object.
1458

1459
    There should be only a GanetiLockManager object at any time, so this
1460
    function raises an error if this is not the case.
1461

1462
    @param nodes: list of node names
1463
    @param nodegroups: list of nodegroup uuids
1464
    @param instances: list of instance names
1465

1466
    """
1467
    assert self.__class__._instance is None, \
1468
           "double GanetiLockManager instance"
1469

    
1470
    self.__class__._instance = self
1471

    
1472
    self._monitor = LockMonitor()
1473

    
1474
    # The keyring contains all the locks, at their level and in the correct
1475
    # locking order.
1476
    self.__keyring = {
1477
      LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1478
      LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1479
      LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor),
1480
      LEVEL_INSTANCE: LockSet(instances, "instances",
1481
                              monitor=self._monitor),
1482
      }
1483

    
1484
  def AddToLockMonitor(self, provider):
1485
    """Registers a new lock with the monitor.
1486

1487
    See L{LockMonitor.RegisterLock}.
1488

1489
    """
1490
    return self._monitor.RegisterLock(provider)
1491

    
1492
  def QueryLocks(self, fields):
1493
    """Queries information from all locks.
1494

1495
    See L{LockMonitor.QueryLocks}.
1496

1497
    """
1498
    return self._monitor.QueryLocks(fields)
1499

    
1500
  def OldStyleQueryLocks(self, fields):
1501
    """Queries information from all locks, returning old-style data.
1502

1503
    See L{LockMonitor.OldStyleQueryLocks}.
1504

1505
    """
1506
    return self._monitor.OldStyleQueryLocks(fields)
1507

    
1508
  def _names(self, level):
1509
    """List the lock names at the given level.
1510

1511
    This can be used for debugging/testing purposes.
1512

1513
    @param level: the level whose list of locks to get
1514

1515
    """
1516
    assert level in LEVELS, "Invalid locking level %s" % level
1517
    return self.__keyring[level]._names()
1518

    
1519
  def _is_owned(self, level):
1520
    """Check whether we are owning locks at the given level
1521

1522
    """
1523
    return self.__keyring[level]._is_owned()
1524

    
1525
  is_owned = _is_owned
1526

    
1527
  def _list_owned(self, level):
1528
    """Get the set of owned locks at the given level
1529

1530
    """
1531
    return self.__keyring[level]._list_owned()
1532

    
1533
  list_owned = _list_owned
1534

    
1535
  def _upper_owned(self, level):
1536
    """Check that we don't own any lock at a level greater than the given one.
1537

1538
    """
1539
    # This way of checking only works if LEVELS[i] = i, which we check for in
1540
    # the test cases.
1541
    return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1542

    
1543
  def _BGL_owned(self): # pylint: disable=C0103
1544
    """Check if the current thread owns the BGL.
1545

1546
    Both an exclusive or a shared acquisition work.
1547

1548
    """
1549
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1550

    
1551
  @staticmethod
1552
  def _contains_BGL(level, names): # pylint: disable=C0103
1553
    """Check if the level contains the BGL.
1554

1555
    Check if acting on the given level and set of names will change
1556
    the status of the Big Ganeti Lock.
1557

1558
    """
1559
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1560

    
1561
  def acquire(self, level, names, timeout=None, shared=0, priority=None):
1562
    """Acquire a set of resource locks, at the same level.
1563

1564
    @type level: member of locking.LEVELS
1565
    @param level: the level at which the locks shall be acquired
1566
    @type names: list of strings (or string)
1567
    @param names: the names of the locks which shall be acquired
1568
        (special lock names, or instance/node names)
1569
    @type shared: integer (0/1) used as a boolean
1570
    @param shared: whether to acquire in shared mode; by default
1571
        an exclusive lock will be acquired
1572
    @type timeout: float
1573
    @param timeout: Maximum time to acquire all locks
1574
    @type priority: integer
1575
    @param priority: Priority for acquiring lock
1576

1577
    """
1578
    assert level in LEVELS, "Invalid locking level %s" % level
1579

    
1580
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1581
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1582
    # so even if we've migrated we need to at least share the BGL to be
1583
    # compatible with them. Of course if we own the BGL exclusively there's no
1584
    # point in acquiring any other lock, unless perhaps we are half way through
1585
    # the migration of the current opcode.
1586
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1587
            "You must own the Big Ganeti Lock before acquiring any other")
1588

    
1589
    # Check we don't own locks at the same or upper levels.
1590
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1591
           " while owning some at a greater one")
1592

    
1593
    # Acquire the locks in the set.
1594
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1595
                                         priority=priority)
1596

    
1597
  def downgrade(self, level, names=None):
1598
    """Downgrade a set of resource locks from exclusive to shared mode.
1599

1600
    You must have acquired the locks in exclusive mode.
1601

1602
    @type level: member of locking.LEVELS
1603
    @param level: the level at which the locks shall be downgraded
1604
    @type names: list of strings, or None
1605
    @param names: the names of the locks which shall be downgraded
1606
        (defaults to all the locks acquired at the level)
1607

1608
    """
1609
    assert level in LEVELS, "Invalid locking level %s" % level
1610

    
1611
    return self.__keyring[level].downgrade(names=names)
1612

    
1613
  def release(self, level, names=None):
1614
    """Release a set of resource locks, at the same level.
1615

1616
    You must have acquired the locks, either in shared or in exclusive
1617
    mode, before releasing them.
1618

1619
    @type level: member of locking.LEVELS
1620
    @param level: the level at which the locks shall be released
1621
    @type names: list of strings, or None
1622
    @param names: the names of the locks which shall be released
1623
        (defaults to all the locks acquired at that level)
1624

1625
    """
1626
    assert level in LEVELS, "Invalid locking level %s" % level
1627
    assert (not self._contains_BGL(level, names) or
1628
            not self._upper_owned(LEVEL_CLUSTER)), (
1629
            "Cannot release the Big Ganeti Lock while holding something"
1630
            " at upper levels (%r)" %
1631
            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1632
                              for i in self.__keyring.keys()]), ))
1633

    
1634
    # Release will complain if we don't own the locks already
1635
    return self.__keyring[level].release(names)
1636

    
1637
  def add(self, level, names, acquired=0, shared=0):
1638
    """Add locks at the specified level.
1639

1640
    @type level: member of locking.LEVELS_MOD
1641
    @param level: the level at which the locks shall be added
1642
    @type names: list of strings
1643
    @param names: names of the locks to acquire
1644
    @type acquired: integer (0/1) used as a boolean
1645
    @param acquired: whether to acquire the newly added locks
1646
    @type shared: integer (0/1) used as a boolean
1647
    @param shared: whether the acquisition will be shared
1648

1649
    """
1650
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1651
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1652
           " operations")
1653
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1654
           " while owning some at a greater one")
1655
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1656

    
1657
  def remove(self, level, names):
1658
    """Remove locks from the specified level.
1659

1660
    You must either already own the locks you are trying to remove
1661
    exclusively or not own any lock at an upper level.
1662

1663
    @type level: member of locking.LEVELS_MOD
1664
    @param level: the level at which the locks shall be removed
1665
    @type names: list of strings
1666
    @param names: the names of the locks which shall be removed
1667
        (special lock names, or instance/node names)
1668

1669
    """
1670
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1671
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1672
           " operations")
1673
    # Check we either own the level or don't own anything from here
1674
    # up. LockSet.remove() will check the case in which we don't own
1675
    # all the needed resources, or we have a shared ownership.
1676
    assert self._is_owned(level) or not self._upper_owned(level), (
1677
           "Cannot remove locks at a level while not owning it or"
1678
           " owning some at a greater one")
1679
    return self.__keyring[level].remove(names)
1680

    
1681

    
1682
def _MonitorSortKey((item, idx, num)):
1683
  """Sorting key function.
1684

1685
  Sort by name, registration order and then order of information. This provides
1686
  a stable sort order over different providers, even if they return the same
1687
  name.
1688

1689
  """
1690
  (name, _, _, _) = item
1691

    
1692
  return (utils.NiceSortKey(name), num, idx)
1693

    
1694

    
1695
class LockMonitor(object):
1696
  _LOCK_ATTR = "_lock"
1697

    
1698
  def __init__(self):
1699
    """Initializes this class.
1700

1701
    """
1702
    self._lock = SharedLock("LockMonitor")
1703

    
1704
    # Counter for stable sorting
1705
    self._counter = itertools.count(0)
1706

    
1707
    # Tracked locks. Weak references are used to avoid issues with circular
1708
    # references and deletion.
1709
    self._locks = weakref.WeakKeyDictionary()
1710

    
1711
  @ssynchronized(_LOCK_ATTR)
1712
  def RegisterLock(self, provider):
1713
    """Registers a new lock.
1714

1715
    @param provider: Object with a callable method named C{GetLockInfo}, taking
1716
      a single C{set} containing the requested information items
1717
    @note: It would be nicer to only receive the function generating the
1718
      requested information but, as it turns out, weak references to bound
1719
      methods (e.g. C{self.GetLockInfo}) are tricky; there are several
1720
      workarounds, but none of the ones I found works properly in combination
1721
      with a standard C{WeakKeyDictionary}
1722

1723
    """
1724
    assert provider not in self._locks, "Duplicate registration"
1725

    
1726
    # There used to be a check for duplicate names here. As it turned out, when
1727
    # a lock is re-created with the same name in a very short timeframe, the
1728
    # previous instance might not yet be removed from the weakref dictionary.
1729
    # By keeping track of the order of incoming registrations, a stable sort
1730
    # ordering can still be guaranteed.
1731

    
1732
    self._locks[provider] = self._counter.next()
1733

    
1734
  def _GetLockInfo(self, requested):
1735
    """Get information from all locks.
1736

1737
    """
1738
    # Must hold lock while getting consistent list of tracked items
1739
    self._lock.acquire(shared=1)
1740
    try:
1741
      items = self._locks.items()
1742
    finally:
1743
      self._lock.release()
1744

    
1745
    return [(info, idx, num)
1746
            for (provider, num) in items
1747
            for (idx, info) in enumerate(provider.GetLockInfo(requested))]
1748

    
1749
  def _Query(self, fields):
1750
    """Queries information from all locks.
1751

1752
    @type fields: list of strings
1753
    @param fields: List of fields to return
1754

1755
    """
1756
    qobj = query.Query(query.LOCK_FIELDS, fields)
1757

    
1758
    # Get all data with internal lock held and then sort by name and incoming
1759
    # order
1760
    lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
1761
                      key=_MonitorSortKey)
1762

    
1763
    # Extract lock information and build query data
1764
    return (qobj, query.LockQueryData(map(compat.fst, lockinfo)))
1765

    
1766
  def QueryLocks(self, fields):
1767
    """Queries information from all locks.
1768

1769
    @type fields: list of strings
1770
    @param fields: List of fields to return
1771

1772
    """
1773
    (qobj, ctx) = self._Query(fields)
1774

    
1775
    # Prepare query response
1776
    return query.GetQueryResponse(qobj, ctx)
1777

    
1778
  def OldStyleQueryLocks(self, fields):
1779
    """Queries information from all locks, returning old-style data.
1780

1781
    @type fields: list of strings
1782
    @param fields: List of fields to return
1783

1784
    """
1785
    (qobj, ctx) = self._Query(fields)
1786

    
1787
    return qobj.OldStyleQuery(ctx)