Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ 7c4c22f5

History | View | Annotate | Download (41.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable-msg=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import time
32
import errno
33

    
34
from ganeti import errors
35
from ganeti import utils
36
from ganeti import compat
37

    
38

    
39
def ssynchronized(mylock, shared=0):
40
  """Shared Synchronization decorator.
41

42
  Calls the function holding the given lock, either in exclusive or shared
43
  mode. It requires the passed lock to be a SharedLock (or support its
44
  semantics).
45

46
  @type mylock: lockable object or string
47
  @param mylock: lock to acquire or class member name of the lock to acquire
48

49
  """
50
  def wrap(fn):
51
    def sync_function(*args, **kwargs):
52
      if isinstance(mylock, basestring):
53
        assert args, "cannot ssynchronize on non-class method: self not found"
54
        # args[0] is "self"
55
        lock = getattr(args[0], mylock)
56
      else:
57
        lock = mylock
58
      lock.acquire(shared=shared)
59
      try:
60
        return fn(*args, **kwargs)
61
      finally:
62
        lock.release()
63
    return sync_function
64
  return wrap
65

    
66

    
67
class RunningTimeout(object):
68
  """Class to calculate remaining timeout when doing several operations.
69

70
  """
71
  __slots__ = [
72
    "_allow_negative",
73
    "_start_time",
74
    "_time_fn",
75
    "_timeout",
76
    ]
77

    
78
  def __init__(self, timeout, allow_negative, _time_fn=time.time):
79
    """Initializes this class.
80

81
    @type timeout: float
82
    @param timeout: Timeout duration
83
    @type allow_negative: bool
84
    @param allow_negative: Whether to return values below zero
85
    @param _time_fn: Time function for unittests
86

87
    """
88
    object.__init__(self)
89

    
90
    if timeout is not None and timeout < 0.0:
91
      raise ValueError("Timeout must not be negative")
92

    
93
    self._timeout = timeout
94
    self._allow_negative = allow_negative
95
    self._time_fn = _time_fn
96

    
97
    self._start_time = None
98

    
99
  def Remaining(self):
100
    """Returns the remaining timeout.
101

102
    """
103
    if self._timeout is None:
104
      return None
105

    
106
    # Get start time on first calculation
107
    if self._start_time is None:
108
      self._start_time = self._time_fn()
109

    
110
    # Calculate remaining time
111
    remaining_timeout = self._start_time + self._timeout - self._time_fn()
112

    
113
    if not self._allow_negative:
114
      # Ensure timeout is always >= 0
115
      return max(0.0, remaining_timeout)
116

    
117
    return remaining_timeout
118

    
119

    
120
class _SingleNotifyPipeConditionWaiter(object):
121
  """Helper class for SingleNotifyPipeCondition
122

123
  """
124
  __slots__ = [
125
    "_fd",
126
    "_poller",
127
    ]
128

    
129
  def __init__(self, poller, fd):
130
    """Constructor for _SingleNotifyPipeConditionWaiter
131

132
    @type poller: select.poll
133
    @param poller: Poller object
134
    @type fd: int
135
    @param fd: File descriptor to wait for
136

137
    """
138
    object.__init__(self)
139
    self._poller = poller
140
    self._fd = fd
141

    
142
  def __call__(self, timeout):
143
    """Wait for something to happen on the pipe.
144

145
    @type timeout: float or None
146
    @param timeout: Timeout for waiting (can be None)
147

148
    """
149
    running_timeout = RunningTimeout(timeout, True)
150

    
151
    while True:
152
      remaining_time = running_timeout.Remaining()
153

    
154
      if remaining_time is not None:
155
        if remaining_time < 0.0:
156
          break
157

    
158
        # Our calculation uses seconds, poll() wants milliseconds
159
        remaining_time *= 1000
160

    
161
      try:
162
        result = self._poller.poll(remaining_time)
163
      except EnvironmentError, err:
164
        if err.errno != errno.EINTR:
165
          raise
166
        result = None
167

    
168
      # Check whether we were notified
169
      if result and result[0][0] == self._fd:
170
        break
171

    
172

    
173
class _BaseCondition(object):
174
  """Base class containing common code for conditions.
175

176
  Some of this code is taken from python's threading module.
177

178
  """
179
  __slots__ = [
180
    "_lock",
181
    "acquire",
182
    "release",
183
    "_is_owned",
184
    "_acquire_restore",
185
    "_release_save",
186
    ]
187

    
188
  def __init__(self, lock):
189
    """Constructor for _BaseCondition.
190

191
    @type lock: threading.Lock
192
    @param lock: condition base lock
193

194
    """
195
    object.__init__(self)
196

    
197
    try:
198
      self._release_save = lock._release_save
199
    except AttributeError:
200
      self._release_save = self._base_release_save
201
    try:
202
      self._acquire_restore = lock._acquire_restore
203
    except AttributeError:
204
      self._acquire_restore = self._base_acquire_restore
205
    try:
206
      self._is_owned = lock._is_owned
207
    except AttributeError:
208
      self._is_owned = self._base_is_owned
209

    
210
    self._lock = lock
211

    
212
    # Export the lock's acquire() and release() methods
213
    self.acquire = lock.acquire
214
    self.release = lock.release
215

    
216
  def _base_is_owned(self):
217
    """Check whether lock is owned by current thread.
218

219
    """
220
    if self._lock.acquire(0):
221
      self._lock.release()
222
      return False
223
    return True
224

    
225
  def _base_release_save(self):
226
    self._lock.release()
227

    
228
  def _base_acquire_restore(self, _):
229
    self._lock.acquire()
230

    
231
  def _check_owned(self):
232
    """Raise an exception if the current thread doesn't own the lock.
233

234
    """
235
    if not self._is_owned():
236
      raise RuntimeError("cannot work with un-aquired lock")
237

    
238

    
239
class SingleNotifyPipeCondition(_BaseCondition):
240
  """Condition which can only be notified once.
241

242
  This condition class uses pipes and poll, internally, to be able to wait for
243
  notification with a timeout, without resorting to polling. It is almost
244
  compatible with Python's threading.Condition, with the following differences:
245
    - notifyAll can only be called once, and no wait can happen after that
246
    - notify is not supported, only notifyAll
247

248
  """
249

    
250
  __slots__ = [
251
    "_poller",
252
    "_read_fd",
253
    "_write_fd",
254
    "_nwaiters",
255
    "_notified",
256
    ]
257

    
258
  _waiter_class = _SingleNotifyPipeConditionWaiter
259

    
260
  def __init__(self, lock):
261
    """Constructor for SingleNotifyPipeCondition
262

263
    """
264
    _BaseCondition.__init__(self, lock)
265
    self._nwaiters = 0
266
    self._notified = False
267
    self._read_fd = None
268
    self._write_fd = None
269
    self._poller = None
270

    
271
  def _check_unnotified(self):
272
    """Throws an exception if already notified.
273

274
    """
275
    if self._notified:
276
      raise RuntimeError("cannot use already notified condition")
277

    
278
  def _Cleanup(self):
279
    """Cleanup open file descriptors, if any.
280

281
    """
282
    if self._read_fd is not None:
283
      os.close(self._read_fd)
284
      self._read_fd = None
285

    
286
    if self._write_fd is not None:
287
      os.close(self._write_fd)
288
      self._write_fd = None
289
    self._poller = None
290

    
291
  def wait(self, timeout=None):
292
    """Wait for a notification.
293

294
    @type timeout: float or None
295
    @param timeout: Waiting timeout (can be None)
296

297
    """
298
    self._check_owned()
299
    self._check_unnotified()
300

    
301
    self._nwaiters += 1
302
    try:
303
      if self._poller is None:
304
        (self._read_fd, self._write_fd) = os.pipe()
305
        self._poller = select.poll()
306
        self._poller.register(self._read_fd, select.POLLHUP)
307

    
308
      wait_fn = self._waiter_class(self._poller, self._read_fd)
309
      state = self._release_save()
310
      try:
311
        # Wait for notification
312
        wait_fn(timeout)
313
      finally:
314
        # Re-acquire lock
315
        self._acquire_restore(state)
316
    finally:
317
      self._nwaiters -= 1
318
      if self._nwaiters == 0:
319
        self._Cleanup()
320

    
321
  def notifyAll(self): # pylint: disable-msg=C0103
322
    """Close the writing side of the pipe to notify all waiters.
323

324
    """
325
    self._check_owned()
326
    self._check_unnotified()
327
    self._notified = True
328
    if self._write_fd is not None:
329
      os.close(self._write_fd)
330
      self._write_fd = None
331

    
332

    
333
class PipeCondition(_BaseCondition):
334
  """Group-only non-polling condition with counters.
335

336
  This condition class uses pipes and poll, internally, to be able to wait for
337
  notification with a timeout, without resorting to polling. It is almost
338
  compatible with Python's threading.Condition, but only supports notifyAll and
339
  non-recursive locks. As an additional features it's able to report whether
340
  there are any waiting threads.
341

342
  """
343
  __slots__ = [
344
    "_nwaiters",
345
    "_single_condition",
346
    ]
347

    
348
  _single_condition_class = SingleNotifyPipeCondition
349

    
350
  def __init__(self, lock):
351
    """Initializes this class.
352

353
    """
354
    _BaseCondition.__init__(self, lock)
355
    self._nwaiters = 0
356
    self._single_condition = self._single_condition_class(self._lock)
357

    
358
  def wait(self, timeout=None):
359
    """Wait for a notification.
360

361
    @type timeout: float or None
362
    @param timeout: Waiting timeout (can be None)
363

364
    """
365
    self._check_owned()
366

    
367
    # Keep local reference to the pipe. It could be replaced by another thread
368
    # notifying while we're waiting.
369
    my_condition = self._single_condition
370

    
371
    assert self._nwaiters >= 0
372
    self._nwaiters += 1
373
    try:
374
      my_condition.wait(timeout)
375
    finally:
376
      assert self._nwaiters > 0
377
      self._nwaiters -= 1
378

    
379
  def notifyAll(self): # pylint: disable-msg=C0103
380
    """Notify all currently waiting threads.
381

382
    """
383
    self._check_owned()
384
    self._single_condition.notifyAll()
385
    self._single_condition = self._single_condition_class(self._lock)
386

    
387
  def has_waiting(self):
388
    """Returns whether there are active waiters.
389

390
    """
391
    self._check_owned()
392

    
393
    return bool(self._nwaiters)
394

    
395

    
396
class SharedLock(object):
397
  """Implements a shared lock.
398

399
  Multiple threads can acquire the lock in a shared way, calling
400
  acquire_shared().  In order to acquire the lock in an exclusive way threads
401
  can call acquire_exclusive().
402

403
  The lock prevents starvation but does not guarantee that threads will acquire
404
  the shared lock in the order they queued for it, just that they will
405
  eventually do so.
406

407
  @type name: string
408
  @ivar name: the name of the lock
409

410
  """
411
  __slots__ = [
412
    "__active_shr_c",
413
    "__inactive_shr_c",
414
    "__deleted",
415
    "__exc",
416
    "__lock",
417
    "__pending",
418
    "__shr",
419
    "name",
420
    ]
421

    
422
  __condition_class = PipeCondition
423

    
424
  def __init__(self, name):
425
    """Construct a new SharedLock.
426

427
    @param name: the name of the lock
428

429
    """
430
    object.__init__(self)
431

    
432
    self.name = name
433

    
434
    # Internal lock
435
    self.__lock = threading.Lock()
436

    
437
    # Queue containing waiting acquires
438
    self.__pending = []
439

    
440
    # Active and inactive conditions for shared locks
441
    self.__active_shr_c = self.__condition_class(self.__lock)
442
    self.__inactive_shr_c = self.__condition_class(self.__lock)
443

    
444
    # Current lock holders
445
    self.__shr = set()
446
    self.__exc = None
447

    
448
    # is this lock in the deleted state?
449
    self.__deleted = False
450

    
451
  def __check_deleted(self):
452
    """Raises an exception if the lock has been deleted.
453

454
    """
455
    if self.__deleted:
456
      raise errors.LockError("Deleted lock %s" % self.name)
457

    
458
  def __is_sharer(self):
459
    """Is the current thread sharing the lock at this time?
460

461
    """
462
    return threading.currentThread() in self.__shr
463

    
464
  def __is_exclusive(self):
465
    """Is the current thread holding the lock exclusively at this time?
466

467
    """
468
    return threading.currentThread() == self.__exc
469

    
470
  def __is_owned(self, shared=-1):
471
    """Is the current thread somehow owning the lock at this time?
472

473
    This is a private version of the function, which presumes you're holding
474
    the internal lock.
475

476
    """
477
    if shared < 0:
478
      return self.__is_sharer() or self.__is_exclusive()
479
    elif shared:
480
      return self.__is_sharer()
481
    else:
482
      return self.__is_exclusive()
483

    
484
  def _is_owned(self, shared=-1):
485
    """Is the current thread somehow owning the lock at this time?
486

487
    @param shared:
488
        - < 0: check for any type of ownership (default)
489
        - 0: check for exclusive ownership
490
        - > 0: check for shared ownership
491

492
    """
493
    self.__lock.acquire()
494
    try:
495
      return self.__is_owned(shared=shared)
496
    finally:
497
      self.__lock.release()
498

    
499
  def _count_pending(self):
500
    """Returns the number of pending acquires.
501

502
    @rtype: int
503

504
    """
505
    self.__lock.acquire()
506
    try:
507
      return len(self.__pending)
508
    finally:
509
      self.__lock.release()
510

    
511
  def __do_acquire(self, shared):
512
    """Actually acquire the lock.
513

514
    """
515
    if shared:
516
      self.__shr.add(threading.currentThread())
517
    else:
518
      self.__exc = threading.currentThread()
519

    
520
  def __can_acquire(self, shared):
521
    """Determine whether lock can be acquired.
522

523
    """
524
    if shared:
525
      return self.__exc is None
526
    else:
527
      return len(self.__shr) == 0 and self.__exc is None
528

    
529
  def __is_on_top(self, cond):
530
    """Checks whether the passed condition is on top of the queue.
531

532
    The caller must make sure the queue isn't empty.
533

534
    """
535
    return self.__pending[0] == cond
536

    
537
  def __acquire_unlocked(self, shared, timeout):
538
    """Acquire a shared lock.
539

540
    @param shared: whether to acquire in shared mode; by default an
541
        exclusive lock will be acquired
542
    @param timeout: maximum waiting time before giving up
543

544
    """
545
    self.__check_deleted()
546

    
547
    # We cannot acquire the lock if we already have it
548
    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
549
                                   " %s" % self.name)
