Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ 6915fe26

History | View | Annotate | Download (53.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import errno
32
import weakref
33
import logging
34
import heapq
35
import itertools
36

    
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import compat
40
from ganeti import query
41

    
42

    
43
_EXCLUSIVE_TEXT = "exclusive"
44
_SHARED_TEXT = "shared"
45
_DELETED_TEXT = "deleted"
46

    
47
_DEFAULT_PRIORITY = 0
48

    
49

    
50
def ssynchronized(mylock, shared=0):
51
  """Shared Synchronization decorator.
52

53
  Calls the function holding the given lock, either in exclusive or shared
54
  mode. It requires the passed lock to be a SharedLock (or support its
55
  semantics).
56

57
  @type mylock: lockable object or string
58
  @param mylock: lock to acquire or class member name of the lock to acquire
59

60
  """
61
  def wrap(fn):
62
    def sync_function(*args, **kwargs):
63
      if isinstance(mylock, basestring):
64
        assert args, "cannot ssynchronize on non-class method: self not found"
65
        # args[0] is "self"
66
        lock = getattr(args[0], mylock)
67
      else:
68
        lock = mylock
69
      lock.acquire(shared=shared)
70
      try:
71
        return fn(*args, **kwargs)
72
      finally:
73
        lock.release()
74
    return sync_function
75
  return wrap
76

    
77

    
78
class _SingleNotifyPipeConditionWaiter(object):
79
  """Helper class for SingleNotifyPipeCondition
80

81
  """
82
  __slots__ = [
83
    "_fd",
84
    "_poller",
85
    ]
86

    
87
  def __init__(self, poller, fd):
88
    """Constructor for _SingleNotifyPipeConditionWaiter
89

90
    @type poller: select.poll
91
    @param poller: Poller object
92
    @type fd: int
93
    @param fd: File descriptor to wait for
94

95
    """
96
    object.__init__(self)
97
    self._poller = poller
98
    self._fd = fd
99

    
100
  def __call__(self, timeout):
101
    """Wait for something to happen on the pipe.
102

103
    @type timeout: float or None
104
    @param timeout: Timeout for waiting (can be None)
105

106
    """
107
    running_timeout = utils.RunningTimeout(timeout, True)
108

    
109
    while True:
110
      remaining_time = running_timeout.Remaining()
111

    
112
      if remaining_time is not None:
113
        if remaining_time < 0.0:
114
          break
115

    
116
        # Our calculation uses seconds, poll() wants milliseconds
117
        remaining_time *= 1000
118

    
119
      try:
120
        result = self._poller.poll(remaining_time)
121
      except EnvironmentError, err:
122
        if err.errno != errno.EINTR:
123
          raise
124
        result = None
125

    
126
      # Check whether we were notified
127
      if result and result[0][0] == self._fd:
128
        break
129

    
130

    
131
class _BaseCondition(object):
132
  """Base class containing common code for conditions.
133

134
  Some of this code is taken from python's threading module.
135

136
  """
137
  __slots__ = [
138
    "_lock",
139
    "acquire",
140
    "release",
141
    "_is_owned",
142
    "_acquire_restore",
143
    "_release_save",
144
    ]
145

    
146
  def __init__(self, lock):
147
    """Constructor for _BaseCondition.
148

149
    @type lock: threading.Lock
150
    @param lock: condition base lock
151

152
    """
153
    object.__init__(self)
154

    
155
    try:
156
      self._release_save = lock._release_save
157
    except AttributeError:
158
      self._release_save = self._base_release_save
159
    try:
160
      self._acquire_restore = lock._acquire_restore
161
    except AttributeError:
162
      self._acquire_restore = self._base_acquire_restore
163
    try:
164
      self._is_owned = lock._is_owned
165
    except AttributeError:
166
      self._is_owned = self._base_is_owned
167

    
168
    self._lock = lock
169

    
170
    # Export the lock's acquire() and release() methods
171
    self.acquire = lock.acquire
172
    self.release = lock.release
173

    
174
  def _base_is_owned(self):
175
    """Check whether lock is owned by current thread.
176

177
    """
178
    if self._lock.acquire(0):
179
      self._lock.release()
180
      return False
181
    return True
182

    
183
  def _base_release_save(self):
184
    self._lock.release()
185

    
186
  def _base_acquire_restore(self, _):
187
    self._lock.acquire()
188

    
189
  def _check_owned(self):
190
    """Raise an exception if the current thread doesn't own the lock.
191

192
    """
193
    if not self._is_owned():
194
      raise RuntimeError("cannot work with un-aquired lock")
195

    
196

    
197
class SingleNotifyPipeCondition(_BaseCondition):
198
  """Condition which can only be notified once.
199

200
  This condition class uses pipes and poll, internally, to be able to wait for
201
  notification with a timeout, without resorting to polling. It is almost
202
  compatible with Python's threading.Condition, with the following differences:
203
    - notifyAll can only be called once, and no wait can happen after that
204
    - notify is not supported, only notifyAll
205

206
  """
207

    
208
  __slots__ = [
209
    "_poller",
210
    "_read_fd",
211
    "_write_fd",
212
    "_nwaiters",
213
    "_notified",
214
    ]
215

    
216
  _waiter_class = _SingleNotifyPipeConditionWaiter
217

    
218
  def __init__(self, lock):
219
    """Constructor for SingleNotifyPipeCondition
220

221
    """
222
    _BaseCondition.__init__(self, lock)
223
    self._nwaiters = 0
224
    self._notified = False
225
    self._read_fd = None
226
    self._write_fd = None
227
    self._poller = None
228

    
229
  def _check_unnotified(self):
230
    """Throws an exception if already notified.
231

232
    """
233
    if self._notified:
234
      raise RuntimeError("cannot use already notified condition")
235

    
236
  def _Cleanup(self):
237
    """Cleanup open file descriptors, if any.
238

239
    """
240
    if self._read_fd is not None:
241
      os.close(self._read_fd)
242
      self._read_fd = None
243

    
244
    if self._write_fd is not None:
245
      os.close(self._write_fd)
246
      self._write_fd = None
247
    self._poller = None
248

    
249
  def wait(self, timeout):
250
    """Wait for a notification.
251

252
    @type timeout: float or None
253
    @param timeout: Waiting timeout (can be None)
254

255
    """
256
    self._check_owned()
257
    self._check_unnotified()
258

    
259
    self._nwaiters += 1
260
    try:
261
      if self._poller is None:
262
        (self._read_fd, self._write_fd) = os.pipe()
263
        self._poller = select.poll()
264
        self._poller.register(self._read_fd, select.POLLHUP)
265

    
266
      wait_fn = self._waiter_class(self._poller, self._read_fd)
267
      state = self._release_save()
268
      try:
269
        # Wait for notification
270
        wait_fn(timeout)
271
      finally:
272
        # Re-acquire lock
273
        self._acquire_restore(state)
274
    finally:
275
      self._nwaiters -= 1
276
      if self._nwaiters == 0:
277
        self._Cleanup()
278

    
279
  def notifyAll(self): # pylint: disable=C0103
280
    """Close the writing side of the pipe to notify all waiters.
281

282
    """
283
    self._check_owned()
284
    self._check_unnotified()
285
    self._notified = True
286
    if self._write_fd is not None:
287
      os.close(self._write_fd)
288
      self._write_fd = None
289

    
290

    
291
class PipeCondition(_BaseCondition):
292
  """Group-only non-polling condition with counters.
293

294
  This condition class uses pipes and poll, internally, to be able to wait for
295
  notification with a timeout, without resorting to polling. It is almost
296
  compatible with Python's threading.Condition, but only supports notifyAll and
297
  non-recursive locks. As an additional features it's able to report whether
298
  there are any waiting threads.
299

300
  """
301
  __slots__ = [
302
    "_waiters",
303
    "_single_condition",
304
    ]
305

    
306
  _single_condition_class = SingleNotifyPipeCondition
307

    
308
  def __init__(self, lock):
309
    """Initializes this class.
310

311
    """
312
    _BaseCondition.__init__(self, lock)
313
    self._waiters = set()
314
    self._single_condition = self._single_condition_class(self._lock)
315

    
316
  def wait(self, timeout):
317
    """Wait for a notification.
318

319
    @type timeout: float or None
320
    @param timeout: Waiting timeout (can be None)
321

322
    """
323
    self._check_owned()
324

    
325
    # Keep local reference to the pipe. It could be replaced by another thread
326
    # notifying while we're waiting.
327
    cond = self._single_condition
328

    
329
    self._waiters.add(threading.currentThread())
330
    try:
331
      cond.wait(timeout)
332
    finally:
333
      self._check_owned()
334
      self._waiters.remove(threading.currentThread())
335

    
336
  def notifyAll(self): # pylint: disable=C0103
337
    """Notify all currently waiting threads.
338

339
    """
340
    self._check_owned()
341
    self._single_condition.notifyAll()
342
    self._single_condition = self._single_condition_class(self._lock)
343

    
344
  def get_waiting(self):
345
    """Returns a list of all waiting threads.
346

347
    """
348
    self._check_owned()
349

    
350
    return self._waiters
351

    
352
  def has_waiting(self):
353
    """Returns whether there are active waiters.
354

355
    """
356
    self._check_owned()
357

    
358
    return bool(self._waiters)
359

    
360

    
361
class _PipeConditionWithMode(PipeCondition):
362
  __slots__ = [
363
    "shared",
364
    ]
365

    
366
  def __init__(self, lock, shared):
367
    """Initializes this class.
368

369
    """
370
    self.shared = shared
371
    PipeCondition.__init__(self, lock)
372

    
373

    
374
class SharedLock(object):
375
  """Implements a shared lock.
376

377
  Multiple threads can acquire the lock in a shared way by calling
378
  C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
379
  threads can call C{acquire(shared=0)}.
380

381
  Notes on data structures: C{__pending} contains a priority queue (heapq) of
382
  all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
383
  ...]}. Each per-priority queue contains a normal in-order list of conditions
384
  to be notified when the lock can be acquired. Shared locks are grouped
385
  together by priority and the condition for them is stored in
386
  C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
387
  references for the per-priority queues indexed by priority for faster access.
388

389
  @type name: string
390
  @ivar name: the name of the lock
391

392
  """
393
  __slots__ = [
394
    "__weakref__",
395
    "__deleted",
396
    "__exc",
397
    "__lock",
398
    "__pending",
399
    "__pending_by_prio",
400
    "__pending_shared",
401
    "__shr",
402
    "name",
403
    ]
404

    
405
  __condition_class = _PipeConditionWithMode
406

    
407
  def __init__(self, name, monitor=None):
408
    """Construct a new SharedLock.
409

410
    @param name: the name of the lock
411
    @type monitor: L{LockMonitor}
412
    @param monitor: Lock monitor with which to register
413

414
    """
415
    object.__init__(self)
416

    
417
    self.name = name
418

    
419
    # Internal lock
420
    self.__lock = threading.Lock()
421

    
422
    # Queue containing waiting acquires
423
    self.__pending = []
424
    self.__pending_by_prio = {}
425
    self.__pending_shared = {}
426

    
427
    # Current lock holders
428
    self.__shr = set()
429
    self.__exc = None
430

    
431
    # is this lock in the deleted state?
432
    self.__deleted = False
433

    
434
    # Register with lock monitor
435
    if monitor:
436
      logging.debug("Adding lock %s to monitor", name)
437
      monitor.RegisterLock(self)
438

    
439
  def GetLockInfo(self, requested):
440
    """Retrieves information for querying locks.
441

442
    @type requested: set
443
    @param requested: Requested information, see C{query.LQ_*}
444

445
    """
446
    self.__lock.acquire()
447
    try:
448
      # Note: to avoid unintentional race conditions, no references to
449
      # modifiable objects should be returned unless they were created in this
450
      # function.
451
      mode = None
452
      owner_names = None
453

    
454
      if query.LQ_MODE in requested:
455
        if self.__deleted:
456
          mode = _DELETED_TEXT
457
          assert not (self.__exc or self.__shr)
458
        elif self.__exc:
459
          mode = _EXCLUSIVE_TEXT
460
        elif self.__shr:
461
          mode = _SHARED_TEXT
462

    
463
      # Current owner(s) are wanted
464
      if query.LQ_OWNER in requested:
465
        if self.__exc:
466
          owner = [self.__exc]
467
        else:
468
          owner = self.__shr
469

    
470
        if owner:
471
          assert not self.__deleted
472
          owner_names = [i.getName() for i in owner]
473

    
474
      # Pending acquires are wanted
475
      if query.LQ_PENDING in requested:
476
        pending = []
477

    
478
        # Sorting instead of copying and using heaq functions for simplicity
479
        for (_, prioqueue) in sorted(self.__pending):
480
          for cond in prioqueue:
481
            if cond.shared:
482
              pendmode = _SHARED_TEXT
483
            else:
484
              pendmode = _EXCLUSIVE_TEXT
485

    
486
            # List of names will be sorted in L{query._GetLockPending}
487
            pending.append((pendmode, [i.getName()
488
                                       for i in cond.get_waiting()]))
489
      else:
490
        pending = None
491

    
492
      return [(self.name, mode, owner_names, pending)]
493
    finally:
494
      self.__lock.release()
495

    
496
  def __check_deleted(self):
497
    """Raises an exception if the lock has been deleted.
498

499
    """
500
    if self.__deleted:
501
      raise errors.LockError("Deleted lock %s" % self.name)
502

    
503
  def __is_sharer(self):
504
    """Is the current thread sharing the lock at this time?
505

506
    """
507
    return threading.currentThread() in self.__shr
508

    
509
  def __is_exclusive(self):
510
    """Is the current thread holding the lock exclusively at this time?
511

512
    """
513
    return threading.currentThread() == self.__exc
514

    
515
  def __is_owned(self, shared=-1):
516
    """Is the current thread somehow owning the lock at this time?
517

518
    This is a private version of the function, which presumes you're holding
519
    the internal lock.
520

521
    """
522
    if shared < 0:
523
      return self.__is_sharer() or self.__is_exclusive()
524
    elif shared:
525
      return self.__is_sharer()
526
    else:
527
      return self.__is_exclusive()
528

    
529
  def _is_owned(self, shared=-1):
530
    """Is the current thread somehow owning the lock at this time?
531

532
    @param shared:
533
        - < 0: check for any type of ownership (default)
534
        - 0: check for exclusive ownership
535
        - > 0: check for shared ownership
536

537
    """
538
    self.__lock.acquire()
539
    try:
540
      return self.__is_owned(shared=shared)
541
    finally:
542
      self.__lock.release()
543

    
544
  is_owned = _is_owned
545

    
546
  def _count_pending(self):
547
    """Returns the number of pending acquires.
548

549
    @rtype: int
550

551
    """
552
    self.__lock.acquire()
553
    try:
554
      return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
555
    finally:
556
      self.__lock.release()
557

    
558
  def _check_empty(self):
559
    """Checks whether there are any pending acquires.
560

561
    @rtype: bool
562

563
    """
564
    self.__lock.acquire()
565
    try:
566
      # Order is important: __find_first_pending_queue modifies __pending
567
      (_, prioqueue) = self.__find_first_pending_queue()
568

    
569
      return not (prioqueue or
570
                  self.__pending or
571
                  self.__pending_by_prio or
572
                  self.__pending_shared)
573
    finally:
574
      self.__lock.release()
575

    
576
  def __do_acquire(self, shared):
577
    """Actually acquire the lock.
578

579
    """
580
    if shared:
581
      self.__shr.add(threading.currentThread())
582
    else:
583
      self.__exc = threading.currentThread()
584

    
585
  def __can_acquire(self, shared):
586
    """Determine whether lock can be acquired.
587

588
    """
589
    if shared:
590
      return self.__exc is None
591
    else:
592
      return len(self.__shr) == 0 and self.__exc is None
593

    
594
  def __find_first_pending_queue(self):
595
    """Tries to find the topmost queued entry with pending acquires.
596

597
    Removes empty entries while going through the list.
598

599
    """
600
    while self.__pending:
601
      (priority, prioqueue) = self.__pending[0]
602

    
603
      if prioqueue:
604
        return (priority, prioqueue)
605

    
606
      # Remove empty queue
607
      heapq.heappop(self.__pending)
608
      del self.__pending_by_prio[priority]
609
      assert priority not in self.__pending_shared
610

    
611
    return (None, None)
612

    
613
  def __is_on_top(self, cond):
614
    """Checks whether the passed condition is on top of the queue.
615

616
    The caller must make sure the queue isn't empty.
617

618
    """
619
    (_, prioqueue) = self.__find_first_pending_queue()
620

    
621
    return cond == prioqueue[0]
622

    
623
  def __acquire_unlocked(self, shared, timeout, priority):
624
    """Acquire a shared lock.
625

626
    @param shared: whether to acquire in shared mode; by default an
627
        exclusive lock will be acquired
628
    @param timeout: maximum waiting time before giving up
629
    @type priority: integer
630
    @param priority: Priority for acquiring lock
631

632
    """
633
    self.__check_deleted()
634

    
635
    # We cannot acquire the lock if we already have it
636
    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
637
                                   " %s" % self.name)
