Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ e02ee261

History | View | Annotate | Download (55.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import errno
32
import weakref
33
import logging
34
import heapq
35
import itertools
36
import time
37

    
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import compat
41
from ganeti import query
42

    
43

    
44
_EXCLUSIVE_TEXT = "exclusive"
45
_SHARED_TEXT = "shared"
46
_DELETED_TEXT = "deleted"
47

    
48
_DEFAULT_PRIORITY = 0
49

    
50

    
51
def ssynchronized(mylock, shared=0):
52
  """Shared Synchronization decorator.
53

54
  Calls the function holding the given lock, either in exclusive or shared
55
  mode. It requires the passed lock to be a SharedLock (or support its
56
  semantics).
57

58
  @type mylock: lockable object or string
59
  @param mylock: lock to acquire or class member name of the lock to acquire
60

61
  """
62
  def wrap(fn):
63
    def sync_function(*args, **kwargs):
64
      if isinstance(mylock, basestring):
65
        assert args, "cannot ssynchronize on non-class method: self not found"
66
        # args[0] is "self"
67
        lock = getattr(args[0], mylock)
68
      else:
69
        lock = mylock
70
      lock.acquire(shared=shared)
71
      try:
72
        return fn(*args, **kwargs)
73
      finally:
74
        lock.release()
75
    return sync_function
76
  return wrap
77

    
78

    
79
class _SingleNotifyPipeConditionWaiter(object):
80
  """Helper class for SingleNotifyPipeCondition
81

82
  """
83
  __slots__ = [
84
    "_fd",
85
    "_poller",
86
    ]
87

    
88
  def __init__(self, poller, fd):
89
    """Constructor for _SingleNotifyPipeConditionWaiter
90

91
    @type poller: select.poll
92
    @param poller: Poller object
93
    @type fd: int
94
    @param fd: File descriptor to wait for
95

96
    """
97
    object.__init__(self)
98
    self._poller = poller
99
    self._fd = fd
100

    
101
  def __call__(self, timeout):
102
    """Wait for something to happen on the pipe.
103

104
    @type timeout: float or None
105
    @param timeout: Timeout for waiting (can be None)
106

107
    """
108
    running_timeout = utils.RunningTimeout(timeout, True)
109

    
110
    while True:
111
      remaining_time = running_timeout.Remaining()
112

    
113
      if remaining_time is not None:
114
        if remaining_time < 0.0:
115
          break
116

    
117
        # Our calculation uses seconds, poll() wants milliseconds
118
        remaining_time *= 1000
119

    
120
      try:
121
        result = self._poller.poll(remaining_time)
122
      except EnvironmentError, err:
123
        if err.errno != errno.EINTR:
124
          raise
125
        result = None
126

    
127
      # Check whether we were notified
128
      if result and result[0][0] == self._fd:
129
        break
130

    
131

    
132
class _BaseCondition(object):
133
  """Base class containing common code for conditions.
134

135
  Some of this code is taken from python's threading module.
136

137
  """
138
  __slots__ = [
139
    "_lock",
140
    "acquire",
141
    "release",
142
    "_is_owned",
143
    "_acquire_restore",
144
    "_release_save",
145
    ]
146

    
147
  def __init__(self, lock):
148
    """Constructor for _BaseCondition.
149

150
    @type lock: threading.Lock
151
    @param lock: condition base lock
152

153
    """
154
    object.__init__(self)
155

    
156
    try:
157
      self._release_save = lock._release_save
158
    except AttributeError:
159
      self._release_save = self._base_release_save
160
    try:
161
      self._acquire_restore = lock._acquire_restore
162
    except AttributeError:
163
      self._acquire_restore = self._base_acquire_restore
164
    try:
165
      self._is_owned = lock.is_owned
166
    except AttributeError:
167
      self._is_owned = self._base_is_owned
168

    
169
    self._lock = lock
170

    
171
    # Export the lock's acquire() and release() methods
172
    self.acquire = lock.acquire
173
    self.release = lock.release
174

    
175
  def _base_is_owned(self):
176
    """Check whether lock is owned by current thread.
177

178
    """
179
    if self._lock.acquire(0):
180
      self._lock.release()
181
      return False
182
    return True
183

    
184
  def _base_release_save(self):
185
    self._lock.release()
186

    
187
  def _base_acquire_restore(self, _):
188
    self._lock.acquire()
189

    
190
  def _check_owned(self):
191
    """Raise an exception if the current thread doesn't own the lock.
192

193
    """
194
    if not self._is_owned():
195
      raise RuntimeError("cannot work with un-aquired lock")
196

    
197

    
198
class SingleNotifyPipeCondition(_BaseCondition):
199
  """Condition which can only be notified once.
200

201
  This condition class uses pipes and poll, internally, to be able to wait for
202
  notification with a timeout, without resorting to polling. It is almost
203
  compatible with Python's threading.Condition, with the following differences:
204
    - notifyAll can only be called once, and no wait can happen after that
205
    - notify is not supported, only notifyAll
206

207
  """
208

    
209
  __slots__ = [
210
    "_poller",
211
    "_read_fd",
212
    "_write_fd",
213
    "_nwaiters",
214
    "_notified",
215
    ]
216

    
217
  _waiter_class = _SingleNotifyPipeConditionWaiter
218

    
219
  def __init__(self, lock):
220
    """Constructor for SingleNotifyPipeCondition
221

222
    """
223
    _BaseCondition.__init__(self, lock)
224
    self._nwaiters = 0
225
    self._notified = False
226
    self._read_fd = None
227
    self._write_fd = None
228
    self._poller = None
229

    
230
  def _check_unnotified(self):
231
    """Throws an exception if already notified.
232

233
    """
234
    if self._notified:
235
      raise RuntimeError("cannot use already notified condition")
236

    
237
  def _Cleanup(self):
238
    """Cleanup open file descriptors, if any.
239

240
    """
241
    if self._read_fd is not None:
242
      os.close(self._read_fd)
243
      self._read_fd = None
244

    
245
    if self._write_fd is not None:
246
      os.close(self._write_fd)
247
      self._write_fd = None
248
    self._poller = None
249

    
250
  def wait(self, timeout):
251
    """Wait for a notification.
252

253
    @type timeout: float or None
254
    @param timeout: Waiting timeout (can be None)
255

256
    """
257
    self._check_owned()
258
    self._check_unnotified()
259

    
260
    self._nwaiters += 1
261
    try:
262
      if self._poller is None:
263
        (self._read_fd, self._write_fd) = os.pipe()
264
        self._poller = select.poll()
265
        self._poller.register(self._read_fd, select.POLLHUP)
266

    
267
      wait_fn = self._waiter_class(self._poller, self._read_fd)
268
      state = self._release_save()
269
      try:
270
        # Wait for notification
271
        wait_fn(timeout)
272
      finally:
273
        # Re-acquire lock
274
        self._acquire_restore(state)
275
    finally:
276
      self._nwaiters -= 1
277
      if self._nwaiters == 0:
278
        self._Cleanup()
279

    
280
  def notifyAll(self): # pylint: disable=C0103
281
    """Close the writing side of the pipe to notify all waiters.
282

283
    """
284
    self._check_owned()
285
    self._check_unnotified()
286
    self._notified = True
287
    if self._write_fd is not None:
288
      os.close(self._write_fd)
289
      self._write_fd = None
290

    
291

    
292
class PipeCondition(_BaseCondition):
293
  """Group-only non-polling condition with counters.
294

295
  This condition class uses pipes and poll, internally, to be able to wait for
296
  notification with a timeout, without resorting to polling. It is almost
297
  compatible with Python's threading.Condition, but only supports notifyAll and
298
  non-recursive locks. As an additional features it's able to report whether
299
  there are any waiting threads.
300

301
  """
302
  __slots__ = [
303
    "_waiters",
304
    "_single_condition",
305
    ]
306

    
307
  _single_condition_class = SingleNotifyPipeCondition
308

    
309
  def __init__(self, lock):
310
    """Initializes this class.
311

312
    """
313
    _BaseCondition.__init__(self, lock)
314
    self._waiters = set()
315
    self._single_condition = self._single_condition_class(self._lock)
316

    
317
  def wait(self, timeout):
318
    """Wait for a notification.
319

320
    @type timeout: float or None
321
    @param timeout: Waiting timeout (can be None)
322

323
    """
324
    self._check_owned()
325

    
326
    # Keep local reference to the pipe. It could be replaced by another thread
327
    # notifying while we're waiting.
328
    cond = self._single_condition
329

    
330
    self._waiters.add(threading.currentThread())
331
    try:
332
      cond.wait(timeout)
333
    finally:
334
      self._check_owned()
335
      self._waiters.remove(threading.currentThread())
336

    
337
  def notifyAll(self): # pylint: disable=C0103
338
    """Notify all currently waiting threads.
339

340
    """
341
    self._check_owned()
342
    self._single_condition.notifyAll()
343
    self._single_condition = self._single_condition_class(self._lock)
344

    
345
  def get_waiting(self):
346
    """Returns a list of all waiting threads.
347

348
    """
349
    self._check_owned()
350

    
351
    return self._waiters
352

    
353
  def has_waiting(self):
354
    """Returns whether there are active waiters.
355

356
    """
357
    self._check_owned()
358

    
359
    return bool(self._waiters)
360

    
361
  def __repr__(self):
362
    return ("<%s.%s waiters=%s at %#x>" %
363
            (self.__class__.__module__, self.__class__.__name__,
364
             self._waiters, id(self)))
365

    
366

    
367
class _PipeConditionWithMode(PipeCondition):
368
  __slots__ = [
369
    "shared",
370
    ]
371

    
372
  def __init__(self, lock, shared):
373
    """Initializes this class.
374

375
    """
376
    self.shared = shared
377
    PipeCondition.__init__(self, lock)
378

    
379

    
380
class SharedLock(object):
381
  """Implements a shared lock.
382

383
  Multiple threads can acquire the lock in a shared way by calling
384
  C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
385
  threads can call C{acquire(shared=0)}.
386

387
  Notes on data structures: C{__pending} contains a priority queue (heapq) of
388
  all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
389
  ...]}. Each per-priority queue contains a normal in-order list of conditions
390
  to be notified when the lock can be acquired. Shared locks are grouped
391
  together by priority and the condition for them is stored in
392
  C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
393
  references for the per-priority queues indexed by priority for faster access.
394

395
  @type name: string
396
  @ivar name: the name of the lock
397

398
  """
399
  __slots__ = [
400
    "__weakref__",
401
    "__deleted",
402
    "__exc",
403
    "__lock",
404
    "__pending",
405
    "__pending_by_prio",
406
    "__pending_shared",
407
    "__shr",
408
    "__time_fn",
409
    "name",
410
    ]
411

    
412
  __condition_class = _PipeConditionWithMode
413

    
414
  def __init__(self, name, monitor=None, _time_fn=time.time):
415
    """Construct a new SharedLock.
416

417
    @param name: the name of the lock
418
    @type monitor: L{LockMonitor}
419
    @param monitor: Lock monitor with which to register
420

421
    """
422
    object.__init__(self)
423

    
424
    self.name = name
425

    
426
    # Used for unittesting
427
    self.__time_fn = _time_fn
428

    
429
    # Internal lock
430
    self.__lock = threading.Lock()
431

    
432
    # Queue containing waiting acquires
433
    self.__pending = []
434
    self.__pending_by_prio = {}
435
    self.__pending_shared = {}
436

    
437
    # Current lock holders
438
    self.__shr = set()
439
    self.__exc = None
440

    
441
    # is this lock in the deleted state?
442
    self.__deleted = False
443

    
444
    # Register with lock monitor
445
    if monitor:
446
      logging.debug("Adding lock %s to monitor", name)
447
      monitor.RegisterLock(self)
448

    
449
  def __repr__(self):
450
    return ("<%s.%s name=%s at %#x>" %
451
            (self.__class__.__module__, self.__class__.__name__,
452
             self.name, id(self)))
453

    
454
  def GetLockInfo(self, requested):
455
    """Retrieves information for querying locks.
456

457
    @type requested: set
458
    @param requested: Requested information, see C{query.LQ_*}
459

460
    """
461
    self.__lock.acquire()
462
    try:
463
      # Note: to avoid unintentional race conditions, no references to
464
      # modifiable objects should be returned unless they were created in this
465
      # function.
466
      mode = None
467
      owner_names = None
468

    
469
      if query.LQ_MODE in requested:
470
        if self.__deleted:
471
          mode = _DELETED_TEXT
472
          assert not (self.__exc or self.__shr)
473
        elif self.__exc:
474
          mode = _EXCLUSIVE_TEXT
475
        elif self.__shr:
476
          mode = _SHARED_TEXT
477

    
478
      # Current owner(s) are wanted
479
      if query.LQ_OWNER in requested:
480
        if self.__exc:
481
          owner = [self.__exc]
482
        else:
483
          owner = self.__shr
484

    
485
        if owner:
486
          assert not self.__deleted
487
          owner_names = [i.getName() for i in owner]
488

    
489
      # Pending acquires are wanted
490
      if query.LQ_PENDING in requested:
491
        pending = []
492

    
493
        # Sorting instead of copying and using heaq functions for simplicity
494
        for (_, prioqueue) in sorted(self.__pending):
495
          for cond in prioqueue:
496
            if cond.shared:
497
              pendmode = _SHARED_TEXT
498
            else:
499
              pendmode = _EXCLUSIVE_TEXT
500

    
501
            # List of names will be sorted in L{query._GetLockPending}
502
            pending.append((pendmode, [i.getName()
503
                                       for i in cond.get_waiting()]))
504
      else:
505
        pending = None
506

    
507
      return [(self.name, mode, owner_names, pending)]
508
    finally:
509
      self.__lock.release()
510

    
511
  def __check_deleted(self):
512
    """Raises an exception if the lock has been deleted.
513

514
    """
515
    if self.__deleted:
516
      raise errors.LockError("Deleted lock %s" % self.name)
517

    
518
  def __is_sharer(self):
519
    """Is the current thread sharing the lock at this time?
520

521
    """
522
    return threading.currentThread() in self.__shr
523

    
524
  def __is_exclusive(self):
525
    """Is the current thread holding the lock exclusively at this time?
526

527
    """
528
    return threading.currentThread() == self.__exc
529

    
530
  def __is_owned(self, shared=-1):
531
    """Is the current thread somehow owning the lock at this time?
532

533
    This is a private version of the function, which presumes you're holding
534
    the internal lock.
535

536
    """
537
    if shared < 0:
538
      return self.__is_sharer() or self.__is_exclusive()
539
    elif shared:
540
      return self.__is_sharer()
541
    else:
542
      return self.__is_exclusive()
543

    
544
  def is_owned(self, shared=-1):
545
    """Is the current thread somehow owning the lock at this time?
546

547
    @param shared:
548
        - < 0: check for any type of ownership (default)
549
        - 0: check for exclusive ownership
550
        - > 0: check for shared ownership
551

552
    """
553
    self.__lock.acquire()
554
    try:
555
      return self.__is_owned(shared=shared)
556
    finally:
557
      self.__lock.release()
558

    
559
  #: Necessary to remain compatible with threading.Condition, which tries to
560
  #: retrieve a locks' "_is_owned" attribute
561
  _is_owned = is_owned
562

    
563
  def _count_pending(self):
564
    """Returns the number of pending acquires.
565

566
    @rtype: int
567

568
    """
569
    self.__lock.acquire()
570
    try:
571
      return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
572
    finally:
573
      self.__lock.release()
574

    
575
  def _check_empty(self):
576
    """Checks whether there are any pending acquires.
577

578
    @rtype: bool
579

580
    """
581
    self.__lock.acquire()
582
    try:
583
      # Order is important: __find_first_pending_queue modifies __pending
584
      (_, prioqueue) = self.__find_first_pending_queue()
585

    
586
      return not (prioqueue or
587
                  self.__pending or
588
                  self.__pending_by_prio or
589
                  self.__pending_shared)
590
    finally:
591
      self.__lock.release()
592

    
593
  def __do_acquire(self, shared):
594
    """Actually acquire the lock.
595

596
    """
597
    if shared:
598
      self.__shr.add(threading.currentThread())
599
    else:
600
      self.__exc = threading.currentThread()
601

    
602
  def __can_acquire(self, shared):
603
    """Determine whether lock can be acquired.
604

605
    """
606
    if shared:
607
      return self.__exc is None
608
    else:
609
      return len(self.__shr) == 0 and self.__exc is None
610

    
611
  def __find_first_pending_queue(self):
612
    """Tries to find the topmost queued entry with pending acquires.
613

614
    Removes empty entries while going through the list.
615

616
    """
617
    while self.__pending:
618
      (priority, prioqueue) = self.__pending[0]
619

    
620
      if prioqueue:
621
        return (priority, prioqueue)
622

    
623
      # Remove empty queue
624
      heapq.heappop(self.__pending)
625
      del self.__pending_by_prio[priority]
626
      assert priority not in self.__pending_shared
627

    
628
    return (None, None)
629

    
630
  def __is_on_top(self, cond):
631
    """Checks whether the passed condition is on top of the queue.
632

633
    The caller must make sure the queue isn't empty.
634

635
    """
636
    (_, prioqueue) = self.__find_first_pending_queue()
637

    
638
    return cond == prioqueue[0]
639

    
640
  def __acquire_unlocked(self, shared, timeout, priority):
641
    """Acquire a shared lock.
642

643
    @param shared: whether to acquire in shared mode; by default an
644
        exclusive lock will be acquired
645
    @param timeout: maximum waiting time before giving up
646
    @type priority: integer
647
    @param priority: Priority for acquiring lock
648

649
    """
650
    self.__check_deleted()
651

    
652
    # We cannot acquire the lock if we already have it
653
    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
654
                                   " %s" % self.name)
