Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ c0c3fa27

History | View | Annotate | Download (40.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable-msg=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import time
32
import errno
33

    
34
from ganeti import errors
35
from ganeti import utils
36
from ganeti import compat
37

    
38

    
39
def ssynchronized(mylock, shared=0):
40
  """Shared Synchronization decorator.
41

42
  Calls the function holding the given lock, either in exclusive or shared
43
  mode. It requires the passed lock to be a SharedLock (or support its
44
  semantics).
45

46
  @type mylock: lockable object or string
47
  @param mylock: lock to acquire or class member name of the lock to acquire
48

49
  """
50
  def wrap(fn):
51
    def sync_function(*args, **kwargs):
52
      if isinstance(mylock, basestring):
53
        assert args, "cannot ssynchronize on non-class method: self not found"
54
        # args[0] is "self"
55
        lock = getattr(args[0], mylock)
56
      else:
57
        lock = mylock
58
      lock.acquire(shared=shared)
59
      try:
60
        return fn(*args, **kwargs)
61
      finally:
62
        lock.release()
63
    return sync_function
64
  return wrap
65

    
66

    
67
class RunningTimeout(object):
68
  """Class to calculate remaining timeout when doing several operations.
69

70
  """
71
  __slots__ = [
72
    "_allow_negative",
73
    "_start_time",
74
    "_time_fn",
75
    "_timeout",
76
    ]
77

    
78
  def __init__(self, timeout, allow_negative, _time_fn=time.time):
79
    """Initializes this class.
80

81
    @type timeout: float
82
    @param timeout: Timeout duration
83
    @type allow_negative: bool
84
    @param allow_negative: Whether to return values below zero
85
    @param _time_fn: Time function for unittests
86

87
    """
88
    object.__init__(self)
89

    
90
    if timeout is not None and timeout < 0.0:
91
      raise ValueError("Timeout must not be negative")
92

    
93
    self._timeout = timeout
94
    self._allow_negative = allow_negative
95
    self._time_fn = _time_fn
96

    
97
    self._start_time = None
98

    
99
  def Remaining(self):
100
    """Returns the remaining timeout.
101

102
    """
103
    if self._timeout is None:
104
      return None
105

    
106
    # Get start time on first calculation
107
    if self._start_time is None:
108
      self._start_time = self._time_fn()
109

    
110
    # Calculate remaining time
111
    remaining_timeout = self._start_time + self._timeout - self._time_fn()
112

    
113
    if not self._allow_negative:
114
      # Ensure timeout is always >= 0
115
      return max(0.0, remaining_timeout)
116

    
117
    return remaining_timeout
118

    
119

    
120
class _SingleNotifyPipeConditionWaiter(object):
121
  """Helper class for SingleNotifyPipeCondition
122

123
  """
124
  __slots__ = [
125
    "_fd",
126
    "_poller",
127
    ]
128

    
129
  def __init__(self, poller, fd):
130
    """Constructor for _SingleNotifyPipeConditionWaiter
131

132
    @type poller: select.poll
133
    @param poller: Poller object
134
    @type fd: int
135
    @param fd: File descriptor to wait for
136

137
    """
138
    object.__init__(self)
139
    self._poller = poller
140
    self._fd = fd
141

    
142
  def __call__(self, timeout):
143
    """Wait for something to happen on the pipe.
144

145
    @type timeout: float or None
146
    @param timeout: Timeout for waiting (can be None)
147

148
    """
149
    running_timeout = RunningTimeout(timeout, True)
150

    
151
    while True:
152
      remaining_time = running_timeout.Remaining()
153

    
154
      if remaining_time is not None:
155
        if remaining_time < 0.0:
156
          break
157

    
158
        # Our calculation uses seconds, poll() wants milliseconds
159
        remaining_time *= 1000
160

    
161
      try:
162
        result = self._poller.poll(remaining_time)
163
      except EnvironmentError, err:
164
        if err.errno != errno.EINTR:
165
          raise
166
        result = None
167

    
168
      # Check whether we were notified
169
      if result and result[0][0] == self._fd:
170
        break
171

    
172

    
173
class _BaseCondition(object):
174
  """Base class containing common code for conditions.
175

176
  Some of this code is taken from python's threading module.
177

178
  """
179
  __slots__ = [
180
    "_lock",
181
    "acquire",
182
    "release",
183
    "_is_owned",
184
    "_acquire_restore",
185
    "_release_save",
186
    ]
187

    
188
  def __init__(self, lock):
189
    """Constructor for _BaseCondition.
190

191
    @type lock: threading.Lock
192
    @param lock: condition base lock
193

194
    """
195
    object.__init__(self)
196

    
197
    try:
198
      self._release_save = lock._release_save
199
    except AttributeError:
200
      self._release_save = self._base_release_save
201
    try:
202
      self._acquire_restore = lock._acquire_restore
203
    except AttributeError:
204
      self._acquire_restore = self._base_acquire_restore
205
    try:
206
      self._is_owned = lock._is_owned
207
    except AttributeError:
208
      self._is_owned = self._base_is_owned
209

    
210
    self._lock = lock
211

    
212
    # Export the lock's acquire() and release() methods
213
    self.acquire = lock.acquire
214
    self.release = lock.release
215

    
216
  def _base_is_owned(self):
217
    """Check whether lock is owned by current thread.
218

219
    """
220
    if self._lock.acquire(0):
221
      self._lock.release()
222
      return False
223
    return True
224

    
225
  def _base_release_save(self):
226
    self._lock.release()
227

    
228
  def _base_acquire_restore(self, _):
229
    self._lock.acquire()
230

    
231
  def _check_owned(self):
232
    """Raise an exception if the current thread doesn't own the lock.
233

234
    """
235
    if not self._is_owned():
236
      raise RuntimeError("cannot work with un-aquired lock")
237

    
238

    
239
class SingleNotifyPipeCondition(_BaseCondition):
240
  """Condition which can only be notified once.
241

242
  This condition class uses pipes and poll, internally, to be able to wait for
243
  notification with a timeout, without resorting to polling. It is almost
244
  compatible with Python's threading.Condition, with the following differences:
245
    - notifyAll can only be called once, and no wait can happen after that
246
    - notify is not supported, only notifyAll
247

248
  """
249

    
250
  __slots__ = [
251
    "_poller",
252
    "_read_fd",
253
    "_write_fd",
254
    "_nwaiters",
255
    "_notified",
256
    ]
257

    
258
  _waiter_class = _SingleNotifyPipeConditionWaiter
259

    
260
  def __init__(self, lock):
261
    """Constructor for SingleNotifyPipeCondition
262

263
    """
264
    _BaseCondition.__init__(self, lock)
265
    self._nwaiters = 0
266
    self._notified = False
267
    self._read_fd = None
268
    self._write_fd = None
269
    self._poller = None
270

    
271
  def _check_unnotified(self):
272
    """Throws an exception if already notified.
273

274
    """
275
    if self._notified:
276
      raise RuntimeError("cannot use already notified condition")
277

    
278
  def _Cleanup(self):
279
    """Cleanup open file descriptors, if any.
280

281
    """
282
    if self._read_fd is not None:
283
      os.close(self._read_fd)
284
      self._read_fd = None
285

    
286
    if self._write_fd is not None:
287
      os.close(self._write_fd)
288
      self._write_fd = None
289
    self._poller = None
290

    
291
  def wait(self, timeout=None):
292
    """Wait for a notification.
293

294
    @type timeout: float or None
295
    @param timeout: Waiting timeout (can be None)
296

297
    """
298
    self._check_owned()
299
    self._check_unnotified()
300

    
301
    self._nwaiters += 1
302
    try:
303
      if self._poller is None:
304
        (self._read_fd, self._write_fd) = os.pipe()
305
        self._poller = select.poll()
306
        self._poller.register(self._read_fd, select.POLLHUP)
307

    
308
      wait_fn = self._waiter_class(self._poller, self._read_fd)
309
      state = self._release_save()
310
      try:
311
        # Wait for notification
312
        wait_fn(timeout)
313
      finally:
314
        # Re-acquire lock
315
        self._acquire_restore(state)
316
    finally:
317
      self._nwaiters -= 1
318
      if self._nwaiters == 0:
319
        self._Cleanup()
320

    
321
  def notifyAll(self): # pylint: disable-msg=C0103
322
    """Close the writing side of the pipe to notify all waiters.
323

324
    """
325
    self._check_owned()
326
    self._check_unnotified()
327
    self._notified = True
328
    if self._write_fd is not None:
329
      os.close(self._write_fd)
330
      self._write_fd = None
331

    
332

    
333
class PipeCondition(_BaseCondition):
334
  """Group-only non-polling condition with counters.
335

336
  This condition class uses pipes and poll, internally, to be able to wait for
337
  notification with a timeout, without resorting to polling. It is almost
338
  compatible with Python's threading.Condition, but only supports notifyAll and
339
  non-recursive locks. As an additional features it's able to report whether
340
  there are any waiting threads.
341

342
  """
343
  __slots__ = [
344
    "_nwaiters",
345
    "_single_condition",
346
    ]
347

    
348
  _single_condition_class = SingleNotifyPipeCondition
349

    
350
  def __init__(self, lock):
351
    """Initializes this class.
352

353
    """
354
    _BaseCondition.__init__(self, lock)
355
    self._nwaiters = 0
356
    self._single_condition = self._single_condition_class(self._lock)
357

    
358
  def wait(self, timeout=None):
359
    """Wait for a notification.
360

361
    @type timeout: float or None
362
    @param timeout: Waiting timeout (can be None)
363

364
    """
365
    self._check_owned()
366

    
367
    # Keep local reference to the pipe. It could be replaced by another thread
368
    # notifying while we're waiting.
369
    my_condition = self._single_condition
370

    
371
    assert self._nwaiters >= 0
372
    self._nwaiters += 1
373
    try:
374
      my_condition.wait(timeout)
375
    finally:
376
      assert self._nwaiters > 0
377
      self._nwaiters -= 1
378

    
379
  def notifyAll(self): # pylint: disable-msg=C0103
380
    """Notify all currently waiting threads.
381

382
    """
383
    self._check_owned()
384
    self._single_condition.notifyAll()
385
    self._single_condition = self._single_condition_class(self._lock)
386

    
387
  def has_waiting(self):
388
    """Returns whether there are active waiters.
389

390
    """
391
    self._check_owned()
392

    
393
    return bool(self._nwaiters)
394

    
395

    
396
class SharedLock(object):
397
  """Implements a shared lock.
398

399
  Multiple threads can acquire the lock in a shared way, calling
400
  acquire_shared().  In order to acquire the lock in an exclusive way threads
401
  can call acquire_exclusive().
402

403
  The lock prevents starvation but does not guarantee that threads will acquire
404
  the shared lock in the order they queued for it, just that they will
405
  eventually do so.
406

407
  """
408
  __slots__ = [
409
    "__active_shr_c",
410
    "__inactive_shr_c",
411
    "__deleted",
412
    "__exc",
413
    "__lock",
414
    "__pending",
415
    "__shr",
416
    ]
417

    
418
  __condition_class = PipeCondition
419

    
420
  def __init__(self):
421
    """Construct a new SharedLock.
422

423
    """
424
    object.__init__(self)
425

    
426
    # Internal lock
427
    self.__lock = threading.Lock()
428

    
429
    # Queue containing waiting acquires
430
    self.__pending = []
431

    
432
    # Active and inactive conditions for shared locks
433
    self.__active_shr_c = self.__condition_class(self.__lock)
434
    self.__inactive_shr_c = self.__condition_class(self.__lock)
435

    
436
    # Current lock holders
437
    self.__shr = set()
438
    self.__exc = None
439

    
440
    # is this lock in the deleted state?
441
    self.__deleted = False
442

    
443
  def __check_deleted(self):
444
    """Raises an exception if the lock has been deleted.
445

446
    """
447
    if self.__deleted:
448
      raise errors.LockError("Deleted lock")
449

    
450
  def __is_sharer(self):
451
    """Is the current thread sharing the lock at this time?
452

453
    """
454
    return threading.currentThread() in self.__shr
455

    
456
  def __is_exclusive(self):
457
    """Is the current thread holding the lock exclusively at this time?
458

459
    """
460
    return threading.currentThread() == self.__exc
461

    
462
  def __is_owned(self, shared=-1):
463
    """Is the current thread somehow owning the lock at this time?
464

465
    This is a private version of the function, which presumes you're holding
466
    the internal lock.
467

468
    """
469
    if shared < 0:
470
      return self.__is_sharer() or self.__is_exclusive()
471
    elif shared:
472
      return self.__is_sharer()
473
    else:
474
      return self.__is_exclusive()
475

    
476
  def _is_owned(self, shared=-1):
477
    """Is the current thread somehow owning the lock at this time?
478

479
    @param shared:
480
        - < 0: check for any type of ownership (default)
481
        - 0: check for exclusive ownership
482
        - > 0: check for shared ownership
483

484
    """
485
    self.__lock.acquire()
486
    try:
487
      return self.__is_owned(shared=shared)
488
    finally:
489
      self.__lock.release()
490

    
491
  def _count_pending(self):
492
    """Returns the number of pending acquires.
493

494
    @rtype: int
495

496
    """
497
    self.__lock.acquire()
498
    try:
499
      return len(self.__pending)
500
    finally:
501
      self.__lock.release()
502

    
503
  def __do_acquire(self, shared):
504
    """Actually acquire the lock.
505

506
    """
507
    if shared:
508
      self.__shr.add(threading.currentThread())
509
    else:
510
      self.__exc = threading.currentThread()
511

    
512
  def __can_acquire(self, shared):
513
    """Determine whether lock can be acquired.
514

515
    """
516
    if shared:
517
      return self.__exc is None
518
    else:
519
      return len(self.__shr) == 0 and self.__exc is None
520

    
521
  def __is_on_top(self, cond):
522
    """Checks whether the passed condition is on top of the queue.
523

524
    The caller must make sure the queue isn't empty.
525

526
    """
527
    return self.__pending[0] == cond
528

    
529
  def __acquire_unlocked(self, shared, timeout):
530
    """Acquire a shared lock.
531

532
    @param shared: whether to acquire in shared mode; by default an
533
        exclusive lock will be acquired
534
    @param timeout: maximum waiting time before giving up
535

536
    """
537
    self.__check_deleted()
538

    
539
    # We cannot acquire the lock if we already have it
540
    assert not self.__is_owned(), "double acquire() on a non-recursive lock"
541

    
542
    # Check whether someone else holds the lock or there are pending acquires.
543
    if not self.__pending and self.__can_acquire(shared):
544
      # Apparently not, can acquire lock directly.
545
      self.__do_acquire(shared)
546
      return True
547

    
548
    if shared:
549
      wait_condition = self.__active_shr_c
550

    
551
      # Check if we're not yet in the queue
552
      if wait_condition not in self.__pending:
553
        self.__pending.append(wait_condition)
554
    else:
555
      wait_condition = self.__condition_class(self.__lock)
556
      # Always add to queue
557
      self.__pending.append(wait_condition)
558

    
559
    try:
560
      # Wait until we become the topmost acquire in the queue or the timeout
561
      # expires.
562
      while not (self.__is_on_top(wait_condition) and
563
                 self.__can_acquire(shared)):
564
        # Wait for notification
565
        wait_condition.wait(timeout)
566
        self.__check_deleted()
567

    
568
        # A lot of code assumes blocking acquires always succeed. Loop
569
        # internally for that case.
570
        if timeout is not None:
571
          break
572

    
573
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
574
        self.__do_acquire(shared)
575
        return True
576
    finally:
577
      # Remove condition from queue if there are no more waiters
578
      if not wait_condition.has_waiting() and not self.__deleted:
579
        self.__pending.remove(wait_condition)
580

    
581
    return False
582

    
583
  def acquire(self, shared=0, timeout=None, test_notify=None):
584
    """Acquire a shared lock.
585

586
    @type shared: integer (0/1) used as a boolean
587
    @param shared: whether to acquire in shared mode; by default an
588
        exclusive lock will be acquired
589
    @type timeout: float
590
    @param timeout: maximum waiting time before giving up
591
    @type test_notify: callable or None
592
    @param test_notify: Special callback function for unittesting
593

594
    """
595
    self.__lock.acquire()
596
    try:
597
      # We already got the lock, notify now
598
      if __debug__ and callable(test_notify):
599
        test_notify()
600

    
601
      return self.__acquire_unlocked(shared, timeout)
602
    finally:
603
      self.__lock.release()
604

    
605
  def release(self):
606
    """Release a Shared Lock.
607

608
    You must have acquired the lock, either in shared or in exclusive mode,
609
    before calling this function.
610

611
    """
612
    self.__lock.acquire()
613
    try:
614
      assert self.__is_exclusive() or self.__is_sharer(), \
615
        "Cannot release non-owned lock"
616

    
617
      # Autodetect release type
618
      if self.__is_exclusive():
619
        self.__exc = None
620
      else:
621
        self.__shr.remove(threading.currentThread())
622

    
623
      # Notify topmost condition in queue
624
      if self.__pending:
625
        first_condition = self.__pending[0]
626
        first_condition.notifyAll()
627

    
628
        if first_condition == self.__active_shr_c:
629
          self.__active_shr_c = self.__inactive_shr_c
630
          self.__inactive_shr_c = first_condition
631

    
632
    finally:
633
      self.__lock.release()
634

    
635
  def delete(self, timeout=None):
636
    """Delete a Shared Lock.
637

638
    This operation will declare the lock for removal. First the lock will be
639
    acquired in exclusive mode if you don't already own it, then the lock
640
    will be put in a state where any future and pending acquire() fail.
641

642
    @type timeout: float
643
    @param timeout: maximum waiting time before giving up
644

645
    """
646
    self.__lock.acquire()
647
    try:
648
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
649

    
650
      self.__check_deleted()
651

    
652
      # The caller is allowed to hold the lock exclusively already.
653
      acquired = self.__is_exclusive()
654

    
655
      if not acquired:
656
        acquired = self.__acquire_unlocked(0, timeout)
657

    
658
        assert self.__is_exclusive() and not self.__is_sharer(), \
659
          "Lock wasn't acquired in exclusive mode"
660

    
661
      if acquired:
662
        self.__deleted = True
663
        self.__exc = None
664

    
665
        # Notify all acquires. They'll throw an error.
666
        while self.__pending:
667
          self.__pending.pop().notifyAll()
668

    
669
      return acquired
670
    finally:
671
      self.__lock.release()
672

    
673
  def _release_save(self):
674
    shared = self.__is_sharer()
675
    self.release()
676
    return shared
677

    
678
  def _acquire_restore(self, shared):
679
    self.acquire(shared=shared)
680

    
681

    
682
# Whenever we want to acquire a full LockSet we pass None as the value
683
# to acquire.  Hide this behind this nicely named constant.
684
ALL_SET = None
685

    
686

    
687
class _AcquireTimeout(Exception):
688
  """Internal exception to abort an acquire on a timeout.
689

690
  """
691

    
692

    
693
class LockSet:
694
  """Implements a set of locks.
695

696
  This abstraction implements a set of shared locks for the same resource type,
697
  distinguished by name. The user can lock a subset of the resources and the
698
  LockSet will take care of acquiring the locks always in the same order, thus
699
  preventing deadlock.
700

701
  All the locks needed in the same set must be acquired together, though.
702

703
  """
704
  def __init__(self, members=None):
705
    """Constructs a new LockSet.
706

707
    @type members: list of strings
708
    @param members: initial members of the set
709

710
    """
711
    # Used internally to guarantee coherency.
712
    self.__lock = SharedLock()
713

    
714
    # The lockdict indexes the relationship name -> lock
715
    # The order-of-locking is implied by the alphabetical order of names
716
    self.__lockdict = {}
717

    
718
    if members is not None:
719
      for name in members:
720
        self.__lockdict[name] = SharedLock()
721

    
722
    # The owner dict contains the set of locks each thread owns. For
723
    # performance each thread can access its own key without a global lock on
724
    # this structure. It is paramount though that *no* other type of access is
725
    # done to this structure (eg. no looping over its keys). *_owner helper
726
    # function are defined to guarantee access is correct, but in general never
727
    # do anything different than __owners[threading.currentThread()], or there
728
    # will be trouble.
729
    self.__owners = {}
730

    
731
  def _is_owned(self):
732
    """Is the current thread a current level owner?"""
733
    return threading.currentThread() in self.__owners
734

    
735
  def _add_owned(self, name=None):
736
    """Note the current thread owns the given lock"""
737
    if name is None:
738
      if not self._is_owned():
739
        self.__owners[threading.currentThread()] = set()
740
    else:
741
      if self._is_owned():
742
        self.__owners[threading.currentThread()].add(name)
743
      else:
744
        self.__owners[threading.currentThread()] = set([name])
745

    
746
  def _del_owned(self, name=None):
747
    """Note the current thread owns the given lock"""
748

    
749
    assert not (name is None and self.__lock._is_owned()), \
750
           "Cannot hold internal lock when deleting owner status"
751

    
752
    if name is not None:
753
      self.__owners[threading.currentThread()].remove(name)
754

    
755
    # Only remove the key if we don't hold the set-lock as well
756
    if (not self.__lock._is_owned() and
757
        not self.__owners[threading.currentThread()]):
758
      del self.__owners[threading.currentThread()]
759

    
760
  def _list_owned(self):
761
    """Get the set of resource names owned by the current thread"""
762
    if self._is_owned():
763
      return self.__owners[threading.currentThread()].copy()
764
    else:
765
      return set()
766

    
767
  def _release_and_delete_owned(self):
768
    """Release and delete all resources owned by the current thread"""
769
    for lname in self._list_owned():
770
      lock = self.__lockdict[lname]
771
      if lock._is_owned():
772
        lock.release()
773
      self._del_owned(name=lname)
774

    
775
  def __names(self):
776
    """Return the current set of names.
777

778
    Only call this function while holding __lock and don't iterate on the
779
    result after releasing the lock.
780

781
    """
782
    return self.__lockdict.keys()
783

    
784
  def _names(self):
785
    """Return a copy of the current set of elements.
786

787
    Used only for debugging purposes.
788

789
    """
790
    # If we don't already own the set-level lock acquired
791
    # we'll get it and note we need to release it later.
792
    release_lock = False
793
    if not self.__lock._is_owned():
794
      release_lock = True
795
      self.__lock.acquire(shared=1)
796
    try:
797
      result = self.__names()
798
    finally:
799
      if release_lock:
800
        self.__lock.release()
801
    return set(result)
802

    
803
  def acquire(self, names, timeout=None, shared=0, test_notify=None):
804
    """Acquire a set of resource locks.
805

806
    @type names: list of strings (or string)
807
    @param names: the names of the locks which shall be acquired
808
        (special lock names, or instance/node names)
809
    @type shared: integer (0/1) used as a boolean
810
    @param shared: whether to acquire in shared mode; by default an
811
        exclusive lock will be acquired
812
    @type timeout: float or None
813
    @param timeout: Maximum time to acquire all locks
814
    @type test_notify: callable or None
815
    @param test_notify: Special callback function for unittesting
816

817
    @return: Set of all locks successfully acquired or None in case of timeout
818

819
    @raise errors.LockError: when any lock we try to acquire has
820
        been deleted before we succeed. In this case none of the
821
        locks requested will be acquired.
822

823
    """
824
    assert timeout is None or timeout >= 0.0
825

    
826
    # Check we don't already own locks at this level
827
    assert not self._is_owned(), "Cannot acquire locks in the same set twice"
828

    
829
    # We need to keep track of how long we spent waiting for a lock. The
830
    # timeout passed to this function is over all lock acquires.
831
    running_timeout = RunningTimeout(timeout, False)
832

    
833
    try:
834
      if names is not None:
835
        # Support passing in a single resource to acquire rather than many
836
        if isinstance(names, basestring):
837
          names = [names]
838

    
839
        return self.__acquire_inner(names, False, shared,
840
                                    running_timeout.Remaining, test_notify)
841

    
842
      else:
843
        # If no names are given acquire the whole set by not letting new names
844
        # being added before we release, and getting the current list of names.
845
        # Some of them may then be deleted later, but we'll cope with this.
846
        #
847
        # We'd like to acquire this lock in a shared way, as it's nice if
848
        # everybody else can use the instances at the same time. If are
849
        # acquiring them exclusively though they won't be able to do this
850
        # anyway, though, so we'll get the list lock exclusively as well in
851
        # order to be able to do add() on the set while owning it.
852
        if not self.__lock.acquire(shared=shared,
853
                                   timeout=running_timeout.Remaining()):
854
          raise _AcquireTimeout()
855
        try:
856
          # note we own the set-lock
857
          self._add_owned()
858

    
859
          return self.__acquire_inner(self.__names(), True, shared,
860
                                      running_timeout.Remaining, test_notify)
861
        except:
862
          # We shouldn't have problems adding the lock to the owners list, but
863
          # if we did we'll try to release this lock and re-raise exception.
864
          # Of course something is going to be really wrong, after this.
865
          self.__lock.release()
866
          self._del_owned()
867
          raise
868

    
869
    except _AcquireTimeout:
870
      return None
871

    
872
  def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
873
    """Inner logic for acquiring a number of locks.
874

875
    @param names: Names of the locks to be acquired
876
    @param want_all: Whether all locks in the set should be acquired
877
    @param shared: Whether to acquire in shared mode
878
    @param timeout_fn: Function returning remaining timeout
879
    @param test_notify: Special callback function for unittesting
880

881
    """
882
    acquire_list = []
883

    
884
    # First we look the locks up on __lockdict. We have no way of being sure
885
    # they will still be there after, but this makes it a lot faster should
886
    # just one of them be the already wrong. Using a sorted sequence to prevent
887
    # deadlocks.
888
    for lname in sorted(utils.UniqueSequence(names)):
889
      try:
890
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
891
      except KeyError:
892
        if want_all:
893
          # We are acquiring all the set, it doesn't matter if this particular
894
          # element is not there anymore.
895
          continue
896

    
897
        raise errors.LockError("Non-existing lock in set (%s)" % lname)
898

    
899
      acquire_list.append((lname, lock))
900

    
901
    # This will hold the locknames we effectively acquired.
902
    acquired = set()
903

    
904
    try:
905
      # Now acquire_list contains a sorted list of resources and locks we
906
      # want.  In order to get them we loop on this (private) list and
907
      # acquire() them.  We gave no real guarantee they will still exist till
908
      # this is done but .acquire() itself is safe and will alert us if the
909
      # lock gets deleted.
910
      for (lname, lock) in acquire_list:
911
        if __debug__ and callable(test_notify):
912
          test_notify_fn = lambda: test_notify(lname)
913
        else:
914
          test_notify_fn = None
915

    
916
        timeout = timeout_fn()
917

    
918
        try:
919
          # raises LockError if the lock was deleted
920
          acq_success = lock.acquire(shared=shared, timeout=timeout,
921
                                     test_notify=test_notify_fn)
922
        except errors.LockError:
923
          if want_all:
924
            # We are acquiring all the set, it doesn't matter if this
925
            # particular element is not there anymore.
926
            continue
927

    
928
          raise errors.LockError("Non-existing lock in set (%s)" % lname)
929

    
930
        if not acq_success:
931
          # Couldn't get lock or timeout occurred
932
          if timeout is None:
933
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
934
            # blocking.
935
            raise errors.LockError("Failed to get lock %s" % lname)
936

    
937
          raise _AcquireTimeout()
938

    
939
        try:
940
          # now the lock cannot be deleted, we have it!
941
          self._add_owned(name=lname)
942
          acquired.add(lname)
943

    
944
        except:
945
          # We shouldn't have problems adding the lock to the owners list, but
946
          # if we did we'll try to release this lock and re-raise exception.
947
          # Of course something is going to be really wrong after this.
948
          if lock._is_owned():
949
            lock.release()
950
          raise
951

    
952
    except:
953
      # Release all owned locks
954
      self._release_and_delete_owned()
955
      raise
956

    
957
    return acquired
958

    
959
  def release(self, names=None):
960
    """Release a set of resource locks, at the same level.
961

962
    You must have acquired the locks, either in shared or in exclusive mode,
963
    before releasing them.
964

965
    @type names: list of strings, or None
966
    @param names: the names of the locks which shall be released
967
        (defaults to all the locks acquired at that level).
968

969
    """
970
    assert self._is_owned(), "release() on lock set while not owner"
971

    
972
    # Support passing in a single resource to release rather than many
973
    if isinstance(names, basestring):
974
      names = [names]
975

    
976
    if names is None:
977
      names = self._list_owned()
978
    else:
979
      names = set(names)
980
      assert self._list_owned().issuperset(names), (
981
               "release() on unheld resources %s" %
982
               names.difference(self._list_owned()))
983

    
984
    # First of all let's release the "all elements" lock, if set.
985
    # After this 'add' can work again
986
    if self.__lock._is_owned():
987
      self.__lock.release()
988
      self._del_owned()
989

    
990
    for lockname in names:
991
      # If we are sure the lock doesn't leave __lockdict without being
992
      # exclusively held we can do this...
993
      self.__lockdict[lockname].release()
994
      self._del_owned(name=lockname)
995

    
996
  def add(self, names, acquired=0, shared=0):
997
    """Add a new set of elements to the set
998

999
    @type names: list of strings
1000
    @param names: names of the new elements to add
1001
    @type acquired: integer (0/1) used as a boolean
1002
    @param acquired: pre-acquire the new resource?
1003
    @type shared: integer (0/1) used as a boolean
1004
    @param shared: is the pre-acquisition shared?
1005

1006
    """
1007
    # Check we don't already own locks at this level
1008
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1009
      "Cannot add locks if the set is only partially owned, or shared"
1010

    
1011
    # Support passing in a single resource to add rather than many
1012
    if isinstance(names, basestring):
1013
      names = [names]
1014

    
1015
    # If we don't already own the set-level lock acquired in an exclusive way
1016
    # we'll get it and note we need to release it later.
1017
    release_lock = False
1018
    if not self.__lock._is_owned():
1019
      release_lock = True
1020
      self.__lock.acquire()
1021

    
1022
    try:
1023
      invalid_names = set(self.__names()).intersection(names)
1024
      if invalid_names:
1025
        # This must be an explicit raise, not an assert, because assert is
1026
        # turned off when using optimization, and this can happen because of
1027
        # concurrency even if the user doesn't want it.
1028
        raise errors.LockError("duplicate add() (%s)" % invalid_names)
1029

    
1030
      for lockname in names:
1031
        lock = SharedLock()
1032

    
1033
        if acquired:
1034
          lock.acquire(shared=shared)
1035
          # now the lock cannot be deleted, we have it!
1036
          try:
1037
            self._add_owned(name=lockname)
1038
          except:
1039
            # We shouldn't have problems adding the lock to the owners list,
1040
            # but if we did we'll try to release this lock and re-raise
1041
            # exception.  Of course something is going to be really wrong,
1042
            # after this.  On the other hand the lock hasn't been added to the
1043
            # __lockdict yet so no other threads should be pending on it. This
1044
            # release is just a safety measure.
1045
            lock.release()
1046
            raise
1047

    
1048
        self.__lockdict[lockname] = lock
1049

    
1050
    finally:
1051
      # Only release __lock if we were not holding it previously.
1052
      if release_lock:
1053
        self.__lock.release()
1054

    
1055
    return True
1056

    
1057
  def remove(self, names):
1058
    """Remove elements from the lock set.
1059

1060
    You can either not hold anything in the lockset or already hold a superset
1061
    of the elements you want to delete, exclusively.
1062

1063
    @type names: list of strings
1064
    @param names: names of the resource to remove.
1065

1066
    @return: a list of locks which we removed; the list is always
1067
        equal to the names list if we were holding all the locks
1068
        exclusively
1069

1070
    """
1071
    # Support passing in a single resource to remove rather than many
1072
    if isinstance(names, basestring):
1073
      names = [names]
1074

    
1075
    # If we own any subset of this lock it must be a superset of what we want
1076
    # to delete. The ownership must also be exclusive, but that will be checked
1077
    # by the lock itself.
1078
    assert not self._is_owned() or self._list_owned().issuperset(names), (
1079
      "remove() on acquired lockset while not owning all elements")
1080

    
1081
    removed = []
1082

    
1083
    for lname in names:
1084
      # Calling delete() acquires the lock exclusively if we don't already own
1085
      # it, and causes all pending and subsequent lock acquires to fail. It's
1086
      # fine to call it out of order because delete() also implies release(),
1087
      # and the assertion above guarantees that if we either already hold
1088
      # everything we want to delete, or we hold none.
1089
      try:
1090
        self.__lockdict[lname].delete()
1091
        removed.append(lname)
1092
      except (KeyError, errors.LockError):
1093
        # This cannot happen if we were already holding it, verify:
1094
        assert not self._is_owned(), "remove failed while holding lockset"
1095
      else:
1096
        # If no LockError was raised we are the ones who deleted the lock.
1097
        # This means we can safely remove it from lockdict, as any further or
1098
        # pending delete() or acquire() will fail (and nobody can have the lock
1099
        # since before our call to delete()).
1100
        #
1101
        # This is done in an else clause because if the exception was thrown
1102
        # it's the job of the one who actually deleted it.
1103
        del self.__lockdict[lname]
1104
        # And let's remove it from our private list if we owned it.
1105
        if self._is_owned():
1106
          self._del_owned(name=lname)
1107

    
1108
    return removed
1109

    
1110

    
1111
# Locking levels, must be acquired in increasing order.
1112
# Current rules are:
1113
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1114
#   acquired before performing any operation, either in shared or in exclusive
1115
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1116
#   avoided.
1117
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1118
#   If you need more than one node, or more than one instance, acquire them at
1119
#   the same time.
1120
LEVEL_CLUSTER = 0
1121
LEVEL_INSTANCE = 1
1122
LEVEL_NODE = 2
1123

    
1124
LEVELS = [LEVEL_CLUSTER,
1125
          LEVEL_INSTANCE,
1126
          LEVEL_NODE]
1127

    
1128
# Lock levels which are modifiable
1129
LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1130

    
1131
LEVEL_NAMES = {
1132
  LEVEL_CLUSTER: "cluster",
1133
  LEVEL_INSTANCE: "instance",
1134
  LEVEL_NODE: "node",
1135
  }
1136

    
1137
# Constant for the big ganeti lock
1138
BGL = 'BGL'
1139

    
1140

    
1141
class GanetiLockManager:
1142
  """The Ganeti Locking Library
1143

1144
  The purpose of this small library is to manage locking for ganeti clusters
1145
  in a central place, while at the same time doing dynamic checks against
1146
  possible deadlocks. It will also make it easier to transition to a different
1147
  lock type should we migrate away from python threads.
1148

1149
  """
1150
  _instance = None
1151

    
1152
  def __init__(self, nodes=None, instances=None):
1153
    """Constructs a new GanetiLockManager object.
1154

1155
    There should be only a GanetiLockManager object at any time, so this
1156
    function raises an error if this is not the case.
1157

1158
    @param nodes: list of node names
1159
    @param instances: list of instance names
1160

1161
    """
1162
    assert self.__class__._instance is None, \
1163
           "double GanetiLockManager instance"
1164

    
1165
    self.__class__._instance = self
1166

    
1167
    # The keyring contains all the locks, at their level and in the correct
1168
    # locking order.
1169
    self.__keyring = {
1170
      LEVEL_CLUSTER: LockSet([BGL]),
1171
      LEVEL_NODE: LockSet(nodes),
1172
      LEVEL_INSTANCE: LockSet(instances),
1173
    }
1174

    
1175
  def _names(self, level):
1176
    """List the lock names at the given level.
1177

1178
    This can be used for debugging/testing purposes.
1179

1180
    @param level: the level whose list of locks to get
1181

1182
    """
1183
    assert level in LEVELS, "Invalid locking level %s" % level
1184
    return self.__keyring[level]._names()
1185

    
1186
  def _is_owned(self, level):
1187
    """Check whether we are owning locks at the given level
1188

1189
    """
1190
    return self.__keyring[level]._is_owned()
1191

    
1192
  is_owned = _is_owned
1193

    
1194
  def _list_owned(self, level):
1195
    """Get the set of owned locks at the given level
1196

1197
    """
1198
    return self.__keyring[level]._list_owned()
1199

    
1200
  def _upper_owned(self, level):
1201
    """Check that we don't own any lock at a level greater than the given one.
1202

1203
    """
1204
    # This way of checking only works if LEVELS[i] = i, which we check for in
1205
    # the test cases.
1206
    return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1207

    
1208
  def _BGL_owned(self): # pylint: disable-msg=C0103
1209
    """Check if the current thread owns the BGL.
1210

1211
    Both an exclusive or a shared acquisition work.
1212

1213
    """
1214
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1215

    
1216
  @staticmethod
1217
  def _contains_BGL(level, names): # pylint: disable-msg=C0103
1218
    """Check if the level contains the BGL.
1219

1220
    Check if acting on the given level and set of names will change
1221
    the status of the Big Ganeti Lock.
1222

1223
    """
1224
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1225

    
1226
  def acquire(self, level, names, timeout=None, shared=0):
1227
    """Acquire a set of resource locks, at the same level.
1228

1229
    @type level: member of locking.LEVELS
1230
    @param level: the level at which the locks shall be acquired
1231
    @type names: list of strings (or string)
1232
    @param names: the names of the locks which shall be acquired
1233
        (special lock names, or instance/node names)
1234
    @type shared: integer (0/1) used as a boolean
1235
    @param shared: whether to acquire in shared mode; by default
1236
        an exclusive lock will be acquired
1237
    @type timeout: float
1238
    @param timeout: Maximum time to acquire all locks
1239

1240
    """
1241
    assert level in LEVELS, "Invalid locking level %s" % level
1242

    
1243
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1244
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1245
    # so even if we've migrated we need to at least share the BGL to be
1246
    # compatible with them. Of course if we own the BGL exclusively there's no
1247
    # point in acquiring any other lock, unless perhaps we are half way through
1248
    # the migration of the current opcode.
1249
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1250
            "You must own the Big Ganeti Lock before acquiring any other")