638

    
639
    # Remove empty entries from queue
640
    self.__find_first_pending_queue()
641

    
642
    # Check whether someone else holds the lock or there are pending acquires.
643
    if not self.__pending and self.__can_acquire(shared):
644
      # Apparently not, can acquire lock directly.
645
      self.__do_acquire(shared)
646
      return True
647

    
648
    prioqueue = self.__pending_by_prio.get(priority, None)
649

    
650
    if shared:
651
      # Try to re-use condition for shared acquire
652
      wait_condition = self.__pending_shared.get(priority, None)
653
      assert (wait_condition is None or
654
              (wait_condition.shared and wait_condition in prioqueue))
655
    else:
656
      wait_condition = None
657

    
658
    if wait_condition is None:
659
      if prioqueue is None:
660
        assert priority not in self.__pending_by_prio
661

    
662
        prioqueue = []
663
        heapq.heappush(self.__pending, (priority, prioqueue))
664
        self.__pending_by_prio[priority] = prioqueue
665

    
666
      wait_condition = self.__condition_class(self.__lock, shared)
667
      prioqueue.append(wait_condition)
668

    
669
      if shared:
670
        # Keep reference for further shared acquires on same priority. This is
671
        # better than trying to find it in the list of pending acquires.
672
        assert priority not in self.__pending_shared