655

    
656
    # Remove empty entries from queue
657
    self.__find_first_pending_queue()
658

    
659
    # Check whether someone else holds the lock or there are pending acquires.
660
    if not self.__pending and self.__can_acquire(shared):
661
      # Apparently not, can acquire lock directly.
662
      self.__do_acquire(shared)
663
      return True
664

    
665
    prioqueue = self.__pending_by_prio.get(priority, None)
666

    
667
    if shared:
668
      # Try to re-use condition for shared acquire
669
      wait_condition = self.__pending_shared.get(priority, None)
670
      assert (wait_condition is None or
671
              (wait_condition.shared and wait_condition in prioqueue))
672
    else:
673
      wait_condition = None
674

    
675
    if wait_condition is None:
676
      if prioqueue is None:
677
        assert priority not in self.__pending_by_prio
678

    
679
        prioqueue = []
680
        heapq.heappush(self.__pending, (priority, prioqueue))
681
        self.__pending_by_prio[priority] = prioqueue
682

    
683
      wait_condition = self.__condition_class(self.__lock, shared)
684
      prioqueue.append(wait_condition)
685

    
686
      if shared:
687
        # Keep reference for further shared acquires on same priority. This is
688
        # better than trying to find it in the list of pending acquires.
689
        assert priority not in self.__pending_shared
