Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ 1f864b60

History | View | Annotate | Download (39.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
# Disable "Invalid name ..." message
22
# pylint: disable-msg=C0103
23

    
24
"""Module implementing the Ganeti locking code."""
25

    
26
import os
27
import select
28
import threading
29
import time
30
import errno
31

    
32
from ganeti import errors
33
from ganeti import utils
34

    
35

    
36
def ssynchronized(lock, shared=0):
37
  """Shared Synchronization decorator.
38

39
  Calls the function holding the given lock, either in exclusive or shared
40
  mode. It requires the passed lock to be a SharedLock (or support its
41
  semantics).
42

43
  """
44
  def wrap(fn):
45
    def sync_function(*args, **kwargs):
46
      lock.acquire(shared=shared)
47
      try:
48
        return fn(*args, **kwargs)
49
      finally:
50
        lock.release()
51
    return sync_function
52
  return wrap
53

    
54

    
55
class RunningTimeout(object):
56
  """Class to calculate remaining timeout when doing several operations.
57

58
  """
59
  __slots__ = [
60
    "_allow_negative",
61
    "_start_time",
62
    "_time_fn",
63
    "_timeout",
64
    ]
65

    
66
  def __init__(self, timeout, allow_negative, _time_fn=time.time):
67
    """Initializes this class.
68

69
    @type timeout: float
70
    @param timeout: Timeout duration
71
    @type allow_negative: bool
72
    @param allow_negative: Whether to return values below zero
73
    @param _time_fn: Time function for unittests
74

75
    """
76
    object.__init__(self)
77

    
78
    if timeout is not None and timeout < 0.0:
79
      raise ValueError("Timeout must not be negative")
80

    
81
    self._timeout = timeout
82
    self._allow_negative = allow_negative
83
    self._time_fn = _time_fn
84

    
85
    self._start_time = None
86

    
87
  def Remaining(self):
88
    """Returns the remaining timeout.
89

90
    """
91
    if self._timeout is None:
92
      return None
93

    
94
    # Get start time on first calculation
95
    if self._start_time is None:
96
      self._start_time = self._time_fn()
97

    
98
    # Calculate remaining time
99
    remaining_timeout = self._start_time + self._timeout - self._time_fn()
100

    
101
    if not self._allow_negative:
102
      # Ensure timeout is always >= 0
103
      return max(0.0, remaining_timeout)
104

    
105
    return remaining_timeout
106

    
107

    
108
class _SingleNotifyPipeConditionWaiter(object):
109
  """Helper class for SingleNotifyPipeCondition
110

111
  """
112
  __slots__ = [
113
    "_fd",
114
    "_poller",
115
    ]
116

    
117
  def __init__(self, poller, fd):
118
    """Constructor for _SingleNotifyPipeConditionWaiter
119

120
    @type poller: select.poll
121
    @param poller: Poller object
122
    @type fd: int
123
    @param fd: File descriptor to wait for
124

125
    """
126
    object.__init__(self)
127
    self._poller = poller
128
    self._fd = fd
129

    
130
  def __call__(self, timeout):
131
    """Wait for something to happen on the pipe.
132

133
    @type timeout: float or None
134
    @param timeout: Timeout for waiting (can be None)
135

136
    """
137
    running_timeout = RunningTimeout(timeout, True)
138

    
139
    while True:
140
      remaining_time = running_timeout.Remaining()
141

    
142
      if remaining_time is not None and remaining_time < 0.0:
143
        break
144

    
145
      try:
146
        result = self._poller.poll(remaining_time)
147
      except EnvironmentError, err:
148
        if err.errno != errno.EINTR:
149
          raise
150
        result = None
151

    
152
      # Check whether we were notified
153
      if result and result[0][0] == self._fd:
154
        break
155

    
156

    
157
class _BaseCondition(object):
158
  """Base class containing common code for conditions.
159

160
  Some of this code is taken from python's threading module.
161

162
  """
163
  __slots__ = [
164
    "_lock",
165
    "acquire",
166
    "release",
167
    ]
168

    
169
  def __init__(self, lock):
170
    """Constructor for _BaseCondition.
171

172
    @type lock: threading.Lock
173
    @param lock: condition base lock
174

175
    """
176
    object.__init__(self)
177

    
178
    # Recursive locks are not supported
179
    assert not hasattr(lock, "_acquire_restore")
180
    assert not hasattr(lock, "_release_save")
181

    
182
    self._lock = lock
183

    
184
    # Export the lock's acquire() and release() methods
185
    self.acquire = lock.acquire
186
    self.release = lock.release
187

    
188
  def _is_owned(self):
189
    """Check whether lock is owned by current thread.
190

191
    """
192
    if self._lock.acquire(0):
193
      self._lock.release()
194
      return False
195

    
196
    return True
197

    
198
  def _check_owned(self):
199
    """Raise an exception if the current thread doesn't own the lock.
200

201
    """
202
    if not self._is_owned():
203
      raise RuntimeError("cannot work with un-aquired lock")
204

    
205

    
206
class SingleNotifyPipeCondition(_BaseCondition):
207
  """Condition which can only be notified once.
208

209
  This condition class uses pipes and poll, internally, to be able to wait for
210
  notification with a timeout, without resorting to polling. It is almost
211
  compatible with Python's threading.Condition, with the following differences:
212
    - notifyAll can only be called once, and no wait can happen after that
213
    - notify is not supported, only notifyAll
214

215
  """
216

    
217
  __slots__ = _BaseCondition.__slots__ + [
218
    "_poller",
219
    "_read_fd",
220
    "_write_fd",
221
    "_nwaiters",
222
    "_notified",
223
    ]
224

    
225
  _waiter_class = _SingleNotifyPipeConditionWaiter
226

    
227
  def __init__(self, lock):
228
    """Constructor for SingleNotifyPipeCondition
229

230
    """
231
    _BaseCondition.__init__(self, lock)
232
    self._nwaiters = 0
233
    self._notified = False
234
    self._read_fd = None
235
    self._write_fd = None
236
    self._poller = None
237

    
238
  def _check_unnotified(self):
239
    """Throws an exception if already notified.
240

241
    """
242
    if self._notified:
243
      raise RuntimeError("cannot use already notified condition")
244

    
245
  def _Cleanup(self):
246
    """Cleanup open file descriptors, if any.
247

248
    """
249
    if self._read_fd is not None:
250
      os.close(self._read_fd)
251
      self._read_fd = None
252

    
253
    if self._write_fd is not None:
254
      os.close(self._write_fd)
255
      self._write_fd = None
256
    self._poller = None
257

    
258
  def wait(self, timeout=None):
259
    """Wait for a notification.
260

261
    @type timeout: float or None
262
    @param timeout: Waiting timeout (can be None)
263

264
    """
265
    self._check_owned()
266
    self._check_unnotified()
267

    
268
    self._nwaiters += 1
269
    try:
270
      if self._poller is None:
271
        (self._read_fd, self._write_fd) = os.pipe()
272
        self._poller = select.poll()
273
        self._poller.register(self._read_fd, select.POLLHUP)
274

    
275
      wait_fn = self._waiter_class(self._poller, self._read_fd)
276
      self.release()
277
      try:
278
        # Wait for notification
279
        wait_fn(timeout)
280
      finally:
281
        # Re-acquire lock
282
        self.acquire()
283
    finally:
284
      self._nwaiters -= 1
285
      if self._nwaiters == 0:
286
        self._Cleanup()
287

    
288
  def notifyAll(self):
289
    """Close the writing side of the pipe to notify all waiters.
290

291
    """
292
    self._check_owned()
293
    self._check_unnotified()
294
    self._notified = True
295
    if self._write_fd is not None:
296
      os.close(self._write_fd)
297
      self._write_fd = None
298

    
299

    
300
class PipeCondition(_BaseCondition):
301
  """Group-only non-polling condition with counters.
302

303
  This condition class uses pipes and poll, internally, to be able to wait for
304
  notification with a timeout, without resorting to polling. It is almost
305
  compatible with Python's threading.Condition, but only supports notifyAll and
306
  non-recursive locks. As an additional features it's able to report whether
307
  there are any waiting threads.
308

309
  """
310
  __slots__ = _BaseCondition.__slots__ + [
311
    "_nwaiters",
312
    "_single_condition",
313
    ]
314

    
315
  _single_condition_class = SingleNotifyPipeCondition
316

    
317
  def __init__(self, lock):
318
    """Initializes this class.
319

320
    """
321
    _BaseCondition.__init__(self, lock)
322
    self._nwaiters = 0
323
    self._single_condition = self._single_condition_class(self._lock)
324

    
325
  def wait(self, timeout=None):
326
    """Wait for a notification.
327

328
    @type timeout: float or None
329
    @param timeout: Waiting timeout (can be None)
330

331
    """
332
    self._check_owned()
333

    
334
    # Keep local reference to the pipe. It could be replaced by another thread
335
    # notifying while we're waiting.
336
    my_condition = self._single_condition
337

    
338
    assert self._nwaiters >= 0
339
    self._nwaiters += 1
340
    try:
341
      my_condition.wait(timeout)
342
    finally:
343
      assert self._nwaiters > 0
344
      self._nwaiters -= 1
345

    
346
  def notifyAll(self):
347
    """Notify all currently waiting threads.
348

349
    """
350
    self._check_owned()
351
    self._single_condition.notifyAll()
352
    self._single_condition = self._single_condition_class(self._lock)
353

    
354
  def has_waiting(self):
355
    """Returns whether there are active waiters.
356

357
    """
358
    self._check_owned()
359

    
360
    return bool(self._nwaiters)
361

    
362

    
363
class _CountingCondition(object):
364
  """Wrapper for Python's built-in threading.Condition class.
365

366
  This wrapper keeps a count of active waiters. We can't access the internal
367
  "__waiters" attribute of threading.Condition because it's not thread-safe.
368

369
  """
370
  __slots__ = [
371
    "_cond",
372
    "_nwaiters",
373
    ]
374

    
375
  def __init__(self, lock):
376
    """Initializes this class.
377

378
    """
379
    object.__init__(self)
380
    self._cond = threading.Condition(lock=lock)
381
    self._nwaiters = 0
382

    
383
  def notifyAll(self):
384
    """Notifies the condition.
385

386
    """
387
    return self._cond.notifyAll()
388

    
389
  def wait(self, timeout=None):
390
    """Waits for the condition to be notified.
391

392
    @type timeout: float or None
393
    @param timeout: Waiting timeout (can be None)
394

395
    """
396
    assert self._nwaiters >= 0
397

    
398
    self._nwaiters += 1
399
    try:
400
      return self._cond.wait(timeout=timeout)
401
    finally:
402
      self._nwaiters -= 1
403

    
404
  def has_waiting(self):
405
    """Returns whether there are active waiters.
406

407
    """
408
    return bool(self._nwaiters)
409

    
410

    
411
class SharedLock(object):
412
  """Implements a shared lock.
413

414
  Multiple threads can acquire the lock in a shared way, calling
415
  acquire_shared().  In order to acquire the lock in an exclusive way threads
416
  can call acquire_exclusive().
417

418
  The lock prevents starvation but does not guarantee that threads will acquire
419
  the shared lock in the order they queued for it, just that they will
420
  eventually do so.
421

422
  """
423
  __slots__ = [
424
    "__active_shr_c",
425
    "__inactive_shr_c",
426
    "__deleted",
427
    "__exc",
428
    "__lock",
429
    "__pending",
430
    "__shr",
431
    ]
432

    
433
  __condition_class = PipeCondition
434

    
435
  def __init__(self):
436
    """Construct a new SharedLock.
437

438
    """
439
    object.__init__(self)
440

    
441
    # Internal lock
442
    self.__lock = threading.Lock()
443

    
444
    # Queue containing waiting acquires
445
    self.__pending = []
446

    
447
    # Active and inactive conditions for shared locks
448
    self.__active_shr_c = self.__condition_class(self.__lock)
449
    self.__inactive_shr_c = self.__condition_class(self.__lock)
450

    
451
    # Current lock holders
452
    self.__shr = set()
453
    self.__exc = None
454

    
455
    # is this lock in the deleted state?
456
    self.__deleted = False
457

    
458
  def __check_deleted(self):
459
    """Raises an exception if the lock has been deleted.
460

461
    """
462
    if self.__deleted:
463
      raise errors.LockError("Deleted lock")
464

    
465
  def __is_sharer(self):
466
    """Is the current thread sharing the lock at this time?
467

468
    """
469
    return threading.currentThread() in self.__shr
470

    
471
  def __is_exclusive(self):
472
    """Is the current thread holding the lock exclusively at this time?
473

474
    """
475
    return threading.currentThread() == self.__exc
476

    
477
  def __is_owned(self, shared=-1):
478
    """Is the current thread somehow owning the lock at this time?
479

480
    This is a private version of the function, which presumes you're holding
481
    the internal lock.
482

483
    """
484
    if shared < 0:
485
      return self.__is_sharer() or self.__is_exclusive()
486
    elif shared:
487
      return self.__is_sharer()
488
    else:
489
      return self.__is_exclusive()
490

    
491
  def _is_owned(self, shared=-1):
492
    """Is the current thread somehow owning the lock at this time?
493

494
    @param shared:
495
        - < 0: check for any type of ownership (default)
496
        - 0: check for exclusive ownership
497
        - > 0: check for shared ownership
498

499
    """
500
    self.__lock.acquire()
501
    try:
502
      return self.__is_owned(shared=shared)
503
    finally:
504
      self.__lock.release()
505

    
506
  def _count_pending(self):
507
    """Returns the number of pending acquires.
508

509
    @rtype: int
510

511
    """
512
    self.__lock.acquire()
513
    try:
514
      return len(self.__pending)
515
    finally:
516
      self.__lock.release()
517

    
518
  def __do_acquire(self, shared):
519
    """Actually acquire the lock.
520

521
    """
522
    if shared:
523
      self.__shr.add(threading.currentThread())
524
    else:
525
      self.__exc = threading.currentThread()
526

    
527
  def __can_acquire(self, shared):
528
    """Determine whether lock can be acquired.
529

530
    """
531
    if shared:
532
      return self.__exc is None
533
    else:
534
      return len(self.__shr) == 0 and self.__exc is None
535

    
536
  def __is_on_top(self, cond):
537
    """Checks whether the passed condition is on top of the queue.
538

539
    The caller must make sure the queue isn't empty.
540

541
    """
542
    return self.__pending[0] == cond
543

    
544
  def __acquire_unlocked(self, shared, timeout):
545
    """Acquire a shared lock.
546

547
    @param shared: whether to acquire in shared mode; by default an
548
        exclusive lock will be acquired
549
    @param timeout: maximum waiting time before giving up
550

551
    """
552
    self.__check_deleted()
553

    
554
    # We cannot acquire the lock if we already have it
555
    assert not self.__is_owned(), "double acquire() on a non-recursive lock"
556

    
557
    # Check whether someone else holds the lock or there are pending acquires.
558
    if not self.__pending and self.__can_acquire(shared):
559
      # Apparently not, can acquire lock directly.
560
      self.__do_acquire(shared)
561
      return True
562

    
563
    if shared:
564
      wait_condition = self.__active_shr_c
565

    
566
      # Check if we're not yet in the queue
567
      if wait_condition not in self.__pending:
568
        self.__pending.append(wait_condition)
569
    else:
570
      wait_condition = self.__condition_class(self.__lock)
571
      # Always add to queue
572
      self.__pending.append(wait_condition)
573

    
574
    try:
575
      # Wait until we become the topmost acquire in the queue or the timeout
576
      # expires.
577
      while not (self.__is_on_top(wait_condition) and
578
                 self.__can_acquire(shared)):
579
        # Wait for notification
580
        wait_condition.wait(timeout)
581
        self.__check_deleted()
582

    
583
        # A lot of code assumes blocking acquires always succeed. Loop
584
        # internally for that case.
585
        if timeout is not None:
586
          break
587

    
588
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
589
        self.__do_acquire(shared)
590
        return True
591
    finally:
592
      # Remove condition from queue if there are no more waiters
593
      if not wait_condition.has_waiting() and not self.__deleted:
594
        self.__pending.remove(wait_condition)
595

    
596
    return False
597

    
598
  def acquire(self, shared=0, timeout=None, test_notify=None):
599
    """Acquire a shared lock.
600

601
    @type shared: int
602
    @param shared: whether to acquire in shared mode; by default an
603
        exclusive lock will be acquired
604
    @type timeout: float
605
    @param timeout: maximum waiting time before giving up
606
    @type test_notify: callable or None
607
    @param test_notify: Special callback function for unittesting
608

609
    """
610
    self.__lock.acquire()
611
    try:
612
      # We already got the lock, notify now
613
      if __debug__ and callable(test_notify):
614
        test_notify()
615

    
616
      return self.__acquire_unlocked(shared, timeout)
617
    finally:
618
      self.__lock.release()
619

    
620
  def release(self):
621
    """Release a Shared Lock.
622

623
    You must have acquired the lock, either in shared or in exclusive mode,
624
    before calling this function.
625

626
    """
627
    self.__lock.acquire()
628
    try:
629
      assert self.__is_exclusive() or self.__is_sharer(), \
630
        "Cannot release non-owned lock"
631

    
632
      # Autodetect release type
633
      if self.__is_exclusive():
634
        self.__exc = None
635
      else:
636
        self.__shr.remove(threading.currentThread())
637

    
638
      # Notify topmost condition in queue
639
      if self.__pending:
640
        first_condition = self.__pending[0]
641
        first_condition.notifyAll()
642

    
643
        if first_condition == self.__active_shr_c:
644
          self.__active_shr_c = self.__inactive_shr_c
645
          self.__inactive_shr_c = first_condition
646

    
647
    finally:
648
      self.__lock.release()
649

    
650
  def delete(self, timeout=None):
651
    """Delete a Shared Lock.
652

653
    This operation will declare the lock for removal. First the lock will be
654
    acquired in exclusive mode if you don't already own it, then the lock
655
    will be put in a state where any future and pending acquire() fail.
656

657
    @type timeout: float
658
    @param timeout: maximum waiting time before giving up
659

660
    """
661
    self.__lock.acquire()
662
    try:
663
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
664

    
665
      self.__check_deleted()
666

    
667
      # The caller is allowed to hold the lock exclusively already.
668
      acquired = self.__is_exclusive()
669

    
670
      if not acquired:
671
        acquired = self.__acquire_unlocked(0, timeout)
672

    
673
        assert self.__is_exclusive() and not self.__is_sharer(), \
674
          "Lock wasn't acquired in exclusive mode"
675

    
676
      if acquired:
677
        self.__deleted = True
678
        self.__exc = None
679

    
680
        # Notify all acquires. They'll throw an error.
681
        while self.__pending:
682
          self.__pending.pop().notifyAll()
683

    
684
      return acquired
685
    finally:
686
      self.__lock.release()
687

    
688

    
689
# Whenever we want to acquire a full LockSet we pass None as the value
690
# to acquire.  Hide this behind this nicely named constant.
691
ALL_SET = None
692

    
693

    
694
class _AcquireTimeout(Exception):
695
  """Internal exception to abort an acquire on a timeout.
696

697
  """
698

    
699

    
700
class LockSet:
701
  """Implements a set of locks.
702

703
  This abstraction implements a set of shared locks for the same resource type,
704
  distinguished by name. The user can lock a subset of the resources and the
705
  LockSet will take care of acquiring the locks always in the same order, thus
706
  preventing deadlock.
707

708
  All the locks needed in the same set must be acquired together, though.
709

710
  """
711
  def __init__(self, members=None):
712
    """Constructs a new LockSet.
713

714
    @param members: initial members of the set
715

716
    """
717
    # Used internally to guarantee coherency.
718
    self.__lock = SharedLock()
719

    
720
    # The lockdict indexes the relationship name -> lock
721
    # The order-of-locking is implied by the alphabetical order of names
722
    self.__lockdict = {}
723

    
724
    if members is not None:
725
      for name in members:
726
        self.__lockdict[name] = SharedLock()
727

    
728
    # The owner dict contains the set of locks each thread owns. For
729
    # performance each thread can access its own key without a global lock on
730
    # this structure. It is paramount though that *no* other type of access is
731
    # done to this structure (eg. no looping over its keys). *_owner helper
732
    # function are defined to guarantee access is correct, but in general never
733
    # do anything different than __owners[threading.currentThread()], or there
734
    # will be trouble.
735
    self.__owners = {}
736

    
737
  def _is_owned(self):
738
    """Is the current thread a current level owner?"""
739
    return threading.currentThread() in self.__owners
740

    
741
  def _add_owned(self, name=None):
742
    """Note the current thread owns the given lock"""
743
    if name is None:
744
      if not self._is_owned():
745
        self.__owners[threading.currentThread()] = set()
746
    else:
747
      if self._is_owned():
748
        self.__owners[threading.currentThread()].add(name)
749
      else:
750
        self.__owners[threading.currentThread()] = set([name])
751

    
752
  def _del_owned(self, name=None):
753
    """Note the current thread owns the given lock"""
754

    
755
    assert not (name is None and self.__lock._is_owned()), \
756
           "Cannot hold internal lock when deleting owner status"
757

    
758
    if name is not None:
759
      self.__owners[threading.currentThread()].remove(name)
760

    
761
    # Only remove the key if we don't hold the set-lock as well
762
    if (not self.__lock._is_owned() and
763
        not self.__owners[threading.currentThread()]):
764
      del self.__owners[threading.currentThread()]
765

    
766
  def _list_owned(self):
767
    """Get the set of resource names owned by the current thread"""
768
    if self._is_owned():
769
      return self.__owners[threading.currentThread()].copy()
770
    else:
771
      return set()
772

    
773
  def _release_and_delete_owned(self):
774
    """Release and delete all resources owned by the current thread"""
775
    for lname in self._list_owned():
776
      self.__lockdict[lname].release()
777
      self._del_owned(name=lname)
778

    
779
  def __names(self):
780
    """Return the current set of names.
781

782
    Only call this function while holding __lock and don't iterate on the
783
    result after releasing the lock.
784

785
    """
786
    return self.__lockdict.keys()
787

    
788
  def _names(self):
789
    """Return a copy of the current set of elements.
790

791
    Used only for debugging purposes.
792

793
    """
794
    # If we don't already own the set-level lock acquired
795
    # we'll get it and note we need to release it later.
796
    release_lock = False
797
    if not self.__lock._is_owned():
798
      release_lock = True
799
      self.__lock.acquire(shared=1)
800
    try:
801
      result = self.__names()
802
    finally:
803
      if release_lock:
804
        self.__lock.release()
805
    return set(result)
806

    
807
  def acquire(self, names, timeout=None, shared=0, test_notify=None):
808
    """Acquire a set of resource locks.
809

810
    @param names: the names of the locks which shall be acquired
811
        (special lock names, or instance/node names)
812
    @param shared: whether to acquire in shared mode; by default an
813
        exclusive lock will be acquired
814
    @type timeout: float or None
815
    @param timeout: Maximum time to acquire all locks
816
    @type test_notify: callable or None
817
    @param test_notify: Special callback function for unittesting
818

819
    @return: Set of all locks successfully acquired or None in case of timeout
820

821
    @raise errors.LockError: when any lock we try to acquire has
822
        been deleted before we succeed. In this case none of the
823
        locks requested will be acquired.
824

825
    """
826
    assert timeout is None or timeout >= 0.0
827

    
828
    # Check we don't already own locks at this level
829
    assert not self._is_owned(), "Cannot acquire locks in the same set twice"
830

    
831
    # We need to keep track of how long we spent waiting for a lock. The
832
    # timeout passed to this function is over all lock acquires.
833
    running_timeout = RunningTimeout(timeout, False)
834

    
835
    try:
836
      if names is not None:
837
        # Support passing in a single resource to acquire rather than many
838
        if isinstance(names, basestring):
839
          names = [names]
840
        else:
841
          names = sorted(names)
842

    
843
        return self.__acquire_inner(names, False, shared,
844
                                    running_timeout.Remaining, test_notify)
845

    
846
      else:
847
        # If no names are given acquire the whole set by not letting new names
848
        # being added before we release, and getting the current list of names.
849
        # Some of them may then be deleted later, but we'll cope with this.
850
        #
851
        # We'd like to acquire this lock in a shared way, as it's nice if
852
        # everybody else can use the instances at the same time. If are
853
        # acquiring them exclusively though they won't be able to do this
854
        # anyway, though, so we'll get the list lock exclusively as well in
855
        # order to be able to do add() on the set while owning it.
856
        if not self.__lock.acquire(shared=shared,
857
                                   timeout=running_timeout.Remaining()):
858
          raise _AcquireTimeout()
859
        try:
860
          # note we own the set-lock
861
          self._add_owned()
862

    
863
          return self.__acquire_inner(self.__names(), True, shared,
864
                                      running_timeout.Remaining, test_notify)
865
        except:
866
          # We shouldn't have problems adding the lock to the owners list, but
867
          # if we did we'll try to release this lock and re-raise exception.
868
          # Of course something is going to be really wrong, after this.
869
          self.__lock.release()
870
          self._del_owned()
871
          raise
872

    
873
    except _AcquireTimeout:
874
      return None
875

    
876
  def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
877
    """Inner logic for acquiring a number of locks.
878

879
    @param names: Names of the locks to be acquired
880
    @param want_all: Whether all locks in the set should be acquired
881
    @param shared: Whether to acquire in shared mode
882
    @param timeout_fn: Function returning remaining timeout
883
    @param test_notify: Special callback function for unittesting
884

885
    """
886
    acquire_list = []
887

    
888
    # First we look the locks up on __lockdict. We have no way of being sure
889
    # they will still be there after, but this makes it a lot faster should
890
    # just one of them be the already wrong
891
    for lname in utils.UniqueSequence(names):
892
      try:
893
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
894
        acquire_list.append((lname, lock))
895
      except KeyError:
896
        if want_all:
897
          # We are acquiring all the set, it doesn't matter if this particular
898
          # element is not there anymore.
899
          continue
900

    
901
        raise errors.LockError("Non-existing lock in set (%s)" % lname)
902

    
903
    # This will hold the locknames we effectively acquired.
904
    acquired = set()
905

    
906
    try:
907
      # Now acquire_list contains a sorted list of resources and locks we
908
      # want.  In order to get them we loop on this (private) list and
909
      # acquire() them.  We gave no real guarantee they will still exist till
910
      # this is done but .acquire() itself is safe and will alert us if the
911
      # lock gets deleted.
912
      for (lname, lock) in acquire_list:
913
        if __debug__ and callable(test_notify):
914
          test_notify_fn = lambda: test_notify(lname)
915
        else:
916
          test_notify_fn = None
917

    
918
        timeout = timeout_fn()
919

    
920
        try:
921
          # raises LockError if the lock was deleted
922
          acq_success = lock.acquire(shared=shared, timeout=timeout,
923
                                     test_notify=test_notify_fn)
924
        except errors.LockError:
925
          if want_all:
926
            # We are acquiring all the set, it doesn't matter if this
927
            # particular element is not there anymore.
928
            continue
929

    
930
          raise errors.LockError("Non-existing lock in set (%s)" % lname)
931

    
932
        if not acq_success:
933
          # Couldn't get lock or timeout occurred
934
          if timeout is None:
935
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
936
            # blocking.
937
            raise errors.LockError("Failed to get lock %s" % lname)
938

    
939
          raise _AcquireTimeout()
940

    
941
        try:
942
          # now the lock cannot be deleted, we have it!
943
          self._add_owned(name=lname)
944
          acquired.add(lname)
945

    
946
        except:
947
          # We shouldn't have problems adding the lock to the owners list, but
948
          # if we did we'll try to release this lock and re-raise exception.
949
          # Of course something is going to be really wrong after this.
950
          if lock._is_owned():
951
            lock.release()
952
          raise
953

    
954
    except:
955
      # Release all owned locks
956
      self._release_and_delete_owned()
957
      raise
958

    
959
    return acquired
960

    
961
  def release(self, names=None):
962
    """Release a set of resource locks, at the same level.
963

964
    You must have acquired the locks, either in shared or in exclusive mode,
965
    before releasing them.
966

967
    @param names: the names of the locks which shall be released
968
        (defaults to all the locks acquired at that level).
969

970
    """
971
    assert self._is_owned(), "release() on lock set while not owner"
972

    
973
    # Support passing in a single resource to release rather than many
974
    if isinstance(names, basestring):
975
      names = [names]
976

    
977
    if names is None:
978
      names = self._list_owned()
979
    else:
980
      names = set(names)
981
      assert self._list_owned().issuperset(names), (
982
               "release() on unheld resources %s" %
983
               names.difference(self._list_owned()))
984

    
985
    # First of all let's release the "all elements" lock, if set.
986
    # After this 'add' can work again
987
    if self.__lock._is_owned():
988
      self.__lock.release()
989
      self._del_owned()
990

    
991
    for lockname in names:
992
      # If we are sure the lock doesn't leave __lockdict without being
993
      # exclusively held we can do this...
994
      self.__lockdict[lockname].release()
995
      self._del_owned(name=lockname)
996

    
997
  def add(self, names, acquired=0, shared=0):
998
    """Add a new set of elements to the set
999

1000
    @param names: names of the new elements to add
1001
    @param acquired: pre-acquire the new resource?
1002
    @param shared: is the pre-acquisition shared?
1003

1004
    """
1005
    # Check we don't already own locks at this level
1006
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1007
      "Cannot add locks if the set is only partially owned, or shared"
1008

    
1009
    # Support passing in a single resource to add rather than many
1010
    if isinstance(names, basestring):
1011
      names = [names]
1012

    
1013
    # If we don't already own the set-level lock acquired in an exclusive way
1014
    # we'll get it and note we need to release it later.
1015
    release_lock = False
1016
    if not self.__lock._is_owned():
1017
      release_lock = True
1018
      self.__lock.acquire()
1019

    
1020
    try:
1021
      invalid_names = set(self.__names()).intersection(names)
1022
      if invalid_names:
1023
        # This must be an explicit raise, not an assert, because assert is
1024
        # turned off when using optimization, and this can happen because of
1025
        # concurrency even if the user doesn't want it.
1026
        raise errors.LockError("duplicate add() (%s)" % invalid_names)
1027

    
1028
      for lockname in names:
1029
        lock = SharedLock()
1030

    
1031
        if acquired:
1032
          lock.acquire(shared=shared)
1033
          # now the lock cannot be deleted, we have it!
1034
          try:
1035
            self._add_owned(name=lockname)
1036
          except:
1037
            # We shouldn't have problems adding the lock to the owners list,
1038
            # but if we did we'll try to release this lock and re-raise
1039
            # exception.  Of course something is going to be really wrong,
1040
            # after this.  On the other hand the lock hasn't been added to the
1041
            # __lockdict yet so no other threads should be pending on it. This
1042
            # release is just a safety measure.
1043
            lock.release()
1044
            raise
1045

    
1046
        self.__lockdict[lockname] = lock
1047

    
1048
    finally:
1049
      # Only release __lock if we were not holding it previously.
1050
      if release_lock:
1051
        self.__lock.release()
1052

    
1053
    return True
1054

    
1055
  def remove(self, names):
1056
    """Remove elements from the lock set.
1057

1058
    You can either not hold anything in the lockset or already hold a superset
1059
    of the elements you want to delete, exclusively.
1060

1061
    @param names: names of the resource to remove.
1062

1063
    @return: a list of locks which we removed; the list is always
1064
        equal to the names list if we were holding all the locks
1065
        exclusively
1066

1067
    """
1068
    # Support passing in a single resource to remove rather than many
1069
    if isinstance(names, basestring):
1070
      names = [names]
1071

    
1072
    # If we own any subset of this lock it must be a superset of what we want
1073
    # to delete. The ownership must also be exclusive, but that will be checked
1074
    # by the lock itself.
1075
    assert not self._is_owned() or self._list_owned().issuperset(names), (
1076
      "remove() on acquired lockset while not owning all elements")
1077

    
1078
    removed = []
1079

    
1080
    for lname in names:
1081
      # Calling delete() acquires the lock exclusively if we don't already own
1082
      # it, and causes all pending and subsequent lock acquires to fail. It's
1083
      # fine to call it out of order because delete() also implies release(),
1084
      # and the assertion above guarantees that if we either already hold
1085
      # everything we want to delete, or we hold none.
1086
      try:
1087
        self.__lockdict[lname].delete()
1088
        removed.append(lname)
1089
      except (KeyError, errors.LockError):
1090
        # This cannot happen if we were already holding it, verify:
1091
        assert not self._is_owned(), "remove failed while holding lockset"
1092
      else:
1093
        # If no LockError was raised we are the ones who deleted the lock.
1094
        # This means we can safely remove it from lockdict, as any further or
1095
        # pending delete() or acquire() will fail (and nobody can have the lock
1096
        # since before our call to delete()).
1097
        #
1098
        # This is done in an else clause because if the exception was thrown
1099
        # it's the job of the one who actually deleted it.
1100
        del self.__lockdict[lname]
1101
        # And let's remove it from our private list if we owned it.
1102
        if self._is_owned():
1103
          self._del_owned(name=lname)
1104

    
1105
    return removed
1106

    
1107

    
1108
# Locking levels, must be acquired in increasing order.
1109
# Current rules are:
1110
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1111
#   acquired before performing any operation, either in shared or in exclusive
1112
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1113
#   avoided.
1114
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1115
#   If you need more than one node, or more than one instance, acquire them at
1116
#   the same time.
1117
LEVEL_CLUSTER = 0
1118
LEVEL_INSTANCE = 1
1119
LEVEL_NODE = 2
1120

    
1121
LEVELS = [LEVEL_CLUSTER,
1122
          LEVEL_INSTANCE,
1123
          LEVEL_NODE]
1124

    
1125
# Lock levels which are modifiable
1126
LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1127

    
1128
LEVEL_NAMES = {
1129
  LEVEL_CLUSTER: "cluster",
1130
  LEVEL_INSTANCE: "instance",
1131
  LEVEL_NODE: "node",
1132
  }
1133

    
1134
# Constant for the big ganeti lock
1135
BGL = 'BGL'
1136

    
1137

    
1138
class GanetiLockManager:
1139
  """The Ganeti Locking Library
1140

1141
  The purpose of this small library is to manage locking for ganeti clusters
1142
  in a central place, while at the same time doing dynamic checks against
1143
  possible deadlocks. It will also make it easier to transition to a different
1144
  lock type should we migrate away from python threads.
1145

1146
  """
1147
  _instance = None
1148

    
1149
  def __init__(self, nodes=None, instances=None):
1150
    """Constructs a new GanetiLockManager object.
1151

1152
    There should be only a GanetiLockManager object at any time, so this
1153
    function raises an error if this is not the case.
1154

1155
    @param nodes: list of node names
1156
    @param instances: list of instance names
1157

1158
    """
1159
    assert self.__class__._instance is None, \
1160
           "double GanetiLockManager instance"
1161

    
1162
    self.__class__._instance = self
1163

    
1164
    # The keyring contains all the locks, at their level and in the correct
1165
    # locking order.
1166
    self.__keyring = {
1167
      LEVEL_CLUSTER: LockSet([BGL]),
1168
      LEVEL_NODE: LockSet(nodes),
1169
      LEVEL_INSTANCE: LockSet(instances),
1170
    }
1171

    
1172
  def _names(self, level):
1173
    """List the lock names at the given level.
1174

1175
    This can be used for debugging/testing purposes.
1176

1177
    @param level: the level whose list of locks to get
1178

1179
    """
1180
    assert level in LEVELS, "Invalid locking level %s" % level
1181
    return self.__keyring[level]._names()
1182

    
1183
  def _is_owned(self, level):
1184
    """Check whether we are owning locks at the given level
1185

1186
    """
1187
    return self.__keyring[level]._is_owned()
1188

    
1189
  is_owned = _is_owned
1190

    
1191
  def _list_owned(self, level):
1192
    """Get the set of owned locks at the given level
1193

1194
    """
1195
    return self.__keyring[level]._list_owned()
1196

    
1197
  def _upper_owned(self, level):
1198
    """Check that we don't own any lock at a level greater than the given one.
1199

1200
    """
1201
    # This way of checking only works if LEVELS[i] = i, which we check for in
1202
    # the test cases.
1203
    return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1204

    
1205
  def _BGL_owned(self):
1206
    """Check if the current thread owns the BGL.
1207

1208
    Both an exclusive or a shared acquisition work.
1209

1210
    """
1211
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1212

    
1213
  def _contains_BGL(self, level, names):
1214
    """Check if the level contains the BGL.
1215

1216
    Check if acting on the given level and set of names will change
1217
    the status of the Big Ganeti Lock.
1218

1219
    """
1220
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1221

    
1222
  def acquire(self, level, names, timeout=None, shared=0):
1223
    """Acquire a set of resource locks, at the same level.
1224

1225
    @param level: the level at which the locks shall be acquired;
1226
        it must be a member of LEVELS.
1227
    @param names: the names of the locks which shall be acquired
1228
        (special lock names, or instance/node names)
1229
    @param shared: whether to acquire in shared mode; by default
1230
        an exclusive lock will be acquired
1231
    @type timeout: float
1232
    @param timeout: Maximum time to acquire all locks
1233

1234
    """
1235
    assert level in LEVELS, "Invalid locking level %s" % level
1236

    
1237
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1238
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1239
    # so even if we've migrated we need to at least share the BGL to be
1240
    # compatible with them. Of course if we own the BGL exclusively there's no
1241
    # point in acquiring any other lock, unless perhaps we are half way through
1242
    # the migration of the current opcode.
1243
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1244
            "You must own the Big Ganeti Lock before acquiring any other")
