Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ 44b4eddc

History | View | Annotate | Download (53.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable-msg=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import errno
32
import weakref
33
import logging
34
import heapq
35
import operator
36
import itertools
37

    
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import compat
41
from ganeti import query
42

    
43

    
44
_EXCLUSIVE_TEXT = "exclusive"
45
_SHARED_TEXT = "shared"
46
_DELETED_TEXT = "deleted"
47

    
48
_DEFAULT_PRIORITY = 0
49

    
50

    
51
def ssynchronized(mylock, shared=0):
52
  """Shared Synchronization decorator.
53

54
  Calls the function holding the given lock, either in exclusive or shared
55
  mode. It requires the passed lock to be a SharedLock (or support its
56
  semantics).
57

58
  @type mylock: lockable object or string
59
  @param mylock: lock to acquire or class member name of the lock to acquire
60

61
  """
62
  def wrap(fn):
63
    def sync_function(*args, **kwargs):
64
      if isinstance(mylock, basestring):
65
        assert args, "cannot ssynchronize on non-class method: self not found"
66
        # args[0] is "self"
67
        lock = getattr(args[0], mylock)
68
      else:
69
        lock = mylock
70
      lock.acquire(shared=shared)
71
      try:
72
        return fn(*args, **kwargs)
73
      finally:
74
        lock.release()
75
    return sync_function
76
  return wrap
77

    
78

    
79
class _SingleNotifyPipeConditionWaiter(object):
80
  """Helper class for SingleNotifyPipeCondition
81

82
  """
83
  __slots__ = [
84
    "_fd",
85
    "_poller",
86
    ]
87

    
88
  def __init__(self, poller, fd):
89
    """Constructor for _SingleNotifyPipeConditionWaiter
90

91
    @type poller: select.poll
92
    @param poller: Poller object
93
    @type fd: int
94
    @param fd: File descriptor to wait for
95

96
    """
97
    object.__init__(self)
98
    self._poller = poller
99
    self._fd = fd
100

    
101
  def __call__(self, timeout):
102
    """Wait for something to happen on the pipe.
103

104
    @type timeout: float or None
105
    @param timeout: Timeout for waiting (can be None)
106

107
    """
108
    running_timeout = utils.RunningTimeout(timeout, True)
109

    
110
    while True:
111
      remaining_time = running_timeout.Remaining()
112

    
113
      if remaining_time is not None:
114
        if remaining_time < 0.0:
115
          break
116

    
117
        # Our calculation uses seconds, poll() wants milliseconds
118
        remaining_time *= 1000
119

    
120
      try:
121
        result = self._poller.poll(remaining_time)
122
      except EnvironmentError, err:
123
        if err.errno != errno.EINTR:
124
          raise
125
        result = None
126

    
127
      # Check whether we were notified
128
      if result and result[0][0] == self._fd:
129
        break
130

    
131

    
132
class _BaseCondition(object):
133
  """Base class containing common code for conditions.
134

135
  Some of this code is taken from python's threading module.
136

137
  """
138
  __slots__ = [
139
    "_lock",
140
    "acquire",
141
    "release",
142
    "_is_owned",
143
    "_acquire_restore",
144
    "_release_save",
145
    ]
146

    
147
  def __init__(self, lock):
148
    """Constructor for _BaseCondition.
149

150
    @type lock: threading.Lock
151
    @param lock: condition base lock
152

153
    """
154
    object.__init__(self)
155

    
156
    try:
157
      self._release_save = lock._release_save
158
    except AttributeError:
159
      self._release_save = self._base_release_save
160
    try:
161
      self._acquire_restore = lock._acquire_restore
162
    except AttributeError:
163
      self._acquire_restore = self._base_acquire_restore
164
    try:
165
      self._is_owned = lock._is_owned
166
    except AttributeError:
167
      self._is_owned = self._base_is_owned
168

    
169
    self._lock = lock
170

    
171
    # Export the lock's acquire() and release() methods
172
    self.acquire = lock.acquire
173
    self.release = lock.release
174

    
175
  def _base_is_owned(self):
176
    """Check whether lock is owned by current thread.
177

178
    """
179
    if self._lock.acquire(0):
180
      self._lock.release()
181
      return False
182
    return True
183

    
184
  def _base_release_save(self):
185
    self._lock.release()
186

    
187
  def _base_acquire_restore(self, _):
188
    self._lock.acquire()
189

    
190
  def _check_owned(self):
191
    """Raise an exception if the current thread doesn't own the lock.
192

193
    """
194
    if not self._is_owned():
195
      raise RuntimeError("cannot work with un-aquired lock")
196

    
197

    
198
class SingleNotifyPipeCondition(_BaseCondition):
199
  """Condition which can only be notified once.
200

201
  This condition class uses pipes and poll, internally, to be able to wait for
202
  notification with a timeout, without resorting to polling. It is almost
203
  compatible with Python's threading.Condition, with the following differences:
204
    - notifyAll can only be called once, and no wait can happen after that
205
    - notify is not supported, only notifyAll
206

207
  """
208

    
209
  __slots__ = [
210
    "_poller",
211
    "_read_fd",
212
    "_write_fd",
213
    "_nwaiters",
214
    "_notified",
215
    ]
216

    
217
  _waiter_class = _SingleNotifyPipeConditionWaiter
218

    
219
  def __init__(self, lock):
220
    """Constructor for SingleNotifyPipeCondition
221

222
    """
223
    _BaseCondition.__init__(self, lock)
224
    self._nwaiters = 0
225
    self._notified = False
226
    self._read_fd = None
227
    self._write_fd = None
228
    self._poller = None
229

    
230
  def _check_unnotified(self):
231
    """Throws an exception if already notified.
232

233
    """
234
    if self._notified:
235
      raise RuntimeError("cannot use already notified condition")
236

    
237
  def _Cleanup(self):
238
    """Cleanup open file descriptors, if any.
239

240
    """
241
    if self._read_fd is not None:
242
      os.close(self._read_fd)
243
      self._read_fd = None
244

    
245
    if self._write_fd is not None:
246
      os.close(self._write_fd)
247
      self._write_fd = None
248
    self._poller = None
249

    
250
  def wait(self, timeout):
251
    """Wait for a notification.
252

253
    @type timeout: float or None
254
    @param timeout: Waiting timeout (can be None)
255

256
    """
257
    self._check_owned()
258
    self._check_unnotified()
259

    
260
    self._nwaiters += 1
261
    try:
262
      if self._poller is None:
263
        (self._read_fd, self._write_fd) = os.pipe()
264
        self._poller = select.poll()
265
        self._poller.register(self._read_fd, select.POLLHUP)
266

    
267
      wait_fn = self._waiter_class(self._poller, self._read_fd)
268
      state = self._release_save()
269
      try:
270
        # Wait for notification
271
        wait_fn(timeout)
272
      finally:
273
        # Re-acquire lock
274
        self._acquire_restore(state)
275
    finally:
276
      self._nwaiters -= 1
277
      if self._nwaiters == 0:
278
        self._Cleanup()
279

    
280
  def notifyAll(self): # pylint: disable-msg=C0103
281
    """Close the writing side of the pipe to notify all waiters.
282

283
    """
284
    self._check_owned()
285
    self._check_unnotified()
286
    self._notified = True
287
    if self._write_fd is not None:
288
      os.close(self._write_fd)
289
      self._write_fd = None
290

    
291

    
292
class PipeCondition(_BaseCondition):
293
  """Group-only non-polling condition with counters.
294

295
  This condition class uses pipes and poll, internally, to be able to wait for
296
  notification with a timeout, without resorting to polling. It is almost
297
  compatible with Python's threading.Condition, but only supports notifyAll and
298
  non-recursive locks. As an additional features it's able to report whether
299
  there are any waiting threads.
300

301
  """
302
  __slots__ = [
303
    "_waiters",
304
    "_single_condition",
305
    ]
306

    
307
  _single_condition_class = SingleNotifyPipeCondition
308

    
309
  def __init__(self, lock):
310
    """Initializes this class.
311

312
    """
313
    _BaseCondition.__init__(self, lock)
314
    self._waiters = set()
315
    self._single_condition = self._single_condition_class(self._lock)
316

    
317
  def wait(self, timeout):
318
    """Wait for a notification.
319

320
    @type timeout: float or None
321
    @param timeout: Waiting timeout (can be None)
322

323
    """
324
    self._check_owned()
325

    
326
    # Keep local reference to the pipe. It could be replaced by another thread
327
    # notifying while we're waiting.
328
    cond = self._single_condition
329

    
330
    self._waiters.add(threading.currentThread())
331
    try:
332
      cond.wait(timeout)
333
    finally:
334
      self._check_owned()
335
      self._waiters.remove(threading.currentThread())
336

    
337
  def notifyAll(self): # pylint: disable-msg=C0103
338
    """Notify all currently waiting threads.
339

340
    """
341
    self._check_owned()
342
    self._single_condition.notifyAll()
343
    self._single_condition = self._single_condition_class(self._lock)
344

    
345
  def get_waiting(self):
346
    """Returns a list of all waiting threads.
347

348
    """
349
    self._check_owned()
350

    
351
    return self._waiters
352

    
353
  def has_waiting(self):
354
    """Returns whether there are active waiters.
355

356
    """
357
    self._check_owned()
358

    
359
    return bool(self._waiters)
360

    
361

    
362
class _PipeConditionWithMode(PipeCondition):
363
  __slots__ = [
364
    "shared",
365
    ]
366

    
367
  def __init__(self, lock, shared):
368
    """Initializes this class.
369

370
    """
371
    self.shared = shared
372
    PipeCondition.__init__(self, lock)
373

    
374

    
375
class SharedLock(object):
376
  """Implements a shared lock.
377

378
  Multiple threads can acquire the lock in a shared way by calling
379
  C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
380
  threads can call C{acquire(shared=0)}.
381

382
  Notes on data structures: C{__pending} contains a priority queue (heapq) of
383
  all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
384
  ...]}. Each per-priority queue contains a normal in-order list of conditions
385
  to be notified when the lock can be acquired. Shared locks are grouped
386
  together by priority and the condition for them is stored in
387
  C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
388
  references for the per-priority queues indexed by priority for faster access.
389

390
  @type name: string
391
  @ivar name: the name of the lock
392

393
  """
394
  __slots__ = [
395
    "__weakref__",
396
    "__deleted",
397
    "__exc",
398
    "__lock",
399
    "__pending",
400
    "__pending_by_prio",
401
    "__pending_shared",
402
    "__shr",
403
    "name",
404
    ]
405

    
406
  __condition_class = _PipeConditionWithMode
407

    
408
  def __init__(self, name, monitor=None):
409
    """Construct a new SharedLock.
410

411
    @param name: the name of the lock
412
    @type monitor: L{LockMonitor}
413
    @param monitor: Lock monitor with which to register
414

415
    """
416
    object.__init__(self)
417

    
418
    self.name = name
419

    
420
    # Internal lock
421
    self.__lock = threading.Lock()
422

    
423
    # Queue containing waiting acquires
424
    self.__pending = []
425
    self.__pending_by_prio = {}
426
    self.__pending_shared = {}
427

    
428
    # Current lock holders
429
    self.__shr = set()
430
    self.__exc = None
431

    
432
    # is this lock in the deleted state?
433
    self.__deleted = False
434

    
435
    # Register with lock monitor
436
    if monitor:
437
      logging.debug("Adding lock %s to monitor", name)
438
      monitor.RegisterLock(self)
439

    
440
  def GetLockInfo(self, requested):
441
    """Retrieves information for querying locks.
442

443
    @type requested: set
444
    @param requested: Requested information, see C{query.LQ_*}
445

446
    """
447
    self.__lock.acquire()
448
    try:
449
      # Note: to avoid unintentional race conditions, no references to
450
      # modifiable objects should be returned unless they were created in this
451
      # function.
452
      mode = None
453
      owner_names = None
454

    
455
      if query.LQ_MODE in requested:
456
        if self.__deleted:
457
          mode = _DELETED_TEXT
458
          assert not (self.__exc or self.__shr)
459
        elif self.__exc:
460
          mode = _EXCLUSIVE_TEXT
461
        elif self.__shr:
462
          mode = _SHARED_TEXT
463

    
464
      # Current owner(s) are wanted
465
      if query.LQ_OWNER in requested:
466
        if self.__exc:
467
          owner = [self.__exc]
468
        else:
469
          owner = self.__shr
470

    
471
        if owner:
472
          assert not self.__deleted
473
          owner_names = [i.getName() for i in owner]
474

    
475
      # Pending acquires are wanted
476
      if query.LQ_PENDING in requested:
477
        pending = []
478

    
479
        # Sorting instead of copying and using heaq functions for simplicity
480
        for (_, prioqueue) in sorted(self.__pending):
481
          for cond in prioqueue:
482
            if cond.shared:
483
              pendmode = _SHARED_TEXT
484
            else:
485
              pendmode = _EXCLUSIVE_TEXT
486

    
487
            # List of names will be sorted in L{query._GetLockPending}
488
            pending.append((pendmode, [i.getName()
489
                                       for i in cond.get_waiting()]))
490
      else:
491
        pending = None
492

    
493
      return [(self.name, mode, owner_names, pending)]
494
    finally:
495
      self.__lock.release()
496

    
497
  def __check_deleted(self):
498
    """Raises an exception if the lock has been deleted.
499

500
    """
501
    if self.__deleted:
502
      raise errors.LockError("Deleted lock %s" % self.name)
503

    
504
  def __is_sharer(self):
505
    """Is the current thread sharing the lock at this time?
506

507
    """
508
    return threading.currentThread() in self.__shr
509

    
510
  def __is_exclusive(self):
511
    """Is the current thread holding the lock exclusively at this time?
512

513
    """
514
    return threading.currentThread() == self.__exc
515

    
516
  def __is_owned(self, shared=-1):
517
    """Is the current thread somehow owning the lock at this time?
518

519
    This is a private version of the function, which presumes you're holding
520
    the internal lock.
521

522
    """
523
    if shared < 0:
524
      return self.__is_sharer() or self.__is_exclusive()
525
    elif shared:
526
      return self.__is_sharer()
527
    else:
528
      return self.__is_exclusive()
529

    
530
  def _is_owned(self, shared=-1):
531
    """Is the current thread somehow owning the lock at this time?
532

533
    @param shared:
534
        - < 0: check for any type of ownership (default)
535
        - 0: check for exclusive ownership
536
        - > 0: check for shared ownership
537

538
    """
539
    self.__lock.acquire()
540
    try:
541
      return self.__is_owned(shared=shared)
542
    finally:
543
      self.__lock.release()
544

    
545
  is_owned = _is_owned
546

    
547
  def _count_pending(self):
548
    """Returns the number of pending acquires.
549

550
    @rtype: int
551

552
    """
553
    self.__lock.acquire()
554
    try:
555
      return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
556
    finally:
557
      self.__lock.release()
558

    
559
  def _check_empty(self):
560
    """Checks whether there are any pending acquires.
561

562
    @rtype: bool
563

564
    """
565
    self.__lock.acquire()
566
    try:
567
      # Order is important: __find_first_pending_queue modifies __pending
568
      (_, prioqueue) = self.__find_first_pending_queue()
569

    
570
      return not (prioqueue or
571
                  self.__pending or
572
                  self.__pending_by_prio or
573
                  self.__pending_shared)
574
    finally:
575
      self.__lock.release()
576

    
577
  def __do_acquire(self, shared):
578
    """Actually acquire the lock.
579

580
    """
581
    if shared:
582
      self.__shr.add(threading.currentThread())
583
    else:
584
      self.__exc = threading.currentThread()
585

    
586
  def __can_acquire(self, shared):
587
    """Determine whether lock can be acquired.
588

589
    """
590
    if shared:
591
      return self.__exc is None
592
    else:
593
      return len(self.__shr) == 0 and self.__exc is None
594

    
595
  def __find_first_pending_queue(self):
596
    """Tries to find the topmost queued entry with pending acquires.
597

598
    Removes empty entries while going through the list.
599

600
    """
601
    while self.__pending:
602
      (priority, prioqueue) = self.__pending[0]
603

    
604
      if prioqueue:
605
        return (priority, prioqueue)
606

    
607
      # Remove empty queue
608
      heapq.heappop(self.__pending)
609
      del self.__pending_by_prio[priority]
610
      assert priority not in self.__pending_shared
611

    
612
    return (None, None)
613

    
614
  def __is_on_top(self, cond):
615
    """Checks whether the passed condition is on top of the queue.
616

617
    The caller must make sure the queue isn't empty.
618

619
    """
620
    (_, prioqueue) = self.__find_first_pending_queue()
621

    
622
    return cond == prioqueue[0]
623

    
624
  def __acquire_unlocked(self, shared, timeout, priority):
625
    """Acquire a shared lock.
626

627
    @param shared: whether to acquire in shared mode; by default an
628
        exclusive lock will be acquired
629
    @param timeout: maximum waiting time before giving up
630
    @type priority: integer
631
    @param priority: Priority for acquiring lock
632

633
    """
634
    self.__check_deleted()
635

    
636
    # We cannot acquire the lock if we already have it
637
    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
638
                                   " %s" % self.name)
