Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ 008b92fa

History | View | Annotate | Download (35.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
import os
24
import select
25
import threading
26
import time
27
import errno
28

    
29
from ganeti import errors
30
from ganeti import utils
31

    
32

    
33
def ssynchronized(lock, shared=0):
34
  """Shared Synchronization decorator.
35

36
  Calls the function holding the given lock, either in exclusive or shared
37
  mode. It requires the passed lock to be a SharedLock (or support its
38
  semantics).
39

40
  """
41
  def wrap(fn):
42
    def sync_function(*args, **kwargs):
43
      lock.acquire(shared=shared)
44
      try:
45
        return fn(*args, **kwargs)
46
      finally:
47
        lock.release()
48
    return sync_function
49
  return wrap
50

    
51

    
52
class _SingleNotifyPipeConditionWaiter(object):
53
  """Helper class for SingleNotifyPipeCondition
54

55
  """
56
  __slots__ = [
57
    "_fd",
58
    "_poller",
59
    ]
60

    
61
  def __init__(self, poller, fd):
62
    """Constructor for _SingleNotifyPipeConditionWaiter
63

64
    @type poller: select.poll
65
    @param poller: Poller object
66
    @type fd: int
67
    @param fd: File descriptor to wait for
68

69
    """
70
    object.__init__(self)
71
    self._poller = poller
72
    self._fd = fd
73

    
74
  def __call__(self, timeout):
75
    """Wait for something to happen on the pipe.
76

77
    @type timeout: float or None
78
    @param timeout: Timeout for waiting (can be None)
79

80
    """
81
    start_time = time.time()
82
    remaining_time = timeout
83

    
84
    while timeout is None or remaining_time > 0:
85
      try:
86
        result = self._poller.poll(remaining_time)
87
      except EnvironmentError, err:
88
        if err.errno != errno.EINTR:
89
          raise
90
        result = None
91

    
92
      # Check whether we were notified
93
      if result and result[0][0] == self._fd:
94
        break
95

    
96
      # Re-calculate timeout if necessary
97
      if timeout is not None:
98
        remaining_time = start_time + timeout - time.time()
99

    
100

    
101
class _BaseCondition(object):
102
  """Base class containing common code for conditions.
103

104
  Some of this code is taken from python's threading module.
105

106
  """
107
  __slots__ = [
108
    "_lock",
109
    "acquire",
110
    "release",
111
    ]
112

    
113
  def __init__(self, lock):
114
    """Constructor for _BaseCondition.
115

116
    @type lock: L{threading.Lock}
117
    @param lock: condition base lock
118

119
    """
120
    object.__init__(self)
121

    
122
    # Recursive locks are not supported
123
    assert not hasattr(lock, "_acquire_restore")
124
    assert not hasattr(lock, "_release_save")
125

    
126
    self._lock = lock
127

    
128
    # Export the lock's acquire() and release() methods
129
    self.acquire = lock.acquire
130
    self.release = lock.release
131

    
132
  def _is_owned(self):
133
    """Check whether lock is owned by current thread.
134

135
    """
136
    if self._lock.acquire(0):
137
      self._lock.release()
138
      return False
139

    
140
    return True
141

    
142
  def _check_owned(self):
143
    """Raise an exception if the current thread doesn't own the lock.
144

145
    """
146
    if not self._is_owned():
147
      raise RuntimeError("cannot work with un-aquired lock")
148

    
149

    
150
class SingleNotifyPipeCondition(_BaseCondition):
151
  """Condition which can only be notified once.
152

153
  This condition class uses pipes and poll, internally, to be able to wait for
154
  notification with a timeout, without resorting to polling. It is almost
155
  compatible with Python's threading.Condition, with the following differences:
156
    - notifyAll can only be called once, and no wait can happen after that
157
    - notify is not supported, only notifyAll
158

159
  """
160

    
161
  __slots__ = _BaseCondition.__slots__ + [
162
    "_poller",
163
    "_read_fd",
164
    "_write_fd",
165
    "_nwaiters",
166
    "_notified",
167
    ]
168

    
169
  _waiter_class = _SingleNotifyPipeConditionWaiter
170

    
171
  def __init__(self, lock):
172
    """Constructor for SingleNotifyPipeCondition
173

174
    """
175
    _BaseCondition.__init__(self, lock)
176
    self._nwaiters = 0
177
    self._notified = False
178
    self._read_fd = None
179
    self._write_fd = None
180
    self._poller = None
181

    
182
  def _check_unnotified(self):
183
    if self._notified:
184
      raise RuntimeError("cannot use already notified condition")
185

    
186
  def _Cleanup(self):
187
    """Cleanup open file descriptors, if any.
188

189
    """
190
    if self._read_fd is not None:
191
      os.close(self._read_fd)
192
      self._read_fd = None
193

    
194
    if self._write_fd is not None:
195
      os.close(self._write_fd)
196
      self._write_fd = None
197
    self._poller = None
198

    
199
  def wait(self, timeout=None):
200
    """Wait for a notification.
201

202
    @type timeout: float or None
203
    @param timeout: Waiting timeout (can be None)
204

205
    """
206
    self._check_owned()
207
    self._check_unnotified()
208

    
209
    self._nwaiters += 1
210
    try:
211
      if self._poller is None:
212
        (self._read_fd, self._write_fd) = os.pipe()
213
        self._poller = select.poll()
214
        self._poller.register(self._read_fd, select.POLLHUP)
215

    
216
      wait_fn = self._waiter_class(self._poller, self._read_fd)
217
      self.release()
218
      try:
219
        # Wait for notification
220
        wait_fn(timeout)
221
      finally:
222
        # Re-acquire lock
223
        self.acquire()
224
    finally:
225
      self._nwaiters -= 1
226
      if self._nwaiters == 0:
227
        self._Cleanup()
228

    
229
  def notifyAll(self):
230
    """Close the writing side of the pipe to notify all waiters.
231

232
    """
233
    self._check_owned()
234
    self._check_unnotified()
235
    self._notified = True
236
    if self._write_fd is not None:
237
      os.close(self._write_fd)
238
      self._write_fd = None
239

    
240

    
241
class PipeCondition(_BaseCondition):
242
  """Group-only non-polling condition with counters.
243

244
  This condition class uses pipes and poll, internally, to be able to wait for
245
  notification with a timeout, without resorting to polling. It is almost
246
  compatible with Python's threading.Condition, but only supports notifyAll and
247
  non-recursive locks. As an additional features it's able to report whether
248
  there are any waiting threads.
249

250
  """
251
  __slots__ = _BaseCondition.__slots__ + [
252
    "_nwaiters",
253
    "_single_condition",
254
    ]
255

    
256
  _single_condition_class = SingleNotifyPipeCondition
257

    
258
  def __init__(self, lock):
259
    """Initializes this class.
260

261
    """
262
    _BaseCondition.__init__(self, lock)
263
    self._nwaiters = 0
264
    self._single_condition = self._single_condition_class(self._lock)
265

    
266
  def wait(self, timeout=None):
267
    """Wait for a notification.
268

269
    @type timeout: float or None
270
    @param timeout: Waiting timeout (can be None)
271

272
    """
273
    self._check_owned()
274

    
275
    # Keep local reference to the pipe. It could be replaced by another thread
276
    # notifying while we're waiting.
277
    my_condition = self._single_condition
278

    
279
    assert self._nwaiters >= 0
280
    self._nwaiters += 1
281
    try:
282
      my_condition.wait(timeout)
283
    finally:
284
      assert self._nwaiters > 0
285
      self._nwaiters -= 1
286

    
287
  def notifyAll(self):
288
    """Notify all currently waiting threads.
289

290
    """
291
    self._check_owned()
292
    self._single_condition.notifyAll()
293
    self._single_condition = self._single_condition_class(self._lock)
294

    
295
  def has_waiting(self):
296
    """Returns whether there are active waiters.
297

298
    """
299
    self._check_owned()
300

    
301
    return bool(self._nwaiters)
302

    
303

    
304
class _CountingCondition(object):
305
  """Wrapper for Python's built-in threading.Condition class.
306

307
  This wrapper keeps a count of active waiters. We can't access the internal
308
  "__waiters" attribute of threading.Condition because it's not thread-safe.
309

310
  """
311
  __slots__ = [
312
    "_cond",
313
    "_nwaiters",
314
    ]
315

    
316
  def __init__(self, lock):
317
    """Initializes this class.
318

319
    """
320
    object.__init__(self)
321
    self._cond = threading.Condition(lock=lock)
322
    self._nwaiters = 0
323

    
324
  def notifyAll(self):
325
    """Notifies the condition.
326

327
    """
328
    return self._cond.notifyAll()
329

    
330
  def wait(self, timeout=None):
331
    """Waits for the condition to be notified.
332

333
    @type timeout: float or None
334
    @param timeout: Waiting timeout (can be None)
335

336
    """
337
    assert self._nwaiters >= 0
338

    
339
    self._nwaiters += 1
340
    try:
341
      return self._cond.wait(timeout=timeout)
342
    finally:
343
      self._nwaiters -= 1
344

    
345
  def has_waiting(self):
346
    """Returns whether there are active waiters.
347

348
    """
349
    return bool(self._nwaiters)
350

    
351

    
352
class SharedLock(object):
353
  """Implements a shared lock.
354

355
  Multiple threads can acquire the lock in a shared way, calling
356
  acquire_shared().  In order to acquire the lock in an exclusive way threads
357
  can call acquire_exclusive().
358

359
  The lock prevents starvation but does not guarantee that threads will acquire
360
  the shared lock in the order they queued for it, just that they will
361
  eventually do so.
362

363
  """
364
  __slots__ = [
365
    "__active_shr_c",
366
    "__inactive_shr_c",
367
    "__deleted",
368
    "__exc",
369
    "__lock",
370
    "__pending",
371
    "__shr",
372
    ]
373

    
374
  __condition_class = PipeCondition
375

    
376
  def __init__(self):
377
    """Construct a new SharedLock.
378

379
    """
380
    object.__init__(self)
381

    
382
    # Internal lock
383
    self.__lock = threading.Lock()
384

    
385
    # Queue containing waiting acquires
386
    self.__pending = []
387

    
388
    # Active and inactive conditions for shared locks
389
    self.__active_shr_c = self.__condition_class(self.__lock)
390
    self.__inactive_shr_c = self.__condition_class(self.__lock)
391

    
392
    # Current lock holders
393
    self.__shr = set()
394
    self.__exc = None
395

    
396
    # is this lock in the deleted state?
397
    self.__deleted = False
398

    
399
  def __check_deleted(self):
400
    """Raises an exception if the lock has been deleted.
401

402
    """
403
    if self.__deleted:
404
      raise errors.LockError("Deleted lock")
405

    
406
  def __is_sharer(self):
407
    """Is the current thread sharing the lock at this time?
408

409
    """
410
    return threading.currentThread() in self.__shr
411

    
412
  def __is_exclusive(self):
413
    """Is the current thread holding the lock exclusively at this time?
414

415
    """
416
    return threading.currentThread() == self.__exc
417

    
418
  def __is_owned(self, shared=-1):
419
    """Is the current thread somehow owning the lock at this time?
420

421
    This is a private version of the function, which presumes you're holding
422
    the internal lock.
423

424
    """
425
    if shared < 0:
426
      return self.__is_sharer() or self.__is_exclusive()
427
    elif shared:
428
      return self.__is_sharer()
429
    else:
430
      return self.__is_exclusive()
431

    
432
  def _is_owned(self, shared=-1):
433
    """Is the current thread somehow owning the lock at this time?
434

435
    @param shared:
436
        - < 0: check for any type of ownership (default)
437
        - 0: check for exclusive ownership
438
        - > 0: check for shared ownership
439

440
    """
441
    self.__lock.acquire()
442
    try:
443
      return self.__is_owned(shared=shared)
444
    finally:
445
      self.__lock.release()
446

    
447
  def _count_pending(self):
448
    """Returns the number of pending acquires.
449

450
    @rtype: int
451

452
    """
453
    self.__lock.acquire()
454
    try:
455
      return len(self.__pending)
456
    finally:
457
      self.__lock.release()
458

    
459
  def __do_acquire(self, shared):
460
    """Actually acquire the lock.
461

462
    """
463
    if shared:
464
      self.__shr.add(threading.currentThread())
465
    else:
466
      self.__exc = threading.currentThread()
467

    
468
  def __can_acquire(self, shared):
469
    """Determine whether lock can be acquired.
470

471
    """
472
    if shared:
473
      return self.__exc is None
474
    else:
475
      return len(self.__shr) == 0 and self.__exc is None
476

    
477
  def __is_on_top(self, cond):
478
    """Checks whether the passed condition is on top of the queue.
479

480
    The caller must make sure the queue isn't empty.
481

482
    """
483
    return self.__pending[0] == cond
484

    
485
  def __acquire_unlocked(self, shared, timeout):
486
    """Acquire a shared lock.
487

488
    @param shared: whether to acquire in shared mode; by default an
489
        exclusive lock will be acquired
490
    @param timeout: maximum waiting time before giving up
491

492
    """
493
    self.__check_deleted()
494

    
495
    # We cannot acquire the lock if we already have it
496
    assert not self.__is_owned(), "double acquire() on a non-recursive lock"
497

    
498
    # Check whether someone else holds the lock or there are pending acquires.
499
    if not self.__pending and self.__can_acquire(shared):
500
      # Apparently not, can acquire lock directly.
501
      self.__do_acquire(shared)
502
      return True
503

    
504
    if shared:
505
      wait_condition = self.__active_shr_c
506

    
507
      # Check if we're not yet in the queue
508
      if wait_condition not in self.__pending:
509
        self.__pending.append(wait_condition)
510
    else:
511
      wait_condition = self.__condition_class(self.__lock)
512
      # Always add to queue
513
      self.__pending.append(wait_condition)
514

    
515
    try:
516
      # Wait until we become the topmost acquire in the queue or the timeout
517
      # expires.
518
      while not (self.__is_on_top(wait_condition) and
519
                 self.__can_acquire(shared)):
520
        # Wait for notification
521
        wait_condition.wait(timeout)
522
        self.__check_deleted()
523

    
524
        # A lot of code assumes blocking acquires always succeed. Loop
525
        # internally for that case.
526
        if timeout is not None:
527
          break
528

    
529
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
530
        self.__do_acquire(shared)
531
        return True
532
    finally:
533
      # Remove condition from queue if there are no more waiters
534
      if not wait_condition.has_waiting() and not self.__deleted:
535
        self.__pending.remove(wait_condition)
536

    
537
    return False
538

    
539
  def acquire(self, shared=0, timeout=None, test_notify=None):
540
    """Acquire a shared lock.
541

542
    @type shared: int
543
    @param shared: whether to acquire in shared mode; by default an
544
        exclusive lock will be acquired
545
    @type timeout: float
546
    @param timeout: maximum waiting time before giving up
547
    @type test_notify: callable or None
548
    @param test_notify: Special callback function for unittesting
549

550
    """
551
    self.__lock.acquire()
552
    try:
553
      # We already got the lock, notify now
554
      if __debug__ and callable(test_notify):
555
        test_notify()
556

    
557
      return self.__acquire_unlocked(shared, timeout)
558
    finally:
559
      self.__lock.release()
560

    
561
  def release(self):
562
    """Release a Shared Lock.
563

564
    You must have acquired the lock, either in shared or in exclusive mode,
565
    before calling this function.
566

567
    """
568
    self.__lock.acquire()
569
    try:
570
      assert self.__is_exclusive() or self.__is_sharer(), \
571
        "Cannot release non-owned lock"
572

    
573
      # Autodetect release type
574
      if self.__is_exclusive():
575
        self.__exc = None
576
      else:
577
        self.__shr.remove(threading.currentThread())
578

    
579
      # Notify topmost condition in queue
580
      if self.__pending:
581
        first_condition = self.__pending[0]
582
        first_condition.notifyAll()
583

    
584
        if first_condition == self.__active_shr_c:
585
          self.__active_shr_c = self.__inactive_shr_c
586
          self.__inactive_shr_c = first_condition
587

    
588
    finally:
589
      self.__lock.release()
590

    
591
  def delete(self, timeout=None):
592
    """Delete a Shared Lock.
593

594
    This operation will declare the lock for removal. First the lock will be
595
    acquired in exclusive mode if you don't already own it, then the lock
596
    will be put in a state where any future and pending acquire() fail.
597

598
    @type timeout: float
599
    @param timeout: maximum waiting time before giving up
600

601
    """
602
    self.__lock.acquire()
603
    try:
604
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
605

    
606
      self.__check_deleted()
607

    
608
      # The caller is allowed to hold the lock exclusively already.
609
      acquired = self.__is_exclusive()
610

    
611
      if not acquired:
612
        acquired = self.__acquire_unlocked(0, timeout)
613

    
614
        assert self.__is_exclusive() and not self.__is_sharer(), \
615
          "Lock wasn't acquired in exclusive mode"
616

    
617
      if acquired:
618
        self.__deleted = True
619
        self.__exc = None
620

    
621
        # Notify all acquires. They'll throw an error.
622
        while self.__pending:
623
          self.__pending.pop().notifyAll()
624

    
625
      return acquired
626
    finally:
627
      self.__lock.release()
628

    
629

    
630
# Whenever we want to acquire a full LockSet we pass None as the value
631
# to acquire.  Hide this behind this nicely named constant.
632
ALL_SET = None
633

    
634

    
635
class LockSet:
636
  """Implements a set of locks.
637

638
  This abstraction implements a set of shared locks for the same resource type,
639
  distinguished by name. The user can lock a subset of the resources and the
640
  LockSet will take care of acquiring the locks always in the same order, thus
641
  preventing deadlock.
642

643
  All the locks needed in the same set must be acquired together, though.
644

645
  """
646
  def __init__(self, members=None):
647
    """Constructs a new LockSet.
648

649
    @param members: initial members of the set
650

651
    """
652
    # Used internally to guarantee coherency.
653
    self.__lock = SharedLock()
654

    
655
    # The lockdict indexes the relationship name -> lock
656
    # The order-of-locking is implied by the alphabetical order of names
657
    self.__lockdict = {}
658

    
659
    if members is not None:
660
      for name in members:
661
        self.__lockdict[name] = SharedLock()
662

    
663
    # The owner dict contains the set of locks each thread owns. For
664
    # performance each thread can access its own key without a global lock on
665
    # this structure. It is paramount though that *no* other type of access is
666
    # done to this structure (eg. no looping over its keys). *_owner helper
667
    # function are defined to guarantee access is correct, but in general never
668
    # do anything different than __owners[threading.currentThread()], or there
669
    # will be trouble.
670
    self.__owners = {}
671

    
672
  def _is_owned(self):
673
    """Is the current thread a current level owner?"""
674
    return threading.currentThread() in self.__owners
675

    
676
  def _add_owned(self, name=None):
677
    """Note the current thread owns the given lock"""
678
    if name is None:
679
      if not self._is_owned():
680
        self.__owners[threading.currentThread()] = set()
681
    else:
682
      if self._is_owned():
683
        self.__owners[threading.currentThread()].add(name)
684
      else:
685
        self.__owners[threading.currentThread()] = set([name])
686

    
687
  def _del_owned(self, name=None):
688
    """Note the current thread owns the given lock"""
689

    
690
    if name is not None:
691
      self.__owners[threading.currentThread()].remove(name)
692

    
693
    # Only remove the key if we don't hold the set-lock as well
694
    if (not self.__lock._is_owned() and
695
        not self.__owners[threading.currentThread()]):
696
      del self.__owners[threading.currentThread()]
697

    
698
  def _list_owned(self):
699
    """Get the set of resource names owned by the current thread"""
700
    if self._is_owned():
701
      return self.__owners[threading.currentThread()].copy()
702
    else:
703
      return set()
704

    
705
  def __names(self):
706
    """Return the current set of names.
707

708
    Only call this function while holding __lock and don't iterate on the
709
    result after releasing the lock.
710

711
    """
712
    return self.__lockdict.keys()
713

    
714
  def _names(self):
715
    """Return a copy of the current set of elements.
716

717
    Used only for debugging purposes.
718

719
    """
720
    # If we don't already own the set-level lock acquired
721
    # we'll get it and note we need to release it later.
722
    release_lock = False
723
    if not self.__lock._is_owned():
724
      release_lock = True
725
      self.__lock.acquire(shared=1)
726
    try:
727
      result = self.__names()
728
    finally:
729
      if release_lock:
730
        self.__lock.release()
731
    return set(result)
732

    
733
  def acquire(self, names, timeout=None, shared=0):
734
    """Acquire a set of resource locks.
735

736
    @param names: the names of the locks which shall be acquired
737
        (special lock names, or instance/node names)
738
    @param shared: whether to acquire in shared mode; by default an
739
        exclusive lock will be acquired
740
    @type timeout: float
741
    @param timeout: Maximum time to acquire all locks
742

743
    @return: True when all the locks are successfully acquired
744

745
    @raise errors.LockError: when any lock we try to acquire has
746
        been deleted before we succeed. In this case none of the
747
        locks requested will be acquired.
748

749
    """
750
    if timeout is not None:
751
      raise NotImplementedError
752

    
753
    # Check we don't already own locks at this level
754
    assert not self._is_owned(), "Cannot acquire locks in the same set twice"
755

    
756
    if names is None:
757
      # If no names are given acquire the whole set by not letting new names
758
      # being added before we release, and getting the current list of names.
759
      # Some of them may then be deleted later, but we'll cope with this.
760
      #
761
      # We'd like to acquire this lock in a shared way, as it's nice if
762
      # everybody else can use the instances at the same time. If are acquiring
763
      # them exclusively though they won't be able to do this anyway, though,
764
      # so we'll get the list lock exclusively as well in order to be able to
765
      # do add() on the set while owning it.
766
      self.__lock.acquire(shared=shared)
767
      try:
768
        # note we own the set-lock
769
        self._add_owned()
770
        names = self.__names()
771
      except:
772
        # We shouldn't have problems adding the lock to the owners list, but
773
        # if we did we'll try to release this lock and re-raise exception.
774
        # Of course something is going to be really wrong, after this.
775
        self.__lock.release()
776
        raise
777

    
778
    try:
779
      # Support passing in a single resource to acquire rather than many
780
      if isinstance(names, basestring):
781
        names = [names]
782
      else:
783
        names = sorted(names)
784

    
785
      acquire_list = []
786
      # First we look the locks up on __lockdict. We have no way of being sure
787
      # they will still be there after, but this makes it a lot faster should
788
      # just one of them be the already wrong
789
      for lname in utils.UniqueSequence(names):
790
        try:
791
          lock = self.__lockdict[lname] # raises KeyError if lock is not there
792
          acquire_list.append((lname, lock))
793
        except (KeyError):
794
          if self.__lock._is_owned():
795
            # We are acquiring all the set, it doesn't matter if this
796
            # particular element is not there anymore.
797
            continue
798
          else:
799
            raise errors.LockError('non-existing lock in set (%s)' % lname)
800

    
801
      # This will hold the locknames we effectively acquired.
802
      acquired = set()
803
      # Now acquire_list contains a sorted list of resources and locks we want.
804
      # In order to get them we loop on this (private) list and acquire() them.
805
      # We gave no real guarantee they will still exist till this is done but
806
      # .acquire() itself is safe and will alert us if the lock gets deleted.
807
      for (lname, lock) in acquire_list:
808
        try:
809
          lock.acquire(shared=shared) # raises LockError if the lock is deleted
810
          # now the lock cannot be deleted, we have it!
811
          self._add_owned(name=lname)
812
          acquired.add(lname)
813
        except (errors.LockError):
814
          if self.__lock._is_owned():
815
            # We are acquiring all the set, it doesn't matter if this
816
            # particular element is not there anymore.
817
            continue
818
          else:
819
            name_fail = lname
820
            for lname in self._list_owned():
821
              self.__lockdict[lname].release()
822
              self._del_owned(name=lname)
823
            raise errors.LockError('non-existing lock in set (%s)' % name_fail)
824
        except:
825
          # We shouldn't have problems adding the lock to the owners list, but
826
          # if we did we'll try to release this lock and re-raise exception.
827
          # Of course something is going to be really wrong, after this.
828
          if lock._is_owned():
829
            lock.release()
830
          raise
831

    
832
    except:
833
      # If something went wrong and we had the set-lock let's release it...
834
      if self.__lock._is_owned():
835
        self.__lock.release()
836
      raise
837

    
838
    return acquired
839

    
840
  def release(self, names=None):
841
    """Release a set of resource locks, at the same level.
842

843
    You must have acquired the locks, either in shared or in exclusive mode,
844
    before releasing them.
845

846
    @param names: the names of the locks which shall be released
847
        (defaults to all the locks acquired at that level).
848

849
    """
850
    assert self._is_owned(), "release() on lock set while not owner"
851

    
852
    # Support passing in a single resource to release rather than many
853
    if isinstance(names, basestring):
854
      names = [names]
855

    
856
    if names is None:
857
      names = self._list_owned()
858
    else:
859
      names = set(names)
860
      assert self._list_owned().issuperset(names), (
861
               "release() on unheld resources %s" %
862
               names.difference(self._list_owned()))
863

    
864
    # First of all let's release the "all elements" lock, if set.
865
    # After this 'add' can work again
866
    if self.__lock._is_owned():
867
      self.__lock.release()
868
      self._del_owned()
869

    
870
    for lockname in names:
871
      # If we are sure the lock doesn't leave __lockdict without being
872
      # exclusively held we can do this...
873
      self.__lockdict[lockname].release()
874
      self._del_owned(name=lockname)
875

    
876
  def add(self, names, acquired=0, shared=0):
877
    """Add a new set of elements to the set
878

879
    @param names: names of the new elements to add
880
    @param acquired: pre-acquire the new resource?
881
    @param shared: is the pre-acquisition shared?
882

883
    """
884
    # Check we don't already own locks at this level
885
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
886
      "Cannot add locks if the set is only partially owned, or shared"
887

    
888
    # Support passing in a single resource to add rather than many
889
    if isinstance(names, basestring):
890
      names = [names]
891

    
892
    # If we don't already own the set-level lock acquired in an exclusive way
893
    # we'll get it and note we need to release it later.
894
    release_lock = False
895
    if not self.__lock._is_owned():
896
      release_lock = True
897
      self.__lock.acquire()
898

    
899
    try:
900
      invalid_names = set(self.__names()).intersection(names)
901
      if invalid_names:
902
        # This must be an explicit raise, not an assert, because assert is
903
        # turned off when using optimization, and this can happen because of
904
        # concurrency even if the user doesn't want it.
905
        raise errors.LockError("duplicate add() (%s)" % invalid_names)
906

    
907
      for lockname in names:
908
        lock = SharedLock()
909

    
910
        if acquired:
911
          lock.acquire(shared=shared)
912
          # now the lock cannot be deleted, we have it!
913
          try:
914
            self._add_owned(name=lockname)
915
          except:
916
            # We shouldn't have problems adding the lock to the owners list,
917
            # but if we did we'll try to release this lock and re-raise
918
            # exception.  Of course something is going to be really wrong,
919
            # after this.  On the other hand the lock hasn't been added to the
920
            # __lockdict yet so no other threads should be pending on it. This
921
            # release is just a safety measure.
922
            lock.release()
923
            raise
924

    
925
        self.__lockdict[lockname] = lock
926

    
927
    finally:
928
      # Only release __lock if we were not holding it previously.
929
      if release_lock:
930
        self.__lock.release()
931

    
932
    return True
933

    
934
  def remove(self, names):
935
    """Remove elements from the lock set.
936

937
    You can either not hold anything in the lockset or already hold a superset
938
    of the elements you want to delete, exclusively.
939

940
    @param names: names of the resource to remove.
941

942
    @return:: a list of locks which we removed; the list is always
943
        equal to the names list if we were holding all the locks
944
        exclusively
945

946
    """
947
    # Support passing in a single resource to remove rather than many
948
    if isinstance(names, basestring):
949
      names = [names]
950

    
951
    # If we own any subset of this lock it must be a superset of what we want
952
    # to delete. The ownership must also be exclusive, but that will be checked
953
    # by the lock itself.
954
    assert not self._is_owned() or self._list_owned().issuperset(names), (
955
      "remove() on acquired lockset while not owning all elements")
956

    
957
    removed = []
958

    
959
    for lname in names:
960
      # Calling delete() acquires the lock exclusively if we don't already own
961
      # it, and causes all pending and subsequent lock acquires to fail. It's
962
      # fine to call it out of order because delete() also implies release(),
963
      # and the assertion above guarantees that if we either already hold
964
      # everything we want to delete, or we hold none.
965
      try:
966
        self.__lockdict[lname].delete()
967
        removed.append(lname)
968
      except (KeyError, errors.LockError):
969
        # This cannot happen if we were already holding it, verify:
970
        assert not self._is_owned(), "remove failed while holding lockset"
971
      else:
972
        # If no LockError was raised we are the ones who deleted the lock.
973
        # This means we can safely remove it from lockdict, as any further or
974
        # pending delete() or acquire() will fail (and nobody can have the lock
975
        # since before our call to delete()).
976
        #
977
        # This is done in an else clause because if the exception was thrown
978
        # it's the job of the one who actually deleted it.
979
        del self.__lockdict[lname]
980
        # And let's remove it from our private list if we owned it.
981
        if self._is_owned():
982
          self._del_owned(name=lname)
983

    
984
    return removed
985

    
986

    
987
# Locking levels, must be acquired in increasing order.
988
# Current rules are:
989
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
990
#   acquired before performing any operation, either in shared or in exclusive
991
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
992
#   avoided.
993
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
994
#   If you need more than one node, or more than one instance, acquire them at
995
#   the same time.
996
LEVEL_CLUSTER = 0
997
LEVEL_INSTANCE = 1
998
LEVEL_NODE = 2
999

    
1000
LEVELS = [LEVEL_CLUSTER,
1001
          LEVEL_INSTANCE,
1002
          LEVEL_NODE]
1003

    
1004
# Lock levels which are modifiable
1005
LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1006

    
1007
LEVEL_NAMES = {
1008
  LEVEL_CLUSTER: "cluster",
1009
  LEVEL_INSTANCE: "instance",
1010
  LEVEL_NODE: "node",
1011
  }
1012

    
1013
# Constant for the big ganeti lock
1014
BGL = 'BGL'
1015

    
1016

    
1017
class GanetiLockManager:
1018
  """The Ganeti Locking Library
1019

1020
  The purpose of this small library is to manage locking for ganeti clusters
1021
  in a central place, while at the same time doing dynamic checks against
1022
  possible deadlocks. It will also make it easier to transition to a different
1023
  lock type should we migrate away from python threads.
1024

1025
  """
1026
  _instance = None
1027

    
1028
  def __init__(self, nodes=None, instances=None):
1029
    """Constructs a new GanetiLockManager object.
1030

1031
    There should be only a GanetiLockManager object at any time, so this
1032
    function raises an error if this is not the case.
1033

1034
    @param nodes: list of node names
1035
    @param instances: list of instance names
1036

1037
    """
1038
    assert self.__class__._instance is None, \
1039
           "double GanetiLockManager instance"
1040

    
1041
    self.__class__._instance = self
1042

    
1043
    # The keyring contains all the locks, at their level and in the correct
1044
    # locking order.
1045
    self.__keyring = {
1046
      LEVEL_CLUSTER: LockSet([BGL]),
1047
      LEVEL_NODE: LockSet(nodes),
1048
      LEVEL_INSTANCE: LockSet(instances),
1049
    }
1050

    
1051
  def _names(self, level):
1052
    """List the lock names at the given level.
1053

1054
    This can be used for debugging/testing purposes.
1055

1056
    @param level: the level whose list of locks to get
1057

1058
    """
1059
    assert level in LEVELS, "Invalid locking level %s" % level
1060
    return self.__keyring[level]._names()
1061

    
1062
  def _is_owned(self, level):
1063
    """Check whether we are owning locks at the given level
1064

1065
    """
1066
    return self.__keyring[level]._is_owned()
1067

    
1068
  is_owned = _is_owned
1069

    
1070
  def _list_owned(self, level):
1071
    """Get the set of owned locks at the given level
1072

1073
    """
1074
    return self.__keyring[level]._list_owned()
1075

    
1076
  def _upper_owned(self, level):
1077
    """Check that we don't own any lock at a level greater than the given one.
1078

1079
    """
1080
    # This way of checking only works if LEVELS[i] = i, which we check for in
1081
    # the test cases.
1082
    return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1083

    
1084
  def _BGL_owned(self):
1085
    """Check if the current thread owns the BGL.
1086

1087
    Both an exclusive or a shared acquisition work.
1088

1089
    """
1090
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1091

    
1092
  def _contains_BGL(self, level, names):
1093
    """Check if the level contains the BGL.
1094

1095
    Check if acting on the given level and set of names will change
1096
    the status of the Big Ganeti Lock.
1097

1098
    """
1099
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1100

    
1101
  def acquire(self, level, names, timeout=None, shared=0):
1102
    """Acquire a set of resource locks, at the same level.
1103

1104
    @param level: the level at which the locks shall be acquired;
1105
        it must be a member of LEVELS.
1106
    @param names: the names of the locks which shall be acquired
1107
        (special lock names, or instance/node names)
1108
    @param shared: whether to acquire in shared mode; by default
1109
        an exclusive lock will be acquired
1110
    @type timeout: float
1111
    @param timeout: Maximum time to acquire all locks
1112

1113
    """
1114
    assert level in LEVELS, "Invalid locking level %s" % level
1115

    
1116
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1117
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1118
    # so even if we've migrated we need to at least share the BGL to be
1119
    # compatible with them. Of course if we own the BGL exclusively there's no
1120
    # point in acquiring any other lock, unless perhaps we are half way through
1121
    # the migration of the current opcode.
1122
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1123
            "You must own the Big Ganeti Lock before acquiring any other")