673
        self.__pending_shared[priority] = wait_condition
674

    
675
    try:
676
      # Wait until we become the topmost acquire in the queue or the timeout
677
      # expires.
678
      # TODO: Decrease timeout with spurious notifications
679
      while not (self.__is_on_top(wait_condition) and
680
                 self.__can_acquire(shared)):
681
        # Wait for notification
682
        wait_condition.wait(timeout)
683
        self.__check_deleted()
684

    
685
        # A lot of code assumes blocking acquires always succeed. Loop
686
        # internally for that case.
687
        if timeout is not None:
688
          break
689

    
690
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
691
        self.__do_acquire(shared)
692
        return True
693
    finally:
694
      # Remove condition from queue if there are no more waiters
695
      if not wait_condition.has_waiting():
696
        prioqueue.remove(wait_condition)
697
        if wait_condition.shared:
698
          # Remove from list of shared acquires if it wasn't while releasing
699
          # (e.g. on lock deletion)
700
          self.__pending_shared.pop(priority, None)
701

    
702
    return False
703

    
704
  def acquire(self, shared=0, timeout=None, priority=None,
705
              test_notify=None):
706
    """Acquire a shared lock.
707

708
    @type shared: integer (0/1) used as a boolean
709
    @param shared: whether to acquire in shared mode; by default an
710
        exclusive lock will be acquired
711
    @type timeout: float
712
    @param timeout: maximum waiting time before giving up
713
    @type priority: integer
714
    @param priority: Priority for acquiring lock
715
    @type test_notify: callable or None
716
    @param test_notify: Special callback function for unittesting
717

718
    """
719
    if priority is None:
720
      priority = _DEFAULT_PRIORITY
721

    
722
    self.__lock.acquire()
723
    try:
724
      # We already got the lock, notify now
725
      if __debug__ and callable(test_notify):
726
        test_notify()
727

    
728
      return self.__acquire_unlocked(shared, timeout, priority)
729
    finally:
730
      self.__lock.release()
731

    
732
  def downgrade(self):
733
    """Changes the lock mode from exclusive to shared.
734

735
    Pending acquires in shared mode on the same priority will go ahead.
736

737
    """
738
    self.__lock.acquire()
739
    try:
740
      assert self.__is_owned(), "Lock must be owned"
741

    
742
      if self.__is_exclusive():
743
        # Do nothing if the lock is already acquired in shared mode
744
        self.__exc = None
745
        self.__do_acquire(1)
746

    
747
        # Important: pending shared acquires should only jump ahead if there
