Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ 0376655e

History | View | Annotate | Download (52.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable-msg=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import errno
32
import weakref
33
import logging
34
import heapq
35
import operator
36
import itertools
37

    
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import compat
41
from ganeti import query
42

    
43

    
44
_EXCLUSIVE_TEXT = "exclusive"
45
_SHARED_TEXT = "shared"
46
_DELETED_TEXT = "deleted"
47

    
48
_DEFAULT_PRIORITY = 0
49

    
50

    
51
def ssynchronized(mylock, shared=0):
52
  """Shared Synchronization decorator.
53

54
  Calls the function holding the given lock, either in exclusive or shared
55
  mode. It requires the passed lock to be a SharedLock (or support its
56
  semantics).
57

58
  @type mylock: lockable object or string
59
  @param mylock: lock to acquire or class member name of the lock to acquire
60

61
  """
62
  def wrap(fn):
63
    def sync_function(*args, **kwargs):
64
      if isinstance(mylock, basestring):
65
        assert args, "cannot ssynchronize on non-class method: self not found"
66
        # args[0] is "self"
67
        lock = getattr(args[0], mylock)
68
      else:
69
        lock = mylock
70
      lock.acquire(shared=shared)
71
      try:
72
        return fn(*args, **kwargs)
73
      finally:
74
        lock.release()
75
    return sync_function
76
  return wrap
77

    
78

    
79
class _SingleNotifyPipeConditionWaiter(object):
80
  """Helper class for SingleNotifyPipeCondition
81

82
  """
83
  __slots__ = [
84
    "_fd",
85
    "_poller",
86
    ]
87

    
88
  def __init__(self, poller, fd):
89
    """Constructor for _SingleNotifyPipeConditionWaiter
90

91
    @type poller: select.poll
92
    @param poller: Poller object
93
    @type fd: int
94
    @param fd: File descriptor to wait for
95

96
    """
97
    object.__init__(self)
98
    self._poller = poller
99
    self._fd = fd
100

    
101
  def __call__(self, timeout):
102
    """Wait for something to happen on the pipe.
103

104
    @type timeout: float or None
105
    @param timeout: Timeout for waiting (can be None)
106

107
    """
108
    running_timeout = utils.RunningTimeout(timeout, True)
109

    
110
    while True:
111
      remaining_time = running_timeout.Remaining()
112

    
113
      if remaining_time is not None:
114
        if remaining_time < 0.0:
115
          break
116

    
117
        # Our calculation uses seconds, poll() wants milliseconds
118
        remaining_time *= 1000
119

    
120
      try:
121
        result = self._poller.poll(remaining_time)
122
      except EnvironmentError, err:
123
        if err.errno != errno.EINTR:
124
          raise
125
        result = None
126

    
127
      # Check whether we were notified
128
      if result and result[0][0] == self._fd:
129
        break
130

    
131

    
132
class _BaseCondition(object):
133
  """Base class containing common code for conditions.
134

135
  Some of this code is taken from python's threading module.
136

137
  """
138
  __slots__ = [
139
    "_lock",
140
    "acquire",
141
    "release",
142
    "_is_owned",
143
    "_acquire_restore",
144
    "_release_save",
145
    ]
146

    
147
  def __init__(self, lock):
148
    """Constructor for _BaseCondition.
149

150
    @type lock: threading.Lock
151
    @param lock: condition base lock
152

153
    """
154
    object.__init__(self)
155

    
156
    try:
157
      self._release_save = lock._release_save
158
    except AttributeError:
159
      self._release_save = self._base_release_save
160
    try:
161
      self._acquire_restore = lock._acquire_restore
162
    except AttributeError:
163
      self._acquire_restore = self._base_acquire_restore
164
    try:
165
      self._is_owned = lock._is_owned
166
    except AttributeError:
167
      self._is_owned = self._base_is_owned
168

    
169
    self._lock = lock
170

    
171
    # Export the lock's acquire() and release() methods
172
    self.acquire = lock.acquire
173
    self.release = lock.release
174

    
175
  def _base_is_owned(self):
176
    """Check whether lock is owned by current thread.
177

178
    """
179
    if self._lock.acquire(0):
180
      self._lock.release()
181
      return False
182
    return True
183

    
184
  def _base_release_save(self):
185
    self._lock.release()
186

    
187
  def _base_acquire_restore(self, _):
188
    self._lock.acquire()
189

    
190
  def _check_owned(self):
191
    """Raise an exception if the current thread doesn't own the lock.
192

193
    """
194
    if not self._is_owned():
195
      raise RuntimeError("cannot work with un-aquired lock")
196

    
197

    
198
class SingleNotifyPipeCondition(_BaseCondition):
199
  """Condition which can only be notified once.
200

201
  This condition class uses pipes and poll, internally, to be able to wait for
202
  notification with a timeout, without resorting to polling. It is almost
203
  compatible with Python's threading.Condition, with the following differences:
204
    - notifyAll can only be called once, and no wait can happen after that
205
    - notify is not supported, only notifyAll
206

207
  """
208

    
209
  __slots__ = [
210
    "_poller",
211
    "_read_fd",
212
    "_write_fd",
213
    "_nwaiters",
214
    "_notified",
215
    ]
216

    
217
  _waiter_class = _SingleNotifyPipeConditionWaiter
218

    
219
  def __init__(self, lock):
220
    """Constructor for SingleNotifyPipeCondition
221

222
    """
223
    _BaseCondition.__init__(self, lock)
224
    self._nwaiters = 0
225
    self._notified = False
226
    self._read_fd = None
227
    self._write_fd = None
228
    self._poller = None
229

    
230
  def _check_unnotified(self):
231
    """Throws an exception if already notified.
232

233
    """
234
    if self._notified:
235
      raise RuntimeError("cannot use already notified condition")
236

    
237
  def _Cleanup(self):
238
    """Cleanup open file descriptors, if any.
239

240
    """
241
    if self._read_fd is not None:
242
      os.close(self._read_fd)
243
      self._read_fd = None
244

    
245
    if self._write_fd is not None:
246
      os.close(self._write_fd)
247
      self._write_fd = None
248
    self._poller = None
249

    
250
  def wait(self, timeout):
251
    """Wait for a notification.
252

253
    @type timeout: float or None
254
    @param timeout: Waiting timeout (can be None)
255

256
    """
257
    self._check_owned()
258
    self._check_unnotified()
259

    
260
    self._nwaiters += 1
261
    try:
262
      if self._poller is None:
263
        (self._read_fd, self._write_fd) = os.pipe()
264
        self._poller = select.poll()
265
        self._poller.register(self._read_fd, select.POLLHUP)
266

    
267
      wait_fn = self._waiter_class(self._poller, self._read_fd)
268
      state = self._release_save()
269
      try:
270
        # Wait for notification
271
        wait_fn(timeout)
272
      finally:
273
        # Re-acquire lock
274
        self._acquire_restore(state)
275
    finally:
276
      self._nwaiters -= 1
277
      if self._nwaiters == 0:
278
        self._Cleanup()
279

    
280
  def notifyAll(self): # pylint: disable-msg=C0103
281
    """Close the writing side of the pipe to notify all waiters.
282

283
    """
284
    self._check_owned()
285
    self._check_unnotified()
286
    self._notified = True
287
    if self._write_fd is not None:
288
      os.close(self._write_fd)
289
      self._write_fd = None
290

    
291

    
292
class PipeCondition(_BaseCondition):
293
  """Group-only non-polling condition with counters.
294

295
  This condition class uses pipes and poll, internally, to be able to wait for
296
  notification with a timeout, without resorting to polling. It is almost
297
  compatible with Python's threading.Condition, but only supports notifyAll and
298
  non-recursive locks. As an additional features it's able to report whether
299
  there are any waiting threads.
300

301
  """
302
  __slots__ = [
303
    "_waiters",
304
    "_single_condition",
305
    ]
306

    
307
  _single_condition_class = SingleNotifyPipeCondition
308

    
309
  def __init__(self, lock):
310
    """Initializes this class.
311

312
    """
313
    _BaseCondition.__init__(self, lock)
314
    self._waiters = set()
315
    self._single_condition = self._single_condition_class(self._lock)
316

    
317
  def wait(self, timeout):
318
    """Wait for a notification.
319

320
    @type timeout: float or None
321
    @param timeout: Waiting timeout (can be None)
322

323
    """
324
    self._check_owned()
325

    
326
    # Keep local reference to the pipe. It could be replaced by another thread
327
    # notifying while we're waiting.
328
    cond = self._single_condition
329

    
330
    self._waiters.add(threading.currentThread())
331
    try:
332
      cond.wait(timeout)
333
    finally:
334
      self._check_owned()
335
      self._waiters.remove(threading.currentThread())
336

    
337
  def notifyAll(self): # pylint: disable-msg=C0103
338
    """Notify all currently waiting threads.
339

340
    """
341
    self._check_owned()
342
    self._single_condition.notifyAll()
343
    self._single_condition = self._single_condition_class(self._lock)
344

    
345
  def get_waiting(self):
346
    """Returns a list of all waiting threads.
347

348
    """
349
    self._check_owned()
350

    
351
    return self._waiters
352

    
353
  def has_waiting(self):
354
    """Returns whether there are active waiters.
355

356
    """
357
    self._check_owned()
358

    
359
    return bool(self._waiters)
360

    
361

    
362
class _PipeConditionWithMode(PipeCondition):
363
  __slots__ = [
364
    "shared",
365
    ]
366

    
367
  def __init__(self, lock, shared):
368
    """Initializes this class.
369

370
    """
371
    self.shared = shared
372
    PipeCondition.__init__(self, lock)
373

    
374

    
375
class SharedLock(object):
376
  """Implements a shared lock.
377

378
  Multiple threads can acquire the lock in a shared way by calling
379
  C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
380
  threads can call C{acquire(shared=0)}.
381

382
  Notes on data structures: C{__pending} contains a priority queue (heapq) of
383
  all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
384
  ...]}. Each per-priority queue contains a normal in-order list of conditions
385
  to be notified when the lock can be acquired. Shared locks are grouped
386
  together by priority and the condition for them is stored in
387
  C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
388
  references for the per-priority queues indexed by priority for faster access.
389

390
  @type name: string
391
  @ivar name: the name of the lock
392

393
  """
394
  __slots__ = [
395
    "__weakref__",
396
    "__deleted",
397
    "__exc",
398
    "__lock",
399
    "__pending",
400
    "__pending_by_prio",
401
    "__pending_shared",
402
    "__shr",
403
    "name",
404
    ]
405

    
406
  __condition_class = _PipeConditionWithMode
407

    
408
  def __init__(self, name, monitor=None):
409
    """Construct a new SharedLock.
410

411
    @param name: the name of the lock
412
    @type monitor: L{LockMonitor}
413
    @param monitor: Lock monitor with which to register
414

415
    """
416
    object.__init__(self)
417

    
418
    self.name = name
419

    
420
    # Internal lock
421
    self.__lock = threading.Lock()
422

    
423
    # Queue containing waiting acquires
424
    self.__pending = []
425
    self.__pending_by_prio = {}
426
    self.__pending_shared = {}
427

    
428
    # Current lock holders
429
    self.__shr = set()
430
    self.__exc = None
431

    
432
    # is this lock in the deleted state?
433
    self.__deleted = False
434

    
435
    # Register with lock monitor
436
    if monitor:
437
      monitor.RegisterLock(self)
438

    
439
  def GetInfo(self, requested):
440
    """Retrieves information for querying locks.
441

442
    @type requested: set
443
    @param requested: Requested information, see C{query.LQ_*}
444

445
    """
446
    self.__lock.acquire()
447
    try:
448
      # Note: to avoid unintentional race conditions, no references to
449
      # modifiable objects should be returned unless they were created in this
450
      # function.
451
      mode = None
452
      owner_names = None
453

    
454
      if query.LQ_MODE in requested:
455
        if self.__deleted:
456
          mode = _DELETED_TEXT
457
          assert not (self.__exc or self.__shr)
458
        elif self.__exc:
459
          mode = _EXCLUSIVE_TEXT
460
        elif self.__shr:
461
          mode = _SHARED_TEXT
462

    
463
      # Current owner(s) are wanted
464
      if query.LQ_OWNER in requested:
465
        if self.__exc:
466
          owner = [self.__exc]
467
        else:
468
          owner = self.__shr
469

    
470
        if owner:
471
          assert not self.__deleted
472
          owner_names = [i.getName() for i in owner]
473

    
474
      # Pending acquires are wanted
475
      if query.LQ_PENDING in requested:
476
        pending = []
477

    
478
        # Sorting instead of copying and using heaq functions for simplicity
479
        for (_, prioqueue) in sorted(self.__pending):
480
          for cond in prioqueue:
481
            if cond.shared:
482
              pendmode = _SHARED_TEXT
483
            else:
484
              pendmode = _EXCLUSIVE_TEXT
485

    
486
            # List of names will be sorted in L{query._GetLockPending}
487
            pending.append((pendmode, [i.getName()
488
                                       for i in cond.get_waiting()]))
489
      else:
490
        pending = None
491

    
492
      return (self.name, mode, owner_names, pending)
493
    finally:
494
      self.__lock.release()
495

    
496
  def __check_deleted(self):
497
    """Raises an exception if the lock has been deleted.
498

499
    """
500
    if self.__deleted:
501
      raise errors.LockError("Deleted lock %s" % self.name)
502

    
503
  def __is_sharer(self):
504
    """Is the current thread sharing the lock at this time?
505

506
    """
507
    return threading.currentThread() in self.__shr
508

    
509
  def __is_exclusive(self):
510
    """Is the current thread holding the lock exclusively at this time?
511

512
    """
513
    return threading.currentThread() == self.__exc
514

    
515
  def __is_owned(self, shared=-1):
516
    """Is the current thread somehow owning the lock at this time?
517

518
    This is a private version of the function, which presumes you're holding
519
    the internal lock.
520

521
    """
522
    if shared < 0:
523
      return self.__is_sharer() or self.__is_exclusive()
524
    elif shared:
525
      return self.__is_sharer()
526
    else:
527
      return self.__is_exclusive()
528

    
529
  def _is_owned(self, shared=-1):
530
    """Is the current thread somehow owning the lock at this time?
531

532
    @param shared:
533
        - < 0: check for any type of ownership (default)
534
        - 0: check for exclusive ownership
535
        - > 0: check for shared ownership
536

537
    """
538
    self.__lock.acquire()
539
    try:
540
      return self.__is_owned(shared=shared)
541
    finally:
542
      self.__lock.release()
543

    
544
  def _count_pending(self):
545
    """Returns the number of pending acquires.
546

547
    @rtype: int
548

549
    """
550
    self.__lock.acquire()
551
    try:
552
      return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
553
    finally:
554
      self.__lock.release()
555

    
556
  def _check_empty(self):
557
    """Checks whether there are any pending acquires.
558

559
    @rtype: bool
560

561
    """
562
    self.__lock.acquire()
563
    try:
564
      # Order is important: __find_first_pending_queue modifies __pending
565
      (_, prioqueue) = self.__find_first_pending_queue()
566

    
567
      return not (prioqueue or
568
                  self.__pending or
569
                  self.__pending_by_prio or
570
                  self.__pending_shared)
571
    finally:
572
      self.__lock.release()
573

    
574
  def __do_acquire(self, shared):
575
    """Actually acquire the lock.
576

577
    """
578
    if shared:
579
      self.__shr.add(threading.currentThread())
580
    else:
581
      self.__exc = threading.currentThread()
582

    
583
  def __can_acquire(self, shared):
584
    """Determine whether lock can be acquired.
585

586
    """
587
    if shared:
588
      return self.__exc is None
589
    else:
590
      return len(self.__shr) == 0 and self.__exc is None
591

    
592
  def __find_first_pending_queue(self):
593
    """Tries to find the topmost queued entry with pending acquires.
594

595
    Removes empty entries while going through the list.
596

597
    """
598
    while self.__pending:
599
      (priority, prioqueue) = self.__pending[0]
600

    
601
      if prioqueue:
602
        return (priority, prioqueue)
603

    
604
      # Remove empty queue
605
      heapq.heappop(self.__pending)
606
      del self.__pending_by_prio[priority]
607
      assert priority not in self.__pending_shared
608

    
609
    return (None, None)
610

    
611
  def __is_on_top(self, cond):
612
    """Checks whether the passed condition is on top of the queue.
613

614
    The caller must make sure the queue isn't empty.
615

616
    """
617
    (_, prioqueue) = self.__find_first_pending_queue()
618

    
619
    return cond == prioqueue[0]
620

    
621
  def __acquire_unlocked(self, shared, timeout, priority):
622
    """Acquire a shared lock.
623

624
    @param shared: whether to acquire in shared mode; by default an
625
        exclusive lock will be acquired
626
    @param timeout: maximum waiting time before giving up
627
    @type priority: integer
628
    @param priority: Priority for acquiring lock
629

630
    """
631
    self.__check_deleted()
632

    
633
    # We cannot acquire the lock if we already have it
634
    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
635
                                   " %s" % self.name)