690
        self.__pending_shared[priority] = wait_condition
691

    
692
    wait_start = self.__time_fn()
693
    acquired = False
694

    
695
    try:
696
      # Wait until we become the topmost acquire in the queue or the timeout
697
      # expires.
698
      while True:
699
        if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
700
          self.__do_acquire(shared)
701
          acquired = True
702
          break
703

    
704
        # A lot of code assumes blocking acquires always succeed, therefore we
705
        # can never return False for a blocking acquire
706
        if (timeout is not None and
707
            utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)):
708
          break
709

    
710
        # Wait for notification
711
        wait_condition.wait(timeout)
712
        self.__check_deleted()
713
    finally:
714
      # Remove condition from queue if there are no more waiters
715
      if not wait_condition.has_waiting():
716
        prioqueue.remove(wait_condition)
717
        if wait_condition.shared:
718
          # Remove from list of shared acquires if it wasn't while releasing
719
          # (e.g. on lock deletion)
720
          self.__pending_shared.pop(priority, None)
721

    
722
    return acquired
723

    
724
  def acquire(self, shared=0, timeout=None, priority=None,
725
              test_notify=None):
726
    """Acquire a shared lock.
727

728
    @type shared: integer (0/1) used as a boolean
729
    @param shared: whether to acquire in shared mode; by default an
730
        exclusive lock will be acquired
731
    @type timeout: float
732
    @param timeout: maximum waiting time before giving up
733
    @type priority: integer
734
    @param priority: Priority for acquiring lock
735
    @type test_notify: callable or None
736
    @param test_notify: Special callback function for unittesting
737

738
    """
739
    if priority is None:
740
      priority = _DEFAULT_PRIORITY
741

    
742
    self.__lock.acquire()
743
    try:
744
      # We already got the lock, notify now
745
      if __debug__ and callable(test_notify):
746
        test_notify()
747

    
748
      return self.__acquire_unlocked(shared, timeout, priority)
749
    finally:
750
      self.__lock.release()
751

    
752
  def downgrade(self):
753
    """Changes the lock mode from exclusive to shared.
754

755
    Pending acquires in shared mode on the same priority will go ahead.
756

757
    """
758
    self.__lock.acquire()
759
    try:
760
      assert self.__is_owned(), "Lock must be owned"
761

    
762
      if self.__is_exclusive():
763
        # Do nothing if the lock is already acquired in shared mode
764
        self.__exc = None
765
        self.__do_acquire(1)
766

    
767
        # Important: pending shared acquires should only jump ahead if there
768
        # was a transition from exclusive to shared, otherwise an owner of a
769
        # shared lock can keep calling this function to push incoming shared
770
        # acquires
771
        (priority, prioqueue) = self.__find_first_pending_queue()
772
        if prioqueue:
773
          # Is there a pending shared acquire on this priority?
774
          cond = self.__pending_shared.pop(priority, None)
775
          if cond:
776
            assert cond.shared
777
            assert cond in prioqueue
778

    
779
            # Ensure shared acquire is on top of queue
780
            if len(prioqueue) > 1:
781
              prioqueue.remove(cond)
