Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ ec44d893

History | View | Annotate | Download (40.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable-msg=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import time
32
import errno
33

    
34
from ganeti import errors
35
from ganeti import utils
36

    
37

    
38
def ssynchronized(lock, shared=0):
39
  """Shared Synchronization decorator.
40

41
  Calls the function holding the given lock, either in exclusive or shared
42
  mode. It requires the passed lock to be a SharedLock (or support its
43
  semantics).
44

45
  """
46
  def wrap(fn):
47
    def sync_function(*args, **kwargs):
48
      lock.acquire(shared=shared)
49
      try:
50
        return fn(*args, **kwargs)
51
      finally:
52
        lock.release()
53
    return sync_function
54
  return wrap
55

    
56

    
57
class RunningTimeout(object):
58
  """Class to calculate remaining timeout when doing several operations.
59

60
  """
61
  __slots__ = [
62
    "_allow_negative",
63
    "_start_time",
64
    "_time_fn",
65
    "_timeout",
66
    ]
67

    
68
  def __init__(self, timeout, allow_negative, _time_fn=time.time):
69
    """Initializes this class.
70

71
    @type timeout: float
72
    @param timeout: Timeout duration
73
    @type allow_negative: bool
74
    @param allow_negative: Whether to return values below zero
75
    @param _time_fn: Time function for unittests
76

77
    """
78
    object.__init__(self)
79

    
80
    if timeout is not None and timeout < 0.0:
81
      raise ValueError("Timeout must not be negative")
82

    
83
    self._timeout = timeout
84
    self._allow_negative = allow_negative
85
    self._time_fn = _time_fn
86

    
87
    self._start_time = None
88

    
89
  def Remaining(self):
90
    """Returns the remaining timeout.
91

92
    """
93
    if self._timeout is None:
94
      return None
95

    
96
    # Get start time on first calculation
97
    if self._start_time is None:
98
      self._start_time = self._time_fn()
99

    
100
    # Calculate remaining time
101
    remaining_timeout = self._start_time + self._timeout - self._time_fn()
102

    
103
    if not self._allow_negative:
104
      # Ensure timeout is always >= 0
105
      return max(0.0, remaining_timeout)
106

    
107
    return remaining_timeout
108

    
109

    
110
class _SingleNotifyPipeConditionWaiter(object):
111
  """Helper class for SingleNotifyPipeCondition
112

113
  """
114
  __slots__ = [
115
    "_fd",
116
    "_poller",
117
    ]
118

    
119
  def __init__(self, poller, fd):
120
    """Constructor for _SingleNotifyPipeConditionWaiter
121

122
    @type poller: select.poll
123
    @param poller: Poller object
124
    @type fd: int
125
    @param fd: File descriptor to wait for
126

127
    """
128
    object.__init__(self)
129
    self._poller = poller
130
    self._fd = fd
131

    
132
  def __call__(self, timeout):
133
    """Wait for something to happen on the pipe.
134

135
    @type timeout: float or None
136
    @param timeout: Timeout for waiting (can be None)
137

138
    """
139
    running_timeout = RunningTimeout(timeout, True)
140

    
141
    while True:
142
      remaining_time = running_timeout.Remaining()
143

    
144
      if remaining_time is not None and remaining_time < 0.0:
145
        break
146

    
147
      try:
148
        result = self._poller.poll(remaining_time)
149
      except EnvironmentError, err:
150
        if err.errno != errno.EINTR:
151
          raise
152
        result = None
153

    
154
      # Check whether we were notified
155
      if result and result[0][0] == self._fd:
156
        break
157

    
158

    
159
class _BaseCondition(object):
160
  """Base class containing common code for conditions.
161

162
  Some of this code is taken from python's threading module.
163

164
  """
165
  __slots__ = [
166
    "_lock",
167
    "acquire",
168
    "release",
169
    ]
170

    
171
  def __init__(self, lock):
172
    """Constructor for _BaseCondition.
173

174
    @type lock: threading.Lock
175
    @param lock: condition base lock
176

177
    """
178
    object.__init__(self)
179

    
180
    # Recursive locks are not supported
181
    assert not hasattr(lock, "_acquire_restore")
182
    assert not hasattr(lock, "_release_save")
183

    
184
    self._lock = lock
185

    
186
    # Export the lock's acquire() and release() methods
187
    self.acquire = lock.acquire
188
    self.release = lock.release
189

    
190
  def _is_owned(self):
191
    """Check whether lock is owned by current thread.
192

193
    """
194
    if self._lock.acquire(0):
195
      self._lock.release()
196
      return False
197

    
198
    return True
199

    
200
  def _check_owned(self):
201
    """Raise an exception if the current thread doesn't own the lock.
202

203
    """
204
    if not self._is_owned():
205
      raise RuntimeError("cannot work with un-aquired lock")
206

    
207

    
208
class SingleNotifyPipeCondition(_BaseCondition):
209
  """Condition which can only be notified once.
210

211
  This condition class uses pipes and poll, internally, to be able to wait for
212
  notification with a timeout, without resorting to polling. It is almost
213
  compatible with Python's threading.Condition, with the following differences:
214
    - notifyAll can only be called once, and no wait can happen after that
215
    - notify is not supported, only notifyAll
216

217
  """
218

    
219
  __slots__ = [
220
    "_poller",
221
    "_read_fd",
222
    "_write_fd",
223
    "_nwaiters",
224
    "_notified",
225
    ]
226

    
227
  _waiter_class = _SingleNotifyPipeConditionWaiter
228

    
229
  def __init__(self, lock):
230
    """Constructor for SingleNotifyPipeCondition
231

232
    """
233
    _BaseCondition.__init__(self, lock)
234
    self._nwaiters = 0
235
    self._notified = False
236
    self._read_fd = None
237
    self._write_fd = None
238
    self._poller = None
239

    
240
  def _check_unnotified(self):
241
    """Throws an exception if already notified.
242

243
    """
244
    if self._notified:
245
      raise RuntimeError("cannot use already notified condition")
246

    
247
  def _Cleanup(self):
248
    """Cleanup open file descriptors, if any.
249

250
    """
251
    if self._read_fd is not None:
252
      os.close(self._read_fd)
253
      self._read_fd = None
254

    
255
    if self._write_fd is not None:
256
      os.close(self._write_fd)
257
      self._write_fd = None
258
    self._poller = None
259

    
260
  def wait(self, timeout=None):
261
    """Wait for a notification.
262

263
    @type timeout: float or None
264
    @param timeout: Waiting timeout (can be None)
265

266
    """
267
    self._check_owned()
268
    self._check_unnotified()
269

    
270
    self._nwaiters += 1
271
    try:
272
      if self._poller is None:
273
        (self._read_fd, self._write_fd) = os.pipe()
274
        self._poller = select.poll()
275
        self._poller.register(self._read_fd, select.POLLHUP)
276

    
277
      wait_fn = self._waiter_class(self._poller, self._read_fd)
278
      self.release()
279
      try:
280
        # Wait for notification
281
        wait_fn(timeout)
282
      finally:
283
        # Re-acquire lock
284
        self.acquire()
285
    finally:
286
      self._nwaiters -= 1
287
      if self._nwaiters == 0:
288
        self._Cleanup()
289

    
290
  def notifyAll(self): # pylint: disable-msg=C0103
291
    """Close the writing side of the pipe to notify all waiters.
292

293
    """
294
    self._check_owned()
295
    self._check_unnotified()
296
    self._notified = True
297
    if self._write_fd is not None:
298
      os.close(self._write_fd)
299
      self._write_fd = None
300

    
301

    
302
class PipeCondition(_BaseCondition):
303
  """Group-only non-polling condition with counters.
304

305
  This condition class uses pipes and poll, internally, to be able to wait for
306
  notification with a timeout, without resorting to polling. It is almost
307
  compatible with Python's threading.Condition, but only supports notifyAll and
308
  non-recursive locks. As an additional features it's able to report whether
309
  there are any waiting threads.
310

311
  """
312
  __slots__ = [
313
    "_nwaiters",
314
    "_single_condition",
315
    ]
316

    
317
  _single_condition_class = SingleNotifyPipeCondition
318

    
319
  def __init__(self, lock):
320
    """Initializes this class.
321

322
    """
323
    _BaseCondition.__init__(self, lock)
324
    self._nwaiters = 0
325
    self._single_condition = self._single_condition_class(self._lock)
326

    
327
  def wait(self, timeout=None):
328
    """Wait for a notification.
329

330
    @type timeout: float or None
331
    @param timeout: Waiting timeout (can be None)
332

333
    """
334
    self._check_owned()
335

    
336
    # Keep local reference to the pipe. It could be replaced by another thread
337
    # notifying while we're waiting.
338
    my_condition = self._single_condition
339

    
340
    assert self._nwaiters >= 0
341
    self._nwaiters += 1
342
    try:
343
      my_condition.wait(timeout)
344
    finally:
345
      assert self._nwaiters > 0
346
      self._nwaiters -= 1
347

    
348
  def notifyAll(self): # pylint: disable-msg=C0103
349
    """Notify all currently waiting threads.
350

351
    """
352
    self._check_owned()
353
    self._single_condition.notifyAll()
354
    self._single_condition = self._single_condition_class(self._lock)
355

    
356
  def has_waiting(self):
357
    """Returns whether there are active waiters.
358

359
    """
360
    self._check_owned()
361

    
362
    return bool(self._nwaiters)
363

    
364

    
365
class _CountingCondition(object):
366
  """Wrapper for Python's built-in threading.Condition class.
367

368
  This wrapper keeps a count of active waiters. We can't access the internal
369
  "__waiters" attribute of threading.Condition because it's not thread-safe.
370

371
  """
372
  __slots__ = [
373
    "_cond",
374
    "_nwaiters",
375
    ]
376

    
377
  def __init__(self, lock):
378
    """Initializes this class.
379

380
    """
381
    object.__init__(self)
382
    self._cond = threading.Condition(lock=lock)
383
    self._nwaiters = 0
384

    
385
  def notifyAll(self): # pylint: disable-msg=C0103
386
    """Notifies the condition.
387

388
    """
389
    return self._cond.notifyAll()
390

    
391
  def wait(self, timeout=None):
392
    """Waits for the condition to be notified.
393

394
    @type timeout: float or None
395
    @param timeout: Waiting timeout (can be None)
396

397
    """
398
    assert self._nwaiters >= 0
399

    
400
    self._nwaiters += 1
401
    try:
402
      return self._cond.wait(timeout=timeout)
403
    finally:
404
      self._nwaiters -= 1
405

    
406
  def has_waiting(self):
407
    """Returns whether there are active waiters.
408

409
    """
410
    return bool(self._nwaiters)
411

    
412

    
413
class SharedLock(object):
414
  """Implements a shared lock.
415

416
  Multiple threads can acquire the lock in a shared way, calling
417
  acquire_shared().  In order to acquire the lock in an exclusive way threads
418
  can call acquire_exclusive().
419

420
  The lock prevents starvation but does not guarantee that threads will acquire
421
  the shared lock in the order they queued for it, just that they will
422
  eventually do so.
423

424
  """
425
  __slots__ = [
426
    "__active_shr_c",
427
    "__inactive_shr_c",
428
    "__deleted",
429
    "__exc",
430
    "__lock",
431
    "__pending",
432
    "__shr",
433
    ]
434

    
435
  __condition_class = PipeCondition
436

    
437
  def __init__(self):
438
    """Construct a new SharedLock.
439

440
    """
441
    object.__init__(self)
442

    
443
    # Internal lock
444
    self.__lock = threading.Lock()
445

    
446
    # Queue containing waiting acquires
447
    self.__pending = []
448

    
449
    # Active and inactive conditions for shared locks
450
    self.__active_shr_c = self.__condition_class(self.__lock)
451
    self.__inactive_shr_c = self.__condition_class(self.__lock)
452

    
453
    # Current lock holders
454
    self.__shr = set()
455
    self.__exc = None
456

    
457
    # is this lock in the deleted state?
458
    self.__deleted = False
459

    
460
  def __check_deleted(self):
461
    """Raises an exception if the lock has been deleted.
462

463
    """
464
    if self.__deleted:
465
      raise errors.LockError("Deleted lock")
466

    
467
  def __is_sharer(self):
468
    """Is the current thread sharing the lock at this time?
469

470
    """
471
    return threading.currentThread() in self.__shr
472

    
473
  def __is_exclusive(self):
474
    """Is the current thread holding the lock exclusively at this time?
475

476
    """
477
    return threading.currentThread() == self.__exc
478

    
479
  def __is_owned(self, shared=-1):
480
    """Is the current thread somehow owning the lock at this time?
481

482
    This is a private version of the function, which presumes you're holding
483
    the internal lock.
484

485
    """
486
    if shared < 0:
487
      return self.__is_sharer() or self.__is_exclusive()
488
    elif shared:
489
      return self.__is_sharer()
490
    else:
491
      return self.__is_exclusive()
492

    
493
  def _is_owned(self, shared=-1):
494
    """Is the current thread somehow owning the lock at this time?
495

496
    @param shared:
497
        - < 0: check for any type of ownership (default)
498
        - 0: check for exclusive ownership
499
        - > 0: check for shared ownership
500

501
    """
502
    self.__lock.acquire()
503
    try:
504
      return self.__is_owned(shared=shared)
505
    finally:
506
      self.__lock.release()
507

    
508
  def _count_pending(self):
509
    """Returns the number of pending acquires.
510

511
    @rtype: int
512

513
    """
514
    self.__lock.acquire()
515
    try:
516
      return len(self.__pending)
517
    finally:
518
      self.__lock.release()
519

    
520
  def __do_acquire(self, shared):
521
    """Actually acquire the lock.
522

523
    """
524
    if shared:
525
      self.__shr.add(threading.currentThread())
526
    else:
527
      self.__exc = threading.currentThread()
528

    
529
  def __can_acquire(self, shared):
530
    """Determine whether lock can be acquired.
531

532
    """
533
    if shared:
534
      return self.__exc is None
535
    else:
536
      return len(self.__shr) == 0 and self.__exc is None
537

    
538
  def __is_on_top(self, cond):
539
    """Checks whether the passed condition is on top of the queue.
540

541
    The caller must make sure the queue isn't empty.
542

543
    """
544
    return self.__pending[0] == cond
545

    
546
  def __acquire_unlocked(self, shared, timeout):
547
    """Acquire a shared lock.
548

549
    @param shared: whether to acquire in shared mode; by default an
550
        exclusive lock will be acquired
551
    @param timeout: maximum waiting time before giving up
552

553
    """
554
    self.__check_deleted()
555

    
556
    # We cannot acquire the lock if we already have it
557
    assert not self.__is_owned(), "double acquire() on a non-recursive lock"
558

    
559
    # Check whether someone else holds the lock or there are pending acquires.
560
    if not self.__pending and self.__can_acquire(shared):
561
      # Apparently not, can acquire lock directly.
562
      self.__do_acquire(shared)
563
      return True
564

    
565
    if shared:
566
      wait_condition = self.__active_shr_c
567

    
568
      # Check if we're not yet in the queue
569
      if wait_condition not in self.__pending:
570
        self.__pending.append(wait_condition)
571
    else:
572
      wait_condition = self.__condition_class(self.__lock)
573
      # Always add to queue
574
      self.__pending.append(wait_condition)
575

    
576
    try:
577
      # Wait until we become the topmost acquire in the queue or the timeout
578
      # expires.
579
      while not (self.__is_on_top(wait_condition) and
580
                 self.__can_acquire(shared)):
581
        # Wait for notification
582
        wait_condition.wait(timeout)
583
        self.__check_deleted()
584

    
585
        # A lot of code assumes blocking acquires always succeed. Loop
586
        # internally for that case.
587
        if timeout is not None:
588
          break
589

    
590
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
591
        self.__do_acquire(shared)
592
        return True
593
    finally:
594
      # Remove condition from queue if there are no more waiters
595
      if not wait_condition.has_waiting() and not self.__deleted:
596
        self.__pending.remove(wait_condition)
597

    
598
    return False
599

    
600
  def acquire(self, shared=0, timeout=None, test_notify=None):
601
    """Acquire a shared lock.
602

603
    @type shared: integer (0/1) used as a boolean
604
    @param shared: whether to acquire in shared mode; by default an
605
        exclusive lock will be acquired
606
    @type timeout: float
607
    @param timeout: maximum waiting time before giving up
608
    @type test_notify: callable or None
609
    @param test_notify: Special callback function for unittesting
610

611
    """
612
    self.__lock.acquire()
613
    try:
614
      # We already got the lock, notify now
615
      if __debug__ and callable(test_notify):
616
        test_notify()
617

    
618
      return self.__acquire_unlocked(shared, timeout)
619
    finally:
620
      self.__lock.release()
621

    
622
  def release(self):
623
    """Release a Shared Lock.
624

625
    You must have acquired the lock, either in shared or in exclusive mode,
626
    before calling this function.
627

628
    """
629
    self.__lock.acquire()
630
    try:
631
      assert self.__is_exclusive() or self.__is_sharer(), \
632
        "Cannot release non-owned lock"
633

    
634
      # Autodetect release type
635
      if self.__is_exclusive():
636
        self.__exc = None
637
      else:
638
        self.__shr.remove(threading.currentThread())
639

    
640
      # Notify topmost condition in queue
641
      if self.__pending:
642
        first_condition = self.__pending[0]
643
        first_condition.notifyAll()
644

    
645
        if first_condition == self.__active_shr_c:
646
          self.__active_shr_c = self.__inactive_shr_c
647
          self.__inactive_shr_c = first_condition
648

    
649
    finally:
650
      self.__lock.release()
651

    
652
  def delete(self, timeout=None):
653
    """Delete a Shared Lock.
654

655
    This operation will declare the lock for removal. First the lock will be
656
    acquired in exclusive mode if you don't already own it, then the lock
657
    will be put in a state where any future and pending acquire() fail.
658

659
    @type timeout: float
660
    @param timeout: maximum waiting time before giving up
661

662
    """
663
    self.__lock.acquire()
664
    try:
665
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
666

    
667
      self.__check_deleted()
668

    
669
      # The caller is allowed to hold the lock exclusively already.
670
      acquired = self.__is_exclusive()
671

    
672
      if not acquired:
673
        acquired = self.__acquire_unlocked(0, timeout)
674

    
675
        assert self.__is_exclusive() and not self.__is_sharer(), \
676
          "Lock wasn't acquired in exclusive mode"
677

    
678
      if acquired:
679
        self.__deleted = True
680
        self.__exc = None
681

    
682
        # Notify all acquires. They'll throw an error.
683
        while self.__pending:
684
          self.__pending.pop().notifyAll()
685

    
686
      return acquired
687
    finally:
688
      self.__lock.release()
689

    
690

    
691
# Whenever we want to acquire a full LockSet we pass None as the value
692
# to acquire.  Hide this behind this nicely named constant.
693
ALL_SET = None
694

    
695

    
696
class _AcquireTimeout(Exception):
697
  """Internal exception to abort an acquire on a timeout.
698

699
  """
700

    
701

    
702
class LockSet:
703
  """Implements a set of locks.
704

705
  This abstraction implements a set of shared locks for the same resource type,
706
  distinguished by name. The user can lock a subset of the resources and the
707
  LockSet will take care of acquiring the locks always in the same order, thus
708
  preventing deadlock.
709

710
  All the locks needed in the same set must be acquired together, though.
711

712
  """
713
  def __init__(self, members=None):
714
    """Constructs a new LockSet.
715

716
    @type members: list of strings
717
    @param members: initial members of the set
718

719
    """
720
    # Used internally to guarantee coherency.
721
    self.__lock = SharedLock()
722

    
723
    # The lockdict indexes the relationship name -> lock
724
    # The order-of-locking is implied by the alphabetical order of names
725
    self.__lockdict = {}
726

    
727
    if members is not None:
728
      for name in members:
729
        self.__lockdict[name] = SharedLock()
730

    
731
    # The owner dict contains the set of locks each thread owns. For
732
    # performance each thread can access its own key without a global lock on
733
    # this structure. It is paramount though that *no* other type of access is
734
    # done to this structure (eg. no looping over its keys). *_owner helper
735
    # function are defined to guarantee access is correct, but in general never
736
    # do anything different than __owners[threading.currentThread()], or there
737
    # will be trouble.
738
    self.__owners = {}
739

    
740
  def _is_owned(self):
741
    """Is the current thread a current level owner?"""
742
    return threading.currentThread() in self.__owners
743

    
744
  def _add_owned(self, name=None):
745
    """Note the current thread owns the given lock"""
746
    if name is None:
747
      if not self._is_owned():
748
        self.__owners[threading.currentThread()] = set()
749
    else:
750
      if self._is_owned():
751
        self.__owners[threading.currentThread()].add(name)
752
      else:
753
        self.__owners[threading.currentThread()] = set([name])
754

    
755
  def _del_owned(self, name=None):
756
    """Note the current thread owns the given lock"""
757

    
758
    assert not (name is None and self.__lock._is_owned()), \
759
           "Cannot hold internal lock when deleting owner status"
760

    
761
    if name is not None:
762
      self.__owners[threading.currentThread()].remove(name)
763

    
764
    # Only remove the key if we don't hold the set-lock as well
765
    if (not self.__lock._is_owned() and
766
        not self.__owners[threading.currentThread()]):
767
      del self.__owners[threading.currentThread()]
768

    
769
  def _list_owned(self):
770
    """Get the set of resource names owned by the current thread"""
771
    if self._is_owned():
772
      return self.__owners[threading.currentThread()].copy()
773
    else:
774
      return set()
775

    
776
  def _release_and_delete_owned(self):
777
    """Release and delete all resources owned by the current thread"""
778
    for lname in self._list_owned():
779
      lock = self.__lockdict[lname]
780
      if lock._is_owned():
781
        lock.release()
782
      self._del_owned(name=lname)
783

    
784
  def __names(self):
785
    """Return the current set of names.
786

787
    Only call this function while holding __lock and don't iterate on the
788
    result after releasing the lock.
789

790
    """
791
    return self.__lockdict.keys()
792

    
793
  def _names(self):
794
    """Return a copy of the current set of elements.
795

796
    Used only for debugging purposes.
797

798
    """
799
    # If we don't already own the set-level lock acquired
800
    # we'll get it and note we need to release it later.
801
    release_lock = False
802
    if not self.__lock._is_owned():
803
      release_lock = True
804
      self.__lock.acquire(shared=1)
805
    try:
806
      result = self.__names()
807
    finally:
808
      if release_lock:
809
        self.__lock.release()
810
    return set(result)
811

    
812
  def acquire(self, names, timeout=None, shared=0, test_notify=None):
813
    """Acquire a set of resource locks.
814

815
    @type names: list of strings (or string)
816
    @param names: the names of the locks which shall be acquired
817
        (special lock names, or instance/node names)
818
    @type shared: integer (0/1) used as a boolean
819
    @param shared: whether to acquire in shared mode; by default an
820
        exclusive lock will be acquired
821
    @type timeout: float or None
822
    @param timeout: Maximum time to acquire all locks
823
    @type test_notify: callable or None
824
    @param test_notify: Special callback function for unittesting
825

826
    @return: Set of all locks successfully acquired or None in case of timeout
827

828
    @raise errors.LockError: when any lock we try to acquire has
829
        been deleted before we succeed. In this case none of the
830
        locks requested will be acquired.
831

832
    """
833
    assert timeout is None or timeout >= 0.0
834

    
835
    # Check we don't already own locks at this level
836
    assert not self._is_owned(), "Cannot acquire locks in the same set twice"
837

    
838
    # We need to keep track of how long we spent waiting for a lock. The
839
    # timeout passed to this function is over all lock acquires.
840
    running_timeout = RunningTimeout(timeout, False)
841

    
842
    try:
843
      if names is not None:
844
        # Support passing in a single resource to acquire rather than many
845
        if isinstance(names, basestring):
846
          names = [names]
847

    
848
        return self.__acquire_inner(names, False, shared,
849
                                    running_timeout.Remaining, test_notify)
850

    
851
      else:
852
        # If no names are given acquire the whole set by not letting new names
853
        # being added before we release, and getting the current list of names.
854
        # Some of them may then be deleted later, but we'll cope with this.
855
        #
856
        # We'd like to acquire this lock in a shared way, as it's nice if
857
        # everybody else can use the instances at the same time. If are
858
        # acquiring them exclusively though they won't be able to do this
859
        # anyway, though, so we'll get the list lock exclusively as well in
860
        # order to be able to do add() on the set while owning it.
861
        if not self.__lock.acquire(shared=shared,
862
                                   timeout=running_timeout.Remaining()):
863
          raise _AcquireTimeout()
864
        try:
865
          # note we own the set-lock
866
          self._add_owned()
867

    
868
          return self.__acquire_inner(self.__names(), True, shared,
869
                                      running_timeout.Remaining, test_notify)
870
        except:
871
          # We shouldn't have problems adding the lock to the owners list, but
872
          # if we did we'll try to release this lock and re-raise exception.
873
          # Of course something is going to be really wrong, after this.
874
          self.__lock.release()
875
          self._del_owned()
876
          raise
877

    
878
    except _AcquireTimeout:
879
      return None
880

    
881
  def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
882
    """Inner logic for acquiring a number of locks.
883

884
    @param names: Names of the locks to be acquired
885
    @param want_all: Whether all locks in the set should be acquired
886
    @param shared: Whether to acquire in shared mode
887
    @param timeout_fn: Function returning remaining timeout
888
    @param test_notify: Special callback function for unittesting
889

890
    """
891
    acquire_list = []
892

    
893
    # First we look the locks up on __lockdict. We have no way of being sure
894
    # they will still be there after, but this makes it a lot faster should
895
    # just one of them be the already wrong. Using a sorted sequence to prevent
896
    # deadlocks.
897
    for lname in sorted(utils.UniqueSequence(names)):
898
      try:
899
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
900
      except KeyError:
901
        if want_all:
902
          # We are acquiring all the set, it doesn't matter if this particular
903
          # element is not there anymore.
904
          continue
905

    
906
        raise errors.LockError("Non-existing lock in set (%s)" % lname)
907

    
908
      acquire_list.append((lname, lock))
909

    
910
    # This will hold the locknames we effectively acquired.
911
    acquired = set()
912

    
913
    try:
914
      # Now acquire_list contains a sorted list of resources and locks we
915
      # want.  In order to get them we loop on this (private) list and
916
      # acquire() them.  We gave no real guarantee they will still exist till
917
      # this is done but .acquire() itself is safe and will alert us if the
918
      # lock gets deleted.
919
      for (lname, lock) in acquire_list:
920
        if __debug__ and callable(test_notify):
921
          test_notify_fn = lambda: test_notify(lname)
922
        else:
923
          test_notify_fn = None
924

    
925
        timeout = timeout_fn()
926

    
927
        try:
928
          # raises LockError if the lock was deleted
929
          acq_success = lock.acquire(shared=shared, timeout=timeout,
930
                                     test_notify=test_notify_fn)
931
        except errors.LockError:
932
          if want_all:
933
            # We are acquiring all the set, it doesn't matter if this
934
            # particular element is not there anymore.
935
            continue
936

    
937
          raise errors.LockError("Non-existing lock in set (%s)" % lname)
938

    
939
        if not acq_success:
940
          # Couldn't get lock or timeout occurred
941
          if timeout is None:
942
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
943
            # blocking.
944
            raise errors.LockError("Failed to get lock %s" % lname)
945

    
946
          raise _AcquireTimeout()
947

    
948
        try:
949
          # now the lock cannot be deleted, we have it!
950
          self._add_owned(name=lname)
951
          acquired.add(lname)
952

    
953
        except:
954
          # We shouldn't have problems adding the lock to the owners list, but
955
          # if we did we'll try to release this lock and re-raise exception.
956
          # Of course something is going to be really wrong after this.
957
          if lock._is_owned():
958
            lock.release()
959
          raise
960

    
961
    except:
962
      # Release all owned locks
963
      self._release_and_delete_owned()
964
      raise
965

    
966
    return acquired
967

    
968
  def release(self, names=None):
969
    """Release a set of resource locks, at the same level.
970

971
    You must have acquired the locks, either in shared or in exclusive mode,
972
    before releasing them.
973

974
    @type names: list of strings, or None
975
    @param names: the names of the locks which shall be released
976
        (defaults to all the locks acquired at that level).
977

978
    """
979
    assert self._is_owned(), "release() on lock set while not owner"
980

    
981
    # Support passing in a single resource to release rather than many
982
    if isinstance(names, basestring):
983
      names = [names]
984

    
985
    if names is None:
986
      names = self._list_owned()
987
    else:
988
      names = set(names)
989
      assert self._list_owned().issuperset(names), (
990
               "release() on unheld resources %s" %
991
               names.difference(self._list_owned()))
992

    
993
    # First of all let's release the "all elements" lock, if set.
994
    # After this 'add' can work again
995
    if self.__lock._is_owned():
996
      self.__lock.release()
997
      self._del_owned()
998

    
999
    for lockname in names:
1000
      # If we are sure the lock doesn't leave __lockdict without being
1001
      # exclusively held we can do this...
1002
      self.__lockdict[lockname].release()
1003
      self._del_owned(name=lockname)
1004

    
1005
  def add(self, names, acquired=0, shared=0):
1006
    """Add a new set of elements to the set
1007

1008
    @type names: list of strings
1009
    @param names: names of the new elements to add
1010
    @type acquired: integer (0/1) used as a boolean
1011
    @param acquired: pre-acquire the new resource?
1012
    @type shared: integer (0/1) used as a boolean
1013
    @param shared: is the pre-acquisition shared?
1014

1015
    """
1016
    # Check we don't already own locks at this level
1017
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1018
      "Cannot add locks if the set is only partially owned, or shared"
1019

    
1020
    # Support passing in a single resource to add rather than many
1021
    if isinstance(names, basestring):
1022
      names = [names]
1023

    
1024
    # If we don't already own the set-level lock acquired in an exclusive way
1025
    # we'll get it and note we need to release it later.
1026
    release_lock = False
1027
    if not self.__lock._is_owned():
1028
      release_lock = True
1029
      self.__lock.acquire()
1030

    
1031
    try:
1032
      invalid_names = set(self.__names()).intersection(names)
1033
      if invalid_names:
1034
        # This must be an explicit raise, not an assert, because assert is
1035
        # turned off when using optimization, and this can happen because of
1036
        # concurrency even if the user doesn't want it.
1037
        raise errors.LockError("duplicate add() (%s)" % invalid_names)
1038

    
1039
      for lockname in names:
1040
        lock = SharedLock()
1041

    
1042
        if acquired:
1043
          lock.acquire(shared=shared)
1044
          # now the lock cannot be deleted, we have it!
1045
          try:
1046
            self._add_owned(name=lockname)
1047
          except:
1048
            # We shouldn't have problems adding the lock to the owners list,
1049
            # but if we did we'll try to release this lock and re-raise
1050
            # exception.  Of course something is going to be really wrong,
1051
            # after this.  On the other hand the lock hasn't been added to the
1052
            # __lockdict yet so no other threads should be pending on it. This
1053
            # release is just a safety measure.
1054
            lock.release()
1055
            raise
1056

    
1057
        self.__lockdict[lockname] = lock
1058

    
1059
    finally:
1060
      # Only release __lock if we were not holding it previously.
1061
      if release_lock:
1062
        self.__lock.release()
1063

    
1064
    return True
1065

    
1066
  def remove(self, names):
1067
    """Remove elements from the lock set.
1068

1069
    You can either not hold anything in the lockset or already hold a superset
1070
    of the elements you want to delete, exclusively.
1071

1072
    @type names: list of strings
1073
    @param names: names of the resource to remove.
1074

1075
    @return: a list of locks which we removed; the list is always
1076
        equal to the names list if we were holding all the locks
1077
        exclusively
1078

1079
    """
1080
    # Support passing in a single resource to remove rather than many
1081
    if isinstance(names, basestring):
1082
      names = [names]
1083

    
1084
    # If we own any subset of this lock it must be a superset of what we want
1085
    # to delete. The ownership must also be exclusive, but that will be checked
1086
    # by the lock itself.
1087
    assert not self._is_owned() or self._list_owned().issuperset(names), (
1088
      "remove() on acquired lockset while not owning all elements")
1089

    
1090
    removed = []
1091

    
1092
    for lname in names:
1093
      # Calling delete() acquires the lock exclusively if we don't already own
1094
      # it, and causes all pending and subsequent lock acquires to fail. It's
1095
      # fine to call it out of order because delete() also implies release(),
1096
      # and the assertion above guarantees that if we either already hold
1097
      # everything we want to delete, or we hold none.
1098
      try:
1099
        self.__lockdict[lname].delete()
1100
        removed.append(lname)
1101
      except (KeyError, errors.LockError):
1102
        # This cannot happen if we were already holding it, verify:
1103
        assert not self._is_owned(), "remove failed while holding lockset"
1104
      else:
1105
        # If no LockError was raised we are the ones who deleted the lock.
1106
        # This means we can safely remove it from lockdict, as any further or
1107
        # pending delete() or acquire() will fail (and nobody can have the lock
1108
        # since before our call to delete()).
1109
        #
1110
        # This is done in an else clause because if the exception was thrown
1111
        # it's the job of the one who actually deleted it.
1112
        del self.__lockdict[lname]
1113
        # And let's remove it from our private list if we owned it.
1114
        if self._is_owned():
1115
          self._del_owned(name=lname)
1116

    
1117
    return removed
1118

    
1119

    
1120
# Locking levels, must be acquired in increasing order.
1121
# Current rules are:
1122
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1123
#   acquired before performing any operation, either in shared or in exclusive
1124
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1125
#   avoided.
1126
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1127
#   If you need more than one node, or more than one instance, acquire them at
1128
#   the same time.
1129
LEVEL_CLUSTER = 0
1130
LEVEL_INSTANCE = 1
1131
LEVEL_NODE = 2
1132

    
1133
LEVELS = [LEVEL_CLUSTER,
1134
          LEVEL_INSTANCE,
1135
          LEVEL_NODE]
1136

    
1137
# Lock levels which are modifiable
1138
LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1139

    
1140
LEVEL_NAMES = {
1141
  LEVEL_CLUSTER: "cluster",
1142
  LEVEL_INSTANCE: "instance",
1143
  LEVEL_NODE: "node",
1144
  }
1145

    
1146
# Constant for the big ganeti lock
1147
BGL = 'BGL'
1148

    
1149

    
1150
class GanetiLockManager:
1151
  """The Ganeti Locking Library
1152

1153
  The purpose of this small library is to manage locking for ganeti clusters
1154
  in a central place, while at the same time doing dynamic checks against
1155
  possible deadlocks. It will also make it easier to transition to a different
1156
  lock type should we migrate away from python threads.
1157

1158
  """
1159
  _instance = None
1160

    
1161
  def __init__(self, nodes=None, instances=None):
1162
    """Constructs a new GanetiLockManager object.
1163

1164
    There should be only a GanetiLockManager object at any time, so this
1165
    function raises an error if this is not the case.
1166

1167
    @param nodes: list of node names
1168
    @param instances: list of instance names
1169

1170
    """
1171
    assert self.__class__._instance is None, \
1172
           "double GanetiLockManager instance"
1173

    
1174
    self.__class__._instance = self
1175

    
1176
    # The keyring contains all the locks, at their level and in the correct
1177
    # locking order.
1178
    self.__keyring = {
1179
      LEVEL_CLUSTER: LockSet([BGL]),
1180
      LEVEL_NODE: LockSet(nodes),
1181
      LEVEL_INSTANCE: LockSet(instances),
1182
    }
1183

    
1184
  def _names(self, level):
1185
    """List the lock names at the given level.
1186

1187
    This can be used for debugging/testing purposes.
1188

1189
    @param level: the level whose list of locks to get
1190

1191
    """
1192
    assert level in LEVELS, "Invalid locking level %s" % level
1193
    return self.__keyring[level]._names()
1194

    
1195
  def _is_owned(self, level):
1196
    """Check whether we are owning locks at the given level
1197

1198
    """
1199
    return self.__keyring[level]._is_owned()
1200

    
1201
  is_owned = _is_owned
1202

    
1203
  def _list_owned(self, level):
1204
    """Get the set of owned locks at the given level
1205

1206
    """
1207
    return self.__keyring[level]._list_owned()
1208

    
1209
  def _upper_owned(self, level):
1210
    """Check that we don't own any lock at a level greater than the given one.
1211

1212
    """
1213
    # This way of checking only works if LEVELS[i] = i, which we check for in
1214
    # the test cases.
1215
    return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1216

    
1217
  def _BGL_owned(self): # pylint: disable-msg=C0103
1218
    """Check if the current thread owns the BGL.
1219

1220
    Both an exclusive or a shared acquisition work.
1221

1222
    """
1223
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1224

    
1225
  @staticmethod
1226
  def _contains_BGL(level, names): # pylint: disable-msg=C0103
1227
    """Check if the level contains the BGL.
1228

1229
    Check if acting on the given level and set of names will change
1230
    the status of the Big Ganeti Lock.
1231

1232
    """
1233
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1234

    
1235
  def acquire(self, level, names, timeout=None, shared=0):
1236
    """Acquire a set of resource locks, at the same level.
1237

1238
    @type level: member of locking.LEVELS
1239
    @param level: the level at which the locks shall be acquired
1240
    @type names: list of strings (or string)
1241
    @param names: the names of the locks which shall be acquired
1242
        (special lock names, or instance/node names)
1243
    @type shared: integer (0/1) used as a boolean
1244
    @param shared: whether to acquire in shared mode; by default
1245
        an exclusive lock will be acquired
1246
    @type timeout: float
1247
    @param timeout: Maximum time to acquire all locks
1248

1249
    """
1250
    assert level in LEVELS, "Invalid locking level %s" % level
1251

    
1252
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1253
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1254
    # so even if we've migrated we need to at least share the BGL to be
1255
    # compatible with them. Of course if we own the BGL exclusively there's no
1256
    # point in acquiring any other lock, unless perhaps we are half way through
1257
    # the migration of the current opcode.
1258
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1259
            "You must own the Big Ganeti Lock before acquiring any other")