639

    
640
    # Remove empty entries from queue
641
    self.__find_first_pending_queue()
642

    
643
    # Check whether someone else holds the lock or there are pending acquires.
644
    if not self.__pending and self.__can_acquire(shared):
645
      # Apparently not, can acquire lock directly.
646
      self.__do_acquire(shared)
647
      return True
648

    
649
    prioqueue = self.__pending_by_prio.get(priority, None)
650

    
651
    if shared:
652
      # Try to re-use condition for shared acquire
653
      wait_condition = self.__pending_shared.get(priority, None)
654
      assert (wait_condition is None or
655
              (wait_condition.shared and wait_condition in prioqueue))
656
    else:
657
      wait_condition = None
658

    
659
    if wait_condition is None:
660
      if prioqueue is None:
661
        assert priority not in self.__pending_by_prio
662

    
663
        prioqueue = []
664
        heapq.heappush(self.__pending, (priority, prioqueue))
665
        self.__pending_by_prio[priority] = prioqueue
666

    
667
      wait_condition = self.__condition_class(self.__lock, shared)
668
      prioqueue.append(wait_condition)
669

    
670
      if shared:
671
        # Keep reference for further shared acquires on same priority. This is
672
        # better than trying to find it in the list of pending acquires.
673
        assert priority not in self.__pending_shared