550

    
551
    # Check whether someone else holds the lock or there are pending acquires.
552
    if not self.__pending and self.__can_acquire(shared):
553
      # Apparently not, can acquire lock directly.
554
      self.__do_acquire(shared)
555
      return True
556

    
557
    if shared:
558
      wait_condition = self.__active_shr_c
559

    
560
      # Check if we're not yet in the queue
561
      if wait_condition not in self.__pending:
562
        self.__pending.append(wait_condition)
563
    else:
564
      wait_condition = self.__condition_class(self.__lock)
565
      # Always add to queue
566
      self.__pending.append(wait_condition)
567

    
568
    try:
569
      # Wait until we become the topmost acquire in the queue or the timeout
570
      # expires.
571
      while not (self.__is_on_top(wait_condition) and
572
                 self.__can_acquire(shared)):
573
        # Wait for notification
574
        wait_condition.wait(timeout)
575
        self.__check_deleted()
576

    
577
        # A lot of code assumes blocking acquires always succeed. Loop
578
        # internally for that case.
579
        if timeout is not None:
580
          break
581

    
582
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
583
        self.__do_acquire(shared)
584
        return True
585
    finally:
586
      # Remove condition from queue if there are no more waiters
587
      if not wait_condition.has_waiting() and not self.__deleted:
588
        self.__pending.remove(wait_condition)
589

    
590
    return False
591

    
592
  def acquire(self, shared=0, timeout=None, test_notify=None):
593
    """Acquire a shared lock.
594

595
    @type shared: integer (0/1) used as a boolean
596
    @param shared: whether to acquire in shared mode; by default an
597
        exclusive lock will be acquired
598
    @type timeout: float
599
    @param timeout: maximum waiting time before giving up
600
    @type test_notify: callable or None
601
    @param test_notify: Special callback function for unittesting
602

603
    """
604
    self.__lock.acquire()
605
    try:
606
      # We already got the lock, notify now
607
      if __debug__ and callable(test_notify):
608
        test_notify()
609

    
610
      return self.__acquire_unlocked(shared, timeout)
611
    finally:
612
      self.__lock.release()
613

    
614
  def release(self):
615
    """Release a Shared Lock.
616

617
    You must have acquired the lock, either in shared or in exclusive mode,
618
    before calling this function.
619

620
    """
621
    self.__lock.acquire()
622
    try:
623
      assert self.__is_exclusive() or self.__is_sharer(), \
624
        "Cannot release non-owned lock"
625

    
626
      # Autodetect release type
627
      if self.__is_exclusive():
628
        self.__exc = None
629
      else:
630
        self.__shr.remove(threading.currentThread())
631

    
632
      # Notify topmost condition in queue
633
      if self.__pending:
634
        first_condition = self.__pending[0]
635
        first_condition.notifyAll()
636

    
637
        if first_condition == self.__active_shr_c:
638
          self.__active_shr_c = self.__inactive_shr_c
639
          self.__inactive_shr_c = first_condition
640

    
641
    finally:
642
      self.__lock.release()
643

    
644
  def delete(self, timeout=None):
645
    """Delete a Shared Lock.
646

647
    This operation will declare the lock for removal. First the lock will be
648
    acquired in exclusive mode if you don't already own it, then the lock
649
    will be put in a state where any future and pending acquire() fail.
650

651
    @type timeout: float
652
    @param timeout: maximum waiting time before giving up
653

654
    """
655
    self.__lock.acquire()
656
    try:
657
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
658

    
659
      self.__check_deleted()
660

    
661
      # The caller is allowed to hold the lock exclusively already.
662
      acquired = self.__is_exclusive()
663

    
664
      if not acquired:
665
        acquired = self.__acquire_unlocked(0, timeout)
666

    
667
        assert self.__is_exclusive() and not self.__is_sharer(), \
668
          "Lock wasn't acquired in exclusive mode"
669

    
670
      if acquired:
671
        self.__deleted = True
672
        self.__exc = None
673

    
674
        # Notify all acquires. They'll throw an error.
675
        while self.__pending:
676
          self.__pending.pop().notifyAll()
677

    
678
      return acquired
679
    finally:
680
      self.__lock.release()
681

    
682
  def _release_save(self):
683
    shared = self.__is_sharer()
684
    self.release()
685
    return shared
686

    
687
  def _acquire_restore(self, shared):
688
    self.acquire(shared=shared)
689

    
690

    
691
# Whenever we want to acquire a full LockSet we pass None as the value
692
# to acquire.  Hide this behind this nicely named constant.
693
ALL_SET = None
694

    
695

    
696
class _AcquireTimeout(Exception):
697
  """Internal exception to abort an acquire on a timeout.
698

699
  """
700

    
701

    
702
class LockSet:
703
  """Implements a set of locks.
704

705
  This abstraction implements a set of shared locks for the same resource type,
706
  distinguished by name. The user can lock a subset of the resources and the
707
  LockSet will take care of acquiring the locks always in the same order, thus
708
  preventing deadlock.
709

710
  All the locks needed in the same set must be acquired together, though.
711

712
  @type name: string
713
  @ivar name: the name of the lockset
714

715
  """
716
  def __init__(self, members, name):
717
    """Constructs a new LockSet.
718

719
    @type members: list of strings
720
    @param members: initial members of the set
721

722
    """
723
    assert members is not None, "members parameter is not a list"
724
    self.name = name
725

    
726
    # Used internally to guarantee coherency.
727
    self.__lock = SharedLock(name)
728

    
729
    # The lockdict indexes the relationship name -> lock
730
    # The order-of-locking is implied by the alphabetical order of names
731
    self.__lockdict = {}
732

    
733
    for mname in members:
734
      self.__lockdict[mname] = SharedLock("%s/%s" % (name, mname))
735

    
736
    # The owner dict contains the set of locks each thread owns. For
737
    # performance each thread can access its own key without a global lock on
738
    # this structure. It is paramount though that *no* other type of access is
739
    # done to this structure (eg. no looping over its keys). *_owner helper
740
    # function are defined to guarantee access is correct, but in general never
741
    # do anything different than __owners[threading.currentThread()], or there
742
    # will be trouble.
743
    self.__owners = {}
744

    
745
  def _is_owned(self):
746
    """Is the current thread a current level owner?"""
747
    return threading.currentThread() in self.__owners
748

    
749
  def _add_owned(self, name=None):
750
    """Note the current thread owns the given lock"""
751
    if name is None:
752
      if not self._is_owned():
753
        self.__owners[threading.currentThread()] = set()
754
    else:
755
      if self._is_owned():
756
        self.__owners[threading.currentThread()].add(name)
757
      else:
758
        self.__owners[threading.currentThread()] = set([name])
759

    
760
  def _del_owned(self, name=None):
761
    """Note the current thread owns the given lock"""
762

    
763
    assert not (name is None and self.__lock._is_owned()), \
764
           "Cannot hold internal lock when deleting owner status"
765

    
766
    if name is not None:
767
      self.__owners[threading.currentThread()].remove(name)
768

    
769
    # Only remove the key if we don't hold the set-lock as well
770
    if (not self.__lock._is_owned() and
771
        not self.__owners[threading.currentThread()]):
772
      del self.__owners[threading.currentThread()]
773

    
774
  def _list_owned(self):
775
    """Get the set of resource names owned by the current thread"""
776
    if self._is_owned():
777
      return self.__owners[threading.currentThread()].copy()
778
    else:
779
      return set()
780

    
781
  def _release_and_delete_owned(self):
782
    """Release and delete all resources owned by the current thread"""
783
    for lname in self._list_owned():
784
      lock = self.__lockdict[lname]
785
      if lock._is_owned():
786
        lock.release()
787
      self._del_owned(name=lname)
788

    
789
  def __names(self):
790
    """Return the current set of names.
791

792
    Only call this function while holding __lock and don't iterate on the
793
    result after releasing the lock.
794

795
    """
796
    return self.__lockdict.keys()
797

    
798
  def _names(self):
799
    """Return a copy of the current set of elements.
800

801
    Used only for debugging purposes.
802

803
    """
804
    # If we don't already own the set-level lock acquired
805
    # we'll get it and note we need to release it later.
806
    release_lock = False
807
    if not self.__lock._is_owned():
808
      release_lock = True
809
      self.__lock.acquire(shared=1)
810
    try:
811
      result = self.__names()
812
    finally:
813
      if release_lock:
814
        self.__lock.release()
815
    return set(result)
816

    
817
  def acquire(self, names, timeout=None, shared=0, test_notify=None):
818
    """Acquire a set of resource locks.
819

820
    @type names: list of strings (or string)
821
    @param names: the names of the locks which shall be acquired
822
        (special lock names, or instance/node names)
823
    @type shared: integer (0/1) used as a boolean
824
    @param shared: whether to acquire in shared mode; by default an
825
        exclusive lock will be acquired
826
    @type timeout: float or None
827
    @param timeout: Maximum time to acquire all locks
828
    @type test_notify: callable or None
829
    @param test_notify: Special callback function for unittesting
830

831
    @return: Set of all locks successfully acquired or None in case of timeout
832

833
    @raise errors.LockError: when any lock we try to acquire has
834
        been deleted before we succeed. In this case none of the
835
        locks requested will be acquired.
836

837
    """
838
    assert timeout is None or timeout >= 0.0
839

    
840
    # Check we don't already own locks at this level
841
    assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
842
                                  " (lockset %s)" % self.name)
843

    
844
    # We need to keep track of how long we spent waiting for a lock. The
845
    # timeout passed to this function is over all lock acquires.
846
    running_timeout = RunningTimeout(timeout, False)
847

    
848
    try:
849
      if names is not None:
850
        # Support passing in a single resource to acquire rather than many
851
        if isinstance(names, basestring):
852
          names = [names]
853

    
854
        return self.__acquire_inner(names, False, shared,
855
                                    running_timeout.Remaining, test_notify)
856

    
857
      else:
858
        # If no names are given acquire the whole set by not letting new names
859
        # being added before we release, and getting the current list of names.
860
        # Some of them may then be deleted later, but we'll cope with this.
861
        #
862
        # We'd like to acquire this lock in a shared way, as it's nice if
863
        # everybody else can use the instances at the same time. If are
864
        # acquiring them exclusively though they won't be able to do this
865
        # anyway, though, so we'll get the list lock exclusively as well in
866
        # order to be able to do add() on the set while owning it.
867
        if not self.__lock.acquire(shared=shared,
868
                                   timeout=running_timeout.Remaining()):
869
          raise _AcquireTimeout()
870
        try:
871
          # note we own the set-lock
872
          self._add_owned()
873

    
874
          return self.__acquire_inner(self.__names(), True, shared,
875
                                      running_timeout.Remaining, test_notify)
876
        except:
877
          # We shouldn't have problems adding the lock to the owners list, but
878
          # if we did we'll try to release this lock and re-raise exception.
879
          # Of course something is going to be really wrong, after this.
880
          self.__lock.release()
881
          self._del_owned()
882
          raise
883

    
884
    except _AcquireTimeout:
885
      return None
886

    
887
  def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
888
    """Inner logic for acquiring a number of locks.
889

890
    @param names: Names of the locks to be acquired
891
    @param want_all: Whether all locks in the set should be acquired
892
    @param shared: Whether to acquire in shared mode
893
    @param timeout_fn: Function returning remaining timeout
894
    @param test_notify: Special callback function for unittesting
895

896
    """
897
    acquire_list = []
898

    
899
    # First we look the locks up on __lockdict. We have no way of being sure
900
    # they will still be there after, but this makes it a lot faster should
901
    # just one of them be the already wrong. Using a sorted sequence to prevent