1245

    
1246
    # Check we don't own locks at the same or upper levels.
1247
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1248
           " while owning some at a greater one")
1249

    
1250
    # Acquire the locks in the set.
1251
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1252

    
1253
  def release(self, level, names=None):
1254
    """Release a set of resource locks, at the same level.
1255

1256
    You must have acquired the locks, either in shared or in exclusive
1257
    mode, before releasing them.
1258

1259
    @param level: the level at which the locks shall be released;
1260
        it must be a member of LEVELS
1261
    @param names: the names of the locks which shall be released
1262
        (defaults to all the locks acquired at that level)
1263

1264
    """
1265
    assert level in LEVELS, "Invalid locking level %s" % level
1266
    assert (not self._contains_BGL(level, names) or
1267
            not self._upper_owned(LEVEL_CLUSTER)), (
1268
            "Cannot release the Big Ganeti Lock while holding something"
1269
            " at upper levels (%r)" %
1270
            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1271
                              for i in self.__keyring.keys()]), ))
1272

    
1273
    # Release will complain if we don't own the locks already
1274
    return self.__keyring[level].release(names)
1275

    
1276
  def add(self, level, names, acquired=0, shared=0):
1277
    """Add locks at the specified level.
1278

1279
    @param level: the level at which the locks shall be added;
1280
        it must be a member of LEVELS_MOD.
1281
    @param names: names of the locks to acquire
1282
    @param acquired: whether to acquire the newly added locks
1283
    @param shared: whether the acquisition will be shared
1284

1285
    """
1286
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1287
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1288
           " operations")
1289
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1290
           " while owning some at a greater one")
1291
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1292

    
1293
  def remove(self, level, names):
1294
    """Remove locks from the specified level.
1295

1296
    You must either already own the locks you are trying to remove
1297
    exclusively or not own any lock at an upper level.
1298

1299
    @param level: the level at which the locks shall be removed;
1300
        it must be a member of LEVELS_MOD
1301
    @param names: the names of the locks which shall be removed
1302
        (special lock names, or instance/node names)
1303

1304
    """
1305
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1306
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1307
           " operations")
1308
    # Check we either own the level or don't own anything from here
1309
    # up. LockSet.remove() will check the case in which we don't own
1310
    # all the needed resources, or we have a shared ownership.
1311
    assert self._is_owned(level) or not self._upper_owned(level), (
1312
           "Cannot remove locks at a level while not owning it or"
1313
           " owning some at a greater one")
1314
    return self.__keyring[level].remove(names)