674
        self.__pending_shared[priority] = wait_condition
675

    
676
    try:
677
      # Wait until we become the topmost acquire in the queue or the timeout
678
      # expires.
679
      # TODO: Decrease timeout with spurious notifications
680
      while not (self.__is_on_top(wait_condition) and
681
                 self.__can_acquire(shared)):
682
        # Wait for notification
683
        wait_condition.wait(timeout)
684
        self.__check_deleted()
685

    
686
        # A lot of code assumes blocking acquires always succeed. Loop
687
        # internally for that case.
688
        if timeout is not None:
689
          break
690

    
691
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
692
        self.__do_acquire(shared)
693
        return True
694
    finally:
695
      # Remove condition from queue if there are no more waiters
696
      if not wait_condition.has_waiting():
697
        prioqueue.remove(wait_condition)
698
        if wait_condition.shared:
699
          # Remove from list of shared acquires if it wasn't while releasing
700
          # (e.g. on lock deletion)
701
          self.__pending_shared.pop(priority, None)
702

    
703
    return False
704

    
705
  def acquire(self, shared=0, timeout=None, priority=None,
706
              test_notify=None):
707
    """Acquire a shared lock.
708

709
    @type shared: integer (0/1) used as a boolean
710
    @param shared: whether to acquire in shared mode; by default an
711
        exclusive lock will be acquired
712
    @type timeout: float
713
    @param timeout: maximum waiting time before giving up
714
    @type priority: integer
715
    @param priority: Priority for acquiring lock
716
    @type test_notify: callable or None
717
    @param test_notify: Special callback function for unittesting
718

719
    """
720
    if priority is None:
721
      priority = _DEFAULT_PRIORITY
722

    
723
    self.__lock.acquire()
724
    try:
725
      # We already got the lock, notify now
726
      if __debug__ and callable(test_notify):
727
        test_notify()
728

    
729
      return self.__acquire_unlocked(shared, timeout, priority)
730
    finally:
731
      self.__lock.release()
732

    
733
  def downgrade(self):
734
    """Changes the lock mode from exclusive to shared.
735

736
    Pending acquires in shared mode on the same priority will go ahead.
737

738
    """
739
    self.__lock.acquire()
740
    try:
741
      assert self.__is_owned(), "Lock must be owned"
742

    
743
      if self.__is_exclusive():
744
        # Do nothing if the lock is already acquired in shared mode