782
              prioqueue.insert(0, cond)
783

    
784
            # Notify
785
            cond.notifyAll()
786

    
787
      assert not self.__is_exclusive()
788
      assert self.__is_sharer()
789

    
790
      return True
791
    finally:
792
      self.__lock.release()
793

    
794
  def release(self):
795
    """Release a Shared Lock.
796

797
    You must have acquired the lock, either in shared or in exclusive mode,
798
    before calling this function.
799

800
    """
801
    self.__lock.acquire()
802
    try:
803
      assert self.__is_exclusive() or self.__is_sharer(), \
804
        "Cannot release non-owned lock"
805

    
806
      # Autodetect release type
807
      if self.__is_exclusive():
808
        self.__exc = None
809
        notify = True
810
      else:
811
        self.__shr.remove(threading.currentThread())
812
        notify = not self.__shr
813

    
814
      # Notify topmost condition in queue if there are no owners left (for
815
      # shared locks)
816
      if notify:
817
        self.__notify_topmost()
818
    finally:
819
      self.__lock.release()
820

    
821
  def __notify_topmost(self):
822
    """Notifies topmost condition in queue of pending acquires.
823

824
    """
825
    (priority, prioqueue) = self.__find_first_pending_queue()
826
    if prioqueue:
827
      cond = prioqueue[0]
828
      cond.notifyAll()
829
      if cond.shared:
830
        # Prevent further shared acquires from sneaking in while waiters are
831
        # notified
832
        self.__pending_shared.pop(priority, None)
833

    
834
  def _notify_topmost(self):
835
    """Exported version of L{__notify_topmost}.
836

837
    """
838
    self.__lock.acquire()
839
    try:
840
      return self.__notify_topmost()
841
    finally:
842
      self.__lock.release()
843

    
844
  def delete(self, timeout=None, priority=None):
845
    """Delete a Shared Lock.
846

847
    This operation will declare the lock for removal. First the lock will be
848
    acquired in exclusive mode if you don't already own it, then the lock
849
    will be put in a state where any future and pending acquire() fail.
850

851
    @type timeout: float
852
    @param timeout: maximum waiting time before giving up
853
    @type priority: integer
854
    @param priority: Priority for acquiring lock
855

856
    """
857
    if priority is None:
858
      priority = _DEFAULT_PRIORITY
859

    
860
    self.__lock.acquire()
861
    try:
862
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
863

    
864
      self.__check_deleted()
865

    
866
      # The caller is allowed to hold the lock exclusively already.
867
      acquired = self.__is_exclusive()
868

    
869
      if not acquired:
870
        acquired = self.__acquire_unlocked(0, timeout, priority)
871

    
872
      if acquired:
873
        assert self.__is_exclusive() and not self.__is_sharer(), \
874
          "Lock wasn't acquired in exclusive mode"
875

    
876
        self.__deleted = True
877
        self.__exc = None
878

    
879
        assert not (self.__exc or self.__shr), "Found owner during deletion"
880

    
881
        # Notify all acquires. They'll throw an error.
882
        for (_, prioqueue) in self.__pending:
883
          for cond in prioqueue:
884
            cond.notifyAll()
885

    
886
        assert self.__deleted
887

    
888
      return acquired
889
    finally:
890
      self.__lock.release()
891

    
892
  def _release_save(self):
893
    shared = self.__is_sharer()
894
    self.release()
895
    return shared
896

    
897
  def _acquire_restore(self, shared):
898
    self.acquire(shared=shared)
899

    
900

    
901
# Whenever we want to acquire a full LockSet we pass None as the value
902
# to acquire.  Hide this behind this nicely named constant.
903
ALL_SET = None
904

    
905

    
906
class _AcquireTimeout(Exception):
907
  """Internal exception to abort an acquire on a timeout.
908

909
  """
910

    
911

    
912
class LockSet:
913
  """Implements a set of locks.
914

915
  This abstraction implements a set of shared locks for the same resource type,
916
  distinguished by name. The user can lock a subset of the resources and the
917
  LockSet will take care of acquiring the locks always in the same order, thus
918
  preventing deadlock.
919

920
  All the locks needed in the same set must be acquired together, though.
921

922
  @type name: string
923
  @ivar name: the name of the lockset
924

925
  """
926
  def __init__(self, members, name, monitor=None):
927
    """Constructs a new LockSet.
928

929
    @type members: list of strings
930
    @param members: initial members of the set
931
    @type monitor: L{LockMonitor}
932
    @param monitor: Lock monitor with which to register member locks
933

934
    """
935
    assert members is not None, "members parameter is not a list"
936
    self.name = name
937

    
938
    # Lock monitor
939
    self.__monitor = monitor
940

    
941
    # Used internally to guarantee coherency
942
    self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
943

    
944
    # The lockdict indexes the relationship name -> lock
945
    # The order-of-locking is implied by the alphabetical order of names
946
    self.__lockdict = {}
947

    
948
    for mname in members:
949
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
950
                                          monitor=monitor)
951

    
952
    # The owner dict contains the set of locks each thread owns. For
953
    # performance each thread can access its own key without a global lock on
954
    # this structure. It is paramount though that *no* other type of access is
955
    # done to this structure (eg. no looping over its keys). *_owner helper
956
    # function are defined to guarantee access is correct, but in general never
957
    # do anything different than __owners[threading.currentThread()], or there
958
    # will be trouble.
959
    self.__owners = {}
960

    
961
  def _GetLockName(self, mname):
962
    """Returns the name for a member lock.
963

964
    """
965
    return "%s/%s" % (self.name, mname)
966

    
967
  def _get_lock(self):
968
    """Returns the lockset-internal lock.
969

970
    """
971
    return self.__lock
972

    
973
  def _get_lockdict(self):
974
    """Returns the lockset-internal lock dictionary.
975

976
    Accessing this structure is only safe in single-thread usage or when the
977
    lockset-internal lock is held.
978

979
    """
980
    return self.__lockdict
981

    
982
  def is_owned(self):
983
    """Is the current thread a current level owner?
984

985
    @note: Use L{check_owned} to check if a specific lock is held
986

987
    """
988
    return threading.currentThread() in self.__owners
989

    
990
  def check_owned(self, names, shared=-1):
991
    """Check if locks are owned in a specific mode.
992

993
    @type names: sequence or string
994
    @param names: Lock names (or a single lock name)
995
    @param shared: See L{SharedLock.is_owned}
996
    @rtype: bool
997
    @note: Use L{is_owned} to check if the current thread holds I{any} lock and
998
      L{list_owned} to get the names of all owned locks
999

1000
    """
1001
    if isinstance(names, basestring):
1002
      names = [names]
1003

    
1004
    # Avoid check if no locks are owned anyway
1005
    if names and self.is_owned():
1006
      candidates = []
1007

    
1008
      # Gather references to all locks (in case they're deleted in the meantime)
1009
      for lname in names:
1010
        try:
1011
          lock = self.__lockdict[lname]
1012
        except KeyError:
1013
          raise errors.LockError("Non-existing lock '%s' in set '%s' (it may"
1014
                                 " have been removed)" % (lname, self.name))
1015
        else:
1016
          candidates.append(lock)
1017

    
1018
      return compat.all(lock.is_owned(shared=shared) for lock in candidates)
1019
    else:
1020
      return False
1021

    
1022
  def _add_owned(self, name=None):
1023
    """Note the current thread owns the given lock"""
1024
    if name is None:
1025
      if not self.is_owned():