748
        # was a transition from exclusive to shared, otherwise an owner of a
749
        # shared lock can keep calling this function to push incoming shared
750
        # acquires
751
        (priority, prioqueue) = self.__find_first_pending_queue()
752
        if prioqueue:
753
          # Is there a pending shared acquire on this priority?
754
          cond = self.__pending_shared.pop(priority, None)
755
          if cond:
756
            assert cond.shared
757
            assert cond in prioqueue
758

    
759
            # Ensure shared acquire is on top of queue
760
            if len(prioqueue) > 1:
761
              prioqueue.remove(cond)
762
              prioqueue.insert(0, cond)
763

    
764
            # Notify
765
            cond.notifyAll()
766

    
767
      assert not self.__is_exclusive()
768
      assert self.__is_sharer()
769

    
770
      return True
771
    finally:
772
      self.__lock.release()
773

    
774
  def release(self):
775
    """Release a Shared Lock.
776

777
    You must have acquired the lock, either in shared or in exclusive mode,
778
    before calling this function.
779

780
    """
781
    self.__lock.acquire()
782
    try:
783
      assert self.__is_exclusive() or self.__is_sharer(), \
784
        "Cannot release non-owned lock"
785

    
786
      # Autodetect release type
787
      if self.__is_exclusive():
788
        self.__exc = None
789
      else:
790
        self.__shr.remove(threading.currentThread())
791

    
792
      # Notify topmost condition in queue
793
      (priority, prioqueue) = self.__find_first_pending_queue()
794
      if prioqueue:
795
        cond = prioqueue[0]
796
        cond.notifyAll()
797
        if cond.shared:
798
          # Prevent further shared acquires from sneaking in while waiters are
799
          # notified
800
          self.__pending_shared.pop(priority, None)
801

    
802
    finally:
803
      self.__lock.release()
804

    
805
  def delete(self, timeout=None, priority=None):
806
    """Delete a Shared Lock.
807

808
    This operation will declare the lock for removal. First the lock will be
809
    acquired in exclusive mode if you don't already own it, then the lock
810
    will be put in a state where any future and pending acquire() fail.
811

812
    @type timeout: float
813
    @param timeout: maximum waiting time before giving up
814
    @type priority: integer
815
    @param priority: Priority for acquiring lock
816

817
    """
818
    if priority is None:
819
      priority = _DEFAULT_PRIORITY
820

    
821
    self.__lock.acquire()
822
    try:
823
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
824

    
825
      self.__check_deleted()
826

    
827
      # The caller is allowed to hold the lock exclusively already.
828
      acquired = self.__is_exclusive()
829

    
830
      if not acquired:
831
        acquired = self.__acquire_unlocked(0, timeout, priority)
832

    
833
        assert self.__is_exclusive() and not self.__is_sharer(), \
834
          "Lock wasn't acquired in exclusive mode"
835

    
836
      if acquired:
837
        self.__deleted = True
838
        self.__exc = None
839

    
840
        assert not (self.__exc or self.__shr), "Found owner during deletion"
841

    
842
        # Notify all acquires. They'll throw an error.
843
        for (_, prioqueue) in self.__pending:
844
          for cond in prioqueue:
845
            cond.notifyAll()
846

    
847
        assert self.__deleted
848

    
849
      return acquired
850
    finally:
851
      self.__lock.release()
852

    
853
  def _release_save(self):
854
    shared = self.__is_sharer()
855
    self.release()
856
    return shared
857

    
858
  def _acquire_restore(self, shared):
859
    self.acquire(shared=shared)
860

    
861

    
862
# Whenever we want to acquire a full LockSet we pass None as the value
863
# to acquire.  Hide this behind this nicely named constant.
864
ALL_SET = None
865

    
866

    
867
class _AcquireTimeout(Exception):
868
  """Internal exception to abort an acquire on a timeout.
869

870
  """
871

    
872

    
873
class LockSet:
874
  """Implements a set of locks.
875

876
  This abstraction implements a set of shared locks for the same resource type,
877
  distinguished by name. The user can lock a subset of the resources and the
878
  LockSet will take care of acquiring the locks always in the same order, thus
879
  preventing deadlock.
880

881
  All the locks needed in the same set must be acquired together, though.
882

883
  @type name: string
884
  @ivar name: the name of the lockset
885

886
  """
887
  def __init__(self, members, name, monitor=None):
888
    """Constructs a new LockSet.
889

890
    @type members: list of strings
891
    @param members: initial members of the set
892
    @type monitor: L{LockMonitor}
893
    @param monitor: Lock monitor with which to register member locks
894

895
    """
896
    assert members is not None, "members parameter is not a list"
897
    self.name = name
898

    
899
    # Lock monitor
900
    self.__monitor = monitor
901

    
902
    # Used internally to guarantee coherency
903
    self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
904

    
905
    # The lockdict indexes the relationship name -> lock
906
    # The order-of-locking is implied by the alphabetical order of names
907
    self.__lockdict = {}
908

    
909
    for mname in members:
910
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
911
                                          monitor=monitor)
912

    
913
    # The owner dict contains the set of locks each thread owns. For
914
    # performance each thread can access its own key without a global lock on
915
    # this structure. It is paramount though that *no* other type of access is
916
    # done to this structure (eg. no looping over its keys). *_owner helper
917
    # function are defined to guarantee access is correct, but in general never
918
    # do anything different than __owners[threading.currentThread()], or there
919
    # will be trouble.
920
    self.__owners = {}
921

    
922
  def _GetLockName(self, mname):
923
    """Returns the name for a member lock.
924

925
    """
926
    return "%s/%s" % (self.name, mname)
927

    
928
  def _get_lock(self):
929
    """Returns the lockset-internal lock.
930

931
    """
932
    return self.__lock
933

    
934
  def _get_lockdict(self):
935
    """Returns the lockset-internal lock dictionary.
936

937
    Accessing this structure is only safe in single-thread usage or when the
938
    lockset-internal lock is held.
939

940
    """
941
    return self.__lockdict
942

    
943
  def _is_owned(self):
944
    """Is the current thread a current level owner?"""
945
    return threading.currentThread() in self.__owners
946

    
947
  def _add_owned(self, name=None):
948
    """Note the current thread owns the given lock"""
949
    if name is None:
950
      if not self._is_owned():
951
        self.__owners[threading.currentThread()] = set()
952
    else:
953
      if self._is_owned():
954
        self.__owners[threading.currentThread()].add(name)
955
      else:
956
        self.__owners[threading.currentThread()] = set([name])
957

    
958
  def _del_owned(self, name=None):
959
    """Note the current thread owns the given lock"""
960

    
961
    assert not (name is None and self.__lock._is_owned()), \
962
           "Cannot hold internal lock when deleting owner status"
963

    
964
    if name is not None:
965
      self.__owners[threading.currentThread()].remove(name)
966

    
967
    # Only remove the key if we don't hold the set-lock as well
968
    if (not self.__lock._is_owned() and
969
        not self.__owners[threading.currentThread()]):