745
        self.__exc = None
746
        self.__do_acquire(1)
747

    
748
        # Important: pending shared acquires should only jump ahead if there
749
        # was a transition from exclusive to shared, otherwise an owner of a
750
        # shared lock can keep calling this function to push incoming shared
751
        # acquires
752
        (priority, prioqueue) = self.__find_first_pending_queue()
753
        if prioqueue:
754
          # Is there a pending shared acquire on this priority?
755
          cond = self.__pending_shared.pop(priority, None)
756
          if cond:
757
            assert cond.shared
758
            assert cond in prioqueue
759

    
760
            # Ensure shared acquire is on top of queue
761
            if len(prioqueue) > 1:
762
              prioqueue.remove(cond)
763
              prioqueue.insert(0, cond)
764

    
765
            # Notify
766
            cond.notifyAll()
767

    
768
      assert not self.__is_exclusive()
769
      assert self.__is_sharer()
770

    
771
      return True
772
    finally:
773
      self.__lock.release()
774

    
775
  def release(self):
776
    """Release a Shared Lock.
777

778
    You must have acquired the lock, either in shared or in exclusive mode,
779
    before calling this function.
780

781
    """
782
    self.__lock.acquire()
783
    try:
784
      assert self.__is_exclusive() or self.__is_sharer(), \
785
        "Cannot release non-owned lock"
786

    
787
      # Autodetect release type
788
      if self.__is_exclusive():
789
        self.__exc = None
790
      else:
791
        self.__shr.remove(threading.currentThread())
792

    
793
      # Notify topmost condition in queue
794
      (priority, prioqueue) = self.__find_first_pending_queue()
795
      if prioqueue:
796
        cond = prioqueue[0]
797
        cond.notifyAll()
798
        if cond.shared:
799
          # Prevent further shared acquires from sneaking in while waiters are
800
          # notified
801
          self.__pending_shared.pop(priority, None)
802

    
803
    finally:
804
      self.__lock.release()
805

    
806
  def delete(self, timeout=None, priority=None):
807
    """Delete a Shared Lock.
808

809
    This operation will declare the lock for removal. First the lock will be
810
    acquired in exclusive mode if you don't already own it, then the lock
811
    will be put in a state where any future and pending acquire() fail.
812

813
    @type timeout: float
814
    @param timeout: maximum waiting time before giving up
815
    @type priority: integer
816
    @param priority: Priority for acquiring lock
817

818
    """
819
    if priority is None:
820
      priority = _DEFAULT_PRIORITY
821

    
822
    self.__lock.acquire()
823
    try:
824
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
825

    
826
      self.__check_deleted()
827

    
828
      # The caller is allowed to hold the lock exclusively already.
829
      acquired = self.__is_exclusive()
830

    
831
      if not acquired:
832
        acquired = self.__acquire_unlocked(0, timeout, priority)
833

    
834
        assert self.__is_exclusive() and not self.__is_sharer(), \
835
          "Lock wasn't acquired in exclusive mode"
836

    
837
      if acquired:
838
        self.__deleted = True
839
        self.__exc = None
840

    
841
        assert not (self.__exc or self.__shr), "Found owner during deletion"
842

    
843
        # Notify all acquires. They'll throw an error.
844
        for (_, prioqueue) in self.__pending:
845
          for cond in prioqueue:
846
            cond.notifyAll()
847

    
848
        assert self.__deleted
849

    
850
      return acquired
851
    finally:
852
      self.__lock.release()
853

    
854
  def _release_save(self):
855
    shared = self.__is_sharer()
856
    self.release()
857
    return shared
858

    
859
  def _acquire_restore(self, shared):
860
    self.acquire(shared=shared)
861

    
862

    
863
# Whenever we want to acquire a full LockSet we pass None as the value
864
# to acquire.  Hide this behind this nicely named constant.
865
ALL_SET = None
866

    
867

    
868
class _AcquireTimeout(Exception):
869
  """Internal exception to abort an acquire on a timeout.
870

871
  """
872

    
873

    
874
class LockSet:
875
  """Implements a set of locks.
876

877
  This abstraction implements a set of shared locks for the same resource type,
878
  distinguished by name. The user can lock a subset of the resources and the
879
  LockSet will take care of acquiring the locks always in the same order, thus
880
  preventing deadlock.
881

882
  All the locks needed in the same set must be acquired together, though.
883

884
  @type name: string
885
  @ivar name: the name of the lockset
886

887
  """
888
  def __init__(self, members, name, monitor=None):
889
    """Constructs a new LockSet.
890

891
    @type members: list of strings
892
    @param members: initial members of the set
893
    @type monitor: L{LockMonitor}
894
    @param monitor: Lock monitor with which to register member locks
895

896
    """
897
    assert members is not None, "members parameter is not a list"
898
    self.name = name
899

    
900
    # Lock monitor
901
    self.__monitor = monitor
902

    
903
    # Used internally to guarantee coherency
904
    self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
905

    
906
    # The lockdict indexes the relationship name -> lock
907
    # The order-of-locking is implied by the alphabetical order of names
908
    self.__lockdict = {}
909

    
910
    for mname in members:
911
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
912
                                          monitor=monitor)
913

    
914
    # The owner dict contains the set of locks each thread owns. For
915
    # performance each thread can access its own key without a global lock on
916
    # this structure. It is paramount though that *no* other type of access is
917
    # done to this structure (eg. no looping over its keys). *_owner helper
918
    # function are defined to guarantee access is correct, but in general never
919
    # do anything different than __owners[threading.currentThread()], or there
920
    # will be trouble.
921
    self.__owners = {}
922

    
923
  def _GetLockName(self, mname):
924
    """Returns the name for a member lock.
925

926
    """
927
    return "%s/%s" % (self.name, mname)
928

    
929
  def _get_lock(self):
930
    """Returns the lockset-internal lock.
931

932
    """
933
    return self.__lock
934

    
935
  def _get_lockdict(self):
936
    """Returns the lockset-internal lock dictionary.
937

938
    Accessing this structure is only safe in single-thread usage or when the
939
    lockset-internal lock is held.
940

941
    """
942
    return self.__lockdict
943

    
944
  def _is_owned(self):
945
    """Is the current thread a current level owner?"""
946
    return threading.currentThread() in self.__owners
947

    
948
  def _add_owned(self, name=None):
949
    """Note the current thread owns the given lock"""
950
    if name is None:
951
      if not self._is_owned():
952
        self.__owners[threading.currentThread()] = set()
953
    else:
954
      if self._is_owned():
955
        self.__owners[threading.currentThread()].add(name)
956
      else:
957
        self.__owners[threading.currentThread()] = set([name])
958

    
959
  def _del_owned(self, name=None):
960
    """Note the current thread owns the given lock"""
961

    
962
    assert not (name is None and self.__lock._is_owned()), \
963
           "Cannot hold internal lock when deleting owner status"
964

    
965
    if name is not None:
966
      self.__owners[threading.currentThread()].remove(name)
967

    
968
    # Only remove the key if we don't hold the set-lock as well