1026
        self.__owners[threading.currentThread()] = set()
1027
    else:
1028
      if self.is_owned():
1029
        self.__owners[threading.currentThread()].add(name)
1030
      else:
1031
        self.__owners[threading.currentThread()] = set([name])
1032

    
1033
  def _del_owned(self, name=None):
1034
    """Note the current thread owns the given lock"""
1035

    
1036
    assert not (name is None and self.__lock.is_owned()), \
1037
           "Cannot hold internal lock when deleting owner status"
1038

    
1039
    if name is not None:
1040
      self.__owners[threading.currentThread()].remove(name)
1041

    
1042
    # Only remove the key if we don't hold the set-lock as well
1043
    if (not self.__lock.is_owned() and
1044
        not self.__owners[threading.currentThread()]):
1045
      del self.__owners[threading.currentThread()]
1046

    
1047
  def list_owned(self):
1048
    """Get the set of resource names owned by the current thread"""
1049
    if self.is_owned():
1050
      return self.__owners[threading.currentThread()].copy()
1051
    else:
1052
      return set()
1053

    
1054
  def _release_and_delete_owned(self):
1055
    """Release and delete all resources owned by the current thread"""
1056
    for lname in self.list_owned():
1057
      lock = self.__lockdict[lname]
1058
      if lock.is_owned():
1059
        lock.release()
1060
      self._del_owned(name=lname)
1061

    
1062
  def __names(self):
1063
    """Return the current set of names.
1064

1065
    Only call this function while holding __lock and don't iterate on the
1066
    result after releasing the lock.
1067

1068
    """
1069
    return self.__lockdict.keys()
1070

    
1071
  def _names(self):
1072
    """Return a copy of the current set of elements.
1073

1074
    Used only for debugging purposes.
1075

1076
    """
1077
    # If we don't already own the set-level lock acquired
1078
    # we'll get it and note we need to release it later.
1079
    release_lock = False
1080
    if not self.__lock.is_owned():
1081
      release_lock = True
1082
      self.__lock.acquire(shared=1)
1083
    try:
1084
      result = self.__names()
1085
    finally:
1086
      if release_lock:
1087
        self.__lock.release()
1088
    return set(result)
1089

    
1090
  def acquire(self, names, timeout=None, shared=0, priority=None,
1091
              test_notify=None):
1092
    """Acquire a set of resource locks.
1093

1094
    @type names: list of strings (or string)
1095
    @param names: the names of the locks which shall be acquired
1096
        (special lock names, or instance/node names)
1097
    @type shared: integer (0/1) used as a boolean
1098
    @param shared: whether to acquire in shared mode; by default an
1099
        exclusive lock will be acquired
1100
    @type timeout: float or None
1101
    @param timeout: Maximum time to acquire all locks
1102
    @type priority: integer
1103
    @param priority: Priority for acquiring locks
1104
    @type test_notify: callable or None
1105
    @param test_notify: Special callback function for unittesting
1106

1107
    @return: Set of all locks successfully acquired or None in case of timeout
1108

1109
    @raise errors.LockError: when any lock we try to acquire has
1110
        been deleted before we succeed. In this case none of the
1111
        locks requested will be acquired.
1112

1113
    """
1114
    assert timeout is None or timeout >= 0.0
1115

    
1116
    # Check we don't already own locks at this level
1117
    assert not self.is_owned(), ("Cannot acquire locks in the same set twice"
1118
                                 " (lockset %s)" % self.name)
1119

    
1120
    if priority is None:
1121
      priority = _DEFAULT_PRIORITY
1122

    
1123
    # We need to keep track of how long we spent waiting for a lock. The
1124
    # timeout passed to this function is over all lock acquires.
1125
    running_timeout = utils.RunningTimeout(timeout, False)
1126

    
1127
    try:
1128
      if names is not None:
1129
        # Support passing in a single resource to acquire rather than many
1130
        if isinstance(names, basestring):
1131
          names = [names]
1132

    
1133
        return self.__acquire_inner(names, False, shared, priority,
1134
                                    running_timeout.Remaining, test_notify)
1135

    
1136
      else:
1137
        # If no names are given acquire the whole set by not letting new names
1138
        # being added before we release, and getting the current list of names.
1139
        # Some of them may then be deleted later, but we'll cope with this.
1140
        #
1141
        # We'd like to acquire this lock in a shared way, as it's nice if
1142
        # everybody else can use the instances at the same time. If we are
1143
        # acquiring them exclusively though they won't be able to do this
1144
        # anyway, though, so we'll get the list lock exclusively as well in
1145
        # order to be able to do add() on the set while owning it.
1146
        if not self.__lock.acquire(shared=shared, priority=priority,
1147
                                   timeout=running_timeout.Remaining()):
1148
          raise _AcquireTimeout()
1149
        try:
1150
          # note we own the set-lock
1151
          self._add_owned()
1152

    
1153
          return self.__acquire_inner(self.__names(), True, shared, priority,
1154
                                      running_timeout.Remaining, test_notify)
1155
        except:
1156
          # We shouldn't have problems adding the lock to the owners list, but
1157
          # if we did we'll try to release this lock and re-raise exception.
1158
          # Of course something is going to be really wrong, after this.
1159
          self.__lock.release()
1160
          self._del_owned()
1161
          raise
1162

    
1163
    except _AcquireTimeout:
1164
      return None
1165

    
1166
  def __acquire_inner(self, names, want_all, shared, priority,
1167
                      timeout_fn, test_notify):
1168
    """Inner logic for acquiring a number of locks.
1169

1170
    @param names: Names of the locks to be acquired
1171
    @param want_all: Whether all locks in the set should be acquired
1172
    @param shared: Whether to acquire in shared mode
1173
    @param timeout_fn: Function returning remaining timeout
1174
    @param priority: Priority for acquiring locks
1175
    @param test_notify: Special callback function for unittesting
1176

1177
    """
1178
    acquire_list = []
1179

    
1180
    # First we look the locks up on __lockdict. We have no way of being sure
1181
    # they will still be there after, but this makes it a lot faster should
1182
    # just one of them be the already wrong. Using a sorted sequence to prevent
1183
    # deadlocks.
1184
    for lname in sorted(utils.UniqueSequence(names)):
1185
      try:
1186
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
1187
      except KeyError:
1188
        if want_all:
1189
          # We are acquiring all the set, it doesn't matter if this particular
1190
          # element is not there anymore.
1191
          continue
1192

    
1193
        raise errors.LockError("Non-existing lock %s in set %s (it may have"
1194
                               " been removed)" % (lname, self.name))
1195

    
1196
      acquire_list.append((lname, lock))
1197

    
1198
    # This will hold the locknames we effectively acquired.
1199
    acquired = set()
1200

    
1201
    try:
1202
      # Now acquire_list contains a sorted list of resources and locks we
1203
      # want.  In order to get them we loop on this (private) list and
1204
      # acquire() them.  We gave no real guarantee they will still exist till
1205
      # this is done but .acquire() itself is safe and will alert us if the
1206
      # lock gets deleted.
1207
      for (lname, lock) in acquire_list:
1208
        if __debug__ and callable(test_notify):
1209
          test_notify_fn = lambda: test_notify(lname)
1210
        else:
1211
          test_notify_fn = None
1212

    
1213
        timeout = timeout_fn()
1214

    
1215
        try:
1216
          # raises LockError if the lock was deleted
1217
          acq_success = lock.acquire(shared=shared, timeout=timeout,
1218
                                     priority=priority,
1219
                                     test_notify=test_notify_fn)
