Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ 70567db0

History | View | Annotate | Download (54.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import errno
32
import weakref
33
import logging
34
import heapq
35
import itertools
36
import time
37

    
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import compat
41
from ganeti import query
42

    
43

    
44
_EXCLUSIVE_TEXT = "exclusive"
45
_SHARED_TEXT = "shared"
46
_DELETED_TEXT = "deleted"
47

    
48
_DEFAULT_PRIORITY = 0
49

    
50

    
51
def ssynchronized(mylock, shared=0):
52
  """Shared Synchronization decorator.
53

54
  Calls the function holding the given lock, either in exclusive or shared
55
  mode. It requires the passed lock to be a SharedLock (or support its
56
  semantics).
57

58
  @type mylock: lockable object or string
59
  @param mylock: lock to acquire or class member name of the lock to acquire
60

61
  """
62
  def wrap(fn):
63
    def sync_function(*args, **kwargs):
64
      if isinstance(mylock, basestring):
65
        assert args, "cannot ssynchronize on non-class method: self not found"
66
        # args[0] is "self"
67
        lock = getattr(args[0], mylock)
68
      else:
69
        lock = mylock
70
      lock.acquire(shared=shared)
71
      try:
72
        return fn(*args, **kwargs)
73
      finally:
74
        lock.release()
75
    return sync_function
76
  return wrap
77

    
78

    
79
class _SingleNotifyPipeConditionWaiter(object):
80
  """Helper class for SingleNotifyPipeCondition
81

82
  """
83
  __slots__ = [
84
    "_fd",
85
    "_poller",
86
    ]
87

    
88
  def __init__(self, poller, fd):
89
    """Constructor for _SingleNotifyPipeConditionWaiter
90

91
    @type poller: select.poll
92
    @param poller: Poller object
93
    @type fd: int
94
    @param fd: File descriptor to wait for
95

96
    """
97
    object.__init__(self)
98
    self._poller = poller
99
    self._fd = fd
100

    
101
  def __call__(self, timeout):
102
    """Wait for something to happen on the pipe.
103

104
    @type timeout: float or None
105
    @param timeout: Timeout for waiting (can be None)
106

107
    """
108
    running_timeout = utils.RunningTimeout(timeout, True)
109

    
110
    while True:
111
      remaining_time = running_timeout.Remaining()
112

    
113
      if remaining_time is not None:
114
        if remaining_time < 0.0:
115
          break
116

    
117
        # Our calculation uses seconds, poll() wants milliseconds
118
        remaining_time *= 1000
119

    
120
      try:
121
        result = self._poller.poll(remaining_time)
122
      except EnvironmentError, err:
123
        if err.errno != errno.EINTR:
124
          raise
125
        result = None
126

    
127
      # Check whether we were notified
128
      if result and result[0][0] == self._fd:
129
        break
130

    
131

    
132
class _BaseCondition(object):
133
  """Base class containing common code for conditions.
134

135
  Some of this code is taken from python's threading module.
136

137
  """
138
  __slots__ = [
139
    "_lock",
140
    "acquire",
141
    "release",
142
    "_is_owned",
143
    "_acquire_restore",
144
    "_release_save",
145
    ]
146

    
147
  def __init__(self, lock):
148
    """Constructor for _BaseCondition.
149

150
    @type lock: threading.Lock
151
    @param lock: condition base lock
152

153
    """
154
    object.__init__(self)
155

    
156
    try:
157
      self._release_save = lock._release_save
158
    except AttributeError:
159
      self._release_save = self._base_release_save
160
    try:
161
      self._acquire_restore = lock._acquire_restore
162
    except AttributeError:
163
      self._acquire_restore = self._base_acquire_restore
164
    try:
165
      self._is_owned = lock._is_owned
166
    except AttributeError:
167
      self._is_owned = self._base_is_owned
168

    
169
    self._lock = lock
170

    
171
    # Export the lock's acquire() and release() methods
172
    self.acquire = lock.acquire
173
    self.release = lock.release
174

    
175
  def _base_is_owned(self):
176
    """Check whether lock is owned by current thread.
177

178
    """
179
    if self._lock.acquire(0):
180
      self._lock.release()
181
      return False
182
    return True
183

    
184
  def _base_release_save(self):
185
    self._lock.release()
186

    
187
  def _base_acquire_restore(self, _):
188
    self._lock.acquire()
189

    
190
  def _check_owned(self):
191
    """Raise an exception if the current thread doesn't own the lock.
192

193
    """
194
    if not self._is_owned():
195
      raise RuntimeError("cannot work with un-aquired lock")
196

    
197

    
198
class SingleNotifyPipeCondition(_BaseCondition):
199
  """Condition which can only be notified once.
200

201
  This condition class uses pipes and poll, internally, to be able to wait for
202
  notification with a timeout, without resorting to polling. It is almost
203
  compatible with Python's threading.Condition, with the following differences:
204
    - notifyAll can only be called once, and no wait can happen after that
205
    - notify is not supported, only notifyAll
206

207
  """
208

    
209
  __slots__ = [
210
    "_poller",
211
    "_read_fd",
212
    "_write_fd",
213
    "_nwaiters",
214
    "_notified",
215
    ]
216

    
217
  _waiter_class = _SingleNotifyPipeConditionWaiter
218

    
219
  def __init__(self, lock):
220
    """Constructor for SingleNotifyPipeCondition
221

222
    """
223
    _BaseCondition.__init__(self, lock)
224
    self._nwaiters = 0
225
    self._notified = False
226
    self._read_fd = None
227
    self._write_fd = None
228
    self._poller = None
229

    
230
  def _check_unnotified(self):
231
    """Throws an exception if already notified.
232

233
    """
234
    if self._notified:
235
      raise RuntimeError("cannot use already notified condition")
236

    
237
  def _Cleanup(self):
238
    """Cleanup open file descriptors, if any.
239

240
    """
241
    if self._read_fd is not None:
242
      os.close(self._read_fd)
243
      self._read_fd = None
244

    
245
    if self._write_fd is not None:
246
      os.close(self._write_fd)
247
      self._write_fd = None
248
    self._poller = None
249

    
250
  def wait(self, timeout):
251
    """Wait for a notification.
252

253
    @type timeout: float or None
254
    @param timeout: Waiting timeout (can be None)
255

256
    """
257
    self._check_owned()
258
    self._check_unnotified()
259

    
260
    self._nwaiters += 1
261
    try:
262
      if self._poller is None:
263
        (self._read_fd, self._write_fd) = os.pipe()
264
        self._poller = select.poll()
265
        self._poller.register(self._read_fd, select.POLLHUP)
266

    
267
      wait_fn = self._waiter_class(self._poller, self._read_fd)
268
      state = self._release_save()
269
      try:
270
        # Wait for notification
271
        wait_fn(timeout)
272
      finally:
273
        # Re-acquire lock
274
        self._acquire_restore(state)
275
    finally:
276
      self._nwaiters -= 1
277
      if self._nwaiters == 0:
278
        self._Cleanup()
279

    
280
  def notifyAll(self): # pylint: disable=C0103
281
    """Close the writing side of the pipe to notify all waiters.
282

283
    """
284
    self._check_owned()
285
    self._check_unnotified()
286
    self._notified = True
287
    if self._write_fd is not None:
288
      os.close(self._write_fd)
289
      self._write_fd = None
290

    
291

    
292
class PipeCondition(_BaseCondition):
293
  """Group-only non-polling condition with counters.
294

295
  This condition class uses pipes and poll, internally, to be able to wait for
296
  notification with a timeout, without resorting to polling. It is almost
297
  compatible with Python's threading.Condition, but only supports notifyAll and
298
  non-recursive locks. As an additional features it's able to report whether
299
  there are any waiting threads.
300

301
  """
302
  __slots__ = [
303
    "_waiters",
304
    "_single_condition",
305
    ]
306

    
307
  _single_condition_class = SingleNotifyPipeCondition
308

    
309
  def __init__(self, lock):
310
    """Initializes this class.
311

312
    """
313
    _BaseCondition.__init__(self, lock)
314
    self._waiters = set()
315
    self._single_condition = self._single_condition_class(self._lock)
316

    
317
  def wait(self, timeout):
318
    """Wait for a notification.
319

320
    @type timeout: float or None
321
    @param timeout: Waiting timeout (can be None)
322

323
    """
324
    self._check_owned()
325

    
326
    # Keep local reference to the pipe. It could be replaced by another thread
327
    # notifying while we're waiting.
328
    cond = self._single_condition
329

    
330
    self._waiters.add(threading.currentThread())
331
    try:
332
      cond.wait(timeout)
333
    finally:
334
      self._check_owned()
335
      self._waiters.remove(threading.currentThread())
336

    
337
  def notifyAll(self): # pylint: disable=C0103
338
    """Notify all currently waiting threads.
339

340
    """
341
    self._check_owned()
342
    self._single_condition.notifyAll()
343
    self._single_condition = self._single_condition_class(self._lock)
344

    
345
  def get_waiting(self):
346
    """Returns a list of all waiting threads.
347

348
    """
349
    self._check_owned()
350

    
351
    return self._waiters
352

    
353
  def has_waiting(self):
354
    """Returns whether there are active waiters.
355

356
    """
357
    self._check_owned()
358

    
359
    return bool(self._waiters)
360

    
361
  def __repr__(self):
362
    return ("<%s.%s waiters=%s at %#x>" %
363
            (self.__class__.__module__, self.__class__.__name__,
364
             self._waiters, id(self)))
365

    
366

    
367
class _PipeConditionWithMode(PipeCondition):
368
  __slots__ = [
369
    "shared",
370
    ]
371

    
372
  def __init__(self, lock, shared):
373
    """Initializes this class.
374

375
    """
376
    self.shared = shared
377
    PipeCondition.__init__(self, lock)
378

    
379

    
380
class SharedLock(object):
381
  """Implements a shared lock.
382

383
  Multiple threads can acquire the lock in a shared way by calling
384
  C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
385
  threads can call C{acquire(shared=0)}.
386

387
  Notes on data structures: C{__pending} contains a priority queue (heapq) of
388
  all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
389
  ...]}. Each per-priority queue contains a normal in-order list of conditions
390
  to be notified when the lock can be acquired. Shared locks are grouped
391
  together by priority and the condition for them is stored in
392
  C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
393
  references for the per-priority queues indexed by priority for faster access.
394

395
  @type name: string
396
  @ivar name: the name of the lock
397

398
  """
399
  __slots__ = [
400
    "__weakref__",
401
    "__deleted",
402
    "__exc",
403
    "__lock",
404
    "__pending",
405
    "__pending_by_prio",
406
    "__pending_shared",
407
    "__shr",
408
    "__time_fn",
409
    "name",
410
    ]
411

    
412
  __condition_class = _PipeConditionWithMode
413

    
414
  def __init__(self, name, monitor=None, _time_fn=time.time):
415
    """Construct a new SharedLock.
416

417
    @param name: the name of the lock
418
    @type monitor: L{LockMonitor}
419
    @param monitor: Lock monitor with which to register
420

421
    """
422
    object.__init__(self)
423

    
424
    self.name = name
425

    
426
    # Used for unittesting
427
    self.__time_fn = _time_fn
428

    
429
    # Internal lock
430
    self.__lock = threading.Lock()
431

    
432
    # Queue containing waiting acquires
433
    self.__pending = []
434
    self.__pending_by_prio = {}
435
    self.__pending_shared = {}
436

    
437
    # Current lock holders
438
    self.__shr = set()
439
    self.__exc = None
440

    
441
    # is this lock in the deleted state?
442
    self.__deleted = False
443

    
444
    # Register with lock monitor
445
    if monitor:
446
      logging.debug("Adding lock %s to monitor", name)
447
      monitor.RegisterLock(self)
448

    
449
  def __repr__(self):
450
    return ("<%s.%s name=%s at %#x>" %
451
            (self.__class__.__module__, self.__class__.__name__,
452
             self.name, id(self)))
453

    
454
  def GetLockInfo(self, requested):
455
    """Retrieves information for querying locks.
456

457
    @type requested: set
458
    @param requested: Requested information, see C{query.LQ_*}
459

460
    """
461
    self.__lock.acquire()
462
    try:
463
      # Note: to avoid unintentional race conditions, no references to
464
      # modifiable objects should be returned unless they were created in this
465
      # function.
466
      mode = None
467
      owner_names = None
468

    
469
      if query.LQ_MODE in requested:
470
        if self.__deleted:
471
          mode = _DELETED_TEXT
472
          assert not (self.__exc or self.__shr)
473
        elif self.__exc:
474
          mode = _EXCLUSIVE_TEXT
475
        elif self.__shr:
476
          mode = _SHARED_TEXT
477

    
478
      # Current owner(s) are wanted
479
      if query.LQ_OWNER in requested:
480
        if self.__exc:
481
          owner = [self.__exc]
482
        else:
483
          owner = self.__shr
484

    
485
        if owner:
486
          assert not self.__deleted
487
          owner_names = [i.getName() for i in owner]
488

    
489
      # Pending acquires are wanted
490
      if query.LQ_PENDING in requested:
491
        pending = []
492

    
493
        # Sorting instead of copying and using heaq functions for simplicity
494
        for (_, prioqueue) in sorted(self.__pending):
495
          for cond in prioqueue:
496
            if cond.shared:
497
              pendmode = _SHARED_TEXT
498
            else:
499
              pendmode = _EXCLUSIVE_TEXT
500

    
501
            # List of names will be sorted in L{query._GetLockPending}
502
            pending.append((pendmode, [i.getName()
503
                                       for i in cond.get_waiting()]))
504
      else:
505
        pending = None
506

    
507
      return [(self.name, mode, owner_names, pending)]
508
    finally:
509
      self.__lock.release()
510

    
511
  def __check_deleted(self):
512
    """Raises an exception if the lock has been deleted.
513

514
    """
515
    if self.__deleted:
516
      raise errors.LockError("Deleted lock %s" % self.name)
517

    
518
  def __is_sharer(self):
519
    """Is the current thread sharing the lock at this time?
520

521
    """
522
    return threading.currentThread() in self.__shr
523

    
524
  def __is_exclusive(self):
525
    """Is the current thread holding the lock exclusively at this time?
526

527
    """
528
    return threading.currentThread() == self.__exc
529

    
530
  def __is_owned(self, shared=-1):
531
    """Is the current thread somehow owning the lock at this time?
532

533
    This is a private version of the function, which presumes you're holding
534
    the internal lock.
535

536
    """
537
    if shared < 0:
538
      return self.__is_sharer() or self.__is_exclusive()
539
    elif shared:
540
      return self.__is_sharer()
541
    else:
542
      return self.__is_exclusive()
543

    
544
  def _is_owned(self, shared=-1):
545
    """Is the current thread somehow owning the lock at this time?
546

547
    @param shared:
548
        - < 0: check for any type of ownership (default)
549
        - 0: check for exclusive ownership
550
        - > 0: check for shared ownership
551

552
    """
553
    self.__lock.acquire()
554
    try:
555
      return self.__is_owned(shared=shared)
556
    finally:
557
      self.__lock.release()
558

    
559
  is_owned = _is_owned
560

    
561
  def _count_pending(self):
562
    """Returns the number of pending acquires.
563

564
    @rtype: int
565

566
    """
567
    self.__lock.acquire()
568
    try:
569
      return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
570
    finally:
571
      self.__lock.release()
572

    
573
  def _check_empty(self):
574
    """Checks whether there are any pending acquires.
575

576
    @rtype: bool
577

578
    """
579
    self.__lock.acquire()
580
    try:
581
      # Order is important: __find_first_pending_queue modifies __pending
582
      (_, prioqueue) = self.__find_first_pending_queue()
583

    
584
      return not (prioqueue or
585
                  self.__pending or
586
                  self.__pending_by_prio or
587
                  self.__pending_shared)
588
    finally:
589
      self.__lock.release()
590

    
591
  def __do_acquire(self, shared):
592
    """Actually acquire the lock.
593

594
    """
595
    if shared:
596
      self.__shr.add(threading.currentThread())
597
    else:
598
      self.__exc = threading.currentThread()
599

    
600
  def __can_acquire(self, shared):
601
    """Determine whether lock can be acquired.
602

603
    """
604
    if shared:
605
      return self.__exc is None
606
    else:
607
      return len(self.__shr) == 0 and self.__exc is None
608

    
609
  def __find_first_pending_queue(self):
610
    """Tries to find the topmost queued entry with pending acquires.
611

612
    Removes empty entries while going through the list.
613

614
    """
615
    while self.__pending:
616
      (priority, prioqueue) = self.__pending[0]
617

    
618
      if prioqueue:
619
        return (priority, prioqueue)
620

    
621
      # Remove empty queue
622
      heapq.heappop(self.__pending)
623
      del self.__pending_by_prio[priority]
624
      assert priority not in self.__pending_shared
625

    
626
    return (None, None)
627

    
628
  def __is_on_top(self, cond):
629
    """Checks whether the passed condition is on top of the queue.
630

631
    The caller must make sure the queue isn't empty.
632

633
    """
634
    (_, prioqueue) = self.__find_first_pending_queue()
635

    
636
    return cond == prioqueue[0]
637

    
638
  def __acquire_unlocked(self, shared, timeout, priority):
639
    """Acquire a shared lock.
640

641
    @param shared: whether to acquire in shared mode; by default an
642
        exclusive lock will be acquired
643
    @param timeout: maximum waiting time before giving up
644
    @type priority: integer
645
    @param priority: Priority for acquiring lock
646

647
    """
648
    self.__check_deleted()
649

    
650
    # We cannot acquire the lock if we already have it
651
    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
652
                                   " %s" % self.name)