1251

    
1252
    # Check we don't own locks at the same or upper levels.
1253
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1254
           " while owning some at a greater one")
1255

    
1256
    # Acquire the locks in the set.
1257
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1258

    
1259
  def release(self, level, names=None):
1260
    """Release a set of resource locks, at the same level.
1261

1262
    You must have acquired the locks, either in shared or in exclusive
1263
    mode, before releasing them.
1264

1265
    @type level: member of locking.LEVELS
1266
    @param level: the level at which the locks shall be released
1267
    @type names: list of strings, or None
1268
    @param names: the names of the locks which shall be released
1269
        (defaults to all the locks acquired at that level)
1270

1271
    """
1272
    assert level in LEVELS, "Invalid locking level %s" % level
1273
    assert (not self._contains_BGL(level, names) or
1274
            not self._upper_owned(LEVEL_CLUSTER)), (
1275
            "Cannot release the Big Ganeti Lock while holding something"
1276
            " at upper levels (%r)" %
1277
            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1278
                              for i in self.__keyring.keys()]), ))
1279

    
1280
    # Release will complain if we don't own the locks already
1281
    return self.__keyring[level].release(names)
1282

    
1283
  def add(self, level, names, acquired=0, shared=0):
1284
    """Add locks at the specified level.
1285

1286
    @type level: member of locking.LEVELS_MOD
1287
    @param level: the level at which the locks shall be added
1288
    @type names: list of strings
1289
    @param names: names of the locks to acquire
1290
    @type acquired: integer (0/1) used as a boolean
1291
    @param acquired: whether to acquire the newly added locks
1292
    @type shared: integer (0/1) used as a boolean
1293
    @param shared: whether the acquisition will be shared
1294

1295
    """
1296
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1297
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1298
           " operations")
1299
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1300
           " while owning some at a greater one")
1301
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1302

    
1303
  def remove(self, level, names):
1304
    """Remove locks from the specified level.
1305

1306
    You must either already own the locks you are trying to remove
1307
    exclusively or not own any lock at an upper level.
1308

1309
    @type level: member of locking.LEVELS_MOD
1310
    @param level: the level at which the locks shall be removed
1311
    @type names: list of strings
1312
    @param names: the names of the locks which shall be removed
1313
        (special lock names, or instance/node names)
1314

1315
    """
1316
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1317
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1318
           " operations")
1319
    # Check we either own the level or don't own anything from here
1320
    # up. LockSet.remove() will check the case in which we don't own
1321
    # all the needed resources, or we have a shared ownership.
1322
    assert self._is_owned(level) or not self._upper_owned(level), (
1323
           "Cannot remove locks at a level while not owning it or"
1324
           " owning some at a greater one")
1325
    return self.__keyring[level].remove(names)