1220
        except errors.LockError:
1221
          if want_all:
1222
            # We are acquiring all the set, it doesn't matter if this
1223
            # particular element is not there anymore.
1224
            continue
1225

    
1226
          raise errors.LockError("Non-existing lock %s in set %s (it may"
1227
                                 " have been removed)" % (lname, self.name))
1228

    
1229
        if not acq_success:
1230
          # Couldn't get lock or timeout occurred
1231
          if timeout is None:
1232
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
1233
            # blocking.
1234
            raise errors.LockError("Failed to get lock %s (set %s)" %
1235
                                   (lname, self.name))
1236

    
1237
          raise _AcquireTimeout()
1238

    
1239
        try:
1240
          # now the lock cannot be deleted, we have it!
1241
          self._add_owned(name=lname)
1242
          acquired.add(lname)
1243

    
1244
        except:
1245
          # We shouldn't have problems adding the lock to the owners list, but
1246
          # if we did we'll try to release this lock and re-raise exception.
1247
          # Of course something is going to be really wrong after this.
1248
          if lock.is_owned():
1249
            lock.release()
1250
          raise
1251

    
1252
    except:
1253
      # Release all owned locks
1254
      self._release_and_delete_owned()
1255
      raise
1256

    
1257
    return acquired
1258

    
1259
  def downgrade(self, names=None):
1260
    """Downgrade a set of resource locks from exclusive to shared mode.
1261

1262
    The locks must have been acquired in exclusive mode.
1263

1264
    """
1265
    assert self.is_owned(), ("downgrade on lockset %s while not owning any"
1266
                             " lock" % self.name)
1267

    
1268
    # Support passing in a single resource to downgrade rather than many
1269
    if isinstance(names, basestring):
1270
      names = [names]
1271

    
1272
    owned = self.list_owned()
1273

    
1274
    if names is None:
1275
      names = owned
1276
    else:
1277
      names = set(names)
1278
      assert owned.issuperset(names), \
1279
        ("downgrade() on unheld resources %s (set %s)" %
1280
         (names.difference(owned), self.name))
1281

    
1282
    for lockname in names:
1283
      self.__lockdict[lockname].downgrade()
1284

    
1285
    # Do we own the lockset in exclusive mode?
1286
    if self.__lock.is_owned(shared=0):
1287
      # Have all locks been downgraded?
1288
      if not compat.any(lock.is_owned(shared=0)
1289
                        for lock in self.__lockdict.values()):
1290
        self.__lock.downgrade()
1291
        assert self.__lock.is_owned(shared=1)
1292

    
1293
    return True
1294

    
1295
  def release(self, names=None):
1296
    """Release a set of resource locks, at the same level.
1297

1298
    You must have acquired the locks, either in shared or in exclusive mode,
1299
    before releasing them.
1300

1301
    @type names: list of strings, or None
1302
    @param names: the names of the locks which shall be released
1303
        (defaults to all the locks acquired at that level).
1304

1305
    """
1306
    assert self.is_owned(), ("release() on lock set %s while not owner" %
1307
                             self.name)
1308

    
1309
    # Support passing in a single resource to release rather than many
1310
    if isinstance(names, basestring):
1311
      names = [names]
1312

    
1313
    if names is None:
1314
      names = self.list_owned()
1315
    else:
1316
      names = set(names)
1317
      assert self.list_owned().issuperset(names), (
1318
               "release() on unheld resources %s (set %s)" %
1319
               (names.difference(self.list_owned()), self.name))
1320

    
1321
    # First of all let's release the "all elements" lock, if set.
1322
    # After this 'add' can work again
1323
    if self.__lock.is_owned():
1324
      self.__lock.release()
1325
      self._del_owned()
1326

    
1327
    for lockname in names:
1328
      # If we are sure the lock doesn't leave __lockdict without being
1329
      # exclusively held we can do this...
1330
      self.__lockdict[lockname].release()
1331
      self._del_owned(name=lockname)
1332

    
1333
  def add(self, names, acquired=0, shared=0):
1334
    """Add a new set of elements to the set
1335

1336
    @type names: list of strings
1337
    @param names: names of the new elements to add
1338
    @type acquired: integer (0/1) used as a boolean
1339
    @param acquired: pre-acquire the new resource?
1340
    @type shared: integer (0/1) used as a boolean
1341
    @param shared: is the pre-acquisition shared?
1342

1343
    """
1344
    # Check we don't already own locks at this level
1345
    assert not self.is_owned() or self.__lock.is_owned(shared=0), \
1346
      ("Cannot add locks if the set %s is only partially owned, or shared" %
1347
       self.name)
1348

    
1349
    # Support passing in a single resource to add rather than many
1350
    if isinstance(names, basestring):
1351
      names = [names]
1352

    
1353
    # If we don't already own the set-level lock acquired in an exclusive way
1354
    # we'll get it and note we need to release it later.
1355
    release_lock = False
1356
    if not self.__lock.is_owned():
1357
      release_lock = True
1358
      self.__lock.acquire()
1359

    
1360
    try:
1361
      invalid_names = set(self.__names()).intersection(names)
1362
      if invalid_names:
1363
        # This must be an explicit raise, not an assert, because assert is
1364
        # turned off when using optimization, and this can happen because of
1365
        # concurrency even if the user doesn't want it.
1366
        raise errors.LockError("duplicate add(%s) on lockset %s" %
1367
                               (invalid_names, self.name))
1368

    
1369
      for lockname in names:
1370
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1371

    
1372
        if acquired:
1373
          # No need for priority or timeout here as this lock has just been
1374
          # created
1375
          lock.acquire(shared=shared)
1376
          # now the lock cannot be deleted, we have it!
1377
          try:
1378
            self._add_owned(name=lockname)
1379
          except:
1380
            # We shouldn't have problems adding the lock to the owners list,
1381
            # but if we did we'll try to release this lock and re-raise
1382
            # exception.  Of course something is going to be really wrong,
1383
            # after this.  On the other hand the lock hasn't been added to the
1384
            # __lockdict yet so no other threads should be pending on it. This
1385
            # release is just a safety measure.
1386
            lock.release()
1387
            raise
1388

    
1389
        self.__lockdict[lockname] = lock
1390

    
1391
    finally:
1392
      # Only release __lock if we were not holding it previously.
1393
      if release_lock:
1394
        self.__lock.release()
1395

    
1396
    return True
1397

    
1398
  def remove(self, names):
1399
    """Remove elements from the lock set.
1400

1401
    You can either not hold anything in the lockset or already hold a superset
1402
    of the elements you want to delete, exclusively.
1403

1404
    @type names: list of strings
1405
    @param names: names of the resource to remove.
1406

1407
    @return: a list of locks which we removed; the list is always
1408
        equal to the names list if we were holding all the locks
1409
        exclusively
1410

1411
    """
1412
    # Support passing in a single resource to remove rather than many
1413
    if isinstance(names, basestring):
1414
      names = [names]
1415

    
1416
    # If we own any subset of this lock it must be a superset of what we want
1417
    # to delete. The ownership must also be exclusive, but that will be checked
1418
    # by the lock itself.
1419
    assert not self.is_owned() or self.list_owned().issuperset(names), (
1420
      "remove() on acquired lockset %s while not owning all elements" %
1421
      self.name)
1422

    
1423
    removed = []
1424

    
1425
    for lname in names:
1426
      # Calling delete() acquires the lock exclusively if we don't already own