969
    if (not self.__lock._is_owned() and
970
        not self.__owners[threading.currentThread()]):
971
      del self.__owners[threading.currentThread()]
972

    
973
  def _list_owned(self):
974
    """Get the set of resource names owned by the current thread"""
975
    if self._is_owned():
976
      return self.__owners[threading.currentThread()].copy()
977
    else:
978
      return set()
979

    
980
  def _release_and_delete_owned(self):
981
    """Release and delete all resources owned by the current thread"""
982
    for lname in self._list_owned():
983
      lock = self.__lockdict[lname]
984
      if lock._is_owned():
985
        lock.release()
986
      self._del_owned(name=lname)
987

    
988
  def __names(self):
989
    """Return the current set of names.
990

991
    Only call this function while holding __lock and don't iterate on the
992
    result after releasing the lock.
993

994
    """
995
    return self.__lockdict.keys()
996

    
997
  def _names(self):
998
    """Return a copy of the current set of elements.
999

1000
    Used only for debugging purposes.
1001

1002
    """
1003
    # If we don't already own the set-level lock acquired
1004
    # we'll get it and note we need to release it later.
1005
    release_lock = False
1006
    if not self.__lock._is_owned():
1007
      release_lock = True
1008
      self.__lock.acquire(shared=1)
1009
    try:
1010
      result = self.__names()
1011
    finally:
1012
      if release_lock:
1013
        self.__lock.release()
1014
    return set(result)
1015

    
1016
  def acquire(self, names, timeout=None, shared=0, priority=None,
1017
              test_notify=None):
1018
    """Acquire a set of resource locks.
1019

1020
    @type names: list of strings (or string)
1021
    @param names: the names of the locks which shall be acquired
1022
        (special lock names, or instance/node names)
1023
    @type shared: integer (0/1) used as a boolean
1024
    @param shared: whether to acquire in shared mode; by default an
1025
        exclusive lock will be acquired
1026
    @type timeout: float or None
1027
    @param timeout: Maximum time to acquire all locks
1028
    @type priority: integer
1029
    @param priority: Priority for acquiring locks
1030
    @type test_notify: callable or None
1031
    @param test_notify: Special callback function for unittesting
1032

1033
    @return: Set of all locks successfully acquired or None in case of timeout
1034

1035
    @raise errors.LockError: when any lock we try to acquire has
1036
        been deleted before we succeed. In this case none of the
1037
        locks requested will be acquired.
1038

1039
    """
1040
    assert timeout is None or timeout >= 0.0
1041

    
1042
    # Check we don't already own locks at this level
1043
    assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
1044
                                  " (lockset %s)" % self.name)
1045

    
1046
    if priority is None:
1047
      priority = _DEFAULT_PRIORITY
1048

    
1049
    # We need to keep track of how long we spent waiting for a lock. The
1050
    # timeout passed to this function is over all lock acquires.
1051
    running_timeout = utils.RunningTimeout(timeout, False)
1052

    
1053
    try:
1054
      if names is not None:
1055
        # Support passing in a single resource to acquire rather than many
1056
        if isinstance(names, basestring):
1057
          names = [names]
1058

    
1059
        return self.__acquire_inner(names, False, shared, priority,
1060
                                    running_timeout.Remaining, test_notify)
1061

    
1062
      else:
1063
        # If no names are given acquire the whole set by not letting new names
1064
        # being added before we release, and getting the current list of names.
1065
        # Some of them may then be deleted later, but we'll cope with this.
1066
        #
1067
        # We'd like to acquire this lock in a shared way, as it's nice if
1068
        # everybody else can use the instances at the same time. If we are
1069
        # acquiring them exclusively though they won't be able to do this
1070
        # anyway, though, so we'll get the list lock exclusively as well in
1071
        # order to be able to do add() on the set while owning it.
1072
        if not self.__lock.acquire(shared=shared, priority=priority,
1073
                                   timeout=running_timeout.Remaining()):
1074
          raise _AcquireTimeout()
1075
        try:
1076
          # note we own the set-lock
1077
          self._add_owned()
1078

    
1079
          return self.__acquire_inner(self.__names(), True, shared, priority,
1080
                                      running_timeout.Remaining, test_notify)
1081
        except:
1082
          # We shouldn't have problems adding the lock to the owners list, but
1083
          # if we did we'll try to release this lock and re-raise exception.
1084
          # Of course something is going to be really wrong, after this.
1085
          self.__lock.release()
1086
          self._del_owned()
1087
          raise
1088

    
1089
    except _AcquireTimeout:
1090
      return None
1091

    
1092
  def __acquire_inner(self, names, want_all, shared, priority,
1093
                      timeout_fn, test_notify):
1094
    """Inner logic for acquiring a number of locks.
1095

1096
    @param names: Names of the locks to be acquired
1097
    @param want_all: Whether all locks in the set should be acquired
1098
    @param shared: Whether to acquire in shared mode
1099
    @param timeout_fn: Function returning remaining timeout
1100
    @param priority: Priority for acquiring locks
1101
    @param test_notify: Special callback function for unittesting
1102

1103
    """
1104
    acquire_list = []
1105

    
1106
    # First we look the locks up on __lockdict. We have no way of being sure
1107
    # they will still be there after, but this makes it a lot faster should
1108
    # just one of them be the already wrong. Using a sorted sequence to prevent
1109
    # deadlocks.
1110
    for lname in sorted(utils.UniqueSequence(names)):
1111
      try:
1112
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
1113
      except KeyError:
1114
        if want_all:
1115
          # We are acquiring all the set, it doesn't matter if this particular
1116
          # element is not there anymore.
1117
          continue
1118

    
1119
        raise errors.LockError("Non-existing lock %s in set %s (it may have"
1120
                               " been removed)" % (lname, self.name))
1121

    
1122
      acquire_list.append((lname, lock))
1123

    
1124
    # This will hold the locknames we effectively acquired.
1125
    acquired = set()
1126

    
1127
    try:
1128
      # Now acquire_list contains a sorted list of resources and locks we
1129
      # want.  In order to get them we loop on this (private) list and
1130
      # acquire() them.  We gave no real guarantee they will still exist till
1131
      # this is done but .acquire() itself is safe and will alert us if the
1132
      # lock gets deleted.
1133
      for (lname, lock) in acquire_list:
1134
        if __debug__ and callable(test_notify):
1135
          test_notify_fn = lambda: test_notify(lname)
1136
        else:
1137
          test_notify_fn = None
1138

    
1139
        timeout = timeout_fn()
1140

    
1141
        try:
1142
          # raises LockError if the lock was deleted
1143
          acq_success = lock.acquire(shared=shared, timeout=timeout,
1144
                                     priority=priority,
1145
                                     test_notify=test_notify_fn)
1146
        except errors.LockError:
1147
          if want_all:
1148
            # We are acquiring all the set, it doesn't matter if this
1149
            # particular element is not there anymore.
1150
            continue
1151

    
1152
          raise errors.LockError("Non-existing lock %s in set %s (it may"
1153
                                 " have been removed)" % (lname, self.name))