653

    
654
    # Remove empty entries from queue
655
    self.__find_first_pending_queue()
656

    
657
    # Check whether someone else holds the lock or there are pending acquires.
658
    if not self.__pending and self.__can_acquire(shared):
659
      # Apparently not, can acquire lock directly.
660
      self.__do_acquire(shared)
661
      return True
662

    
663
    prioqueue = self.__pending_by_prio.get(priority, None)
664

    
665
    if shared:
666
      # Try to re-use condition for shared acquire
667
      wait_condition = self.__pending_shared.get(priority, None)
668
      assert (wait_condition is None or
669
              (wait_condition.shared and wait_condition in prioqueue))
670
    else:
671
      wait_condition = None
672

    
673
    if wait_condition is None:
674
      if prioqueue is None:
675
        assert priority not in self.__pending_by_prio
676

    
677
        prioqueue = []
678
        heapq.heappush(self.__pending, (priority, prioqueue))
679
        self.__pending_by_prio[priority] = prioqueue
680

    
681
      wait_condition = self.__condition_class(self.__lock, shared)
682
      prioqueue.append(wait_condition)
683

    
684
      if shared:
685
        # Keep reference for further shared acquires on same priority. This is
686
        # better than trying to find it in the list of pending acquires.
687
        assert priority not in self.__pending_shared