1427
      # it, and causes all pending and subsequent lock acquires to fail. It's
1428
      # fine to call it out of order because delete() also implies release(),
1429
      # and the assertion above guarantees that if we either already hold
1430
      # everything we want to delete, or we hold none.
1431
      try:
1432
        self.__lockdict[lname].delete()
1433
        removed.append(lname)
1434
      except (KeyError, errors.LockError):
1435
        # This cannot happen if we were already holding it, verify:
1436
        assert not self.is_owned(), ("remove failed while holding lockset %s" %
1437
                                     self.name)
1438
      else:
1439
        # If no LockError was raised we are the ones who deleted the lock.
1440
        # This means we can safely remove it from lockdict, as any further or
1441
        # pending delete() or acquire() will fail (and nobody can have the lock
1442
        # since before our call to delete()).
1443
        #
1444
        # This is done in an else clause because if the exception was thrown
1445
        # it's the job of the one who actually deleted it.
1446
        del self.__lockdict[lname]
1447
        # And let's remove it from our private list if we owned it.
1448
        if self.is_owned():
1449
          self._del_owned(name=lname)
1450

    
1451
    return removed
1452

    
1453

    
1454
# Locking levels, must be acquired in increasing order.
1455
# Current rules are:
1456
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1457
#   acquired before performing any operation, either in shared or in exclusive
1458
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1459
#   avoided.
1460
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1461
#   If you need more than one node, or more than one instance, acquire them at
1462
#   the same time.
1463
LEVEL_CLUSTER = 0
1464
LEVEL_INSTANCE = 1
1465
LEVEL_NODEGROUP = 2
1466
LEVEL_NODE = 3
1467
#: Level for node resources, used for operations with possibly high impact on
1468
#: the node's disks.
1469
LEVEL_NODE_RES = 4
1470

    
1471
LEVELS = [
1472
  LEVEL_CLUSTER,
1473
  LEVEL_INSTANCE,
1474
  LEVEL_NODEGROUP,
1475
  LEVEL_NODE,
1476
  LEVEL_NODE_RES,
1477
  ]
1478

    
1479
# Lock levels which are modifiable
1480
LEVELS_MOD = frozenset([
1481
  LEVEL_NODE_RES,
1482
  LEVEL_NODE,
1483
  LEVEL_NODEGROUP,
1484
  LEVEL_INSTANCE,
1485
  ])
1486

    
1487
#: Lock level names (make sure to use singular form)
1488
LEVEL_NAMES = {
1489
  LEVEL_CLUSTER: "cluster",
1490
  LEVEL_INSTANCE: "instance",
1491
  LEVEL_NODEGROUP: "nodegroup",
1492
  LEVEL_NODE: "node",
1493
  LEVEL_NODE_RES: "node-res",
1494
  }
1495

    
1496
# Constant for the big ganeti lock
1497
BGL = "BGL"
1498

    
1499

    
1500
class GanetiLockManager:
1501
  """The Ganeti Locking Library
1502

1503
  The purpose of this small library is to manage locking for ganeti clusters
1504
  in a central place, while at the same time doing dynamic checks against
1505
  possible deadlocks. It will also make it easier to transition to a different
1506
  lock type should we migrate away from python threads.
1507

1508
  """
1509
  _instance = None
1510

    
1511
  def __init__(self, nodes, nodegroups, instances):
1512
    """Constructs a new GanetiLockManager object.
1513

1514
    There should be only a GanetiLockManager object at any time, so this
1515
    function raises an error if this is not the case.
1516

1517
    @param nodes: list of node names
1518
    @param nodegroups: list of nodegroup uuids
1519
    @param instances: list of instance names
1520

1521
    """
1522
    assert self.__class__._instance is None, \
1523
           "double GanetiLockManager instance"
1524

    
1525
    self.__class__._instance = self
1526

    
1527
    self._monitor = LockMonitor()
1528

    
1529
    # The keyring contains all the locks, at their level and in the correct
1530
    # locking order.
1531
    self.__keyring = {
1532
      LEVEL_CLUSTER: LockSet([BGL], "cluster", monitor=self._monitor),
1533
      LEVEL_NODE: LockSet(nodes, "node", monitor=self._monitor),
1534
      LEVEL_NODE_RES: LockSet(nodes, "node-res", monitor=self._monitor),
1535
      LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroup", monitor=self._monitor),
1536
      LEVEL_INSTANCE: LockSet(instances, "instance",
1537
                              monitor=self._monitor),
1538
      }
1539

    
1540
    assert compat.all(ls.name == LEVEL_NAMES[level]
1541
                      for (level, ls) in self.__keyring.items())
1542

    
1543
  def AddToLockMonitor(self, provider):
1544
    """Registers a new lock with the monitor.
1545

1546
    See L{LockMonitor.RegisterLock}.
1547

1548
    """
1549
    return self._monitor.RegisterLock(provider)
1550

    
1551
  def QueryLocks(self, fields):
1552
    """Queries information from all locks.
1553

1554
    See L{LockMonitor.QueryLocks}.
1555

1556
    """
1557
    return self._monitor.QueryLocks(fields)
1558

    
1559
  def _names(self, level):
1560
    """List the lock names at the given level.
1561

1562
    This can be used for debugging/testing purposes.
1563

1564
    @param level: the level whose list of locks to get
1565

1566
    """
1567
    assert level in LEVELS, "Invalid locking level %s" % level
1568
    return self.__keyring[level]._names()
1569

    
1570
  def is_owned(self, level):
1571
    """Check whether we are owning locks at the given level
1572

1573
    """
1574
    return self.__keyring[level].is_owned()
1575

    
1576
  def list_owned(self, level):
1577
    """Get the set of owned locks at the given level
1578

1579
    """
1580
    return self.__keyring[level].list_owned()
1581

    
1582
  def check_owned(self, level, names, shared=-1):
1583
    """Check if locks at a certain level are owned in a specific mode.
1584

1585
    @see: L{LockSet.check_owned}
1586

1587
    """
1588
    return self.__keyring[level].check_owned(names, shared=shared)
1589

    
1590
  def _upper_owned(self, level):
1591
    """Check that we don't own any lock at a level greater than the given one.
1592

1593
    """
1594
    # This way of checking only works if LEVELS[i] = i, which we check for in
1595
    # the test cases.
1596
    return compat.any((self.is_owned(l) for l in LEVELS[level + 1:]))
1597

    
1598
  def _BGL_owned(self): # pylint: disable=C0103
1599
    """Check if the current thread owns the BGL.
1600

1601
    Both an exclusive or a shared acquisition work.
1602

1603
    """
1604
    return BGL in self.__keyring[LEVEL_CLUSTER].list_owned()
1605

    
1606
  @staticmethod
1607
  def _contains_BGL(level, names): # pylint: disable=C0103
1608
    """Check if the level contains the BGL.
1609

1610
    Check if acting on the given level and set of names will change
1611
    the status of the Big Ganeti Lock.
1612

1613
    """
1614
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1615

    
1616
  def acquire(self, level, names, timeout=None, shared=0, priority=None):
1617
    """Acquire a set of resource locks, at the same level.
1618

1619
    @type level: member of locking.LEVELS
1620
    @param level: the level at which the locks shall be acquired
1621
    @type names: list of strings (or string)
1622
    @param names: the names of the locks which shall be acquired
1623
        (special lock names, or instance/node names)
1624
    @type shared: integer (0/1) used as a boolean
1625
    @param shared: whether to acquire in shared mode; by default
1626
        an exclusive lock will be acquired
1627
    @type timeout: float
1628
    @param timeout: Maximum time to acquire all locks
1629
    @type priority: integer
1630
    @param priority: Priority for acquiring lock
1631

1632
    """