1154

    
1155
        if not acq_success:
1156
          # Couldn't get lock or timeout occurred
1157
          if timeout is None:
1158
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
1159
            # blocking.
1160
            raise errors.LockError("Failed to get lock %s (set %s)" %
1161
                                   (lname, self.name))
1162

    
1163
          raise _AcquireTimeout()
1164

    
1165
        try:
1166
          # now the lock cannot be deleted, we have it!
1167
          self._add_owned(name=lname)
1168
          acquired.add(lname)
1169

    
1170
        except:
1171
          # We shouldn't have problems adding the lock to the owners list, but
1172
          # if we did we'll try to release this lock and re-raise exception.
1173
          # Of course something is going to be really wrong after this.
1174
          if lock._is_owned():
1175
            lock.release()
1176
          raise
1177

    
1178
    except:
1179
      # Release all owned locks
1180
      self._release_and_delete_owned()
1181
      raise
1182

    
1183
    return acquired
1184

    
1185
  def downgrade(self, names=None):
1186
    """Downgrade a set of resource locks from exclusive to shared mode.
1187

1188
    The locks must have been acquired in exclusive mode.
1189

1190
    """
1191
    assert self._is_owned(), ("downgrade on lockset %s while not owning any"
1192
                              " lock" % self.name)
1193

    
1194
    # Support passing in a single resource to downgrade rather than many
1195
    if isinstance(names, basestring):
1196
      names = [names]
1197

    
1198
    owned = self._list_owned()
1199

    
1200
    if names is None:
1201
      names = owned
1202
    else:
1203
      names = set(names)
1204
      assert owned.issuperset(names), \
1205
        ("downgrade() on unheld resources %s (set %s)" %
1206
         (names.difference(owned), self.name))
1207

    
1208
    for lockname in names:
1209
      self.__lockdict[lockname].downgrade()
1210

    
1211
    # Do we own the lockset in exclusive mode?
1212
    if self.__lock._is_owned(shared=0):
1213
      # Have all locks been downgraded?
1214
      if not compat.any(lock._is_owned(shared=0)
1215
                        for lock in self.__lockdict.values()):
1216
        self.__lock.downgrade()
1217
        assert self.__lock._is_owned(shared=1)
1218

    
1219
    return True
1220

    
1221
  def release(self, names=None):
1222
    """Release a set of resource locks, at the same level.
1223

1224
    You must have acquired the locks, either in shared or in exclusive mode,
1225
    before releasing them.
1226

1227
    @type names: list of strings, or None
1228
    @param names: the names of the locks which shall be released
1229
        (defaults to all the locks acquired at that level).
1230

1231
    """
1232
    assert self._is_owned(), ("release() on lock set %s while not owner" %
1233
                              self.name)
1234

    
1235
    # Support passing in a single resource to release rather than many
1236
    if isinstance(names, basestring):
1237
      names = [names]
1238

    
1239
    if names is None:
1240
      names = self._list_owned()
1241
    else:
1242
      names = set(names)
1243
      assert self._list_owned().issuperset(names), (
1244
               "release() on unheld resources %s (set %s)" %
1245
               (names.difference(self._list_owned()), self.name))
1246

    
1247
    # First of all let's release the "all elements" lock, if set.
1248
    # After this 'add' can work again
1249
    if self.__lock._is_owned():
1250
      self.__lock.release()
1251
      self._del_owned()
1252

    
1253
    for lockname in names:
1254
      # If we are sure the lock doesn't leave __lockdict without being
1255
      # exclusively held we can do this...
1256
      self.__lockdict[lockname].release()
1257
      self._del_owned(name=lockname)
1258

    
1259
  def add(self, names, acquired=0, shared=0):
1260
    """Add a new set of elements to the set
1261

1262
    @type names: list of strings
1263
    @param names: names of the new elements to add
1264
    @type acquired: integer (0/1) used as a boolean
1265
    @param acquired: pre-acquire the new resource?
1266
    @type shared: integer (0/1) used as a boolean
1267
    @param shared: is the pre-acquisition shared?
1268

1269
    """
1270
    # Check we don't already own locks at this level
1271
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1272
      ("Cannot add locks if the set %s is only partially owned, or shared" %
1273
       self.name)
1274

    
1275
    # Support passing in a single resource to add rather than many
1276
    if isinstance(names, basestring):
1277
      names = [names]
1278

    
1279
    # If we don't already own the set-level lock acquired in an exclusive way
1280
    # we'll get it and note we need to release it later.
1281
    release_lock = False
1282
    if not self.__lock._is_owned():
1283
      release_lock = True
1284
      self.__lock.acquire()
1285

    
1286
    try:
1287
      invalid_names = set(self.__names()).intersection(names)
1288
      if invalid_names:
1289
        # This must be an explicit raise, not an assert, because assert is
1290
        # turned off when using optimization, and this can happen because of
1291
        # concurrency even if the user doesn't want it.
1292
        raise errors.LockError("duplicate add(%s) on lockset %s" %
1293
                               (invalid_names, self.name))
1294

    
1295
      for lockname in names:
1296
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1297

    
1298
        if acquired:
1299
          # No need for priority or timeout here as this lock has just been
1300
          # created
1301
          lock.acquire(shared=shared)
1302
          # now the lock cannot be deleted, we have it!
1303
          try:
1304
            self._add_owned(name=lockname)
1305
          except:
1306
            # We shouldn't have problems adding the lock to the owners list,
1307
            # but if we did we'll try to release this lock and re-raise
1308
            # exception.  Of course something is going to be really wrong,
1309
            # after this.  On the other hand the lock hasn't been added to the
1310
            # __lockdict yet so no other threads should be pending on it. This
1311
            # release is just a safety measure.
1312
            lock.release()
1313
            raise
1314

    
1315
        self.__lockdict[lockname] = lock
1316

    
1317
    finally:
1318
      # Only release __lock if we were not holding it previously.
1319
      if release_lock:
1320
        self.__lock.release()
1321

    
1322
    return True
1323

    
1324
  def remove(self, names):
1325
    """Remove elements from the lock set.
1326

1327
    You can either not hold anything in the lockset or already hold a superset
1328
    of the elements you want to delete, exclusively.
1329

1330
    @type names: list of strings
1331
    @param names: names of the resource to remove.
1332

1333
    @return: a list of locks which we removed; the list is always
1334
        equal to the names list if we were holding all the locks
1335
        exclusively
1336

1337
    """
1338
    # Support passing in a single resource to remove rather than many
1339
    if isinstance(names, basestring):
1340
      names = [names]
1341

    
1342
    # If we own any subset of this lock it must be a superset of what we want
1343
    # to delete. The ownership must also be exclusive, but that will be checked
1344
    # by the lock itself.
1345
    assert not self._is_owned() or self._list_owned().issuperset(names), (
1346
      "remove() on acquired lockset %s while not owning all elements" %
1347
      self.name)
1348

    
1349
    removed = []
1350

    
1351
    for lname in names:
1352
      # Calling delete() acquires the lock exclusively if we don't already own