636

    
637
    # Remove empty entries from queue
638
    self.__find_first_pending_queue()
639

    
640
    # Check whether someone else holds the lock or there are pending acquires.
641
    if not self.__pending and self.__can_acquire(shared):
642
      # Apparently not, can acquire lock directly.
643
      self.__do_acquire(shared)
644
      return True
645

    
646
    prioqueue = self.__pending_by_prio.get(priority, None)
647

    
648
    if shared:
649
      # Try to re-use condition for shared acquire
650
      wait_condition = self.__pending_shared.get(priority, None)
651
      assert (wait_condition is None or
652
              (wait_condition.shared and wait_condition in prioqueue))
653
    else:
654
      wait_condition = None
655

    
656
    if wait_condition is None:
657
      if prioqueue is None:
658
        assert priority not in self.__pending_by_prio
659

    
660
        prioqueue = []
661
        heapq.heappush(self.__pending, (priority, prioqueue))
662
        self.__pending_by_prio[priority] = prioqueue
663

    
664
      wait_condition = self.__condition_class(self.__lock, shared)
665
      prioqueue.append(wait_condition)
666

    
667
      if shared:
668
        # Keep reference for further shared acquires on same priority. This is
669
        # better than trying to find it in the list of pending acquires.
670
        assert priority not in self.__pending_shared