1260

    
1261
    # Check we don't own locks at the same or upper levels.
1262
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1263
           " while owning some at a greater one")
1264

    
1265
    # Acquire the locks in the set.
1266
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1267

    
1268
  def release(self, level, names=None):
1269
    """Release a set of resource locks, at the same level.
1270

1271
    You must have acquired the locks, either in shared or in exclusive
1272
    mode, before releasing them.
1273

1274
    @type level: member of locking.LEVELS
1275
    @param level: the level at which the locks shall be released
1276
    @type names: list of strings, or None
1277
    @param names: the names of the locks which shall be released
1278
        (defaults to all the locks acquired at that level)
1279

1280
    """
1281
    assert level in LEVELS, "Invalid locking level %s" % level
1282
    assert (not self._contains_BGL(level, names) or
1283
            not self._upper_owned(LEVEL_CLUSTER)), (
1284
            "Cannot release the Big Ganeti Lock while holding something"
1285
            " at upper levels (%r)" %
1286
            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1287
                              for i in self.__keyring.keys()]), ))
1288

    
1289
    # Release will complain if we don't own the locks already
1290
    return self.__keyring[level].release(names)
1291

    
1292
  def add(self, level, names, acquired=0, shared=0):
1293
    """Add locks at the specified level.
1294

1295
    @type level: member of locking.LEVELS_MOD
1296
    @param level: the level at which the locks shall be added
1297
    @type names: list of strings
1298
    @param names: names of the locks to acquire
1299
    @type acquired: integer (0/1) used as a boolean
1300
    @param acquired: whether to acquire the newly added locks
1301
    @type shared: integer (0/1) used as a boolean
1302
    @param shared: whether the acquisition will be shared
1303

1304
    """
1305
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1306
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1307
           " operations")
1308
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1309
           " while owning some at a greater one")
1310
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1311

    
1312
  def remove(self, level, names):
1313
    """Remove locks from the specified level.
1314

1315
    You must either already own the locks you are trying to remove
1316
    exclusively or not own any lock at an upper level.
1317

1318
    @type level: member of locking.LEVELS_MOD
1319
    @param level: the level at which the locks shall be removed
1320
    @type names: list of strings
1321
    @param names: the names of the locks which shall be removed
1322
        (special lock names, or instance/node names)
1323

1324
    """
1325
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1326
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1327
           " operations")
1328
    # Check we either own the level or don't own anything from here
1329
    # up. LockSet.remove() will check the case in which we don't own
1330
    # all the needed resources, or we have a shared ownership.
1331
    assert self._is_owned(level) or not self._upper_owned(level), (
1332
           "Cannot remove locks at a level while not owning it or"
1333
           " owning some at a greater one")
1334
    return self.__keyring[level].remove(names)