902
    # deadlocks.
903
    for lname in sorted(utils.UniqueSequence(names)):
904
      try:
905
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
906
      except KeyError:
907
        if want_all:
908
          # We are acquiring all the set, it doesn't matter if this particular
909
          # element is not there anymore.
910
          continue
911

    
912
        raise errors.LockError("Non-existing lock %s in set %s" %
913
                               (lname, self.name))
914

    
915
      acquire_list.append((lname, lock))
916

    
917
    # This will hold the locknames we effectively acquired.
918
    acquired = set()
919

    
920
    try:
921
      # Now acquire_list contains a sorted list of resources and locks we
922
      # want.  In order to get them we loop on this (private) list and
923
      # acquire() them.  We gave no real guarantee they will still exist till
924
      # this is done but .acquire() itself is safe and will alert us if the
925
      # lock gets deleted.
926
      for (lname, lock) in acquire_list:
927
        if __debug__ and callable(test_notify):
928
          test_notify_fn = lambda: test_notify(lname)
929
        else:
930
          test_notify_fn = None
931

    
932
        timeout = timeout_fn()
933

    
934
        try:
935
          # raises LockError if the lock was deleted
936
          acq_success = lock.acquire(shared=shared, timeout=timeout,
937
                                     test_notify=test_notify_fn)
938
        except errors.LockError:
939
          if want_all:
940
            # We are acquiring all the set, it doesn't matter if this
941
            # particular element is not there anymore.
942
            continue
943

    
944
          raise errors.LockError("Non-existing lock %s in set %s" %
945
                                 (lname, self.name))
946

    
947
        if not acq_success:
948
          # Couldn't get lock or timeout occurred
949
          if timeout is None:
950
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
951
            # blocking.
952
            raise errors.LockError("Failed to get lock %s (set %s)" %
953
                                   (lname, self.name))
954

    
955
          raise _AcquireTimeout()
956

    
957
        try:
958
          # now the lock cannot be deleted, we have it!
959
          self._add_owned(name=lname)
960
          acquired.add(lname)
961

    
962
        except:
963
          # We shouldn't have problems adding the lock to the owners list, but
964
          # if we did we'll try to release this lock and re-raise exception.
965
          # Of course something is going to be really wrong after this.
966
          if lock._is_owned():
967
            lock.release()
968
          raise
969

    
970
    except:
971
      # Release all owned locks
972
      self._release_and_delete_owned()
973
      raise
974

    
975
    return acquired
976

    
977
  def release(self, names=None):
978
    """Release a set of resource locks, at the same level.
979

980
    You must have acquired the locks, either in shared or in exclusive mode,
981
    before releasing them.
982

983
    @type names: list of strings, or None
984
    @param names: the names of the locks which shall be released
985
        (defaults to all the locks acquired at that level).
986

987
    """
988
    assert self._is_owned(), ("release() on lock set %s while not owner" %
989
                              self.name)
990

    
991
    # Support passing in a single resource to release rather than many
992
    if isinstance(names, basestring):
993
      names = [names]
994

    
995
    if names is None:
996
      names = self._list_owned()
997
    else:
998
      names = set(names)
999
      assert self._list_owned().issuperset(names), (
1000
               "release() on unheld resources %s (set %s)" %
1001
               (names.difference(self._list_owned()), self.name))
1002

    
1003
    # First of all let's release the "all elements" lock, if set.
1004
    # After this 'add' can work again
1005
    if self.__lock._is_owned():
1006
      self.__lock.release()
1007
      self._del_owned()
1008

    
1009
    for lockname in names:
1010
      # If we are sure the lock doesn't leave __lockdict without being
1011
      # exclusively held we can do this...
1012
      self.__lockdict[lockname].release()
1013
      self._del_owned(name=lockname)
1014

    
1015
  def add(self, names, acquired=0, shared=0):
1016
    """Add a new set of elements to the set
1017

1018
    @type names: list of strings
1019
    @param names: names of the new elements to add
1020
    @type acquired: integer (0/1) used as a boolean
1021
    @param acquired: pre-acquire the new resource?
1022
    @type shared: integer (0/1) used as a boolean
1023
    @param shared: is the pre-acquisition shared?
1024

1025
    """
1026
    # Check we don't already own locks at this level
1027
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1028
      ("Cannot add locks if the set %s is only partially owned, or shared" %
1029
       self.name)
1030

    
1031
    # Support passing in a single resource to add rather than many
1032
    if isinstance(names, basestring):
1033
      names = [names]
1034

    
1035
    # If we don't already own the set-level lock acquired in an exclusive way
1036
    # we'll get it and note we need to release it later.
1037
    release_lock = False
1038
    if not self.__lock._is_owned():
1039
      release_lock = True
1040
      self.__lock.acquire()
1041

    
1042
    try:
1043
      invalid_names = set(self.__names()).intersection(names)
1044
      if invalid_names:
1045
        # This must be an explicit raise, not an assert, because assert is
1046
        # turned off when using optimization, and this can happen because of
1047
        # concurrency even if the user doesn't want it.
1048
        raise errors.LockError("duplicate add(%s) on lockset %s" %
1049
                               (invalid_names, self.name))
1050

    
1051
      for lockname in names:
1052
        lock = SharedLock("%s/%s" % (self.name, lockname))
1053

    
1054
        if acquired:
1055
          lock.acquire(shared=shared)
1056
          # now the lock cannot be deleted, we have it!
1057
          try:
1058
            self._add_owned(name=lockname)
1059
          except:
1060
            # We shouldn't have problems adding the lock to the owners list,
1061
            # but if we did we'll try to release this lock and re-raise
1062
            # exception.  Of course something is going to be really wrong,
1063
            # after this.  On the other hand the lock hasn't been added to the
1064
            # __lockdict yet so no other threads should be pending on it. This
1065
            # release is just a safety measure.
1066
            lock.release()
1067
            raise
1068

    
1069
        self.__lockdict[lockname] = lock
1070

    
1071
    finally:
1072
      # Only release __lock if we were not holding it previously.
1073
      if release_lock:
1074
        self.__lock.release()
1075

    
1076
    return True
1077

    
1078
  def remove(self, names):
1079
    """Remove elements from the lock set.
1080

1081
    You can either not hold anything in the lockset or already hold a superset
1082
    of the elements you want to delete, exclusively.
1083

1084
    @type names: list of strings
1085
    @param names: names of the resource to remove.
1086

1087
    @return: a list of locks which we removed; the list is always
1088
        equal to the names list if we were holding all the locks
1089
        exclusively
1090

1091
    """
1092
    # Support passing in a single resource to remove rather than many
1093
    if isinstance(names, basestring):
1094
      names = [names]
1095

    
1096
    # If we own any subset of this lock it must be a superset of what we want
1097
    # to delete. The ownership must also be exclusive, but that will be checked
1098
    # by the lock itself.
1099
    assert not self._is_owned() or self._list_owned().issuperset(names), (
1100
      "remove() on acquired lockset %s while not owning all elements" %
1101
      self.name)
1102

    
1103
    removed = []
1104

    
1105
    for lname in names:
1106
      # Calling delete() acquires the lock exclusively if we don't already own
1107
      # it, and causes all pending and subsequent lock acquires to fail. It's
1108
      # fine to call it out of order because delete() also implies release(),
1109
      # and the assertion above guarantees that if we either already hold
1110
      # everything we want to delete, or we hold none.
1111
      try:
1112
        self.__lockdict[lname].delete()
1113
        removed.append(lname)
1114
      except (KeyError, errors.LockError):
1115
        # This cannot happen if we were already holding it, verify:
1116
        assert not self._is_owned(), ("remove failed while holding lockset %s"
1117
                                      % self.name)
1118
      else:
1119
        # If no LockError was raised we are the ones who deleted the lock.
1120
        # This means we can safely remove it from lockdict, as any further or
1121
        # pending delete() or acquire() will fail (and nobody can have the lock
1122
        # since before our call to delete()).
1123
        #
1124
        # This is done in an else clause because if the exception was thrown
1125
        # it's the job of the one who actually deleted it.
1126
        del self.__lockdict[lname]
1127
        # And let's remove it from our private list if we owned it.
1128
        if self._is_owned():
1129
          self._del_owned(name=lname)
1130

    
1131
    return removed
1132

    
1133

    
1134
# Locking levels, must be acquired in increasing order.
1135
# Current rules are:
1136
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1137
#   acquired before performing any operation, either in shared or in exclusive
1138
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1139
#   avoided.
1140
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1141
#   If you need more than one node, or more than one instance, acquire them at
1142
#   the same time.
1143
LEVEL_CLUSTER = 0
1144
LEVEL_INSTANCE = 1
1145
LEVEL_NODE = 2
1146

    
1147
LEVELS = [LEVEL_CLUSTER,
1148
          LEVEL_INSTANCE,
1149
          LEVEL_NODE]
1150

    
1151
# Lock levels which are modifiable
1152
LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1153

    
1154
LEVEL_NAMES = {
1155
  LEVEL_CLUSTER: "cluster",
1156
  LEVEL_INSTANCE: "instance",
1157
  LEVEL_NODE: "node",
1158
  }
1159

    
1160
# Constant for the big ganeti lock
1161
BGL = 'BGL'
1162

    
1163

    
1164
class GanetiLockManager:
1165
  """The Ganeti Locking Library
1166

1167
  The purpose of this small library is to manage locking for ganeti clusters
1168
  in a central place, while at the same time doing dynamic checks against
1169
  possible deadlocks. It will also make it easier to transition to a different
1170
  lock type should we migrate away from python threads.
1171

1172
  """
1173
  _instance = None
1174

    
1175
  def __init__(self, nodes=None, instances=None):
1176
    """Constructs a new GanetiLockManager object.
1177

1178
    There should be only a GanetiLockManager object at any time, so this
1179
    function raises an error if this is not the case.
1180

1181
    @param nodes: list of node names
1182
    @param instances: list of instance names
1183

1184
    """
1185
    assert self.__class__._instance is None, \
1186
           "double GanetiLockManager instance"
1187

    
1188
    self.__class__._instance = self
1189

    
1190
    # The keyring contains all the locks, at their level and in the correct
1191
    # locking order.
1192
    self.__keyring = {
1193
      LEVEL_CLUSTER: LockSet([BGL], "bgl lockset"),
1194
      LEVEL_NODE: LockSet(nodes, "nodes lockset"),
1195
      LEVEL_INSTANCE: LockSet(instances, "instances lockset"),
1196
    }
1197

    
1198
  def _names(self, level):
