Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ cea881e5

History | View | Annotate | Download (40.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable-msg=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import time
32
import errno
33

    
34
from ganeti import errors
35
from ganeti import utils
36
from ganeti import compat
37

    
38

    
39
def ssynchronized(lock, shared=0):
40
  """Shared Synchronization decorator.
41

42
  Calls the function holding the given lock, either in exclusive or shared
43
  mode. It requires the passed lock to be a SharedLock (or support its
44
  semantics).
45

46
  """
47
  def wrap(fn):
48
    def sync_function(*args, **kwargs):
49
      lock.acquire(shared=shared)
50
      try:
51
        return fn(*args, **kwargs)
52
      finally:
53
        lock.release()
54
    return sync_function
55
  return wrap
56

    
57

    
58
class RunningTimeout(object):
59
  """Class to calculate remaining timeout when doing several operations.
60

61
  """
62
  __slots__ = [
63
    "_allow_negative",
64
    "_start_time",
65
    "_time_fn",
66
    "_timeout",
67
    ]
68

    
69
  def __init__(self, timeout, allow_negative, _time_fn=time.time):
70
    """Initializes this class.
71

72
    @type timeout: float
73
    @param timeout: Timeout duration
74
    @type allow_negative: bool
75
    @param allow_negative: Whether to return values below zero
76
    @param _time_fn: Time function for unittests
77

78
    """
79
    object.__init__(self)
80

    
81
    if timeout is not None and timeout < 0.0:
82
      raise ValueError("Timeout must not be negative")
83

    
84
    self._timeout = timeout
85
    self._allow_negative = allow_negative
86
    self._time_fn = _time_fn
87

    
88
    self._start_time = None
89

    
90
  def Remaining(self):
91
    """Returns the remaining timeout.
92

93
    """
94
    if self._timeout is None:
95
      return None
96

    
97
    # Get start time on first calculation
98
    if self._start_time is None:
99
      self._start_time = self._time_fn()
100

    
101
    # Calculate remaining time
102
    remaining_timeout = self._start_time + self._timeout - self._time_fn()
103

    
104
    if not self._allow_negative:
105
      # Ensure timeout is always >= 0
106
      return max(0.0, remaining_timeout)
107

    
108
    return remaining_timeout
109

    
110

    
111
class _SingleNotifyPipeConditionWaiter(object):
112
  """Helper class for SingleNotifyPipeCondition
113

114
  """
115
  __slots__ = [
116
    "_fd",
117
    "_poller",
118
    ]
119

    
120
  def __init__(self, poller, fd):
121
    """Constructor for _SingleNotifyPipeConditionWaiter
122

123
    @type poller: select.poll
124
    @param poller: Poller object
125
    @type fd: int
126
    @param fd: File descriptor to wait for
127

128
    """
129
    object.__init__(self)
130
    self._poller = poller
131
    self._fd = fd
132

    
133
  def __call__(self, timeout):
134
    """Wait for something to happen on the pipe.
135

136
    @type timeout: float or None
137
    @param timeout: Timeout for waiting (can be None)
138

139
    """
140
    running_timeout = RunningTimeout(timeout, True)
141

    
142
    while True:
143
      remaining_time = running_timeout.Remaining()
144

    
145
      if remaining_time is not None:
146
        if remaining_time < 0.0:
147
          break
148

    
149
        # Our calculation uses seconds, poll() wants milliseconds
150
        remaining_time *= 1000
151

    
152
      try:
153
        result = self._poller.poll(remaining_time)
154
      except EnvironmentError, err:
155
        if err.errno != errno.EINTR:
156
          raise
157
        result = None
158

    
159
      # Check whether we were notified
160
      if result and result[0][0] == self._fd:
161
        break
162

    
163

    
164
class _BaseCondition(object):
165
  """Base class containing common code for conditions.
166

167
  Some of this code is taken from python's threading module.
168

169
  """
170
  __slots__ = [
171
    "_lock",
172
    "acquire",
173
    "release",
174
    ]
175

    
176
  def __init__(self, lock):
177
    """Constructor for _BaseCondition.
178

179
    @type lock: threading.Lock
180
    @param lock: condition base lock
181

182
    """
183
    object.__init__(self)
184

    
185
    # Recursive locks are not supported
186
    assert not hasattr(lock, "_acquire_restore")
187
    assert not hasattr(lock, "_release_save")
188

    
189
    self._lock = lock
190

    
191
    # Export the lock's acquire() and release() methods
192
    self.acquire = lock.acquire
193
    self.release = lock.release
194

    
195
  def _is_owned(self):
196
    """Check whether lock is owned by current thread.
197

198
    """
199
    if self._lock.acquire(0):
200
      self._lock.release()
201
      return False
202

    
203
    return True
204

    
205
  def _check_owned(self):
206
    """Raise an exception if the current thread doesn't own the lock.
207

208
    """
209
    if not self._is_owned():
210
      raise RuntimeError("cannot work with un-aquired lock")
211

    
212

    
213
class SingleNotifyPipeCondition(_BaseCondition):
214
  """Condition which can only be notified once.
215

216
  This condition class uses pipes and poll, internally, to be able to wait for
217
  notification with a timeout, without resorting to polling. It is almost
218
  compatible with Python's threading.Condition, with the following differences:
219
    - notifyAll can only be called once, and no wait can happen after that
220
    - notify is not supported, only notifyAll
221

222
  """
223

    
224
  __slots__ = [
225
    "_poller",
226
    "_read_fd",
227
    "_write_fd",
228
    "_nwaiters",
229
    "_notified",
230
    ]
231

    
232
  _waiter_class = _SingleNotifyPipeConditionWaiter
233

    
234
  def __init__(self, lock):
235
    """Constructor for SingleNotifyPipeCondition
236

237
    """
238
    _BaseCondition.__init__(self, lock)
239
    self._nwaiters = 0
240
    self._notified = False
241
    self._read_fd = None
242
    self._write_fd = None
243
    self._poller = None
244

    
245
  def _check_unnotified(self):
246
    """Throws an exception if already notified.
247

248
    """
249
    if self._notified:
250
      raise RuntimeError("cannot use already notified condition")
251

    
252
  def _Cleanup(self):
253
    """Cleanup open file descriptors, if any.
254

255
    """
256
    if self._read_fd is not None:
257
      os.close(self._read_fd)
258
      self._read_fd = None
259

    
260
    if self._write_fd is not None:
261
      os.close(self._write_fd)
262
      self._write_fd = None
263
    self._poller = None
264

    
265
  def wait(self, timeout=None):
266
    """Wait for a notification.
267

268
    @type timeout: float or None
269
    @param timeout: Waiting timeout (can be None)
270

271
    """
272
    self._check_owned()
273
    self._check_unnotified()
274

    
275
    self._nwaiters += 1
276
    try:
277
      if self._poller is None:
278
        (self._read_fd, self._write_fd) = os.pipe()
279
        self._poller = select.poll()
280
        self._poller.register(self._read_fd, select.POLLHUP)
281

    
282
      wait_fn = self._waiter_class(self._poller, self._read_fd)
283
      self.release()
284
      try:
285
        # Wait for notification
286
        wait_fn(timeout)
287
      finally:
288
        # Re-acquire lock
289
        self.acquire()
290
    finally:
291
      self._nwaiters -= 1
292
      if self._nwaiters == 0:
293
        self._Cleanup()
294

    
295
  def notifyAll(self): # pylint: disable-msg=C0103
296
    """Close the writing side of the pipe to notify all waiters.
297

298
    """
299
    self._check_owned()
300
    self._check_unnotified()
301
    self._notified = True
302
    if self._write_fd is not None:
303
      os.close(self._write_fd)
304
      self._write_fd = None
305

    
306

    
307
class PipeCondition(_BaseCondition):
308
  """Group-only non-polling condition with counters.
309

310
  This condition class uses pipes and poll, internally, to be able to wait for
311
  notification with a timeout, without resorting to polling. It is almost
312
  compatible with Python's threading.Condition, but only supports notifyAll and
313
  non-recursive locks. As an additional features it's able to report whether
314
  there are any waiting threads.
315

316
  """
317
  __slots__ = [
318
    "_nwaiters",
319
    "_single_condition",
320
    ]
321

    
322
  _single_condition_class = SingleNotifyPipeCondition
323

    
324
  def __init__(self, lock):
325
    """Initializes this class.
326

327
    """
328
    _BaseCondition.__init__(self, lock)
329
    self._nwaiters = 0
330
    self._single_condition = self._single_condition_class(self._lock)
331

    
332
  def wait(self, timeout=None):
333
    """Wait for a notification.
334

335
    @type timeout: float or None
336
    @param timeout: Waiting timeout (can be None)
337

338
    """
339
    self._check_owned()
340

    
341
    # Keep local reference to the pipe. It could be replaced by another thread
342
    # notifying while we're waiting.
343
    my_condition = self._single_condition
344

    
345
    assert self._nwaiters >= 0
346
    self._nwaiters += 1
347
    try:
348
      my_condition.wait(timeout)
349
    finally:
350
      assert self._nwaiters > 0
351
      self._nwaiters -= 1
352

    
353
  def notifyAll(self): # pylint: disable-msg=C0103
354
    """Notify all currently waiting threads.
355

356
    """
357
    self._check_owned()
358
    self._single_condition.notifyAll()
359
    self._single_condition = self._single_condition_class(self._lock)
360

    
361
  def has_waiting(self):
362
    """Returns whether there are active waiters.
363

364
    """
365
    self._check_owned()
366

    
367
    return bool(self._nwaiters)
368

    
369

    
370
class _CountingCondition(object):
371
  """Wrapper for Python's built-in threading.Condition class.
372

373
  This wrapper keeps a count of active waiters. We can't access the internal
374
  "__waiters" attribute of threading.Condition because it's not thread-safe.
375

376
  """
377
  __slots__ = [
378
    "_cond",
379
    "_nwaiters",
380
    ]
381

    
382
  def __init__(self, lock):
383
    """Initializes this class.
384

385
    """
386
    object.__init__(self)
387
    self._cond = threading.Condition(lock=lock)
388
    self._nwaiters = 0
389

    
390
  def notifyAll(self): # pylint: disable-msg=C0103
391
    """Notifies the condition.
392

393
    """
394
    return self._cond.notifyAll()
395

    
396
  def wait(self, timeout=None):
397
    """Waits for the condition to be notified.
398

399
    @type timeout: float or None
400
    @param timeout: Waiting timeout (can be None)
401

402
    """
403
    assert self._nwaiters >= 0
404

    
405
    self._nwaiters += 1
406
    try:
407
      return self._cond.wait(timeout=timeout)
408
    finally:
409
      self._nwaiters -= 1
410

    
411
  def has_waiting(self):
412
    """Returns whether there are active waiters.
413

414
    """
415
    return bool(self._nwaiters)
416

    
417

    
418
class SharedLock(object):
419
  """Implements a shared lock.
420

421
  Multiple threads can acquire the lock in a shared way, calling
422
  acquire_shared().  In order to acquire the lock in an exclusive way threads
423
  can call acquire_exclusive().
424

425
  The lock prevents starvation but does not guarantee that threads will acquire
426
  the shared lock in the order they queued for it, just that they will
427
  eventually do so.
428

429
  """
430
  __slots__ = [
431
    "__active_shr_c",
432
    "__inactive_shr_c",
433
    "__deleted",
434
    "__exc",
435
    "__lock",
436
    "__pending",
437
    "__shr",
438
    ]
439

    
440
  __condition_class = PipeCondition
441

    
442
  def __init__(self):
443
    """Construct a new SharedLock.
444

445
    """
446
    object.__init__(self)
447

    
448
    # Internal lock
449
    self.__lock = threading.Lock()
450

    
451
    # Queue containing waiting acquires
452
    self.__pending = []
453

    
454
    # Active and inactive conditions for shared locks
455
    self.__active_shr_c = self.__condition_class(self.__lock)
456
    self.__inactive_shr_c = self.__condition_class(self.__lock)
457

    
458
    # Current lock holders
459
    self.__shr = set()
460
    self.__exc = None
461

    
462
    # is this lock in the deleted state?
463
    self.__deleted = False
464

    
465
  def __check_deleted(self):
466
    """Raises an exception if the lock has been deleted.
467

468
    """
469
    if self.__deleted:
470
      raise errors.LockError("Deleted lock")
471

    
472
  def __is_sharer(self):
473
    """Is the current thread sharing the lock at this time?
474

475
    """
476
    return threading.currentThread() in self.__shr
477

    
478
  def __is_exclusive(self):
479
    """Is the current thread holding the lock exclusively at this time?
480

481
    """
482
    return threading.currentThread() == self.__exc
483

    
484
  def __is_owned(self, shared=-1):
485
    """Is the current thread somehow owning the lock at this time?
486

487
    This is a private version of the function, which presumes you're holding
488
    the internal lock.
489

490
    """
491
    if shared < 0:
492
      return self.__is_sharer() or self.__is_exclusive()
493
    elif shared:
494
      return self.__is_sharer()
495
    else:
496
      return self.__is_exclusive()
497

    
498
  def _is_owned(self, shared=-1):
499
    """Is the current thread somehow owning the lock at this time?
500

501
    @param shared:
502
        - < 0: check for any type of ownership (default)
503
        - 0: check for exclusive ownership
504
        - > 0: check for shared ownership
505

506
    """
507
    self.__lock.acquire()
508
    try:
509
      return self.__is_owned(shared=shared)
510
    finally:
511
      self.__lock.release()
512

    
513
  def _count_pending(self):
514
    """Returns the number of pending acquires.
515

516
    @rtype: int
517

518
    """
519
    self.__lock.acquire()
520
    try:
521
      return len(self.__pending)
522
    finally:
523
      self.__lock.release()
524

    
525
  def __do_acquire(self, shared):
526
    """Actually acquire the lock.
527

528
    """
529
    if shared:
530
      self.__shr.add(threading.currentThread())
531
    else:
532
      self.__exc = threading.currentThread()
533

    
534
  def __can_acquire(self, shared):
535
    """Determine whether lock can be acquired.
536

537
    """
538
    if shared:
539
      return self.__exc is None
540
    else:
541
      return len(self.__shr) == 0 and self.__exc is None
542

    
543
  def __is_on_top(self, cond):
544
    """Checks whether the passed condition is on top of the queue.
545

546
    The caller must make sure the queue isn't empty.
547

548
    """
549
    return self.__pending[0] == cond
550

    
551
  def __acquire_unlocked(self, shared, timeout):
552
    """Acquire a shared lock.
553

554
    @param shared: whether to acquire in shared mode; by default an
555
        exclusive lock will be acquired
556
    @param timeout: maximum waiting time before giving up
557

558
    """
559
    self.__check_deleted()
560

    
561
    # We cannot acquire the lock if we already have it
562
    assert not self.__is_owned(), "double acquire() on a non-recursive lock"
563

    
564
    # Check whether someone else holds the lock or there are pending acquires.
565
    if not self.__pending and self.__can_acquire(shared):
566
      # Apparently not, can acquire lock directly.
567
      self.__do_acquire(shared)
568
      return True
569

    
570
    if shared:
571
      wait_condition = self.__active_shr_c
572

    
573
      # Check if we're not yet in the queue
574
      if wait_condition not in self.__pending:
575
        self.__pending.append(wait_condition)
576
    else:
577
      wait_condition = self.__condition_class(self.__lock)
578
      # Always add to queue
579
      self.__pending.append(wait_condition)
580

    
581
    try:
582
      # Wait until we become the topmost acquire in the queue or the timeout
583
      # expires.
584
      while not (self.__is_on_top(wait_condition) and
585
                 self.__can_acquire(shared)):
586
        # Wait for notification
587
        wait_condition.wait(timeout)
588
        self.__check_deleted()
589

    
590
        # A lot of code assumes blocking acquires always succeed. Loop
591
        # internally for that case.
592
        if timeout is not None:
593
          break
594

    
595
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
596
        self.__do_acquire(shared)
597
        return True
598
    finally:
599
      # Remove condition from queue if there are no more waiters
600
      if not wait_condition.has_waiting() and not self.__deleted:
601
        self.__pending.remove(wait_condition)
602

    
603
    return False
604

    
605
  def acquire(self, shared=0, timeout=None, test_notify=None):
606
    """Acquire a shared lock.
607

608
    @type shared: integer (0/1) used as a boolean
609
    @param shared: whether to acquire in shared mode; by default an
610
        exclusive lock will be acquired
611
    @type timeout: float
612
    @param timeout: maximum waiting time before giving up
613
    @type test_notify: callable or None
614
    @param test_notify: Special callback function for unittesting
615

616
    """
617
    self.__lock.acquire()
618
    try:
619
      # We already got the lock, notify now
620
      if __debug__ and callable(test_notify):
621
        test_notify()
622

    
623
      return self.__acquire_unlocked(shared, timeout)
624
    finally:
625
      self.__lock.release()
626

    
627
  def release(self):
628
    """Release a Shared Lock.
629

630
    You must have acquired the lock, either in shared or in exclusive mode,
631
    before calling this function.
632

633
    """
634
    self.__lock.acquire()
635
    try:
636
      assert self.__is_exclusive() or self.__is_sharer(), \
637
        "Cannot release non-owned lock"
638

    
639
      # Autodetect release type
640
      if self.__is_exclusive():
641
        self.__exc = None
642
      else:
643
        self.__shr.remove(threading.currentThread())
644

    
645
      # Notify topmost condition in queue
646
      if self.__pending:
647
        first_condition = self.__pending[0]
648
        first_condition.notifyAll()
649

    
650
        if first_condition == self.__active_shr_c:
651
          self.__active_shr_c = self.__inactive_shr_c
652
          self.__inactive_shr_c = first_condition
653

    
654
    finally:
655
      self.__lock.release()
656

    
657
  def delete(self, timeout=None):
658
    """Delete a Shared Lock.
659

660
    This operation will declare the lock for removal. First the lock will be
661
    acquired in exclusive mode if you don't already own it, then the lock
662
    will be put in a state where any future and pending acquire() fail.
663

664
    @type timeout: float
665
    @param timeout: maximum waiting time before giving up
666

667
    """
668
    self.__lock.acquire()
669
    try:
670
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
671

    
672
      self.__check_deleted()
673

    
674
      # The caller is allowed to hold the lock exclusively already.
675
      acquired = self.__is_exclusive()
676

    
677
      if not acquired:
678
        acquired = self.__acquire_unlocked(0, timeout)
679

    
680
        assert self.__is_exclusive() and not self.__is_sharer(), \
681
          "Lock wasn't acquired in exclusive mode"
682

    
683
      if acquired:
684
        self.__deleted = True
685
        self.__exc = None
686

    
687
        # Notify all acquires. They'll throw an error.
688
        while self.__pending:
689
          self.__pending.pop().notifyAll()
690

    
691
      return acquired
692
    finally:
693
      self.__lock.release()
694

    
695

    
696
# Whenever we want to acquire a full LockSet we pass None as the value
697
# to acquire.  Hide this behind this nicely named constant.
698
ALL_SET = None
699

    
700

    
701
class _AcquireTimeout(Exception):
702
  """Internal exception to abort an acquire on a timeout.
703

704
  """
705

    
706

    
707
class LockSet:
708
  """Implements a set of locks.
709

710
  This abstraction implements a set of shared locks for the same resource type,
711
  distinguished by name. The user can lock a subset of the resources and the
712
  LockSet will take care of acquiring the locks always in the same order, thus
713
  preventing deadlock.
714

715
  All the locks needed in the same set must be acquired together, though.
716

717
  """
718
  def __init__(self, members=None):
719
    """Constructs a new LockSet.
720

721
    @type members: list of strings
722
    @param members: initial members of the set
723

724
    """
725
    # Used internally to guarantee coherency.
726
    self.__lock = SharedLock()
727

    
728
    # The lockdict indexes the relationship name -> lock
729
    # The order-of-locking is implied by the alphabetical order of names
730
    self.__lockdict = {}
731

    
732
    if members is not None:
733
      for name in members:
734
        self.__lockdict[name] = SharedLock()
735

    
736
    # The owner dict contains the set of locks each thread owns. For
737
    # performance each thread can access its own key without a global lock on
738
    # this structure. It is paramount though that *no* other type of access is
739
    # done to this structure (eg. no looping over its keys). *_owner helper
740
    # function are defined to guarantee access is correct, but in general never
741
    # do anything different than __owners[threading.currentThread()], or there
742
    # will be trouble.
743
    self.__owners = {}
744

    
745
  def _is_owned(self):
746
    """Is the current thread a current level owner?"""
747
    return threading.currentThread() in self.__owners
748

    
749
  def _add_owned(self, name=None):
750
    """Note the current thread owns the given lock"""
751
    if name is None:
752
      if not self._is_owned():
753
        self.__owners[threading.currentThread()] = set()
754
    else:
755
      if self._is_owned():
756
        self.__owners[threading.currentThread()].add(name)
757
      else:
758
        self.__owners[threading.currentThread()] = set([name])
759

    
760
  def _del_owned(self, name=None):
761
    """Note the current thread owns the given lock"""
762

    
763
    assert not (name is None and self.__lock._is_owned()), \
764
           "Cannot hold internal lock when deleting owner status"
765

    
766
    if name is not None:
767
      self.__owners[threading.currentThread()].remove(name)
768

    
769
    # Only remove the key if we don't hold the set-lock as well
770
    if (not self.__lock._is_owned() and
771
        not self.__owners[threading.currentThread()]):
772
      del self.__owners[threading.currentThread()]
773

    
774
  def _list_owned(self):
775
    """Get the set of resource names owned by the current thread"""
776
    if self._is_owned():
777
      return self.__owners[threading.currentThread()].copy()
778
    else:
779
      return set()
780

    
781
  def _release_and_delete_owned(self):
782
    """Release and delete all resources owned by the current thread"""
783
    for lname in self._list_owned():
784
      lock = self.__lockdict[lname]
785
      if lock._is_owned():
786
        lock.release()
787
      self._del_owned(name=lname)
788

    
789
  def __names(self):
790
    """Return the current set of names.
791

792
    Only call this function while holding __lock and don't iterate on the
793
    result after releasing the lock.
794

795
    """
796
    return self.__lockdict.keys()
797

    
798
  def _names(self):
799
    """Return a copy of the current set of elements.
800

801
    Used only for debugging purposes.
802

803
    """
804
    # If we don't already own the set-level lock acquired
805
    # we'll get it and note we need to release it later.
806
    release_lock = False
807
    if not self.__lock._is_owned():
808
      release_lock = True
809
      self.__lock.acquire(shared=1)
810
    try:
811
      result = self.__names()
812
    finally:
813
      if release_lock:
814
        self.__lock.release()
815
    return set(result)
816

    
817
  def acquire(self, names, timeout=None, shared=0, test_notify=None):
818
    """Acquire a set of resource locks.
819

820
    @type names: list of strings (or string)
821
    @param names: the names of the locks which shall be acquired
822
        (special lock names, or instance/node names)
823
    @type shared: integer (0/1) used as a boolean
824
    @param shared: whether to acquire in shared mode; by default an
825
        exclusive lock will be acquired
826
    @type timeout: float or None
827
    @param timeout: Maximum time to acquire all locks
828
    @type test_notify: callable or None
829
    @param test_notify: Special callback function for unittesting
830

831
    @return: Set of all locks successfully acquired or None in case of timeout
832

833
    @raise errors.LockError: when any lock we try to acquire has
834
        been deleted before we succeed. In this case none of the
835
        locks requested will be acquired.
836

837
    """
838
    assert timeout is None or timeout >= 0.0
839

    
840
    # Check we don't already own locks at this level
841
    assert not self._is_owned(), "Cannot acquire locks in the same set twice"
842

    
843
    # We need to keep track of how long we spent waiting for a lock. The
844
    # timeout passed to this function is over all lock acquires.
845
    running_timeout = RunningTimeout(timeout, False)
846

    
847
    try:
848
      if names is not None:
849
        # Support passing in a single resource to acquire rather than many
850
        if isinstance(names, basestring):
851
          names = [names]
852

    
853
        return self.__acquire_inner(names, False, shared,
854
                                    running_timeout.Remaining, test_notify)
855

    
856
      else:
857
        # If no names are given acquire the whole set by not letting new names
858
        # being added before we release, and getting the current list of names.
859
        # Some of them may then be deleted later, but we'll cope with this.
860
        #
861
        # We'd like to acquire this lock in a shared way, as it's nice if
862
        # everybody else can use the instances at the same time. If are
863
        # acquiring them exclusively though they won't be able to do this
864
        # anyway, though, so we'll get the list lock exclusively as well in
865
        # order to be able to do add() on the set while owning it.
866
        if not self.__lock.acquire(shared=shared,
867
                                   timeout=running_timeout.Remaining()):
868
          raise _AcquireTimeout()
869
        try:
870
          # note we own the set-lock
871
          self._add_owned()
872

    
873
          return self.__acquire_inner(self.__names(), True, shared,
874
                                      running_timeout.Remaining, test_notify)
875
        except:
876
          # We shouldn't have problems adding the lock to the owners list, but
877
          # if we did we'll try to release this lock and re-raise exception.
878
          # Of course something is going to be really wrong, after this.
879
          self.__lock.release()
880
          self._del_owned()
881
          raise
882

    
883
    except _AcquireTimeout:
884
      return None
885

    
886
  def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
887
    """Inner logic for acquiring a number of locks.
888

889
    @param names: Names of the locks to be acquired
890
    @param want_all: Whether all locks in the set should be acquired
891
    @param shared: Whether to acquire in shared mode
892
    @param timeout_fn: Function returning remaining timeout
893
    @param test_notify: Special callback function for unittesting
894

895
    """
896
    acquire_list = []
897

    
898
    # First we look the locks up on __lockdict. We have no way of being sure
899
    # they will still be there after, but this makes it a lot faster should
900
    # just one of them be the already wrong. Using a sorted sequence to prevent
901
    # deadlocks.
902
    for lname in sorted(utils.UniqueSequence(names)):
903
      try:
904
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
905
      except KeyError:
906
        if want_all:
907
          # We are acquiring all the set, it doesn't matter if this particular
908
          # element is not there anymore.
909
          continue
910

    
911
        raise errors.LockError("Non-existing lock in set (%s)" % lname)
912

    
913
      acquire_list.append((lname, lock))
914

    
915
    # This will hold the locknames we effectively acquired.
916
    acquired = set()
917

    
918
    try:
919
      # Now acquire_list contains a sorted list of resources and locks we
920
      # want.  In order to get them we loop on this (private) list and
921
      # acquire() them.  We gave no real guarantee they will still exist till
922
      # this is done but .acquire() itself is safe and will alert us if the
923
      # lock gets deleted.
924
      for (lname, lock) in acquire_list:
925
        if __debug__ and callable(test_notify):
926
          test_notify_fn = lambda: test_notify(lname)
927
        else:
928
          test_notify_fn = None
929

    
930
        timeout = timeout_fn()
931

    
932
        try:
933
          # raises LockError if the lock was deleted
934
          acq_success = lock.acquire(shared=shared, timeout=timeout,
935
                                     test_notify=test_notify_fn)
936
        except errors.LockError:
937
          if want_all:
938
            # We are acquiring all the set, it doesn't matter if this
939
            # particular element is not there anymore.
940
            continue
941

    
942
          raise errors.LockError("Non-existing lock in set (%s)" % lname)
943

    
944
        if not acq_success:
945
          # Couldn't get lock or timeout occurred
946
          if timeout is None:
947
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
948
            # blocking.
949
            raise errors.LockError("Failed to get lock %s" % lname)
950

    
951
          raise _AcquireTimeout()
952

    
953
        try:
954
          # now the lock cannot be deleted, we have it!
955
          self._add_owned(name=lname)
956
          acquired.add(lname)
957

    
958
        except:
959
          # We shouldn't have problems adding the lock to the owners list, but
960
          # if we did we'll try to release this lock and re-raise exception.
961
          # Of course something is going to be really wrong after this.
962
          if lock._is_owned():
963
            lock.release()
964
          raise
965

    
966
    except:
967
      # Release all owned locks
968
      self._release_and_delete_owned()
969
      raise
970

    
971
    return acquired
972

    
973
  def release(self, names=None):
974
    """Release a set of resource locks, at the same level.
975

976
    You must have acquired the locks, either in shared or in exclusive mode,
977
    before releasing them.
978

979
    @type names: list of strings, or None
980
    @param names: the names of the locks which shall be released
981
        (defaults to all the locks acquired at that level).
982

983
    """
984
    assert self._is_owned(), "release() on lock set while not owner"
985

    
986
    # Support passing in a single resource to release rather than many
987
    if isinstance(names, basestring):
988
      names = [names]
989

    
990
    if names is None:
991
      names = self._list_owned()
992
    else:
993
      names = set(names)
994
      assert self._list_owned().issuperset(names), (
995
               "release() on unheld resources %s" %
996
               names.difference(self._list_owned()))
997

    
998
    # First of all let's release the "all elements" lock, if set.
999
    # After this 'add' can work again
1000
    if self.__lock._is_owned():
1001
      self.__lock.release()
1002
      self._del_owned()
1003

    
1004
    for lockname in names:
1005
      # If we are sure the lock doesn't leave __lockdict without being
1006
      # exclusively held we can do this...
1007
      self.__lockdict[lockname].release()
1008
      self._del_owned(name=lockname)
1009

    
1010
  def add(self, names, acquired=0, shared=0):
1011
    """Add a new set of elements to the set
1012

1013
    @type names: list of strings
1014
    @param names: names of the new elements to add
1015
    @type acquired: integer (0/1) used as a boolean
1016
    @param acquired: pre-acquire the new resource?
1017
    @type shared: integer (0/1) used as a boolean
1018
    @param shared: is the pre-acquisition shared?
1019

1020
    """
1021
    # Check we don't already own locks at this level
1022
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1023
      "Cannot add locks if the set is only partially owned, or shared"
1024

    
1025
    # Support passing in a single resource to add rather than many
1026
    if isinstance(names, basestring):
1027
      names = [names]
1028

    
1029
    # If we don't already own the set-level lock acquired in an exclusive way
1030
    # we'll get it and note we need to release it later.
1031
    release_lock = False
1032
    if not self.__lock._is_owned():
1033
      release_lock = True
1034
      self.__lock.acquire()
1035

    
1036
    try:
1037
      invalid_names = set(self.__names()).intersection(names)
1038
      if invalid_names:
1039
        # This must be an explicit raise, not an assert, because assert is
1040
        # turned off when using optimization, and this can happen because of
1041
        # concurrency even if the user doesn't want it.
1042
        raise errors.LockError("duplicate add() (%s)" % invalid_names)
1043

    
1044
      for lockname in names:
1045
        lock = SharedLock()
1046

    
1047
        if acquired:
1048
          lock.acquire(shared=shared)
1049
          # now the lock cannot be deleted, we have it!
1050
          try:
1051
            self._add_owned(name=lockname)
1052
          except:
1053
            # We shouldn't have problems adding the lock to the owners list,
1054
            # but if we did we'll try to release this lock and re-raise
1055
            # exception.  Of course something is going to be really wrong,
1056
            # after this.  On the other hand the lock hasn't been added to the
1057
            # __lockdict yet so no other threads should be pending on it. This
1058
            # release is just a safety measure.
1059
            lock.release()
1060
            raise
1061

    
1062
        self.__lockdict[lockname] = lock
1063

    
1064
    finally:
1065
      # Only release __lock if we were not holding it previously.
1066
      if release_lock:
1067
        self.__lock.release()
1068

    
1069
    return True
1070

    
1071
  def remove(self, names):
1072
    """Remove elements from the lock set.
1073

1074
    You can either not hold anything in the lockset or already hold a superset
1075
    of the elements you want to delete, exclusively.
1076

1077
    @type names: list of strings
1078
    @param names: names of the resource to remove.
1079

1080
    @return: a list of locks which we removed; the list is always
1081
        equal to the names list if we were holding all the locks
1082
        exclusively
1083

1084
    """
1085
    # Support passing in a single resource to remove rather than many
1086
    if isinstance(names, basestring):
1087
      names = [names]
1088

    
1089
    # If we own any subset of this lock it must be a superset of what we want
1090
    # to delete. The ownership must also be exclusive, but that will be checked
1091
    # by the lock itself.
1092
    assert not self._is_owned() or self._list_owned().issuperset(names), (
1093
      "remove() on acquired lockset while not owning all elements")
1094

    
1095
    removed = []
1096

    
1097
    for lname in names:
1098
      # Calling delete() acquires the lock exclusively if we don't already own
1099
      # it, and causes all pending and subsequent lock acquires to fail. It's
1100
      # fine to call it out of order because delete() also implies release(),
1101
      # and the assertion above guarantees that if we either already hold
1102
      # everything we want to delete, or we hold none.
1103
      try:
1104
        self.__lockdict[lname].delete()
1105
        removed.append(lname)
1106
      except (KeyError, errors.LockError):
1107
        # This cannot happen if we were already holding it, verify:
1108
        assert not self._is_owned(), "remove failed while holding lockset"
1109
      else:
1110
        # If no LockError was raised we are the ones who deleted the lock.
1111
        # This means we can safely remove it from lockdict, as any further or
1112
        # pending delete() or acquire() will fail (and nobody can have the lock
1113
        # since before our call to delete()).
1114
        #
1115
        # This is done in an else clause because if the exception was thrown
1116
        # it's the job of the one who actually deleted it.
1117
        del self.__lockdict[lname]
1118
        # And let's remove it from our private list if we owned it.
1119
        if self._is_owned():
1120
          self._del_owned(name=lname)
1121

    
1122
    return removed
1123

    
1124

    
1125
# Locking levels, must be acquired in increasing order.
1126
# Current rules are:
1127
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1128
#   acquired before performing any operation, either in shared or in exclusive
1129
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1130
#   avoided.
1131
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1132
#   If you need more than one node, or more than one instance, acquire them at
1133
#   the same time.
1134
LEVEL_CLUSTER = 0
1135
LEVEL_INSTANCE = 1
1136
LEVEL_NODE = 2
1137

    
1138
LEVELS = [LEVEL_CLUSTER,
1139
          LEVEL_INSTANCE,
1140
          LEVEL_NODE]
1141

    
1142
# Lock levels which are modifiable
1143
LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1144

    
1145
LEVEL_NAMES = {
1146
  LEVEL_CLUSTER: "cluster",
1147
  LEVEL_INSTANCE: "instance",
1148
  LEVEL_NODE: "node",
1149
  }
1150

    
1151
# Constant for the big ganeti lock
1152
BGL = 'BGL'
1153

    
1154

    
1155
class GanetiLockManager:
1156
  """The Ganeti Locking Library
1157

1158
  The purpose of this small library is to manage locking for ganeti clusters
1159
  in a central place, while at the same time doing dynamic checks against
1160
  possible deadlocks. It will also make it easier to transition to a different
1161
  lock type should we migrate away from python threads.
1162

1163
  """
1164
  _instance = None
1165

    
1166
  def __init__(self, nodes=None, instances=None):
1167
    """Constructs a new GanetiLockManager object.
1168

1169
    There should be only a GanetiLockManager object at any time, so this
1170
    function raises an error if this is not the case.
1171

1172
    @param nodes: list of node names
1173
    @param instances: list of instance names
1174

1175
    """
1176
    assert self.__class__._instance is None, \
1177
           "double GanetiLockManager instance"
1178

    
1179
    self.__class__._instance = self
1180

    
1181
    # The keyring contains all the locks, at their level and in the correct
1182
    # locking order.
1183
    self.__keyring = {
1184
      LEVEL_CLUSTER: LockSet([BGL]),
1185
      LEVEL_NODE: LockSet(nodes),
1186
      LEVEL_INSTANCE: LockSet(instances),
1187
    }
1188

    
1189
  def _names(self, level):
1190
    """List the lock names at the given level.
1191

1192
    This can be used for debugging/testing purposes.
1193

1194
    @param level: the level whose list of locks to get
1195

1196
    """
1197
    assert level in LEVELS, "Invalid locking level %s" % level
1198
    return self.__keyring[level]._names()
1199

    
1200
  def _is_owned(self, level):
1201
    """Check whether we are owning locks at the given level
1202

1203
    """
1204
    return self.__keyring[level]._is_owned()
1205

    
1206
  is_owned = _is_owned
1207

    
1208
  def _list_owned(self, level):
1209
    """Get the set of owned locks at the given level
1210

1211
    """
1212
    return self.__keyring[level]._list_owned()
1213

    
1214
  def _upper_owned(self, level):
1215
    """Check that we don't own any lock at a level greater than the given one.
1216

1217
    """
1218
    # This way of checking only works if LEVELS[i] = i, which we check for in
1219
    # the test cases.
1220
    return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1221

    
1222
  def _BGL_owned(self): # pylint: disable-msg=C0103
1223
    """Check if the current thread owns the BGL.
1224

1225
    Both an exclusive or a shared acquisition work.
1226

1227
    """
1228
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1229

    
1230
  @staticmethod
1231
  def _contains_BGL(level, names): # pylint: disable-msg=C0103
1232
    """Check if the level contains the BGL.
1233

1234
    Check if acting on the given level and set of names will change
1235
    the status of the Big Ganeti Lock.
1236

1237
    """
1238
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1239

    
1240
  def acquire(self, level, names, timeout=None, shared=0):
1241
    """Acquire a set of resource locks, at the same level.
1242

1243
    @type level: member of locking.LEVELS
1244
    @param level: the level at which the locks shall be acquired
1245
    @type names: list of strings (or string)
1246
    @param names: the names of the locks which shall be acquired
1247
        (special lock names, or instance/node names)
1248
    @type shared: integer (0/1) used as a boolean
1249
    @param shared: whether to acquire in shared mode; by default
1250
        an exclusive lock will be acquired
1251
    @type timeout: float
1252
    @param timeout: Maximum time to acquire all locks
1253

1254
    """
1255
    assert level in LEVELS, "Invalid locking level %s" % level
1256

    
1257
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1258
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1259
    # so even if we've migrated we need to at least share the BGL to be
1260
    # compatible with them. Of course if we own the BGL exclusively there's no
1261
    # point in acquiring any other lock, unless perhaps we are half way through
1262
    # the migration of the current opcode.
1263
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1264
            "You must own the Big Ganeti Lock before acquiring any other")