970
      del self.__owners[threading.currentThread()]
971

    
972
  def _list_owned(self):
973
    """Get the set of resource names owned by the current thread"""
974
    if self._is_owned():
975
      return self.__owners[threading.currentThread()].copy()
976
    else:
977
      return set()
978

    
979
  def _release_and_delete_owned(self):
980
    """Release and delete all resources owned by the current thread"""
981
    for lname in self._list_owned():
982
      lock = self.__lockdict[lname]
983
      if lock._is_owned():
984
        lock.release()
985
      self._del_owned(name=lname)
986

    
987
  def __names(self):
988
    """Return the current set of names.
989

990
    Only call this function while holding __lock and don't iterate on the
991
    result after releasing the lock.
992

993
    """
994
    return self.__lockdict.keys()
995

    
996
  def _names(self):
997
    """Return a copy of the current set of elements.
998

999
    Used only for debugging purposes.
1000

1001
    """
1002
    # If we don't already own the set-level lock acquired
1003
    # we'll get it and note we need to release it later.
1004
    release_lock = False
1005
    if not self.__lock._is_owned():
1006
      release_lock = True
1007
      self.__lock.acquire(shared=1)
1008
    try:
1009
      result = self.__names()
1010
    finally:
1011
      if release_lock:
1012
        self.__lock.release()
1013
    return set(result)
1014

    
1015
  def acquire(self, names, timeout=None, shared=0, priority=None,
1016
              test_notify=None):
1017
    """Acquire a set of resource locks.
1018

1019
    @type names: list of strings (or string)
1020
    @param names: the names of the locks which shall be acquired
1021
        (special lock names, or instance/node names)
1022
    @type shared: integer (0/1) used as a boolean
1023
    @param shared: whether to acquire in shared mode; by default an
1024
        exclusive lock will be acquired
1025
    @type timeout: float or None
1026
    @param timeout: Maximum time to acquire all locks
1027
    @type priority: integer
1028
    @param priority: Priority for acquiring locks
1029
    @type test_notify: callable or None
1030
    @param test_notify: Special callback function for unittesting
1031

1032
    @return: Set of all locks successfully acquired or None in case of timeout
1033

1034
    @raise errors.LockError: when any lock we try to acquire has
1035
        been deleted before we succeed. In this case none of the
1036
        locks requested will be acquired.
1037

1038
    """
1039
    assert timeout is None or timeout >= 0.0
1040

    
1041
    # Check we don't already own locks at this level
1042
    assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
1043
                                  " (lockset %s)" % self.name)
1044

    
1045
    if priority is None:
1046
      priority = _DEFAULT_PRIORITY
1047

    
1048
    # We need to keep track of how long we spent waiting for a lock. The
1049
    # timeout passed to this function is over all lock acquires.
1050
    running_timeout = utils.RunningTimeout(timeout, False)
1051

    
1052
    try:
1053
      if names is not None:
1054
        # Support passing in a single resource to acquire rather than many
1055
        if isinstance(names, basestring):
1056
          names = [names]
1057

    
1058
        return self.__acquire_inner(names, False, shared, priority,
1059
                                    running_timeout.Remaining, test_notify)
1060

    
1061
      else:
1062
        # If no names are given acquire the whole set by not letting new names
1063
        # being added before we release, and getting the current list of names.
1064
        # Some of them may then be deleted later, but we'll cope with this.
1065
        #
1066
        # We'd like to acquire this lock in a shared way, as it's nice if
1067
        # everybody else can use the instances at the same time. If we are
1068
        # acquiring them exclusively though they won't be able to do this
1069
        # anyway, though, so we'll get the list lock exclusively as well in
1070
        # order to be able to do add() on the set while owning it.
1071
        if not self.__lock.acquire(shared=shared, priority=priority,
1072
                                   timeout=running_timeout.Remaining()):
1073
          raise _AcquireTimeout()
1074
        try:
1075
          # note we own the set-lock
1076
          self._add_owned()
1077

    
1078
          return self.__acquire_inner(self.__names(), True, shared, priority,
1079
                                      running_timeout.Remaining, test_notify)
1080
        except:
1081
          # We shouldn't have problems adding the lock to the owners list, but
1082
          # if we did we'll try to release this lock and re-raise exception.
1083
          # Of course something is going to be really wrong, after this.
1084
          self.__lock.release()
1085
          self._del_owned()
1086
          raise
1087

    
1088
    except _AcquireTimeout:
1089
      return None
1090

    
1091
  def __acquire_inner(self, names, want_all, shared, priority,
1092
                      timeout_fn, test_notify):
1093
    """Inner logic for acquiring a number of locks.
1094

1095
    @param names: Names of the locks to be acquired
1096
    @param want_all: Whether all locks in the set should be acquired
1097
    @param shared: Whether to acquire in shared mode
1098
    @param timeout_fn: Function returning remaining timeout
1099
    @param priority: Priority for acquiring locks
1100
    @param test_notify: Special callback function for unittesting
1101

1102
    """
1103
    acquire_list = []
1104

    
1105
    # First we look the locks up on __lockdict. We have no way of being sure
1106
    # they will still be there after, but this makes it a lot faster should
1107
    # just one of them be the already wrong. Using a sorted sequence to prevent
1108
    # deadlocks.
1109
    for lname in sorted(utils.UniqueSequence(names)):
1110
      try:
1111
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
1112
      except KeyError:
1113
        if want_all:
1114
          # We are acquiring all the set, it doesn't matter if this particular
1115
          # element is not there anymore.
1116
          continue
1117

    
1118
        raise errors.LockError("Non-existing lock %s in set %s (it may have"
1119
                               " been removed)" % (lname, self.name))
1120

    
1121
      acquire_list.append((lname, lock))
1122

    
1123
    # This will hold the locknames we effectively acquired.
1124
    acquired = set()
1125

    
1126
    try:
1127
      # Now acquire_list contains a sorted list of resources and locks we
1128
      # want.  In order to get them we loop on this (private) list and
1129
      # acquire() them.  We gave no real guarantee they will still exist till
1130
      # this is done but .acquire() itself is safe and will alert us if the
1131
      # lock gets deleted.
1132
      for (lname, lock) in acquire_list:
1133
        if __debug__ and callable(test_notify):
1134
          test_notify_fn = lambda: test_notify(lname)
1135
        else:
1136
          test_notify_fn = None
1137

    
1138
        timeout = timeout_fn()
1139

    
1140
        try:
1141
          # raises LockError if the lock was deleted
1142
          acq_success = lock.acquire(shared=shared, timeout=timeout,
1143
                                     priority=priority,
1144
                                     test_notify=test_notify_fn)
1145
        except errors.LockError:
1146
          if want_all:
1147
            # We are acquiring all the set, it doesn't matter if this
1148
            # particular element is not there anymore.
1149
            continue
1150

    
1151
          raise errors.LockError("Non-existing lock %s in set %s (it may"
1152
                                 " have been removed)" % (lname, self.name))
1153

    
1154
        if not acq_success:
1155
          # Couldn't get lock or timeout occurred
1156
          if timeout is None:
1157
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
1158
            # blocking.
1159
            raise errors.LockError("Failed to get lock %s (set %s)" %
1160
                                   (lname, self.name))
1161

    
1162
          raise _AcquireTimeout()
1163

    
1164
        try:
1165
          # now the lock cannot be deleted, we have it!
1166
          self._add_owned(name=lname)
1167
          acquired.add(lname)
1168

    
1169
        except:
1170
          # We shouldn't have problems adding the lock to the owners list, but
1171
          # if we did we'll try to release this lock and re-raise exception.
1172
          # Of course something is going to be really wrong after this.
1173
          if lock._is_owned():
1174
            lock.release()
1175
          raise
1176

    
1177
    except:
1178
      # Release all owned locks
1179
      self._release_and_delete_owned()
1180
      raise
1181

    
1182
    return acquired
1183

    
1184
  def downgrade(self, names=None):
1185
    """Downgrade a set of resource locks from exclusive to shared mode.
1186

1187
    The locks must have been acquired in exclusive mode.
1188

1189
    """
1190
    assert self._is_owned(), ("downgrade on lockset %s while not owning any"
1191
                              " lock" % self.name)
1192

    
1193
    # Support passing in a single resource to downgrade rather than many
1194
    if isinstance(names, basestring):
1195
      names = [names]
1196

    
1197
    owned = self._list_owned()
1198

    
1199
    if names is None:
1200
      names = owned
1201
    else:
1202
      names = set(names)
1203
      assert owned.issuperset(names), \
1204
        ("downgrade() on unheld resources %s (set %s)" %
1205
         (names.difference(owned), self.name))
1206

    
1207
    for lockname in names:
1208
      self.__lockdict[lockname].downgrade()
1209

    
1210
    # Do we own the lockset in exclusive mode?
1211
    if self.__lock._is_owned(shared=0):
1212
      # Have all locks been downgraded?
1213
      if not compat.any(lock._is_owned(shared=0)
1214
                        for lock in self.__lockdict.values()):
1215
        self.__lock.downgrade()
1216
        assert self.__lock._is_owned(shared=1)
1217

    
1218
    return True
1219

    
1220
  def release(self, names=None):
1221
    """Release a set of resource locks, at the same level.
1222

1223
    You must have acquired the locks, either in shared or in exclusive mode,
1224
    before releasing them.
1225

1226
    @type names: list of strings, or None
1227
    @param names: the names of the locks which shall be released
1228
        (defaults to all the locks acquired at that level).
1229

1230
    """
1231
    assert self._is_owned(), ("release() on lock set %s while not owner" %
1232
                              self.name)
1233

    
1234
    # Support passing in a single resource to release rather than many
1235
    if isinstance(names, basestring):
1236
      names = [names]
1237

    
1238
    if names is None:
1239
      names = self._list_owned()
1240
    else:
1241
      names = set(names)
1242
      assert self._list_owned().issuperset(names), (
1243
               "release() on unheld resources %s (set %s)" %
1244
               (names.difference(self._list_owned()), self.name))
1245

    
1246
    # First of all let's release the "all elements" lock, if set.
1247
    # After this 'add' can work again
1248
    if self.__lock._is_owned():
1249
      self.__lock.release()
1250
      self._del_owned()
1251

    
1252
    for lockname in names:
1253
      # If we are sure the lock doesn't leave __lockdict without being
1254
      # exclusively held we can do this...
1255
      self.__lockdict[lockname].release()
1256
      self._del_owned(name=lockname)
1257

    
1258
  def add(self, names, acquired=0, shared=0):
1259
    """Add a new set of elements to the set
1260

1261
    @type names: list of strings
1262
    @param names: names of the new elements to add
1263
    @type acquired: integer (0/1) used as a boolean
1264
    @param acquired: pre-acquire the new resource?
1265
    @type shared: integer (0/1) used as a boolean
1266
    @param shared: is the pre-acquisition shared?
1267

1268
    """
1269
    # Check we don't already own locks at this level
1270
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1271
      ("Cannot add locks if the set %s is only partially owned, or shared" %
1272
       self.name)
1273

    
1274
    # Support passing in a single resource to add rather than many
1275
    if isinstance(names, basestring):
1276
      names = [names]
1277

    
1278
    # If we don't already own the set-level lock acquired in an exclusive way
1279
    # we'll get it and note we need to release it later.
1280
    release_lock = False
1281
    if not self.__lock._is_owned():
1282
      release_lock = True
1283
      self.__lock.acquire()
1284

    
1285
    try:
1286
      invalid_names = set(self.__names()).intersection(names)
1287
      if invalid_names:
1288
        # This must be an explicit raise, not an assert, because assert is
1289
        # turned off when using optimization, and this can happen because of
1290
        # concurrency even if the user doesn't want it.
1291
        raise errors.LockError("duplicate add(%s) on lockset %s" %
1292
                               (invalid_names, self.name))
1293

    
1294
      for lockname in names:
1295
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1296

    
1297
        if acquired:
1298
          # No need for priority or timeout here as this lock has just been
1299
          # created
1300
          lock.acquire(shared=shared)
1301
          # now the lock cannot be deleted, we have it!
1302
          try:
1303
            self._add_owned(name=lockname)
1304
          except:
1305
            # We shouldn't have problems adding the lock to the owners list,
1306
            # but if we did we'll try to release this lock and re-raise
1307
            # exception.  Of course something is going to be really wrong,
1308
            # after this.  On the other hand the lock hasn't been added to the
1309
            # __lockdict yet so no other threads should be pending on it. This
1310
            # release is just a safety measure.
1311
            lock.release()
1312
            raise
1313

    
1314
        self.__lockdict[lockname] = lock
1315

    
1316
    finally:
1317
      # Only release __lock if we were not holding it previously.
1318
      if release_lock:
1319
        self.__lock.release()
1320

    
1321
    return True
1322

    
1323
  def remove(self, names):
1324
    """Remove elements from the lock set.
1325

1326
    You can either not hold anything in the lockset or already hold a superset
1327
    of the elements you want to delete, exclusively.
1328

1329
    @type names: list of strings
1330
    @param names: names of the resource to remove.
1331

1332
    @return: a list of locks which we removed; the list is always
1333
        equal to the names list if we were holding all the locks
1334
        exclusively
1335

1336
    """
1337
    # Support passing in a single resource to remove rather than many
1338
    if isinstance(names, basestring):
1339
      names = [names]
1340

    
1341
    # If we own any subset of this lock it must be a superset of what we want
1342
    # to delete. The ownership must also be exclusive, but that will be checked
1343
    # by the lock itself.
1344
    assert not self._is_owned() or self._list_owned().issuperset(names), (
1345
      "remove() on acquired lockset %s while not owning all elements" %
1346
      self.name)
1347

    
1348
    removed = []
1349

    
1350
    for lname in names:
1351
      # Calling delete() acquires the lock exclusively if we don't already own
1352
      # it, and causes all pending and subsequent lock acquires to fail. It's
1353
      # fine to call it out of order because delete() also implies release(),