688
        self.__pending_shared[priority] = wait_condition
689

    
690
    wait_start = self.__time_fn()
691
    acquired = False
692

    
693
    try:
694
      # Wait until we become the topmost acquire in the queue or the timeout
695
      # expires.
696
      while True:
697
        if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
698
          self.__do_acquire(shared)
699
          acquired = True
700
          break
701

    
702
        # A lot of code assumes blocking acquires always succeed, therefore we
703
        # can never return False for a blocking acquire
704
        if (timeout is not None and
705
            utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)):
706
          break
707

    
708
        # Wait for notification
709
        wait_condition.wait(timeout)
710
        self.__check_deleted()
711
    finally:
712
      # Remove condition from queue if there are no more waiters
713
      if not wait_condition.has_waiting():
714
        prioqueue.remove(wait_condition)
715
        if wait_condition.shared:
716
          # Remove from list of shared acquires if it wasn't while releasing
717
          # (e.g. on lock deletion)
718
          self.__pending_shared.pop(priority, None)
719

    
720
    return acquired
721

    
722
  def acquire(self, shared=0, timeout=None, priority=None,
723
              test_notify=None):
724
    """Acquire a shared lock.
725

726
    @type shared: integer (0/1) used as a boolean
727
    @param shared: whether to acquire in shared mode; by default an
728
        exclusive lock will be acquired
729
    @type timeout: float
730
    @param timeout: maximum waiting time before giving up
731
    @type priority: integer
732
    @param priority: Priority for acquiring lock
733
    @type test_notify: callable or None
734
    @param test_notify: Special callback function for unittesting
735

736
    """
737
    if priority is None:
738
      priority = _DEFAULT_PRIORITY
739

    
740
    self.__lock.acquire()
741
    try:
742
      # We already got the lock, notify now
743
      if __debug__ and callable(test_notify):
744
        test_notify()
745

    
746
      return self.__acquire_unlocked(shared, timeout, priority)
747
    finally:
748
      self.__lock.release()
749

    
750
  def downgrade(self):
751
    """Changes the lock mode from exclusive to shared.
752

753
    Pending acquires in shared mode on the same priority will go ahead.
754

755
    """
756
    self.__lock.acquire()
757
    try:
758
      assert self.__is_owned(), "Lock must be owned"
759

    
760
      if self.__is_exclusive():
761
        # Do nothing if the lock is already acquired in shared mode