1124

    
1125
    # Check we don't own locks at the same or upper levels.
1126
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1127
           " while owning some at a greater one")
1128

    
1129
    # Acquire the locks in the set.
1130
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1131

    
1132
  def release(self, level, names=None):
1133
    """Release a set of resource locks, at the same level.
1134

1135
    You must have acquired the locks, either in shared or in exclusive
1136
    mode, before releasing them.
1137

1138
    @param level: the level at which the locks shall be released;
1139
        it must be a member of LEVELS
1140
    @param names: the names of the locks which shall be released
1141
        (defaults to all the locks acquired at that level)
1142

1143
    """
1144
    assert level in LEVELS, "Invalid locking level %s" % level
1145
    assert (not self._contains_BGL(level, names) or
1146
            not self._upper_owned(LEVEL_CLUSTER)), (
1147
            "Cannot release the Big Ganeti Lock while holding something"
1148
            " at upper levels")
1149

    
1150
    # Release will complain if we don't own the locks already
1151
    return self.__keyring[level].release(names)
1152

    
1153
  def add(self, level, names, acquired=0, shared=0):
1154
    """Add locks at the specified level.
1155

1156
    @param level: the level at which the locks shall be added;
1157
        it must be a member of LEVELS_MOD.
1158
    @param names: names of the locks to acquire
1159
    @param acquired: whether to acquire the newly added locks
1160
    @param shared: whether the acquisition will be shared
1161

1162
    """
1163
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1164
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1165
           " operations")
1166
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1167
           " while owning some at a greater one")
1168
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1169

    
1170
  def remove(self, level, names):
1171
    """Remove locks from the specified level.
1172

1173
    You must either already own the locks you are trying to remove
1174
    exclusively or not own any lock at an upper level.
1175

1176
    @param level: the level at which the locks shall be removed;
1177
        it must be a member of LEVELS_MOD
1178
    @param names: the names of the locks which shall be removed
1179
        (special lock names, or instance/node names)
1180

1181
    """
1182
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1183
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1184
           " operations")
1185
    # Check we either own the level or don't own anything from here
1186
    # up. LockSet.remove() will check the case in which we don't own
1187
    # all the needed resources, or we have a shared ownership.
1188
    assert self._is_owned(level) or not self._upper_owned(level), (
1189
           "Cannot remove locks at a level while not owning it or"
1190
           " owning some at a greater one")
1191
    return self.__keyring[level].remove(names)