671
        self.__pending_shared[priority] = wait_condition
672

    
673
    try:
674
      # Wait until we become the topmost acquire in the queue or the timeout
675
      # expires.
676
      # TODO: Decrease timeout with spurious notifications
677
      while not (self.__is_on_top(wait_condition) and
678
                 self.__can_acquire(shared)):
679
        # Wait for notification
680
        wait_condition.wait(timeout)
681
        self.__check_deleted()
682

    
683
        # A lot of code assumes blocking acquires always succeed. Loop
684
        # internally for that case.
685
        if timeout is not None:
686
          break
687

    
688
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
689
        self.__do_acquire(shared)
690
        return True
691
    finally:
692
      # Remove condition from queue if there are no more waiters
693
      if not wait_condition.has_waiting():
694
        prioqueue.remove(wait_condition)
695
        if wait_condition.shared:
696
          # Remove from list of shared acquires if it wasn't while releasing
697
          # (e.g. on lock deletion)
698
          self.__pending_shared.pop(priority, None)
699

    
700
    return False
701

    
702
  def acquire(self, shared=0, timeout=None, priority=None,
703
              test_notify=None):
704
    """Acquire a shared lock.
705

706
    @type shared: integer (0/1) used as a boolean
707
    @param shared: whether to acquire in shared mode; by default an
708
        exclusive lock will be acquired
709
    @type timeout: float
710
    @param timeout: maximum waiting time before giving up
711
    @type priority: integer
712
    @param priority: Priority for acquiring lock
713
    @type test_notify: callable or None
714
    @param test_notify: Special callback function for unittesting
715

716
    """
717
    if priority is None:
718
      priority = _DEFAULT_PRIORITY
719

    
720
    self.__lock.acquire()
721
    try:
722
      # We already got the lock, notify now
723
      if __debug__ and callable(test_notify):
724
        test_notify()
725

    
726
      return self.__acquire_unlocked(shared, timeout, priority)
727
    finally:
728
      self.__lock.release()
729

    
730
  def downgrade(self):
731
    """Changes the lock mode from exclusive to shared.
732

733
    Pending acquires in shared mode on the same priority will go ahead.
734

735
    """
736
    self.__lock.acquire()