762
        self.__exc = None
763
        self.__do_acquire(1)
764

    
765
        # Important: pending shared acquires should only jump ahead if there
766
        # was a transition from exclusive to shared, otherwise an owner of a
767
        # shared lock can keep calling this function to push incoming shared
768
        # acquires
769
        (priority, prioqueue) = self.__find_first_pending_queue()
770
        if prioqueue:
771
          # Is there a pending shared acquire on this priority?
772
          cond = self.__pending_shared.pop(priority, None)
773
          if cond:
774
            assert cond.shared
775
            assert cond in prioqueue
776

    
777
            # Ensure shared acquire is on top of queue
778
            if len(prioqueue) > 1:
779
              prioqueue.remove(cond)
780
              prioqueue.insert(0, cond)
781

    
782
            # Notify
783
            cond.notifyAll()
784

    
785
      assert not self.__is_exclusive()
786
      assert self.__is_sharer()
787

    
788
      return True
789
    finally:
790
      self.__lock.release()
791

    
792
  def release(self):
793
    """Release a Shared Lock.
794

795
    You must have acquired the lock, either in shared or in exclusive mode,
796
    before calling this function.
797

798
    """
799
    self.__lock.acquire()
800
    try:
801
      assert self.__is_exclusive() or self.__is_sharer(), \
802
        "Cannot release non-owned lock"
803

    
804
      # Autodetect release type
805
      if self.__is_exclusive():
806
        self.__exc = None
807
        notify = True
808
      else:
809
        self.__shr.remove(threading.currentThread())
810
        notify = not self.__shr
811

    
812
      # Notify topmost condition in queue if there are no owners left (for
813
      # shared locks)
814
      if notify:
815
        self.__notify_topmost()
816
    finally:
817
      self.__lock.release()
818

    
819
  def __notify_topmost(self):
820
    """Notifies topmost condition in queue of pending acquires.
821

822
    """
823
    (priority, prioqueue) = self.__find_first_pending_queue()
824
    if prioqueue:
825
      cond = prioqueue[0]
826
      cond.notifyAll()
827
      if cond.shared:
828
        # Prevent further shared acquires from sneaking in while waiters are
829
        # notified
830
        self.__pending_shared.pop(priority, None)
831

    
832
  def _notify_topmost(self):
833
    """Exported version of L{__notify_topmost}.
834

835
    """
836
    self.__lock.acquire()
837
    try:
838
      return self.__notify_topmost()
839
    finally:
840
      self.__lock.release()
841

    
842
  def delete(self, timeout=None, priority=None):
843
    """Delete a Shared Lock.
844

845
    This operation will declare the lock for removal. First the lock will be
846
    acquired in exclusive mode if you don't already own it, then the lock
847
    will be put in a state where any future and pending acquire() fail.
848

849
    @type timeout: float
850
    @param timeout: maximum waiting time before giving up
851
    @type priority: integer
852
    @param priority: Priority for acquiring lock
853

854
    """
855
    if priority is None:
856
      priority = _DEFAULT_PRIORITY
857

    
858
    self.__lock.acquire()
859
    try:
860
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
861

    
862
      self.__check_deleted()
863

    
864
      # The caller is allowed to hold the lock exclusively already.
865
      acquired = self.__is_exclusive()
866

    
867
      if not acquired:
868
        acquired = self.__acquire_unlocked(0, timeout, priority)
869

    
870
      if acquired:
871
        assert self.__is_exclusive() and not self.__is_sharer(), \
872
          "Lock wasn't acquired in exclusive mode"
873

    
874
        self.__deleted = True
875
        self.__exc = None
876

    
877
        assert not (self.__exc or self.__shr), "Found owner during deletion"
878

    
879
        # Notify all acquires. They'll throw an error.
880
        for (_, prioqueue) in self.__pending:
881
          for cond in prioqueue:
882
            cond.notifyAll()
883

    
884
        assert self.__deleted
885

    
886
      return acquired
887
    finally:
888
      self.__lock.release()
889

    
890
  def _release_save(self):
891
    shared = self.__is_sharer()
892
    self.release()
893
    return shared
894

    
895
  def _acquire_restore(self, shared):
896
    self.acquire(shared=shared)
897

    
898

    
899
# Whenever we want to acquire a full LockSet we pass None as the value
900
# to acquire.  Hide this behind this nicely named constant.
901
ALL_SET = None
902

    
903

    
904
class _AcquireTimeout(Exception):
905
  """Internal exception to abort an acquire on a timeout.
906

907
  """
908

    
909

    
910
class LockSet:
911
  """Implements a set of locks.
912

913
  This abstraction implements a set of shared locks for the same resource type,
914
  distinguished by name. The user can lock a subset of the resources and the
915
  LockSet will take care of acquiring the locks always in the same order, thus
916
  preventing deadlock.
917

918
  All the locks needed in the same set must be acquired together, though.
919

920
  @type name: string
921
  @ivar name: the name of the lockset
922

923
  """
924
  def __init__(self, members, name, monitor=None):
925
    """Constructs a new LockSet.
926

927
    @type members: list of strings
928
    @param members: initial members of the set
929
    @type monitor: L{LockMonitor}
930
    @param monitor: Lock monitor with which to register member locks
931

932
    """
933
    assert members is not None, "members parameter is not a list"
934
    self.name = name
935

    
936
    # Lock monitor
937
    self.__monitor = monitor
938

    
939
    # Used internally to guarantee coherency
940
    self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
941

    
942
    # The lockdict indexes the relationship name -> lock
943
    # The order-of-locking is implied by the alphabetical order of names
944
    self.__lockdict = {}
945

    
946
    for mname in members:
947
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
948
                                          monitor=monitor)
949

    
950
    # The owner dict contains the set of locks each thread owns. For
951
    # performance each thread can access its own key without a global lock on
952
    # this structure. It is paramount though that *no* other type of access is
953
    # done to this structure (eg. no looping over its keys). *_owner helper
954
    # function are defined to guarantee access is correct, but in general never
955
    # do anything different than __owners[threading.currentThread()], or there
956
    # will be trouble.
957
    self.__owners = {}
958

    
959
  def _GetLockName(self, mname):
960
    """Returns the name for a member lock.
961

962
    """
963
    return "%s/%s" % (self.name, mname)
964

    
965
  def _get_lock(self):
966
    """Returns the lockset-internal lock.
967

968
    """
969
    return self.__lock
970

    
971
  def _get_lockdict(self):
972
    """Returns the lockset-internal lock dictionary.
973

974
    Accessing this structure is only safe in single-thread usage or when the
975
    lockset-internal lock is held.
976

977
    """
978
    return self.__lockdict
979

    
980
  def _is_owned(self):
981
    """Is the current thread a current level owner?"""
982
    return threading.currentThread() in self.__owners
983

    
984
  def _add_owned(self, name=None):
985
    """Note the current thread owns the given lock"""
986
    if name is None:
987
      if not self._is_owned():
988
        self.__owners[threading.currentThread()] = set()
989
    else:
990
      if self._is_owned():
991
        self.__owners[threading.currentThread()].add(name)
992
      else:
993
        self.__owners[threading.currentThread()] = set([name])
994

    
995
  def _del_owned(self, name=None):