1353
      # it, and causes all pending and subsequent lock acquires to fail. It's
1354
      # fine to call it out of order because delete() also implies release(),
1355
      # and the assertion above guarantees that if we either already hold
1356
      # everything we want to delete, or we hold none.
1357
      try:
1358
        self.__lockdict[lname].delete()
1359
        removed.append(lname)
1360
      except (KeyError, errors.LockError):
1361
        # This cannot happen if we were already holding it, verify:
1362
        assert not self._is_owned(), ("remove failed while holding lockset %s"
1363
                                      % self.name)
1364
      else:
1365
        # If no LockError was raised we are the ones who deleted the lock.
1366
        # This means we can safely remove it from lockdict, as any further or
1367
        # pending delete() or acquire() will fail (and nobody can have the lock
1368
        # since before our call to delete()).
1369
        #
1370
        # This is done in an else clause because if the exception was thrown
1371
        # it's the job of the one who actually deleted it.
1372
        del self.__lockdict[lname]
1373
        # And let's remove it from our private list if we owned it.
1374
        if self._is_owned():
1375
          self._del_owned(name=lname)
1376

    
1377
    return removed
1378

    
1379

    
1380
# Locking levels, must be acquired in increasing order.
1381
# Current rules are:
1382
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1383
#   acquired before performing any operation, either in shared or in exclusive
1384
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1385
#   avoided.
1386
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1387
#   If you need more than one node, or more than one instance, acquire them at
1388
#   the same time.
1389
LEVEL_CLUSTER = 0
1390
LEVEL_INSTANCE = 1
1391
LEVEL_NODEGROUP = 2
1392
LEVEL_NODE = 3
1393

    
1394
LEVELS = [LEVEL_CLUSTER,
1395
          LEVEL_INSTANCE,
1396
          LEVEL_NODEGROUP,
1397
          LEVEL_NODE]
1398

    
1399
# Lock levels which are modifiable
1400
LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE]
1401

    
1402
LEVEL_NAMES = {
1403
  LEVEL_CLUSTER: "cluster",
1404
  LEVEL_INSTANCE: "instance",
1405
  LEVEL_NODEGROUP: "nodegroup",
1406
  LEVEL_NODE: "node",
1407
  }
1408

    
1409
# Constant for the big ganeti lock
1410
BGL = 'BGL'
1411

    
1412

    
1413
class GanetiLockManager:
1414
  """The Ganeti Locking Library
1415

1416
  The purpose of this small library is to manage locking for ganeti clusters
1417
  in a central place, while at the same time doing dynamic checks against
1418
  possible deadlocks. It will also make it easier to transition to a different
1419
  lock type should we migrate away from python threads.
1420

1421
  """
1422
  _instance = None
1423

    
1424
  def __init__(self, nodes, nodegroups, instances):
1425
    """Constructs a new GanetiLockManager object.
1426

1427
    There should be only a GanetiLockManager object at any time, so this
1428
    function raises an error if this is not the case.
1429

1430
    @param nodes: list of node names
1431
    @param nodegroups: list of nodegroup uuids
1432
    @param instances: list of instance names
1433

1434
    """
1435
    assert self.__class__._instance is None, \
1436
           "double GanetiLockManager instance"
1437

    
1438
    self.__class__._instance = self
1439

    
1440
    self._monitor = LockMonitor()
1441

    
1442
    # The keyring contains all the locks, at their level and in the correct
1443
    # locking order.
1444
    self.__keyring = {
1445
      LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1446
      LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1447
      LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor),
1448
      LEVEL_INSTANCE: LockSet(instances, "instances",
1449
                              monitor=self._monitor),
1450
      }
1451

    
1452
  def QueryLocks(self, fields):
1453
    """Queries information from all locks.
1454

1455
    See L{LockMonitor.QueryLocks}.
1456

1457
    """
1458
    return self._monitor.QueryLocks(fields)
1459

    
1460
  def OldStyleQueryLocks(self, fields):
1461
    """Queries information from all locks, returning old-style data.
1462

1463
    See L{LockMonitor.OldStyleQueryLocks}.
1464

1465
    """
1466
    return self._monitor.OldStyleQueryLocks(fields)
1467

    
1468
  def _names(self, level):
1469
    """List the lock names at the given level.
1470

1471
    This can be used for debugging/testing purposes.
1472

1473
    @param level: the level whose list of locks to get
1474

1475
    """
1476
    assert level in LEVELS, "Invalid locking level %s" % level
1477
    return self.__keyring[level]._names()
1478

    
1479
  def _is_owned(self, level):
1480
    """Check whether we are owning locks at the given level
1481

1482
    """
1483
    return self.__keyring[level]._is_owned()
1484

    
1485
  is_owned = _is_owned
1486

    
1487
  def _list_owned(self, level):
1488
    """Get the set of owned locks at the given level
1489

1490
    """
1491
    return self.__keyring[level]._list_owned()
1492

    
1493
  list_owned = _list_owned
1494

    
1495
  def _upper_owned(self, level):
1496
    """Check that we don't own any lock at a level greater than the given one.
1497

1498
    """
1499
    # This way of checking only works if LEVELS[i] = i, which we check for in
1500
    # the test cases.
1501
    return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1502

    
1503
  def _BGL_owned(self): # pylint: disable-msg=C0103
1504
    """Check if the current thread owns the BGL.
1505

1506
    Both an exclusive or a shared acquisition work.
1507

1508
    """
1509
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1510

    
1511
  @staticmethod
1512
  def _contains_BGL(level, names): # pylint: disable-msg=C0103
1513
    """Check if the level contains the BGL.
1514

1515
    Check if acting on the given level and set of names will change
1516
    the status of the Big Ganeti Lock.
1517

1518
    """
1519
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1520

    
1521
  def acquire(self, level, names, timeout=None, shared=0, priority=None):
1522
    """Acquire a set of resource locks, at the same level.
1523

1524
    @type level: member of locking.LEVELS
1525
    @param level: the level at which the locks shall be acquired
1526
    @type names: list of strings (or string)
1527
    @param names: the names of the locks which shall be acquired
1528
        (special lock names, or instance/node names)
1529
    @type shared: integer (0/1) used as a boolean
1530
    @param shared: whether to acquire in shared mode; by default
1531
        an exclusive lock will be acquired
1532
    @type timeout: float
1533
    @param timeout: Maximum time to acquire all locks
1534
    @type priority: integer
1535
    @param priority: Priority for acquiring lock
1536

1537
    """
1538
    assert level in LEVELS, "Invalid locking level %s" % level
1539

    
1540
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1541
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1542
    # so even if we've migrated we need to at least share the BGL to be
1543
    # compatible with them. Of course if we own the BGL exclusively there's no
1544
    # point in acquiring any other lock, unless perhaps we are half way through
1545
    # the migration of the current opcode.
1546
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1547
            "You must own the Big Ganeti Lock before acquiring any other")
1548

    
1549
    # Check we don't own locks at the same or upper levels.
1550
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1551
           " while owning some at a greater one")