737
    try:
738
      assert self.__is_owned(), "Lock must be owned"
739

    
740
      if self.__is_exclusive():
741
        # Do nothing if the lock is already acquired in shared mode
742
        self.__exc = None
743
        self.__do_acquire(1)
744

    
745
        # Important: pending shared acquires should only jump ahead if there
746
        # was a transition from exclusive to shared, otherwise an owner of a
747
        # shared lock can keep calling this function to push incoming shared
748
        # acquires
749
        (priority, prioqueue) = self.__find_first_pending_queue()
750
        if prioqueue:
751
          # Is there a pending shared acquire on this priority?
752
          cond = self.__pending_shared.pop(priority, None)
753
          if cond:
754
            assert cond.shared
755
            assert cond in prioqueue
756

    
757
            # Ensure shared acquire is on top of queue
758
            if len(prioqueue) > 1:
759
              prioqueue.remove(cond)
760
              prioqueue.insert(0, cond)
761

    
762
            # Notify
763
            cond.notifyAll()
764

    
765
      assert not self.__is_exclusive()
766
      assert self.__is_sharer()
767

    
768
      return True
769
    finally:
770
      self.__lock.release()
771

    
772
  def release(self):
773
    """Release a Shared Lock.
774

775
    You must have acquired the lock, either in shared or in exclusive mode,
776
    before calling this function.
777

778
    """
779
    self.__lock.acquire()
780
    try:
781
      assert self.__is_exclusive() or self.__is_sharer(), \
782
        "Cannot release non-owned lock"
783

    
784
      # Autodetect release type
785
      if self.__is_exclusive():
786
        self.__exc = None
787
      else:
788
        self.__shr.remove(threading.currentThread())
789

    
790
      # Notify topmost condition in queue
791
      (priority, prioqueue) = self.__find_first_pending_queue()
792
      if prioqueue:
793
        cond = prioqueue[0]
794
        cond.notifyAll()
795
        if cond.shared:
796
          # Prevent further shared acquires from sneaking in while waiters are
797
          # notified
798
          self.__pending_shared.pop(priority, None)
799

    
800
    finally:
801
      self.__lock.release()
802

    
803
  def delete(self, timeout=None, priority=None):
804
    """Delete a Shared Lock.
805

806
    This operation will declare the lock for removal. First the lock will be
807
    acquired in exclusive mode if you don't already own it, then the lock
808
    will be put in a state where any future and pending acquire() fail.
809

810
    @type timeout: float
811
    @param timeout: maximum waiting time before giving up
812
    @type priority: integer
813
    @param priority: Priority for acquiring lock
814

815
    """
816
    if priority is None:
817
      priority = _DEFAULT_PRIORITY
818

    
819
    self.__lock.acquire()
820
    try:
821
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
822

    
823
      self.__check_deleted()
824

    
825
      # The caller is allowed to hold the lock exclusively already.
826
      acquired = self.__is_exclusive()
827

    
828
      if not acquired:
829
        acquired = self.__acquire_unlocked(0, timeout, priority)
830

    
831
        assert self.__is_exclusive() and not self.__is_sharer(), \
832
          "Lock wasn't acquired in exclusive mode"
833

    
834
      if acquired:
835
        self.__deleted = True
836
        self.__exc = None
837

    
838
        assert not (self.__exc or self.__shr), "Found owner during deletion"
839

    
840
        # Notify all acquires. They'll throw an error.
841
        for (_, prioqueue) in self.__pending:
842
          for cond in prioqueue:
843
            cond.notifyAll()
844

    
845
        assert self.__deleted
846

    
847
      return acquired
848
    finally:
849
      self.__lock.release()
850

    
851
  def _release_save(self):
852
    shared = self.__is_sharer()
853
    self.release()
854
    return shared
855

    
856
  def _acquire_restore(self, shared):
857
    self.acquire(shared=shared)
858

    
859

    
860
# Whenever we want to acquire a full LockSet we pass None as the value
861
# to acquire.  Hide this behind this nicely named constant.
862
ALL_SET = None
863

    
864

    
865
class _AcquireTimeout(Exception):
866
  """Internal exception to abort an acquire on a timeout.
867

868
  """
869

    
870

    
871
class LockSet:
872
  """Implements a set of locks.
873

874
  This abstraction implements a set of shared locks for the same resource type,
875
  distinguished by name. The user can lock a subset of the resources and the
876
  LockSet will take care of acquiring the locks always in the same order, thus
877
  preventing deadlock.
878

879
  All the locks needed in the same set must be acquired together, though.
880

881
  @type name: string
882
  @ivar name: the name of the lockset
883

884
  """
885
  def __init__(self, members, name, monitor=None):
886
    """Constructs a new LockSet.
887

888
    @type members: list of strings
889
    @param members: initial members of the set
890
    @type monitor: L{LockMonitor}
891
    @param monitor: Lock monitor with which to register member locks
892

893
    """
894
    assert members is not None, "members parameter is not a list"
895
    self.name = name
896

    
897
    # Lock monitor
898
    self.__monitor = monitor
899

    
900
    # Used internally to guarantee coherency
901
    self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
902

    
903
    # The lockdict indexes the relationship name -> lock
904
    # The order-of-locking is implied by the alphabetical order of names
905
    self.__lockdict = {}
906

    
907
    for mname in members:
908
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
909
                                          monitor=monitor)
910

    
911
    # The owner dict contains the set of locks each thread owns. For
912
    # performance each thread can access its own key without a global lock on
913
    # this structure. It is paramount though that *no* other type of access is
914
    # done to this structure (eg. no looping over its keys). *_owner helper
915
    # function are defined to guarantee access is correct, but in general never
916
    # do anything different than __owners[threading.currentThread()], or there
917
    # will be trouble.
918
    self.__owners = {}
919

    
920
  def _GetLockName(self, mname):
921
    """Returns the name for a member lock.
922

923
    """
924
    return "%s/%s" % (self.name, mname)
925

    
926
  def _get_lock(self):
927
    """Returns the lockset-internal lock.
928

929
    """
930
    return self.__lock
931

    
932
  def _get_lockdict(self):
933
    """Returns the lockset-internal lock dictionary.
934

935
    Accessing this structure is only safe in single-thread usage or when the
936
    lockset-internal lock is held.
937

938
    """
939
    return self.__lockdict
940

    
941
  def _is_owned(self):
942
    """Is the current thread a current level owner?"""
943
    return threading.currentThread() in self.__owners
944

    
945
  def _add_owned(self, name=None):
946
    """Note the current thread owns the given lock"""
947
    if name is None:
948
      if not self._is_owned():
949
        self.__owners[threading.currentThread()] = set()
950
    else:
951
      if self._is_owned():
952
        self.__owners[threading.currentThread()].add(name)
953
      else:
954
        self.__owners[threading.currentThread()] = set([name])
955

    
956
  def _del_owned(self, name=None):
957
    """Note the current thread owns the given lock"""