996
    """Note the current thread owns the given lock"""
997

    
998
    assert not (name is None and self.__lock._is_owned()), \
999
           "Cannot hold internal lock when deleting owner status"
1000

    
1001
    if name is not None:
1002
      self.__owners[threading.currentThread()].remove(name)
1003

    
1004
    # Only remove the key if we don't hold the set-lock as well
1005
    if (not self.__lock._is_owned() and
1006
        not self.__owners[threading.currentThread()]):
1007
      del self.__owners[threading.currentThread()]
1008

    
1009
  def _list_owned(self):
1010
    """Get the set of resource names owned by the current thread"""
1011
    if self._is_owned():
1012
      return self.__owners[threading.currentThread()].copy()
1013
    else:
1014
      return set()
1015

    
1016
  def _release_and_delete_owned(self):
1017
    """Release and delete all resources owned by the current thread"""
1018
    for lname in self._list_owned():
1019
      lock = self.__lockdict[lname]
1020
      if lock._is_owned():
1021
        lock.release()
1022
      self._del_owned(name=lname)
1023

    
1024
  def __names(self):
1025
    """Return the current set of names.
1026

1027
    Only call this function while holding __lock and don't iterate on the
1028
    result after releasing the lock.
1029

1030
    """
1031
    return self.__lockdict.keys()
1032

    
1033
  def _names(self):
1034
    """Return a copy of the current set of elements.
1035

1036
    Used only for debugging purposes.
1037

1038
    """
1039
    # If we don't already own the set-level lock acquired
1040
    # we'll get it and note we need to release it later.
1041
    release_lock = False
1042
    if not self.__lock._is_owned():
1043
      release_lock = True
1044
      self.__lock.acquire(shared=1)
1045
    try:
1046
      result = self.__names()
1047
    finally:
1048
      if release_lock:
1049
        self.__lock.release()
1050
    return set(result)
1051

    
1052
  def acquire(self, names, timeout=None, shared=0, priority=None,
1053
              test_notify=None):
1054
    """Acquire a set of resource locks.
1055

1056
    @type names: list of strings (or string)
1057
    @param names: the names of the locks which shall be acquired
1058
        (special lock names, or instance/node names)
1059
    @type shared: integer (0/1) used as a boolean
1060
    @param shared: whether to acquire in shared mode; by default an
1061
        exclusive lock will be acquired
1062
    @type timeout: float or None
1063
    @param timeout: Maximum time to acquire all locks
1064
    @type priority: integer
1065
    @param priority: Priority for acquiring locks
1066
    @type test_notify: callable or None
1067
    @param test_notify: Special callback function for unittesting
1068

1069
    @return: Set of all locks successfully acquired or None in case of timeout
1070

1071
    @raise errors.LockError: when any lock we try to acquire has
1072
        been deleted before we succeed. In this case none of the
1073
        locks requested will be acquired.
1074

1075
    """
1076
    assert timeout is None or timeout >= 0.0
1077

    
1078
    # Check we don't already own locks at this level
1079
    assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
1080
                                  " (lockset %s)" % self.name)
1081

    
1082
    if priority is None:
1083
      priority = _DEFAULT_PRIORITY
1084

    
1085
    # We need to keep track of how long we spent waiting for a lock. The
1086
    # timeout passed to this function is over all lock acquires.
1087
    running_timeout = utils.RunningTimeout(timeout, False)
1088

    
1089
    try:
1090
      if names is not None:
1091
        # Support passing in a single resource to acquire rather than many
1092
        if isinstance(names, basestring):
1093
          names = [names]
1094

    
1095
        return self.__acquire_inner(names, False, shared, priority,
1096
                                    running_timeout.Remaining, test_notify)
1097

    
1098
      else:
1099
        # If no names are given acquire the whole set by not letting new names
1100
        # being added before we release, and getting the current list of names.
1101
        # Some of them may then be deleted later, but we'll cope with this.
1102
        #
1103
        # We'd like to acquire this lock in a shared way, as it's nice if
1104
        # everybody else can use the instances at the same time. If we are
1105
        # acquiring them exclusively though they won't be able to do this
1106
        # anyway, though, so we'll get the list lock exclusively as well in
1107
        # order to be able to do add() on the set while owning it.
1108
        if not self.__lock.acquire(shared=shared, priority=priority,
1109
                                   timeout=running_timeout.Remaining()):
1110
          raise _AcquireTimeout()
1111
        try:
1112
          # note we own the set-lock
1113
          self._add_owned()
1114

    
1115
          return self.__acquire_inner(self.__names(), True, shared, priority,
1116
                                      running_timeout.Remaining, test_notify)
1117
        except:
1118
          # We shouldn't have problems adding the lock to the owners list, but
1119
          # if we did we'll try to release this lock and re-raise exception.
1120
          # Of course something is going to be really wrong, after this.
1121
          self.__lock.release()
1122
          self._del_owned()
1123
          raise
1124

    
1125
    except _AcquireTimeout:
1126
      return None
1127

    
1128
  def __acquire_inner(self, names, want_all, shared, priority,
1129
                      timeout_fn, test_notify):
1130
    """Inner logic for acquiring a number of locks.
1131

1132
    @param names: Names of the locks to be acquired
1133
    @param want_all: Whether all locks in the set should be acquired
1134
    @param shared: Whether to acquire in shared mode
1135
    @param timeout_fn: Function returning remaining timeout
1136
    @param priority: Priority for acquiring locks
1137
    @param test_notify: Special callback function for unittesting
1138

1139
    """
1140
    acquire_list = []
1141

    
1142
    # First we look the locks up on __lockdict. We have no way of being sure
1143
    # they will still be there after, but this makes it a lot faster should
1144
    # just one of them be the already wrong. Using a sorted sequence to prevent
1145
    # deadlocks.
1146
    for lname in sorted(utils.UniqueSequence(names)):
1147
      try:
1148
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
1149
      except KeyError:
1150
        if want_all:
1151
          # We are acquiring all the set, it doesn't matter if this particular
1152
          # element is not there anymore.
1153
          continue
1154

    
1155
        raise errors.LockError("Non-existing lock %s in set %s (it may have"
1156
                               " been removed)" % (lname, self.name))
1157

    
1158
      acquire_list.append((lname, lock))
1159

    
1160
    # This will hold the locknames we effectively acquired.
1161
    acquired = set()
1162

    
1163
    try:
1164
      # Now acquire_list contains a sorted list of resources and locks we
1165
      # want.  In order to get them we loop on this (private) list and
1166
      # acquire() them.  We gave no real guarantee they will still exist till
1167
      # this is done but .acquire() itself is safe and will alert us if the
1168
      # lock gets deleted.
1169
      for (lname, lock) in acquire_list:
1170
        if __debug__ and callable(test_notify):
1171
          test_notify_fn = lambda: test_notify(lname)
1172
        else:
1173
          test_notify_fn = None
1174

    
1175
        timeout = timeout_fn()
1176

    
1177
        try:
1178
          # raises LockError if the lock was deleted
1179
          acq_success = lock.acquire(shared=shared, timeout=timeout,
1180
                                     priority=priority,
1181
                                     test_notify=test_notify_fn)