1265

    
1266
    # Check we don't own locks at the same or upper levels.
1267
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1268
           " while owning some at a greater one")
1269

    
1270
    # Acquire the locks in the set.
1271
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1272

    
1273
  def release(self, level, names=None):
1274
    """Release a set of resource locks, at the same level.
1275

1276
    You must have acquired the locks, either in shared or in exclusive
1277
    mode, before releasing them.
1278

1279
    @type level: member of locking.LEVELS
1280
    @param level: the level at which the locks shall be released
1281
    @type names: list of strings, or None
1282
    @param names: the names of the locks which shall be released
1283
        (defaults to all the locks acquired at that level)
1284

1285
    """
1286
    assert level in LEVELS, "Invalid locking level %s" % level
1287
    assert (not self._contains_BGL(level, names) or
1288
            not self._upper_owned(LEVEL_CLUSTER)), (
1289
            "Cannot release the Big Ganeti Lock while holding something"
1290
            " at upper levels (%r)" %
1291
            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1292
                              for i in self.__keyring.keys()]), ))
1293

    
1294
    # Release will complain if we don't own the locks already
1295
    return self.__keyring[level].release(names)
1296

    
1297
  def add(self, level, names, acquired=0, shared=0):
1298
    """Add locks at the specified level.
1299

1300
    @type level: member of locking.LEVELS_MOD
1301
    @param level: the level at which the locks shall be added
1302
    @type names: list of strings
1303
    @param names: names of the locks to acquire
1304
    @type acquired: integer (0/1) used as a boolean
1305
    @param acquired: whether to acquire the newly added locks
1306
    @type shared: integer (0/1) used as a boolean
1307
    @param shared: whether the acquisition will be shared
1308

1309
    """
1310
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1311
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1312
           " operations")
1313
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1314
           " while owning some at a greater one")
1315
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1316

    
1317
  def remove(self, level, names):
1318
    """Remove locks from the specified level.
1319

1320
    You must either already own the locks you are trying to remove
1321
    exclusively or not own any lock at an upper level.
1322

1323
    @type level: member of locking.LEVELS_MOD
1324
    @param level: the level at which the locks shall be removed
1325
    @type names: list of strings
1326
    @param names: the names of the locks which shall be removed
1327
        (special lock names, or instance/node names)
1328

1329
    """
1330
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1331
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1332
           " operations")
1333
    # Check we either own the level or don't own anything from here
1334
    # up. LockSet.remove() will check the case in which we don't own
1335
    # all the needed resources, or we have a shared ownership.
1336
    assert self._is_owned(level) or not self._upper_owned(level), (
1337
           "Cannot remove locks at a level while not owning it or"
1338
           " owning some at a greater one")
1339
    return self.__keyring[level].remove(names)