958

    
959
    assert not (name is None and self.__lock._is_owned()), \
960
           "Cannot hold internal lock when deleting owner status"
961

    
962
    if name is not None:
963
      self.__owners[threading.currentThread()].remove(name)
964

    
965
    # Only remove the key if we don't hold the set-lock as well
966
    if (not self.__lock._is_owned() and
967
        not self.__owners[threading.currentThread()]):
968
      del self.__owners[threading.currentThread()]
969

    
970
  def _list_owned(self):
971
    """Get the set of resource names owned by the current thread"""
972
    if self._is_owned():
973
      return self.__owners[threading.currentThread()].copy()
974
    else:
975
      return set()
976

    
977
  def _release_and_delete_owned(self):
978
    """Release and delete all resources owned by the current thread"""
979
    for lname in self._list_owned():
980
      lock = self.__lockdict[lname]
981
      if lock._is_owned():
982
        lock.release()
983
      self._del_owned(name=lname)
984

    
985
  def __names(self):
986
    """Return the current set of names.
987

988
    Only call this function while holding __lock and don't iterate on the
989
    result after releasing the lock.
990

991
    """
992
    return self.__lockdict.keys()
993

    
994
  def _names(self):
995
    """Return a copy of the current set of elements.
996

997
    Used only for debugging purposes.
998

999
    """
1000
    # If we don't already own the set-level lock acquired
1001
    # we'll get it and note we need to release it later.
1002
    release_lock = False
1003
    if not self.__lock._is_owned():
1004
      release_lock = True
1005
      self.__lock.acquire(shared=1)
1006
    try:
1007
      result = self.__names()
1008
    finally:
1009
      if release_lock:
1010
        self.__lock.release()
1011
    return set(result)
1012

    
1013
  def acquire(self, names, timeout=None, shared=0, priority=None,
1014
              test_notify=None):
1015
    """Acquire a set of resource locks.
1016

1017
    @type names: list of strings (or string)
1018
    @param names: the names of the locks which shall be acquired
1019
        (special lock names, or instance/node names)
1020
    @type shared: integer (0/1) used as a boolean
1021
    @param shared: whether to acquire in shared mode; by default an
1022
        exclusive lock will be acquired
1023
    @type timeout: float or None
1024
    @param timeout: Maximum time to acquire all locks
1025
    @type priority: integer
1026
    @param priority: Priority for acquiring locks
1027
    @type test_notify: callable or None
1028
    @param test_notify: Special callback function for unittesting
1029

1030
    @return: Set of all locks successfully acquired or None in case of timeout
1031

1032
    @raise errors.LockError: when any lock we try to acquire has
1033
        been deleted before we succeed. In this case none of the
1034
        locks requested will be acquired.
1035

1036
    """
1037
    assert timeout is None or timeout >= 0.0
1038

    
1039
    # Check we don't already own locks at this level
1040
    assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
1041
                                  " (lockset %s)" % self.name)
1042

    
1043
    if priority is None:
1044
      priority = _DEFAULT_PRIORITY
1045

    
1046
    # We need to keep track of how long we spent waiting for a lock. The
1047
    # timeout passed to this function is over all lock acquires.
1048
    running_timeout = utils.RunningTimeout(timeout, False)
1049

    
1050
    try:
1051
      if names is not None:
1052
        # Support passing in a single resource to acquire rather than many
1053
        if isinstance(names, basestring):
1054
          names = [names]
1055

    
1056
        return self.__acquire_inner(names, False, shared, priority,
1057
                                    running_timeout.Remaining, test_notify)
1058

    
1059
      else:
1060
        # If no names are given acquire the whole set by not letting new names
1061
        # being added before we release, and getting the current list of names.
1062
        # Some of them may then be deleted later, but we'll cope with this.
1063
        #
1064
        # We'd like to acquire this lock in a shared way, as it's nice if
1065
        # everybody else can use the instances at the same time. If we are
1066
        # acquiring them exclusively though they won't be able to do this
1067
        # anyway, though, so we'll get the list lock exclusively as well in
1068
        # order to be able to do add() on the set while owning it.
1069
        if not self.__lock.acquire(shared=shared, priority=priority,
1070
                                   timeout=running_timeout.Remaining()):
1071
          raise _AcquireTimeout()
1072
        try:
1073
          # note we own the set-lock
1074
          self._add_owned()
1075

    
1076
          return self.__acquire_inner(self.__names(), True, shared, priority,
1077
                                      running_timeout.Remaining, test_notify)
1078
        except:
1079
          # We shouldn't have problems adding the lock to the owners list, but
1080
          # if we did we'll try to release this lock and re-raise exception.
1081
          # Of course something is going to be really wrong, after this.
1082
          self.__lock.release()
1083
          self._del_owned()
1084
          raise
1085

    
1086
    except _AcquireTimeout:
1087
      return None
1088

    
1089
  def __acquire_inner(self, names, want_all, shared, priority,
1090
                      timeout_fn, test_notify):
1091
    """Inner logic for acquiring a number of locks.
1092

1093
    @param names: Names of the locks to be acquired
1094
    @param want_all: Whether all locks in the set should be acquired
1095
    @param shared: Whether to acquire in shared mode
1096
    @param timeout_fn: Function returning remaining timeout
1097
    @param priority: Priority for acquiring locks
1098
    @param test_notify: Special callback function for unittesting
1099

1100
    """
1101
    acquire_list = []
1102

    
1103
    # First we look the locks up on __lockdict. We have no way of being sure
1104
    # they will still be there after, but this makes it a lot faster should
1105
    # just one of them be the already wrong. Using a sorted sequence to prevent
1106
    # deadlocks.
1107
    for lname in sorted(utils.UniqueSequence(names)):
1108
      try:
1109
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
1110
      except KeyError:
1111
        if want_all:
1112
          # We are acquiring all the set, it doesn't matter if this particular
1113
          # element is not there anymore.
1114
          continue
1115

    
1116
        raise errors.LockError("Non-existing lock %s in set %s (it may have"
1117
                               " been removed)" % (lname, self.name))
1118

    
1119
      acquire_list.append((lname, lock))
1120

    
1121
    # This will hold the locknames we effectively acquired.
1122
    acquired = set()
1123

    
1124
    try:
1125
      # Now acquire_list contains a sorted list of resources and locks we
1126
      # want.  In order to get them we loop on this (private) list and
1127
      # acquire() them.  We gave no real guarantee they will still exist till
1128
      # this is done but .acquire() itself is safe and will alert us if the
1129
      # lock gets deleted.
1130
      for (lname, lock) in acquire_list:
1131
        if __debug__ and callable(test_notify):
1132
          test_notify_fn = lambda: test_notify(lname)
1133
        else:
1134
          test_notify_fn = None
1135

    
1136
        timeout = timeout_fn()
1137

    
1138
        try:
1139
          # raises LockError if the lock was deleted