1182
        except errors.LockError:
1183
          if want_all:
1184
            # We are acquiring all the set, it doesn't matter if this
1185
            # particular element is not there anymore.
1186
            continue
1187

    
1188
          raise errors.LockError("Non-existing lock %s in set %s (it may"
1189
                                 " have been removed)" % (lname, self.name))
1190

    
1191
        if not acq_success:
1192
          # Couldn't get lock or timeout occurred
1193
          if timeout is None:
1194
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
1195
            # blocking.
1196
            raise errors.LockError("Failed to get lock %s (set %s)" %
1197
                                   (lname, self.name))
1198

    
1199
          raise _AcquireTimeout()
1200

    
1201
        try:
1202
          # now the lock cannot be deleted, we have it!
1203
          self._add_owned(name=lname)
1204
          acquired.add(lname)
1205

    
1206
        except:
1207
          # We shouldn't have problems adding the lock to the owners list, but
1208
          # if we did we'll try to release this lock and re-raise exception.
1209
          # Of course something is going to be really wrong after this.
1210
          if lock._is_owned():
1211
            lock.release()
1212
          raise
1213

    
1214
    except:
1215
      # Release all owned locks
1216
      self._release_and_delete_owned()
1217
      raise
1218

    
1219
    return acquired
1220

    
1221
  def downgrade(self, names=None):
1222
    """Downgrade a set of resource locks from exclusive to shared mode.
1223

1224
    The locks must have been acquired in exclusive mode.
1225

1226
    """
1227
    assert self._is_owned(), ("downgrade on lockset %s while not owning any"
1228
                              " lock" % self.name)
1229

    
1230
    # Support passing in a single resource to downgrade rather than many
1231
    if isinstance(names, basestring):
1232
      names = [names]
1233

    
1234
    owned = self._list_owned()
1235

    
1236
    if names is None:
1237
      names = owned
1238
    else:
1239
      names = set(names)
1240
      assert owned.issuperset(names), \
1241
        ("downgrade() on unheld resources %s (set %s)" %
1242
         (names.difference(owned), self.name))
1243

    
1244
    for lockname in names:
1245
      self.__lockdict[lockname].downgrade()
1246

    
1247
    # Do we own the lockset in exclusive mode?
1248
    if self.__lock._is_owned(shared=0):
1249
      # Have all locks been downgraded?
1250
      if not compat.any(lock._is_owned(shared=0)
1251
                        for lock in self.__lockdict.values()):
1252
        self.__lock.downgrade()
1253
        assert self.__lock._is_owned(shared=1)
1254

    
1255
    return True
1256

    
1257
  def release(self, names=None):
1258
    """Release a set of resource locks, at the same level.
1259

1260
    You must have acquired the locks, either in shared or in exclusive mode,
1261
    before releasing them.
1262

1263
    @type names: list of strings, or None
1264
    @param names: the names of the locks which shall be released
1265
        (defaults to all the locks acquired at that level).
1266

1267
    """
1268
    assert self._is_owned(), ("release() on lock set %s while not owner" %
1269
                              self.name)
1270

    
1271
    # Support passing in a single resource to release rather than many
1272
    if isinstance(names, basestring):
1273
      names = [names]
1274

    
1275
    if names is None:
1276
      names = self._list_owned()
1277
    else:
1278
      names = set(names)
1279
      assert self._list_owned().issuperset(names), (
1280
               "release() on unheld resources %s (set %s)" %
1281
               (names.difference(self._list_owned()), self.name))
1282

    
1283
    # First of all let's release the "all elements" lock, if set.
1284
    # After this 'add' can work again
1285
    if self.__lock._is_owned():
1286
      self.__lock.release()
1287
      self._del_owned()
1288

    
1289
    for lockname in names:
1290
      # If we are sure the lock doesn't leave __lockdict without being
1291
      # exclusively held we can do this...
1292
      self.__lockdict[lockname].release()
1293
      self._del_owned(name=lockname)
1294

    
1295
  def add(self, names, acquired=0, shared=0):
1296
    """Add a new set of elements to the set
1297

1298
    @type names: list of strings
1299
    @param names: names of the new elements to add
1300
    @type acquired: integer (0/1) used as a boolean
1301
    @param acquired: pre-acquire the new resource?
1302
    @type shared: integer (0/1) used as a boolean
1303
    @param shared: is the pre-acquisition shared?
1304

1305
    """
1306
    # Check we don't already own locks at this level
1307
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1308
      ("Cannot add locks if the set %s is only partially owned, or shared" %
1309
       self.name)
1310

    
1311
    # Support passing in a single resource to add rather than many
1312
    if isinstance(names, basestring):
1313
      names = [names]
1314

    
1315
    # If we don't already own the set-level lock acquired in an exclusive way
1316
    # we'll get it and note we need to release it later.
1317
    release_lock = False
1318
    if not self.__lock._is_owned():
1319
      release_lock = True
1320
      self.__lock.acquire()
1321

    
1322
    try:
1323
      invalid_names = set(self.__names()).intersection(names)
1324
      if invalid_names:
1325
        # This must be an explicit raise, not an assert, because assert is
1326
        # turned off when using optimization, and this can happen because of
1327
        # concurrency even if the user doesn't want it.
1328
        raise errors.LockError("duplicate add(%s) on lockset %s" %
1329
                               (invalid_names, self.name))
1330

    
1331
      for lockname in names:
1332
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1333

    
1334
        if acquired:
1335
          # No need for priority or timeout here as this lock has just been
1336
          # created
1337
          lock.acquire(shared=shared)
1338
          # now the lock cannot be deleted, we have it!
1339
          try:
1340
            self._add_owned(name=lockname)
1341
          except:
1342
            # We shouldn't have problems adding the lock to the owners list,
1343
            # but if we did we'll try to release this lock and re-raise
1344
            # exception.  Of course something is going to be really wrong,
1345
            # after this.  On the other hand the lock hasn't been added to the
1346
            # __lockdict yet so no other threads should be pending on it. This
1347
            # release is just a safety measure.
1348
            lock.release()
1349
            raise
1350

    
1351
        self.__lockdict[lockname] = lock
1352

    
1353
    finally:
1354
      # Only release __lock if we were not holding it previously.
1355
      if release_lock:
1356
        self.__lock.release()
1357

    
1358
    return True
1359

    
1360
  def remove(self, names):
1361
    """Remove elements from the lock set.
1362

1363
    You can either not hold anything in the lockset or already hold a superset
1364
    of the elements you want to delete, exclusively.
1365

1366
    @type names: list of strings
1367
    @param names: names of the resource to remove.
1368

1369
    @return: a list of locks which we removed; the list is always
1370
        equal to the names list if we were holding all the locks
1371
        exclusively
1372

1373
    """
1374
    # Support passing in a single resource to remove rather than many
1375
    if isinstance(names, basestring):
1376
      names = [names]
1377

    
1378
    # If we own any subset of this lock it must be a superset of what we want
1379
    # to delete. The ownership must also be exclusive, but that will be checked
1380
    # by the lock itself.
1381
    assert not self._is_owned() or self._list_owned().issuperset(names), (
1382
      "remove() on acquired lockset %s while not owning all elements" %
1383
      self.name)