1354
      # and the assertion above guarantees that if we either already hold
1355
      # everything we want to delete, or we hold none.
1356
      try:
1357
        self.__lockdict[lname].delete()
1358
        removed.append(lname)
1359
      except (KeyError, errors.LockError):
1360
        # This cannot happen if we were already holding it, verify:
1361
        assert not self._is_owned(), ("remove failed while holding lockset %s"
1362
                                      % self.name)
1363
      else:
1364
        # If no LockError was raised we are the ones who deleted the lock.
1365
        # This means we can safely remove it from lockdict, as any further or
1366
        # pending delete() or acquire() will fail (and nobody can have the lock
1367
        # since before our call to delete()).
1368
        #
1369
        # This is done in an else clause because if the exception was thrown
1370
        # it's the job of the one who actually deleted it.
1371
        del self.__lockdict[lname]
1372
        # And let's remove it from our private list if we owned it.
1373
        if self._is_owned():
1374
          self._del_owned(name=lname)
1375

    
1376
    return removed
1377

    
1378

    
1379
# Locking levels, must be acquired in increasing order.
1380
# Current rules are:
1381
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1382
#   acquired before performing any operation, either in shared or in exclusive
1383
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1384
#   avoided.
1385
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1386
#   If you need more than one node, or more than one instance, acquire them at
1387
#   the same time.
1388
LEVEL_CLUSTER = 0
1389
LEVEL_INSTANCE = 1
1390
LEVEL_NODEGROUP = 2
1391
LEVEL_NODE = 3
1392

    
1393
LEVELS = [LEVEL_CLUSTER,
1394
          LEVEL_INSTANCE,
1395
          LEVEL_NODEGROUP,
1396
          LEVEL_NODE]
1397

    
1398
# Lock levels which are modifiable
1399
LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE]
1400

    
1401
LEVEL_NAMES = {
1402
  LEVEL_CLUSTER: "cluster",
1403
  LEVEL_INSTANCE: "instance",
1404
  LEVEL_NODEGROUP: "nodegroup",
1405
  LEVEL_NODE: "node",
1406
  }
1407

    
1408
# Constant for the big ganeti lock
1409
BGL = 'BGL'
1410

    
1411

    
1412
class GanetiLockManager:
1413
  """The Ganeti Locking Library
1414

1415
  The purpose of this small library is to manage locking for ganeti clusters
1416
  in a central place, while at the same time doing dynamic checks against
1417
  possible deadlocks. It will also make it easier to transition to a different
1418
  lock type should we migrate away from python threads.
1419

1420
  """
1421
  _instance = None
1422

    
1423
  def __init__(self, nodes, nodegroups, instances):
1424
    """Constructs a new GanetiLockManager object.
1425

1426
    There should be only a GanetiLockManager object at any time, so this
1427
    function raises an error if this is not the case.
1428

1429
    @param nodes: list of node names
1430
    @param nodegroups: list of nodegroup uuids
1431
    @param instances: list of instance names
1432

1433
    """
1434
    assert self.__class__._instance is None, \
1435
           "double GanetiLockManager instance"
1436

    
1437
    self.__class__._instance = self
1438

    
1439
    self._monitor = LockMonitor()
1440

    
1441
    # The keyring contains all the locks, at their level and in the correct
1442
    # locking order.
1443
    self.__keyring = {
1444
      LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1445
      LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1446
      LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor),
1447
      LEVEL_INSTANCE: LockSet(instances, "instances",
1448
                              monitor=self._monitor),
1449
      }
1450

    
1451
  def AddToLockMonitor(self, provider):
1452
    """Registers a new lock with the monitor.
1453

1454
    See L{LockMonitor.RegisterLock}.
1455

1456
    """
1457
    return self._monitor.RegisterLock(provider)
1458

    
1459
  def QueryLocks(self, fields):
1460
    """Queries information from all locks.
1461

1462
    See L{LockMonitor.QueryLocks}.
1463

1464
    """
1465
    return self._monitor.QueryLocks(fields)
1466

    
1467
  def OldStyleQueryLocks(self, fields):
1468
    """Queries information from all locks, returning old-style data.
1469

1470
    See L{LockMonitor.OldStyleQueryLocks}.
1471

1472
    """
1473
    return self._monitor.OldStyleQueryLocks(fields)
1474

    
1475
  def _names(self, level):
1476
    """List the lock names at the given level.
1477

1478
    This can be used for debugging/testing purposes.
1479

1480
    @param level: the level whose list of locks to get
1481

1482
    """
1483
    assert level in LEVELS, "Invalid locking level %s" % level
1484
    return self.__keyring[level]._names()
1485

    
1486
  def _is_owned(self, level):
1487
    """Check whether we are owning locks at the given level
1488

1489
    """
1490
    return self.__keyring[level]._is_owned()
1491

    
1492
  is_owned = _is_owned
1493

    
1494
  def _list_owned(self, level):
1495
    """Get the set of owned locks at the given level
1496

1497
    """
1498
    return self.__keyring[level]._list_owned()
1499

    
1500
  list_owned = _list_owned
1501

    
1502
  def _upper_owned(self, level):
1503
    """Check that we don't own any lock at a level greater than the given one.
1504

1505
    """
1506
    # This way of checking only works if LEVELS[i] = i, which we check for in
1507
    # the test cases.
1508
    return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1509

    
1510
  def _BGL_owned(self): # pylint: disable=C0103
1511
    """Check if the current thread owns the BGL.
1512

1513
    Both an exclusive or a shared acquisition work.
1514

1515
    """
1516
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1517

    
1518
  @staticmethod
1519
  def _contains_BGL(level, names): # pylint: disable=C0103
1520
    """Check if the level contains the BGL.
1521

1522
    Check if acting on the given level and set of names will change
1523
    the status of the Big Ganeti Lock.
1524

1525
    """
1526
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1527

    
1528
  def acquire(self, level, names, timeout=None, shared=0, priority=None):
1529
    """Acquire a set of resource locks, at the same level.
1530

1531
    @type level: member of locking.LEVELS
1532
    @param level: the level at which the locks shall be acquired
1533
    @type names: list of strings (or string)
1534
    @param names: the names of the locks which shall be acquired
1535
        (special lock names, or instance/node names)
1536
    @type shared: integer (0/1) used as a boolean
1537
    @param shared: whether to acquire in shared mode; by default
1538
        an exclusive lock will be acquired
1539
    @type timeout: float
1540
    @param timeout: Maximum time to acquire all locks
1541
    @type priority: integer
1542
    @param priority: Priority for acquiring lock
1543

1544
    """
1545
    assert level in LEVELS, "Invalid locking level %s" % level
1546

    
1547
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1548
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1549
    # so even if we've migrated we need to at least share the BGL to be
1550
    # compatible with them. Of course if we own the BGL exclusively there's no
1551
    # point in acquiring any other lock, unless perhaps we are half way through
1552
    # the migration of the current opcode.
1553
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1554
            "You must own the Big Ganeti Lock before acquiring any other")
1555

    
1556
    # Check we don't own locks at the same or upper levels.