1140
          acq_success = lock.acquire(shared=shared, timeout=timeout,
1141
                                     priority=priority,
1142
                                     test_notify=test_notify_fn)
1143
        except errors.LockError:
1144
          if want_all:
1145
            # We are acquiring all the set, it doesn't matter if this
1146
            # particular element is not there anymore.
1147
            continue
1148

    
1149
          raise errors.LockError("Non-existing lock %s in set %s (it may"
1150
                                 " have been removed)" % (lname, self.name))
1151

    
1152
        if not acq_success:
1153
          # Couldn't get lock or timeout occurred
1154
          if timeout is None:
1155
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
1156
            # blocking.
1157
            raise errors.LockError("Failed to get lock %s (set %s)" %
1158
                                   (lname, self.name))
1159

    
1160
          raise _AcquireTimeout()
1161

    
1162
        try:
1163
          # now the lock cannot be deleted, we have it!
1164
          self._add_owned(name=lname)
1165
          acquired.add(lname)
1166

    
1167
        except:
1168
          # We shouldn't have problems adding the lock to the owners list, but
1169
          # if we did we'll try to release this lock and re-raise exception.
1170
          # Of course something is going to be really wrong after this.
1171
          if lock._is_owned():
1172
            lock.release()
1173
          raise
1174

    
1175
    except:
1176
      # Release all owned locks
1177
      self._release_and_delete_owned()
1178
      raise
1179

    
1180
    return acquired
1181

    
1182
  def downgrade(self, names=None):
1183
    """Downgrade a set of resource locks from exclusive to shared mode.
1184

1185
    The locks must have been acquired in exclusive mode.
1186

1187
    """
1188
    assert self._is_owned(), ("downgrade on lockset %s while not owning any"
1189
                              " lock" % self.name)
1190

    
1191
    # Support passing in a single resource to downgrade rather than many
1192
    if isinstance(names, basestring):
1193
      names = [names]
1194

    
1195
    owned = self._list_owned()
1196

    
1197
    if names is None:
1198
      names = owned
1199
    else:
1200
      names = set(names)
1201
      assert owned.issuperset(names), \
1202
        ("downgrade() on unheld resources %s (set %s)" %
1203
         (names.difference(owned), self.name))
1204

    
1205
    for lockname in names:
1206
      self.__lockdict[lockname].downgrade()
1207

    
1208
    # Do we own the lockset in exclusive mode?
1209
    if self.__lock._is_owned(shared=0):
1210
      # Have all locks been downgraded?
1211
      if not compat.any(lock._is_owned(shared=0)
1212
                        for lock in self.__lockdict.values()):
1213
        self.__lock.downgrade()
1214
        assert self.__lock._is_owned(shared=1)
1215

    
1216
    return True
1217

    
1218
  def release(self, names=None):
1219
    """Release a set of resource locks, at the same level.
1220

1221
    You must have acquired the locks, either in shared or in exclusive mode,
1222
    before releasing them.
1223

1224
    @type names: list of strings, or None
1225
    @param names: the names of the locks which shall be released
1226
        (defaults to all the locks acquired at that level).
1227

1228
    """
1229
    assert self._is_owned(), ("release() on lock set %s while not owner" %
1230
                              self.name)
1231

    
1232
    # Support passing in a single resource to release rather than many
1233
    if isinstance(names, basestring):
1234
      names = [names]
1235

    
1236
    if names is None:
1237
      names = self._list_owned()
1238
    else:
1239
      names = set(names)
1240
      assert self._list_owned().issuperset(names), (
1241
               "release() on unheld resources %s (set %s)" %
1242
               (names.difference(self._list_owned()), self.name))
1243

    
1244
    # First of all let's release the "all elements" lock, if set.
1245
    # After this 'add' can work again
1246
    if self.__lock._is_owned():
1247
      self.__lock.release()
1248
      self._del_owned()
1249

    
1250
    for lockname in names:
1251
      # If we are sure the lock doesn't leave __lockdict without being
1252
      # exclusively held we can do this...
1253
      self.__lockdict[lockname].release()
1254
      self._del_owned(name=lockname)
1255

    
1256
  def add(self, names, acquired=0, shared=0):
1257
    """Add a new set of elements to the set
1258

1259
    @type names: list of strings
1260
    @param names: names of the new elements to add
1261
    @type acquired: integer (0/1) used as a boolean
1262
    @param acquired: pre-acquire the new resource?
1263
    @type shared: integer (0/1) used as a boolean
1264
    @param shared: is the pre-acquisition shared?
1265

1266
    """
1267
    # Check we don't already own locks at this level
1268
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1269
      ("Cannot add locks if the set %s is only partially owned, or shared" %
1270
       self.name)
1271

    
1272
    # Support passing in a single resource to add rather than many
1273
    if isinstance(names, basestring):
1274
      names = [names]
1275

    
1276
    # If we don't already own the set-level lock acquired in an exclusive way
1277
    # we'll get it and note we need to release it later.
1278
    release_lock = False
1279
    if not self.__lock._is_owned():
1280
      release_lock = True
1281
      self.__lock.acquire()
1282

    
1283
    try:
1284
      invalid_names = set(self.__names()).intersection(names)
1285
      if invalid_names:
1286
        # This must be an explicit raise, not an assert, because assert is
1287
        # turned off when using optimization, and this can happen because of
1288
        # concurrency even if the user doesn't want it.
1289
        raise errors.LockError("duplicate add(%s) on lockset %s" %
1290
                               (invalid_names, self.name))
1291

    
1292
      for lockname in names:
1293
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1294

    
1295
        if acquired:
1296
          # No need for priority or timeout here as this lock has just been
1297
          # created
1298
          lock.acquire(shared=shared)
1299
          # now the lock cannot be deleted, we have it!
1300
          try:
1301
            self._add_owned(name=lockname)
1302
          except:
1303
            # We shouldn't have problems adding the lock to the owners list,
1304
            # but if we did we'll try to release this lock and re-raise
1305
            # exception.  Of course something is going to be really wrong,
1306
            # after this.  On the other hand the lock hasn't been added to the
1307
            # __lockdict yet so no other threads should be pending on it. This
1308
            # release is just a safety measure.
1309
            lock.release()
1310
            raise
1311

    
1312
        self.__lockdict[lockname] = lock
1313

    
1314
    finally:
1315
      # Only release __lock if we were not holding it previously.
1316
      if release_lock:
1317
        self.__lock.release()
1318

    
1319
    return True
1320

    
1321
  def remove(self, names):
1322
    """Remove elements from the lock set.
1323

1324
    You can either not hold anything in the lockset or already hold a superset
1325
    of the elements you want to delete, exclusively.
1326

1327
    @type names: list of strings
1328
    @param names: names of the resource to remove.
1329

1330
    @return: a list of locks which we removed; the list is always
1331
        equal to the names list if we were holding all the locks
1332
        exclusively
1333

1334
    """
1335
    # Support passing in a single resource to remove rather than many
1336
    if isinstance(names, basestring):