1384

    
1385
    removed = []
1386

    
1387
    for lname in names:
1388
      # Calling delete() acquires the lock exclusively if we don't already own
1389
      # it, and causes all pending and subsequent lock acquires to fail. It's
1390
      # fine to call it out of order because delete() also implies release(),
1391
      # and the assertion above guarantees that if we either already hold
1392
      # everything we want to delete, or we hold none.
1393
      try:
1394
        self.__lockdict[lname].delete()
1395
        removed.append(lname)
1396
      except (KeyError, errors.LockError):
1397
        # This cannot happen if we were already holding it, verify:
1398
        assert not self._is_owned(), ("remove failed while holding lockset %s"
1399
                                      % self.name)
1400
      else:
1401
        # If no LockError was raised we are the ones who deleted the lock.
1402
        # This means we can safely remove it from lockdict, as any further or
1403
        # pending delete() or acquire() will fail (and nobody can have the lock
1404
        # since before our call to delete()).
1405
        #
1406
        # This is done in an else clause because if the exception was thrown
1407
        # it's the job of the one who actually deleted it.
1408
        del self.__lockdict[lname]
1409
        # And let's remove it from our private list if we owned it.
1410
        if self._is_owned():
1411
          self._del_owned(name=lname)
1412

    
1413
    return removed
1414

    
1415

    
1416
# Locking levels, must be acquired in increasing order.
1417
# Current rules are:
1418
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1419
#   acquired before performing any operation, either in shared or in exclusive
1420
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1421
#   avoided.
1422
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1423
#   If you need more than one node, or more than one instance, acquire them at
1424
#   the same time.
1425
LEVEL_CLUSTER = 0
1426
LEVEL_INSTANCE = 1
1427
LEVEL_NODEGROUP = 2
1428
LEVEL_NODE = 3
1429

    
1430
LEVELS = [LEVEL_CLUSTER,
1431
          LEVEL_INSTANCE,
1432
          LEVEL_NODEGROUP,
1433
          LEVEL_NODE]
1434

    
1435
# Lock levels which are modifiable
1436
LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE]
1437

    
1438
LEVEL_NAMES = {
1439
  LEVEL_CLUSTER: "cluster",
1440
  LEVEL_INSTANCE: "instance",
1441
  LEVEL_NODEGROUP: "nodegroup",
1442
  LEVEL_NODE: "node",
1443
  }
1444

    
1445
# Constant for the big ganeti lock
1446
BGL = 'BGL'
1447

    
1448

    
1449
class GanetiLockManager:
1450
  """The Ganeti Locking Library
1451

1452
  The purpose of this small library is to manage locking for ganeti clusters
1453
  in a central place, while at the same time doing dynamic checks against
1454
  possible deadlocks. It will also make it easier to transition to a different
1455
  lock type should we migrate away from python threads.
1456

1457
  """
1458
  _instance = None
1459

    
1460
  def __init__(self, nodes, nodegroups, instances):
1461
    """Constructs a new GanetiLockManager object.
1462

1463
    There should be only a GanetiLockManager object at any time, so this
1464
    function raises an error if this is not the case.
1465

1466
    @param nodes: list of node names
1467
    @param nodegroups: list of nodegroup uuids
1468
    @param instances: list of instance names
1469

1470
    """
1471
    assert self.__class__._instance is None, \
1472
           "double GanetiLockManager instance"
1473

    
1474
    self.__class__._instance = self
1475

    
1476
    self._monitor = LockMonitor()
1477

    
1478
    # The keyring contains all the locks, at their level and in the correct
1479
    # locking order.
1480
    self.__keyring = {
1481
      LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1482
      LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1483
      LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor),
1484
      LEVEL_INSTANCE: LockSet(instances, "instances",
1485
                              monitor=self._monitor),
1486
      }
1487

    
1488
  def AddToLockMonitor(self, provider):
1489
    """Registers a new lock with the monitor.
1490

1491
    See L{LockMonitor.RegisterLock}.
1492

1493
    """
1494
    return self._monitor.RegisterLock(provider)
1495

    
1496
  def QueryLocks(self, fields):
1497
    """Queries information from all locks.
1498

1499
    See L{LockMonitor.QueryLocks}.
1500

1501
    """
1502
    return self._monitor.QueryLocks(fields)
1503

    
1504
  def OldStyleQueryLocks(self, fields):
1505
    """Queries information from all locks, returning old-style data.
1506

1507
    See L{LockMonitor.OldStyleQueryLocks}.
1508

1509
    """
1510
    return self._monitor.OldStyleQueryLocks(fields)
1511

    
1512
  def _names(self, level):
1513
    """List the lock names at the given level.
1514

1515
    This can be used for debugging/testing purposes.
1516

1517
    @param level: the level whose list of locks to get
1518

1519
    """
1520
    assert level in LEVELS, "Invalid locking level %s" % level
1521
    return self.__keyring[level]._names()
1522

    
1523
  def _is_owned(self, level):
1524
    """Check whether we are owning locks at the given level
1525

1526
    """
1527
    return self.__keyring[level]._is_owned()
1528

    
1529
  is_owned = _is_owned
1530

    
1531
  def _list_owned(self, level):
1532
    """Get the set of owned locks at the given level
1533

1534
    """
1535
    return self.__keyring[level]._list_owned()
1536

    
1537
  list_owned = _list_owned
1538

    
1539
  def _upper_owned(self, level):
1540
    """Check that we don't own any lock at a level greater than the given one.
1541

1542
    """
1543
    # This way of checking only works if LEVELS[i] = i, which we check for in
1544
    # the test cases.
1545
    return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1546

    
1547
  def _BGL_owned(self): # pylint: disable=C0103
1548
    """Check if the current thread owns the BGL.
1549

1550
    Both an exclusive or a shared acquisition work.
1551

1552
    """
1553
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1554

    
1555
  @staticmethod
1556
  def _contains_BGL(level, names): # pylint: disable=C0103
1557
    """Check if the level contains the BGL.
1558

1559
    Check if acting on the given level and set of names will change
1560
    the status of the Big Ganeti Lock.
1561

1562
    """
1563
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1564

    
1565
  def acquire(self, level, names, timeout=None, shared=0, priority=None):
1566
    """Acquire a set of resource locks, at the same level.
1567

1568
    @type level: member of locking.LEVELS
1569
    @param level: the level at which the locks shall be acquired
1570
    @type names: list of strings (or string)
1571
    @param names: the names of the locks which shall be acquired
1572
        (special lock names, or instance/node names)
1573
    @type shared: integer (0/1) used as a boolean
1574
    @param shared: whether to acquire in shared mode; by default
1575
        an exclusive lock will be acquired
1576
    @type timeout: float
1577
    @param timeout: Maximum time to acquire all locks
1578
    @type priority: integer
1579
    @param priority: Priority for acquiring lock
1580

1581
    """
1582
    assert level in LEVELS, "Invalid locking level %s" % level
1583

    
1584
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1585
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1586
    # so even if we've migrated we need to at least share the BGL to be
1587
    # compatible with them. Of course if we own the BGL exclusively there's no
1588
    # point in acquiring any other lock, unless perhaps we are half way through
1589
    # the migration of the current opcode.