1557
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1558
           " while owning some at a greater one")
1559

    
1560
    # Acquire the locks in the set.
1561
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1562
                                         priority=priority)
1563

    
1564
  def downgrade(self, level, names=None):
1565
    """Downgrade a set of resource locks from exclusive to shared mode.
1566

1567
    You must have acquired the locks in exclusive mode.
1568

1569
    @type level: member of locking.LEVELS
1570
    @param level: the level at which the locks shall be downgraded
1571
    @type names: list of strings, or None
1572
    @param names: the names of the locks which shall be downgraded
1573
        (defaults to all the locks acquired at the level)
1574

1575
    """
1576
    assert level in LEVELS, "Invalid locking level %s" % level
1577

    
1578
    return self.__keyring[level].downgrade(names=names)
1579

    
1580
  def release(self, level, names=None):
1581
    """Release a set of resource locks, at the same level.
1582

1583
    You must have acquired the locks, either in shared or in exclusive
1584
    mode, before releasing them.
1585

1586
    @type level: member of locking.LEVELS
1587
    @param level: the level at which the locks shall be released
1588
    @type names: list of strings, or None
1589
    @param names: the names of the locks which shall be released
1590
        (defaults to all the locks acquired at that level)
1591

1592
    """
1593
    assert level in LEVELS, "Invalid locking level %s" % level
1594
    assert (not self._contains_BGL(level, names) or
1595
            not self._upper_owned(LEVEL_CLUSTER)), (
1596
            "Cannot release the Big Ganeti Lock while holding something"
1597
            " at upper levels (%r)" %
1598
            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1599
                              for i in self.__keyring.keys()]), ))
1600

    
1601
    # Release will complain if we don't own the locks already
1602
    return self.__keyring[level].release(names)
1603

    
1604
  def add(self, level, names, acquired=0, shared=0):
1605
    """Add locks at the specified level.
1606

1607
    @type level: member of locking.LEVELS_MOD
1608
    @param level: the level at which the locks shall be added
1609
    @type names: list of strings
1610
    @param names: names of the locks to acquire
1611
    @type acquired: integer (0/1) used as a boolean
1612
    @param acquired: whether to acquire the newly added locks
1613
    @type shared: integer (0/1) used as a boolean
1614
    @param shared: whether the acquisition will be shared
1615

1616
    """
1617
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1618
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1619
           " operations")
1620
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1621
           " while owning some at a greater one")
1622
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1623

    
1624
  def remove(self, level, names):
1625
    """Remove locks from the specified level.
1626

1627
    You must either already own the locks you are trying to remove
1628
    exclusively or not own any lock at an upper level.
1629

1630
    @type level: member of locking.LEVELS_MOD
1631
    @param level: the level at which the locks shall be removed
1632
    @type names: list of strings
1633
    @param names: the names of the locks which shall be removed
1634
        (special lock names, or instance/node names)
1635

1636
    """
1637
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1638
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1639
           " operations")
1640
    # Check we either own the level or don't own anything from here
1641
    # up. LockSet.remove() will check the case in which we don't own
1642
    # all the needed resources, or we have a shared ownership.
1643
    assert self._is_owned(level) or not self._upper_owned(level), (
1644
           "Cannot remove locks at a level while not owning it or"
1645
           " owning some at a greater one")
1646
    return self.__keyring[level].remove(names)
1647

    
1648

    
1649
def _MonitorSortKey((item, idx, num)):
1650
  """Sorting key function.
1651

1652
  Sort by name, registration order and then order of information. This provides
1653
  a stable sort order over different providers, even if they return the same
1654
  name.
1655

1656
  """
1657
  (name, _, _, _) = item
1658

    
1659
  return (utils.NiceSortKey(name), num, idx)
1660

    
1661

    
1662
class LockMonitor(object):
1663
  _LOCK_ATTR = "_lock"
1664

    
1665
  def __init__(self):
1666
    """Initializes this class.
1667

1668
    """
1669
    self._lock = SharedLock("LockMonitor")
1670

    
1671
    # Counter for stable sorting
1672
    self._counter = itertools.count(0)
1673

    
1674
    # Tracked locks. Weak references are used to avoid issues with circular
1675
    # references and deletion.
1676
    self._locks = weakref.WeakKeyDictionary()
1677

    
1678
  @ssynchronized(_LOCK_ATTR)
1679
  def RegisterLock(self, provider):
1680
    """Registers a new lock.
1681

1682
    @param provider: Object with a callable method named C{GetLockInfo}, taking
1683
      a single C{set} containing the requested information items
1684
    @note: It would be nicer to only receive the function generating the
1685
      requested information but, as it turns out, weak references to bound
1686
      methods (e.g. C{self.GetLockInfo}) are tricky; there are several
1687
      workarounds, but none of the ones I found works properly in combination
1688
      with a standard C{WeakKeyDictionary}
1689

1690
    """
1691
    assert provider not in self._locks, "Duplicate registration"
1692

    
1693
    # There used to be a check for duplicate names here. As it turned out, when
1694
    # a lock is re-created with the same name in a very short timeframe, the
1695
    # previous instance might not yet be removed from the weakref dictionary.
1696
    # By keeping track of the order of incoming registrations, a stable sort
1697
    # ordering can still be guaranteed.
1698

    
1699
    self._locks[provider] = self._counter.next()
1700

    
1701
  def _GetLockInfo(self, requested):
1702
    """Get information from all locks.
1703

1704
    """
1705
    # Must hold lock while getting consistent list of tracked items
1706
    self._lock.acquire(shared=1)
1707
    try:
1708
      items = self._locks.items()
1709
    finally:
1710
      self._lock.release()
1711

    
1712
    return [(info, idx, num)
1713
            for (provider, num) in items
1714
            for (idx, info) in enumerate(provider.GetLockInfo(requested))]
1715

    
1716
  def _Query(self, fields):
1717
    """Queries information from all locks.
1718

1719
    @type fields: list of strings
1720
    @param fields: List of fields to return
1721

1722
    """
1723
    qobj = query.Query(query.LOCK_FIELDS, fields)
1724

    
1725
    # Get all data with internal lock held and then sort by name and incoming
1726
    # order
1727
    lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
1728
                      key=_MonitorSortKey)
1729

    
1730
    # Extract lock information and build query data
1731
    return (qobj, query.LockQueryData(map(compat.fst, lockinfo)))
1732

    
1733
  def QueryLocks(self, fields):
1734
    """Queries information from all locks.
1735

1736
    @type fields: list of strings
1737
    @param fields: List of fields to return
1738

1739
    """
1740
    (qobj, ctx) = self._Query(fields)
1741

    
1742
    # Prepare query response
1743
    return query.GetQueryResponse(qobj, ctx)
1744

    
1745
  def OldStyleQueryLocks(self, fields):
1746
    """Queries information from all locks, returning old-style data.
1747

1748
    @type fields: list of strings
1749
    @param fields: List of fields to return
1750

1751
    """
1752
    (qobj, ctx) = self._Query(fields)
1753

    
1754
    return qobj.OldStyleQuery(ctx)