1552

    
1553
    # Acquire the locks in the set.
1554
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1555
                                         priority=priority)
1556

    
1557
  def downgrade(self, level, names=None):
1558
    """Downgrade a set of resource locks from exclusive to shared mode.
1559

1560
    You must have acquired the locks in exclusive mode.
1561

1562
    @type level: member of locking.LEVELS
1563
    @param level: the level at which the locks shall be downgraded
1564
    @type names: list of strings, or None
1565
    @param names: the names of the locks which shall be downgraded
1566
        (defaults to all the locks acquired at the level)
1567

1568
    """
1569
    assert level in LEVELS, "Invalid locking level %s" % level
1570

    
1571
    return self.__keyring[level].downgrade(names=names)
1572

    
1573
  def release(self, level, names=None):
1574
    """Release a set of resource locks, at the same level.
1575

1576
    You must have acquired the locks, either in shared or in exclusive
1577
    mode, before releasing them.
1578

1579
    @type level: member of locking.LEVELS
1580
    @param level: the level at which the locks shall be released
1581
    @type names: list of strings, or None
1582
    @param names: the names of the locks which shall be released
1583
        (defaults to all the locks acquired at that level)
1584

1585
    """
1586
    assert level in LEVELS, "Invalid locking level %s" % level
1587
    assert (not self._contains_BGL(level, names) or
1588
            not self._upper_owned(LEVEL_CLUSTER)), (
1589
            "Cannot release the Big Ganeti Lock while holding something"
1590
            " at upper levels (%r)" %
1591
            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1592
                              for i in self.__keyring.keys()]), ))
1593

    
1594
    # Release will complain if we don't own the locks already
1595
    return self.__keyring[level].release(names)
1596

    
1597
  def add(self, level, names, acquired=0, shared=0):
1598
    """Add locks at the specified level.
1599

1600
    @type level: member of locking.LEVELS_MOD
1601
    @param level: the level at which the locks shall be added
1602
    @type names: list of strings
1603
    @param names: names of the locks to acquire
1604
    @type acquired: integer (0/1) used as a boolean
1605
    @param acquired: whether to acquire the newly added locks
1606
    @type shared: integer (0/1) used as a boolean
1607
    @param shared: whether the acquisition will be shared
1608

1609
    """
1610
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1611
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1612
           " operations")
1613
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1614
           " while owning some at a greater one")
1615
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1616

    
1617
  def remove(self, level, names):
1618
    """Remove locks from the specified level.
1619

1620
    You must either already own the locks you are trying to remove
1621
    exclusively or not own any lock at an upper level.
1622

1623
    @type level: member of locking.LEVELS_MOD
1624
    @param level: the level at which the locks shall be removed
1625
    @type names: list of strings
1626
    @param names: the names of the locks which shall be removed
1627
        (special lock names, or instance/node names)
1628

1629
    """
1630
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1631
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1632
           " operations")
1633
    # Check we either own the level or don't own anything from here
1634
    # up. LockSet.remove() will check the case in which we don't own
1635
    # all the needed resources, or we have a shared ownership.
1636
    assert self._is_owned(level) or not self._upper_owned(level), (
1637
           "Cannot remove locks at a level while not owning it or"
1638
           " owning some at a greater one")
1639
    return self.__keyring[level].remove(names)
1640

    
1641

    
1642
def _MonitorSortKey((item, idx, num)):
1643
  """Sorting key function.
1644

1645
  Sort by name, registration order and then order of information. This provides
1646
  a stable sort order over different providers, even if they return the same
1647
  name.
1648

1649
  """
1650
  (name, _, _, _) = item
1651

    
1652
  return (utils.NiceSortKey(name), num, idx)
1653

    
1654

    
1655
class LockMonitor(object):
1656
  _LOCK_ATTR = "_lock"
1657

    
1658
  def __init__(self):
1659
    """Initializes this class.
1660

1661
    """
1662
    self._lock = SharedLock("LockMonitor")
1663

    
1664
    # Counter for stable sorting
1665
    self._counter = itertools.count(0)
1666

    
1667
    # Tracked locks. Weak references are used to avoid issues with circular
1668
    # references and deletion.
1669
    self._locks = weakref.WeakKeyDictionary()
1670

    
1671
  @ssynchronized(_LOCK_ATTR)
1672
  def RegisterLock(self, provider):
1673
    """Registers a new lock.
1674

1675
    @param provider: Object with a callable method named C{GetLockInfo}, taking
1676
      a single C{set} containing the requested information items
1677
    @note: It would be nicer to only receive the function generating the
1678
      requested information but, as it turns out, weak references to bound
1679
      methods (e.g. C{self.GetLockInfo}) are tricky; there are several
1680
      workarounds, but none of the ones I found works properly in combination
1681
      with a standard C{WeakKeyDictionary}
1682

1683
    """
1684
    assert provider not in self._locks, "Duplicate registration"
1685

    
1686
    # There used to be a check for duplicate names here. As it turned out, when
1687
    # a lock is re-created with the same name in a very short timeframe, the
1688
    # previous instance might not yet be removed from the weakref dictionary.
1689
    # By keeping track of the order of incoming registrations, a stable sort
1690
    # ordering can still be guaranteed.
1691

    
1692
    self._locks[provider] = self._counter.next()
1693

    
1694
  def _GetLockInfo(self, requested):
1695
    """Get information from all locks.
1696

1697
    """
1698
    # Must hold lock while getting consistent list of tracked items
1699
    self._lock.acquire(shared=1)
1700
    try:
1701
      items = self._locks.items()
1702
    finally:
1703
      self._lock.release()
1704

    
1705
    return [(info, idx, num)
1706
            for (provider, num) in items
1707
            for (idx, info) in enumerate(provider.GetLockInfo(requested))]
1708

    
1709
  def _Query(self, fields):
1710
    """Queries information from all locks.
1711

1712
    @type fields: list of strings
1713
    @param fields: List of fields to return
1714

1715
    """
1716
    qobj = query.Query(query.LOCK_FIELDS, fields)
1717

    
1718
    # Get all data with internal lock held and then sort by name and incoming
1719
    # order
1720
    lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
1721
                      key=_MonitorSortKey)
1722

    
1723
    # Extract lock information and build query data
1724
    return (qobj, query.LockQueryData(map(operator.itemgetter(0), lockinfo)))
1725

    
1726
  def QueryLocks(self, fields):
1727
    """Queries information from all locks.
1728

1729
    @type fields: list of strings
1730
    @param fields: List of fields to return
1731

1732
    """
1733
    (qobj, ctx) = self._Query(fields)
1734

    
1735
    # Prepare query response
1736
    return query.GetQueryResponse(qobj, ctx)
1737

    
1738
  def OldStyleQueryLocks(self, fields):
1739
    """Queries information from all locks, returning old-style data.
1740

1741
    @type fields: list of strings
1742
    @param fields: List of fields to return
1743

1744
    """
1745
    (qobj, ctx) = self._Query(fields)
1746

    
1747
    return qobj.OldStyleQuery(ctx)