1590
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1591
            "You must own the Big Ganeti Lock before acquiring any other")
1592

    
1593
    # Check we don't own locks at the same or upper levels.
1594
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1595
           " while owning some at a greater one")
1596

    
1597
    # Acquire the locks in the set.
1598
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1599
                                         priority=priority)
1600

    
1601
  def downgrade(self, level, names=None):
1602
    """Downgrade a set of resource locks from exclusive to shared mode.
1603

1604
    You must have acquired the locks in exclusive mode.
1605

1606
    @type level: member of locking.LEVELS
1607
    @param level: the level at which the locks shall be downgraded
1608
    @type names: list of strings, or None
1609
    @param names: the names of the locks which shall be downgraded
1610
        (defaults to all the locks acquired at the level)
1611

1612
    """
1613
    assert level in LEVELS, "Invalid locking level %s" % level
1614

    
1615
    return self.__keyring[level].downgrade(names=names)
1616

    
1617
  def release(self, level, names=None):
1618
    """Release a set of resource locks, at the same level.
1619

1620
    You must have acquired the locks, either in shared or in exclusive
1621
    mode, before releasing them.
1622

1623
    @type level: member of locking.LEVELS
1624
    @param level: the level at which the locks shall be released
1625
    @type names: list of strings, or None
1626
    @param names: the names of the locks which shall be released
1627
        (defaults to all the locks acquired at that level)
1628

1629
    """
1630
    assert level in LEVELS, "Invalid locking level %s" % level
1631
    assert (not self._contains_BGL(level, names) or
1632
            not self._upper_owned(LEVEL_CLUSTER)), (
1633
            "Cannot release the Big Ganeti Lock while holding something"
1634
            " at upper levels (%r)" %
1635
            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1636
                              for i in self.__keyring.keys()]), ))
1637

    
1638
    # Release will complain if we don't own the locks already
1639
    return self.__keyring[level].release(names)
1640

    
1641
  def add(self, level, names, acquired=0, shared=0):
1642
    """Add locks at the specified level.
1643

1644
    @type level: member of locking.LEVELS_MOD
1645
    @param level: the level at which the locks shall be added
1646
    @type names: list of strings
1647
    @param names: names of the locks to acquire
1648
    @type acquired: integer (0/1) used as a boolean
1649
    @param acquired: whether to acquire the newly added locks
1650
    @type shared: integer (0/1) used as a boolean
1651
    @param shared: whether the acquisition will be shared
1652

1653
    """
1654
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1655
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1656
           " operations")
1657
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1658
           " while owning some at a greater one")
1659
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1660

    
1661
  def remove(self, level, names):
1662
    """Remove locks from the specified level.
1663

1664
    You must either already own the locks you are trying to remove
1665
    exclusively or not own any lock at an upper level.
1666

1667
    @type level: member of locking.LEVELS_MOD
1668
    @param level: the level at which the locks shall be removed
1669
    @type names: list of strings
1670
    @param names: the names of the locks which shall be removed
1671
        (special lock names, or instance/node names)
1672

1673
    """
1674
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1675
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1676
           " operations")
1677
    # Check we either own the level or don't own anything from here
1678
    # up. LockSet.remove() will check the case in which we don't own
1679
    # all the needed resources, or we have a shared ownership.
1680
    assert self._is_owned(level) or not self._upper_owned(level), (
1681
           "Cannot remove locks at a level while not owning it or"
1682
           " owning some at a greater one")
1683
    return self.__keyring[level].remove(names)
1684

    
1685

    
1686
def _MonitorSortKey((item, idx, num)):
1687
  """Sorting key function.
1688

1689
  Sort by name, registration order and then order of information. This provides
1690
  a stable sort order over different providers, even if they return the same
1691
  name.
1692

1693
  """
1694
  (name, _, _, _) = item
1695

    
1696
  return (utils.NiceSortKey(name), num, idx)
1697

    
1698

    
1699
class LockMonitor(object):
1700
  _LOCK_ATTR = "_lock"
1701

    
1702
  def __init__(self):
1703
    """Initializes this class.
1704

1705
    """
1706
    self._lock = SharedLock("LockMonitor")
1707

    
1708
    # Counter for stable sorting
1709
    self._counter = itertools.count(0)
1710

    
1711
    # Tracked locks. Weak references are used to avoid issues with circular
1712
    # references and deletion.
1713
    self._locks = weakref.WeakKeyDictionary()
1714

    
1715
  @ssynchronized(_LOCK_ATTR)
1716
  def RegisterLock(self, provider):
1717
    """Registers a new lock.
1718

1719
    @param provider: Object with a callable method named C{GetLockInfo}, taking
1720
      a single C{set} containing the requested information items
1721
    @note: It would be nicer to only receive the function generating the
1722
      requested information but, as it turns out, weak references to bound
1723
      methods (e.g. C{self.GetLockInfo}) are tricky; there are several
1724
      workarounds, but none of the ones I found works properly in combination
1725
      with a standard C{WeakKeyDictionary}
1726

1727
    """
1728
    assert provider not in self._locks, "Duplicate registration"
1729

    
1730
    # There used to be a check for duplicate names here. As it turned out, when
1731
    # a lock is re-created with the same name in a very short timeframe, the
1732
    # previous instance might not yet be removed from the weakref dictionary.
1733
    # By keeping track of the order of incoming registrations, a stable sort
1734
    # ordering can still be guaranteed.
1735

    
1736
    self._locks[provider] = self._counter.next()
1737

    
1738
  def _GetLockInfo(self, requested):
1739
    """Get information from all locks.
1740

1741
    """
1742
    # Must hold lock while getting consistent list of tracked items
1743
    self._lock.acquire(shared=1)
1744
    try:
1745
      items = self._locks.items()
1746
    finally:
1747
      self._lock.release()
1748

    
1749
    return [(info, idx, num)
1750
            for (provider, num) in items
1751
            for (idx, info) in enumerate(provider.GetLockInfo(requested))]
1752

    
1753
  def _Query(self, fields):
1754
    """Queries information from all locks.
1755

1756
    @type fields: list of strings
1757
    @param fields: List of fields to return
1758

1759
    """
1760
    qobj = query.Query(query.LOCK_FIELDS, fields)
1761

    
1762
    # Get all data with internal lock held and then sort by name and incoming
1763
    # order
1764
    lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
1765
                      key=_MonitorSortKey)
1766

    
1767
    # Extract lock information and build query data
1768
    return (qobj, query.LockQueryData(map(compat.fst, lockinfo)))
1769

    
1770
  def QueryLocks(self, fields):
1771
    """Queries information from all locks.
1772

1773
    @type fields: list of strings
1774
    @param fields: List of fields to return
1775

1776
    """
1777
    (qobj, ctx) = self._Query(fields)
1778

    
1779
    # Prepare query response
1780
    return query.GetQueryResponse(qobj, ctx)
1781

    
1782
  def OldStyleQueryLocks(self, fields):
1783
    """Queries information from all locks, returning old-style data.
1784

1785
    @type fields: list of strings
1786
    @param fields: List of fields to return
1787

1788
    """
1789
    (qobj, ctx) = self._Query(fields)
1790

    
1791
    return qobj.OldStyleQuery(ctx)