1199
    """List the lock names at the given level.
1200

1201
    This can be used for debugging/testing purposes.
1202

1203
    @param level: the level whose list of locks to get
1204

1205
    """
1206
    assert level in LEVELS, "Invalid locking level %s" % level
1207
    return self.__keyring[level]._names()
1208

    
1209
  def _is_owned(self, level):
1210
    """Check whether we are owning locks at the given level
1211

1212
    """
1213
    return self.__keyring[level]._is_owned()
1214

    
1215
  is_owned = _is_owned
1216

    
1217
  def _list_owned(self, level):
1218
    """Get the set of owned locks at the given level
1219

1220
    """
1221
    return self.__keyring[level]._list_owned()
1222

    
1223
  def _upper_owned(self, level):
1224
    """Check that we don't own any lock at a level greater than the given one.
1225

1226
    """
1227
    # This way of checking only works if LEVELS[i] = i, which we check for in
1228
    # the test cases.
1229
    return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1230

    
1231
  def _BGL_owned(self): # pylint: disable-msg=C0103
1232
    """Check if the current thread owns the BGL.
1233

1234
    Both an exclusive or a shared acquisition work.
1235

1236
    """
1237
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1238

    
1239
  @staticmethod
1240
  def _contains_BGL(level, names): # pylint: disable-msg=C0103
1241
    """Check if the level contains the BGL.
1242

1243
    Check if acting on the given level and set of names will change
1244
    the status of the Big Ganeti Lock.
1245

1246
    """
1247
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1248

    
1249
  def acquire(self, level, names, timeout=None, shared=0):
1250
    """Acquire a set of resource locks, at the same level.
1251

1252
    @type level: member of locking.LEVELS
1253
    @param level: the level at which the locks shall be acquired
1254
    @type names: list of strings (or string)
1255
    @param names: the names of the locks which shall be acquired
1256
        (special lock names, or instance/node names)
1257
    @type shared: integer (0/1) used as a boolean
1258
    @param shared: whether to acquire in shared mode; by default
1259
        an exclusive lock will be acquired
1260
    @type timeout: float
1261
    @param timeout: Maximum time to acquire all locks
1262

1263
    """
1264
    assert level in LEVELS, "Invalid locking level %s" % level
1265

    
1266
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1267
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1268
    # so even if we've migrated we need to at least share the BGL to be
1269
    # compatible with them. Of course if we own the BGL exclusively there's no
1270
    # point in acquiring any other lock, unless perhaps we are half way through
1271
    # the migration of the current opcode.
1272
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1273
            "You must own the Big Ganeti Lock before acquiring any other")
1274

    
1275
    # Check we don't own locks at the same or upper levels.
1276
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1277
           " while owning some at a greater one")
1278

    
1279
    # Acquire the locks in the set.
1280
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1281

    
1282
  def release(self, level, names=None):
1283
    """Release a set of resource locks, at the same level.
1284

1285
    You must have acquired the locks, either in shared or in exclusive
1286
    mode, before releasing them.
1287

1288
    @type level: member of locking.LEVELS
1289
    @param level: the level at which the locks shall be released
1290
    @type names: list of strings, or None
1291
    @param names: the names of the locks which shall be released
1292
        (defaults to all the locks acquired at that level)
1293

1294
    """
1295
    assert level in LEVELS, "Invalid locking level %s" % level
1296
    assert (not self._contains_BGL(level, names) or
1297
            not self._upper_owned(LEVEL_CLUSTER)), (
1298
            "Cannot release the Big Ganeti Lock while holding something"
1299
            " at upper levels (%r)" %
1300
            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1301
                              for i in self.__keyring.keys()]), ))
1302

    
1303
    # Release will complain if we don't own the locks already
1304
    return self.__keyring[level].release(names)
1305

    
1306
  def add(self, level, names, acquired=0, shared=0):
1307
    """Add locks at the specified level.
1308

1309
    @type level: member of locking.LEVELS_MOD
1310
    @param level: the level at which the locks shall be added
1311
    @type names: list of strings
1312
    @param names: names of the locks to acquire
1313
    @type acquired: integer (0/1) used as a boolean
1314
    @param acquired: whether to acquire the newly added locks
1315
    @type shared: integer (0/1) used as a boolean
1316
    @param shared: whether the acquisition will be shared
1317

1318
    """
1319
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1320
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1321
           " operations")
1322
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1323
           " while owning some at a greater one")
1324
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1325

    
1326
  def remove(self, level, names):
1327
    """Remove locks from the specified level.
1328

1329
    You must either already own the locks you are trying to remove
1330
    exclusively or not own any lock at an upper level.
1331

1332
    @type level: member of locking.LEVELS_MOD
1333
    @param level: the level at which the locks shall be removed
1334
    @type names: list of strings
1335
    @param names: the names of the locks which shall be removed
1336
        (special lock names, or instance/node names)
1337

1338
    """
1339
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1340
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1341
           " operations")
1342
    # Check we either own the level or don't own anything from here
1343
    # up. LockSet.remove() will check the case in which we don't own
1344
    # all the needed resources, or we have a shared ownership.
1345
    assert self._is_owned(level) or not self._upper_owned(level), (
1346
           "Cannot remove locks at a level while not owning it or"
1347
           " owning some at a greater one")
1348
    return self.__keyring[level].remove(names)