1337
      names = [names]
1338

    
1339
    # If we own any subset of this lock it must be a superset of what we want
1340
    # to delete. The ownership must also be exclusive, but that will be checked
1341
    # by the lock itself.
1342
    assert not self._is_owned() or self._list_owned().issuperset(names), (
1343
      "remove() on acquired lockset %s while not owning all elements" %
1344
      self.name)
1345

    
1346
    removed = []
1347

    
1348
    for lname in names:
1349
      # Calling delete() acquires the lock exclusively if we don't already own
1350
      # it, and causes all pending and subsequent lock acquires to fail. It's
1351
      # fine to call it out of order because delete() also implies release(),
1352
      # and the assertion above guarantees that if we either already hold
1353
      # everything we want to delete, or we hold none.
1354
      try:
1355
        self.__lockdict[lname].delete()
1356
        removed.append(lname)
1357
      except (KeyError, errors.LockError):
1358
        # This cannot happen if we were already holding it, verify:
1359
        assert not self._is_owned(), ("remove failed while holding lockset %s"
1360
                                      % self.name)
1361
      else:
1362
        # If no LockError was raised we are the ones who deleted the lock.
1363
        # This means we can safely remove it from lockdict, as any further or
1364
        # pending delete() or acquire() will fail (and nobody can have the lock
1365
        # since before our call to delete()).
1366
        #
1367
        # This is done in an else clause because if the exception was thrown
1368
        # it's the job of the one who actually deleted it.
1369
        del self.__lockdict[lname]
1370
        # And let's remove it from our private list if we owned it.
1371
        if self._is_owned():
1372
          self._del_owned(name=lname)
1373

    
1374
    return removed
1375

    
1376

    
1377
# Locking levels, must be acquired in increasing order.
1378
# Current rules are:
1379
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1380
#   acquired before performing any operation, either in shared or in exclusive
1381
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1382
#   avoided.
1383
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1384
#   If you need more than one node, or more than one instance, acquire them at
1385
#   the same time.
1386
LEVEL_CLUSTER = 0
1387
LEVEL_INSTANCE = 1
1388
LEVEL_NODEGROUP = 2
1389
LEVEL_NODE = 3
1390

    
1391
LEVELS = [LEVEL_CLUSTER,
1392
          LEVEL_INSTANCE,
1393
          LEVEL_NODEGROUP,
1394
          LEVEL_NODE]
1395

    
1396
# Lock levels which are modifiable
1397
LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE]
1398

    
1399
LEVEL_NAMES = {
1400
  LEVEL_CLUSTER: "cluster",
1401
  LEVEL_INSTANCE: "instance",
1402
  LEVEL_NODEGROUP: "nodegroup",
1403
  LEVEL_NODE: "node",
1404
  }
1405

    
1406
# Constant for the big ganeti lock
1407
BGL = 'BGL'
1408

    
1409

    
1410
class GanetiLockManager:
1411
  """The Ganeti Locking Library
1412

1413
  The purpose of this small library is to manage locking for ganeti clusters
1414
  in a central place, while at the same time doing dynamic checks against
1415
  possible deadlocks. It will also make it easier to transition to a different
1416
  lock type should we migrate away from python threads.
1417

1418
  """
1419
  _instance = None
1420

    
1421
  def __init__(self, nodes, nodegroups, instances):
1422
    """Constructs a new GanetiLockManager object.
1423

1424
    There should be only a GanetiLockManager object at any time, so this
1425
    function raises an error if this is not the case.
1426

1427
    @param nodes: list of node names
1428
    @param nodegroups: list of nodegroup uuids
1429
    @param instances: list of instance names
1430

1431
    """
1432
    assert self.__class__._instance is None, \
1433
           "double GanetiLockManager instance"
1434

    
1435
    self.__class__._instance = self
1436

    
1437
    self._monitor = LockMonitor()
1438

    
1439
    # The keyring contains all the locks, at their level and in the correct
1440
    # locking order.
1441
    self.__keyring = {
1442
      LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1443
      LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1444
      LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor),
1445
      LEVEL_INSTANCE: LockSet(instances, "instances",
1446
                              monitor=self._monitor),
1447
      }
1448

    
1449
  def QueryLocks(self, fields):
1450
    """Queries information from all locks.
1451

1452
    See L{LockMonitor.QueryLocks}.
1453

1454
    """
1455
    return self._monitor.QueryLocks(fields)
1456

    
1457
  def OldStyleQueryLocks(self, fields):
1458
    """Queries information from all locks, returning old-style data.
1459

1460
    See L{LockMonitor.OldStyleQueryLocks}.
1461

1462
    """
1463
    return self._monitor.OldStyleQueryLocks(fields)
1464

    
1465
  def _names(self, level):
1466
    """List the lock names at the given level.
1467

1468
    This can be used for debugging/testing purposes.
1469

1470
    @param level: the level whose list of locks to get
1471

1472
    """
1473
    assert level in LEVELS, "Invalid locking level %s" % level
1474
    return self.__keyring[level]._names()
1475

    
1476
  def _is_owned(self, level):
1477
    """Check whether we are owning locks at the given level
1478

1479
    """
1480
    return self.__keyring[level]._is_owned()
1481

    
1482
  is_owned = _is_owned
1483

    
1484
  def _list_owned(self, level):
1485
    """Get the set of owned locks at the given level
1486

1487
    """
1488
    return self.__keyring[level]._list_owned()
1489

    
1490
  list_owned = _list_owned
1491

    
1492
  def _upper_owned(self, level):
1493
    """Check that we don't own any lock at a level greater than the given one.
1494

1495
    """
1496
    # This way of checking only works if LEVELS[i] = i, which we check for in
1497
    # the test cases.
1498
    return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1499

    
1500
  def _BGL_owned(self): # pylint: disable-msg=C0103
1501
    """Check if the current thread owns the BGL.
1502

1503
    Both an exclusive or a shared acquisition work.
1504

1505
    """
1506
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1507

    
1508
  @staticmethod
1509
  def _contains_BGL(level, names): # pylint: disable-msg=C0103
1510
    """Check if the level contains the BGL.
1511

1512
    Check if acting on the given level and set of names will change
1513
    the status of the Big Ganeti Lock.
1514

1515
    """
1516
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1517

    
1518
  def acquire(self, level, names, timeout=None, shared=0, priority=None):
1519
    """Acquire a set of resource locks, at the same level.
1520

1521
    @type level: member of locking.LEVELS
1522
    @param level: the level at which the locks shall be acquired
1523
    @type names: list of strings (or string)
1524
    @param names: the names of the locks which shall be acquired
1525
        (special lock names, or instance/node names)
1526
    @type shared: integer (0/1) used as a boolean
1527
    @param shared: whether to acquire in shared mode; by default
1528
        an exclusive lock will be acquired
1529
    @type timeout: float
1530
    @param timeout: Maximum time to acquire all locks
1531
    @type priority: integer
1532
    @param priority: Priority for acquiring lock
1533

1534
    """