1633
    assert level in LEVELS, "Invalid locking level %s" % level
1634

    
1635
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1636
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1637
    # so even if we've migrated we need to at least share the BGL to be
1638
    # compatible with them. Of course if we own the BGL exclusively there's no
1639
    # point in acquiring any other lock, unless perhaps we are half way through
1640
    # the migration of the current opcode.
1641
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1642
      "You must own the Big Ganeti Lock before acquiring any other")
1643

    
1644
    # Check we don't own locks at the same or upper levels.
1645
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1646
                                          " while owning some at a greater one")
1647

    
1648
    # Acquire the locks in the set.
1649
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1650
                                         priority=priority)
1651

    
1652
  def downgrade(self, level, names=None):
1653
    """Downgrade a set of resource locks from exclusive to shared mode.
1654

1655
    You must have acquired the locks in exclusive mode.
1656

1657
    @type level: member of locking.LEVELS
1658
    @param level: the level at which the locks shall be downgraded
1659
    @type names: list of strings, or None
1660
    @param names: the names of the locks which shall be downgraded
1661
        (defaults to all the locks acquired at the level)
1662

1663
    """
1664
    assert level in LEVELS, "Invalid locking level %s" % level
1665

    
1666
    return self.__keyring[level].downgrade(names=names)
1667

    
1668
  def release(self, level, names=None):
1669
    """Release a set of resource locks, at the same level.
1670

1671
    You must have acquired the locks, either in shared or in exclusive
1672
    mode, before releasing them.
1673

1674
    @type level: member of locking.LEVELS
1675
    @param level: the level at which the locks shall be released
1676
    @type names: list of strings, or None
1677
    @param names: the names of the locks which shall be released
1678
        (defaults to all the locks acquired at that level)
1679

1680
    """
1681
    assert level in LEVELS, "Invalid locking level %s" % level
1682
    assert (not self._contains_BGL(level, names) or
1683
            not self._upper_owned(LEVEL_CLUSTER)), (
1684
              "Cannot release the Big Ganeti Lock while holding something"
1685
              " at upper levels (%r)" %
1686
              (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self.list_owned(i))
1687
                                for i in self.__keyring.keys()]), ))
1688

    
1689
    # Release will complain if we don't own the locks already
1690
    return self.__keyring[level].release(names)
1691

    
1692
  def add(self, level, names, acquired=0, shared=0):
1693
    """Add locks at the specified level.
1694

1695
    @type level: member of locking.LEVELS_MOD
1696
    @param level: the level at which the locks shall be added
1697
    @type names: list of strings
1698
    @param names: names of the locks to acquire
1699
    @type acquired: integer (0/1) used as a boolean
1700
    @param acquired: whether to acquire the newly added locks
1701
    @type shared: integer (0/1) used as a boolean
1702
    @param shared: whether the acquisition will be shared
1703

1704
    """
1705
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1706
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1707
                               " operations")
1708
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1709
                                          " while owning some at a greater one")
1710
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1711

    
1712
  def remove(self, level, names):
1713
    """Remove locks from the specified level.
1714

1715
    You must either already own the locks you are trying to remove
1716
    exclusively or not own any lock at an upper level.
1717

1718
    @type level: member of locking.LEVELS_MOD
1719
    @param level: the level at which the locks shall be removed
1720
    @type names: list of strings
1721
    @param names: the names of the locks which shall be removed
1722
        (special lock names, or instance/node names)
1723

1724
    """
1725
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1726
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1727
                               " operations")
1728
    # Check we either own the level or don't own anything from here
1729
    # up. LockSet.remove() will check the case in which we don't own
1730
    # all the needed resources, or we have a shared ownership.
1731
    assert self.is_owned(level) or not self._upper_owned(level), (
1732
           "Cannot remove locks at a level while not owning it or"
1733
           " owning some at a greater one")
1734
    return self.__keyring[level].remove(names)
1735

    
1736

    
1737
def _MonitorSortKey((item, idx, num)):
1738
  """Sorting key function.
1739

1740
  Sort by name, registration order and then order of information. This provides
1741
  a stable sort order over different providers, even if they return the same
1742
  name.
1743

1744
  """
1745
  (name, _, _, _) = item
1746

    
1747
  return (utils.NiceSortKey(name), num, idx)
1748

    
1749

    
1750
class LockMonitor(object):
1751
  _LOCK_ATTR = "_lock"
1752

    
1753
  def __init__(self):
1754
    """Initializes this class.
1755

1756
    """
1757
    self._lock = SharedLock("LockMonitor")
1758

    
1759
    # Counter for stable sorting
1760
    self._counter = itertools.count(0)
1761

    
1762
    # Tracked locks. Weak references are used to avoid issues with circular
1763
    # references and deletion.
1764
    self._locks = weakref.WeakKeyDictionary()
1765

    
1766
  @ssynchronized(_LOCK_ATTR)
1767
  def RegisterLock(self, provider):
1768
    """Registers a new lock.
1769

1770
    @param provider: Object with a callable method named C{GetLockInfo}, taking
1771
      a single C{set} containing the requested information items
1772
    @note: It would be nicer to only receive the function generating the
1773
      requested information but, as it turns out, weak references to bound
1774
      methods (e.g. C{self.GetLockInfo}) are tricky; there are several
1775
      workarounds, but none of the ones I found works properly in combination
1776
      with a standard C{WeakKeyDictionary}
1777

1778
    """
1779
    assert provider not in self._locks, "Duplicate registration"
1780

    
1781
    # There used to be a check for duplicate names here. As it turned out, when
1782
    # a lock is re-created with the same name in a very short timeframe, the
1783
    # previous instance might not yet be removed from the weakref dictionary.
1784
    # By keeping track of the order of incoming registrations, a stable sort
1785
    # ordering can still be guaranteed.
1786

    
1787
    self._locks[provider] = self._counter.next()
1788

    
1789
  def _GetLockInfo(self, requested):
1790
    """Get information from all locks.
1791

1792
    """
1793
    # Must hold lock while getting consistent list of tracked items
1794
    self._lock.acquire(shared=1)
1795
    try:
1796
      items = self._locks.items()
1797
    finally:
1798
      self._lock.release()
1799

    
1800
    return [(info, idx, num)
1801
            for (provider, num) in items
1802
            for (idx, info) in enumerate(provider.GetLockInfo(requested))]
1803

    
1804
  def _Query(self, fields):
1805
    """Queries information from all locks.
1806

1807
    @type fields: list of strings
1808
    @param fields: List of fields to return
1809

1810
    """
1811
    qobj = query.Query(query.LOCK_FIELDS, fields)
1812

    
1813
    # Get all data with internal lock held and then sort by name and incoming
1814
    # order
1815
    lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
1816
                      key=_MonitorSortKey)
1817

    
1818
    # Extract lock information and build query data
1819
    return (qobj, query.LockQueryData(map(compat.fst, lockinfo)))
1820

    
1821
  def QueryLocks(self, fields):
1822
    """Queries information from all locks.
1823

1824
    @type fields: list of strings
1825
    @param fields: List of fields to return
1826

1827
    """
1828
    (qobj, ctx) = self._Query(fields)
1829

    
1830
    # Prepare query response
1831
    return query.GetQueryResponse(qobj, ctx)