1535
    assert level in LEVELS, "Invalid locking level %s" % level
1536

    
1537
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1538
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1539
    # so even if we've migrated we need to at least share the BGL to be
1540
    # compatible with them. Of course if we own the BGL exclusively there's no
1541
    # point in acquiring any other lock, unless perhaps we are half way through
1542
    # the migration of the current opcode.
1543
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1544
            "You must own the Big Ganeti Lock before acquiring any other")
1545

    
1546
    # Check we don't own locks at the same or upper levels.
1547
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1548
           " while owning some at a greater one")
1549

    
1550
    # Acquire the locks in the set.
1551
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1552
                                         priority=priority)
1553

    
1554
  def downgrade(self, level, names=None):
1555
    """Downgrade a set of resource locks from exclusive to shared mode.
1556

1557
    You must have acquired the locks in exclusive mode.
1558

1559
    @type level: member of locking.LEVELS
1560
    @param level: the level at which the locks shall be downgraded
1561
    @type names: list of strings, or None
1562
    @param names: the names of the locks which shall be downgraded
1563
        (defaults to all the locks acquired at the level)
1564

1565
    """
1566
    assert level in LEVELS, "Invalid locking level %s" % level
1567

    
1568
    return self.__keyring[level].downgrade(names=names)
1569

    
1570
  def release(self, level, names=None):
1571
    """Release a set of resource locks, at the same level.
1572

1573
    You must have acquired the locks, either in shared or in exclusive
1574
    mode, before releasing them.
1575

1576
    @type level: member of locking.LEVELS
1577
    @param level: the level at which the locks shall be released
1578
    @type names: list of strings, or None
1579
    @param names: the names of the locks which shall be released
1580
        (defaults to all the locks acquired at that level)
1581

1582
    """
1583
    assert level in LEVELS, "Invalid locking level %s" % level
1584
    assert (not self._contains_BGL(level, names) or
1585
            not self._upper_owned(LEVEL_CLUSTER)), (
1586
            "Cannot release the Big Ganeti Lock while holding something"
1587
            " at upper levels (%r)" %
1588
            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1589
                              for i in self.__keyring.keys()]), ))
1590

    
1591
    # Release will complain if we don't own the locks already
1592
    return self.__keyring[level].release(names)
1593

    
1594
  def add(self, level, names, acquired=0, shared=0):
1595
    """Add locks at the specified level.
1596

1597
    @type level: member of locking.LEVELS_MOD
1598
    @param level: the level at which the locks shall be added
1599
    @type names: list of strings
1600
    @param names: names of the locks to acquire
1601
    @type acquired: integer (0/1) used as a boolean
1602
    @param acquired: whether to acquire the newly added locks
1603
    @type shared: integer (0/1) used as a boolean
1604
    @param shared: whether the acquisition will be shared
1605

1606
    """
1607
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1608
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1609
           " operations")
1610
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1611
           " while owning some at a greater one")
1612
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1613

    
1614
  def remove(self, level, names):
1615
    """Remove locks from the specified level.
1616

1617
    You must either already own the locks you are trying to remove
1618
    exclusively or not own any lock at an upper level.
1619

1620
    @type level: member of locking.LEVELS_MOD
1621
    @param level: the level at which the locks shall be removed
1622
    @type names: list of strings
1623
    @param names: the names of the locks which shall be removed
1624
        (special lock names, or instance/node names)
1625

1626
    """
1627
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1628
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1629
           " operations")
1630
    # Check we either own the level or don't own anything from here
1631
    # up. LockSet.remove() will check the case in which we don't own
1632
    # all the needed resources, or we have a shared ownership.
1633
    assert self._is_owned(level) or not self._upper_owned(level), (
1634
           "Cannot remove locks at a level while not owning it or"
1635
           " owning some at a greater one")
1636
    return self.__keyring[level].remove(names)
1637

    
1638

    
1639
def _MonitorSortKey((num, item)):
1640
  """Sorting key function.
1641

1642
  Sort by name, then by incoming order.
1643

1644
  """
1645
  (name, _, _, _) = item
1646

    
1647
  return (utils.NiceSortKey(name), num)
1648

    
1649

    
1650
class LockMonitor(object):
1651
  _LOCK_ATTR = "_lock"
1652

    
1653
  def __init__(self):
1654
    """Initializes this class.
1655

1656
    """
1657
    self._lock = SharedLock("LockMonitor")
1658

    
1659
    # Counter for stable sorting
1660
    self._counter = itertools.count(0)
1661

    
1662
    # Tracked locks. Weak references are used to avoid issues with circular
1663
    # references and deletion.
1664
    self._locks = weakref.WeakKeyDictionary()
1665

    
1666
  @ssynchronized(_LOCK_ATTR)
1667
  def RegisterLock(self, lock):
1668
    """Registers a new lock.
1669

1670
    """
1671
    logging.debug("Registering lock %s", lock.name)
1672
    assert lock not in self._locks, "Duplicate lock registration"
1673

    
1674
    # There used to be a check for duplicate names here. As it turned out, when
1675
    # a lock is re-created with the same name in a very short timeframe, the
1676
    # previous instance might not yet be removed from the weakref dictionary.
1677
    # By keeping track of the order of incoming registrations, a stable sort
1678
    # ordering can still be guaranteed.
1679

    
1680
    self._locks[lock] = self._counter.next()
1681

    
1682
  @ssynchronized(_LOCK_ATTR)
1683
  def _GetLockInfo(self, requested):
1684
    """Get information from all locks while the monitor lock is held.
1685

1686
    """
1687
    return [(num, lock.GetInfo(requested)) for lock, num in self._locks.items()]
1688

    
1689
  def _Query(self, fields):
1690
    """Queries information from all locks.
1691

1692
    @type fields: list of strings
1693
    @param fields: List of fields to return
1694

1695
    """
1696
    qobj = query.Query(query.LOCK_FIELDS, fields)
1697

    
1698
    # Get all data with internal lock held and then sort by name and incoming
1699
    # order
1700
    lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
1701
                      key=_MonitorSortKey)
1702

    
1703
    # Extract lock information and build query data
1704
    return (qobj, query.LockQueryData(map(operator.itemgetter(1), lockinfo)))
1705

    
1706
  def QueryLocks(self, fields):
1707
    """Queries information from all locks.
1708

1709
    @type fields: list of strings
1710
    @param fields: List of fields to return
1711

1712
    """
1713
    (qobj, ctx) = self._Query(fields)
1714

    
1715
    # Prepare query response
1716
    return query.GetQueryResponse(qobj, ctx)
1717

    
1718
  def OldStyleQueryLocks(self, fields):
1719
    """Queries information from all locks, returning old-style data.
1720

1721
    @type fields: list of strings
1722
    @param fields: List of fields to return
1723

1724
    """
1725
    (qobj, ctx) = self._Query(fields)
1726

    
1727
    return qobj.OldStyleQuery(ctx)