Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ 34cb5617

History | View | Annotate | Download (35.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
import os
24
import select
25
import threading
26
import time
27
import errno
28

    
29
from ganeti import errors
30
from ganeti import utils
31

    
32

    
33
def ssynchronized(lock, shared=0):
34
  """Shared Synchronization decorator.
35

36
  Calls the function holding the given lock, either in exclusive or shared
37
  mode. It requires the passed lock to be a SharedLock (or support its
38
  semantics).
39

40
  """
41
  def wrap(fn):
42
    def sync_function(*args, **kwargs):
43
      lock.acquire(shared=shared)
44
      try:
45
        return fn(*args, **kwargs)
46
      finally:
47
        lock.release()
48
    return sync_function
49
  return wrap
50

    
51

    
52
class _SingleNotifyPipeConditionWaiter(object):
53
  """Helper class for SingleNotifyPipeCondition
54

55
  """
56
  __slots__ = [
57
    "_fd",
58
    "_poller",
59
    ]
60

    
61
  def __init__(self, poller, fd):
62
    """Constructor for _SingleNotifyPipeConditionWaiter
63

64
    @type poller: select.poll
65
    @param poller: Poller object
66
    @type fd: int
67
    @param fd: File descriptor to wait for
68

69
    """
70
    object.__init__(self)
71
    self._poller = poller
72
    self._fd = fd
73

    
74
  def __call__(self, timeout):
75
    """Wait for something to happen on the pipe.
76

77
    @type timeout: float or None
78
    @param timeout: Timeout for waiting (can be None)
79

80
    """
81
    start_time = time.time()
82
    remaining_time = timeout
83

    
84
    while timeout is None or remaining_time > 0:
85
      try:
86
        result = self._poller.poll(remaining_time)
87
      except EnvironmentError, err:
88
        if err.errno != errno.EINTR:
89
          raise
90
        result = None
91

    
92
      # Check whether we were notified
93
      if result and result[0][0] == self._fd:
94
        break
95

    
96
      # Re-calculate timeout if necessary
97
      if timeout is not None:
98
        remaining_time = start_time + timeout - time.time()
99

    
100

    
101
class _BaseCondition(object):
102
  """Base class containing common code for conditions.
103

104
  Some of this code is taken from python's threading module.
105

106
  """
107
  __slots__ = [
108
    "_lock",
109
    "acquire",
110
    "release",
111
    ]
112

    
113
  def __init__(self, lock):
114
    """Constructor for _BaseCondition.
115

116
    @type lock: L{threading.Lock}
117
    @param lock: condition base lock
118

119
    """
120
    object.__init__(self)
121

    
122
    # Recursive locks are not supported
123
    assert not hasattr(lock, "_acquire_restore")
124
    assert not hasattr(lock, "_release_save")
125

    
126
    self._lock = lock
127

    
128
    # Export the lock's acquire() and release() methods
129
    self.acquire = lock.acquire
130
    self.release = lock.release
131

    
132
  def _is_owned(self):
133
    """Check whether lock is owned by current thread.
134

135
    """
136
    if self._lock.acquire(0):
137
      self._lock.release()
138
      return False
139

    
140
    return True
141

    
142
  def _check_owned(self):
143
    """Raise an exception if the current thread doesn't own the lock.
144

145
    """
146
    if not self._is_owned():
147
      raise RuntimeError("cannot work with un-aquired lock")
148

    
149

    
150
class SingleNotifyPipeCondition(_BaseCondition):
151
  """Condition which can only be notified once.
152

153
  This condition class uses pipes and poll, internally, to be able to wait for
154
  notification with a timeout, without resorting to polling. It is almost
155
  compatible with Python's threading.Condition, with the following differences:
156
    - notifyAll can only be called once, and no wait can happen after that
157
    - notify is not supported, only notifyAll
158

159
  """
160

    
161
  __slots__ = _BaseCondition.__slots__ + [
162
    "_poller",
163
    "_read_fd",
164
    "_write_fd",
165
    "_nwaiters",
166
    "_notified",
167
    ]
168

    
169
  _waiter_class = _SingleNotifyPipeConditionWaiter
170

    
171
  def __init__(self, lock):
172
    """Constructor for SingleNotifyPipeCondition
173

174
    """
175
    _BaseCondition.__init__(self, lock)
176
    self._nwaiters = 0
177
    self._notified = False
178
    self._read_fd = None
179
    self._write_fd = None
180
    self._poller = None
181

    
182
  def _check_unnotified(self):
183
    if self._notified:
184
      raise RuntimeError("cannot use already notified condition")
185

    
186
  def _Cleanup(self):
187
    """Cleanup open file descriptors, if any.
188

189
    """
190
    if self._read_fd is not None:
191
      os.close(self._read_fd)
192
      self._read_fd = None
193

    
194
    if self._write_fd is not None:
195
      os.close(self._write_fd)
196
      self._write_fd = None
197
    self._poller = None
198

    
199
  def wait(self, timeout=None):
200
    """Wait for a notification.
201

202
    @type timeout: float or None
203
    @param timeout: Waiting timeout (can be None)
204

205
    """
206
    self._check_owned()
207
    self._check_unnotified()
208

    
209
    self._nwaiters += 1
210
    try:
211
      if self._poller is None:
212
        (self._read_fd, self._write_fd) = os.pipe()
213
        self._poller = select.poll()
214
        self._poller.register(self._read_fd, select.POLLHUP)
215

    
216
      wait_fn = self._waiter_class(self._poller, self._read_fd)
217
      self.release()
218
      try:
219
        # Wait for notification
220
        wait_fn(timeout)
221
      finally:
222
        # Re-acquire lock
223
        self.acquire()
224
    finally:
225
      self._nwaiters -= 1
226
      if self._nwaiters == 0:
227
        self._Cleanup()
228

    
229
  def notifyAll(self):
230
    """Close the writing side of the pipe to notify all waiters.
231

232
    """
233
    self._check_owned()
234
    self._check_unnotified()
235
    self._notified = True
236
    if self._write_fd is not None:
237
      os.close(self._write_fd)
238
      self._write_fd = None
239

    
240

    
241
class PipeCondition(_BaseCondition):
242
  """Group-only non-polling condition with counters.
243

244
  This condition class uses pipes and poll, internally, to be able to wait for
245
  notification with a timeout, without resorting to polling. It is almost
246
  compatible with Python's threading.Condition, but only supports notifyAll and
247
  non-recursive locks. As an additional features it's able to report whether
248
  there are any waiting threads.
249

250
  """
251
  __slots__ = _BaseCondition.__slots__ + [
252
    "_nwaiters",
253
    "_single_condition",
254
    ]
255

    
256
  _single_condition_class = SingleNotifyPipeCondition
257

    
258
  def __init__(self, lock):
259
    """Initializes this class.
260

261
    """
262
    _BaseCondition.__init__(self, lock)
263
    self._nwaiters = 0
264
    self._single_condition = self._single_condition_class(self._lock)
265

    
266
  def wait(self, timeout=None):
267
    """Wait for a notification.
268

269
    @type timeout: float or None
270
    @param timeout: Waiting timeout (can be None)
271

272
    """
273
    self._check_owned()
274

    
275
    # Keep local reference to the pipe. It could be replaced by another thread
276
    # notifying while we're waiting.
277
    my_condition = self._single_condition
278

    
279
    assert self._nwaiters >= 0
280
    self._nwaiters += 1
281
    try:
282
      my_condition.wait(timeout)
283
    finally:
284
      assert self._nwaiters > 0
285
      self._nwaiters -= 1
286

    
287
  def notifyAll(self):
288
    """Notify all currently waiting threads.
289

290
    """
291
    self._check_owned()
292
    self._single_condition.notifyAll()
293
    self._single_condition = self._single_condition_class(self._lock)
294

    
295
  def has_waiting(self):
296
    """Returns whether there are active waiters.
297

298
    """
299
    self._check_owned()
300

    
301
    return bool(self._nwaiters)
302

    
303

    
304
class _CountingCondition(object):
305
  """Wrapper for Python's built-in threading.Condition class.
306

307
  This wrapper keeps a count of active waiters. We can't access the internal
308
  "__waiters" attribute of threading.Condition because it's not thread-safe.
309

310
  """
311
  __slots__ = [
312
    "_cond",
313
    "_nwaiters",
314
    ]
315

    
316
  def __init__(self, lock):
317
    """Initializes this class.
318

319
    """
320
    object.__init__(self)
321
    self._cond = threading.Condition(lock=lock)
322
    self._nwaiters = 0
323

    
324
  def notifyAll(self):
325
    """Notifies the condition.
326

327
    """
328
    return self._cond.notifyAll()
329

    
330
  def wait(self, timeout=None):
331
    """Waits for the condition to be notified.
332

333
    @type timeout: float or None
334
    @param timeout: Waiting timeout (can be None)
335

336
    """
337
    assert self._nwaiters >= 0
338

    
339
    self._nwaiters += 1
340
    try:
341
      return self._cond.wait(timeout=timeout)
342
    finally:
343
      self._nwaiters -= 1
344

    
345
  def has_waiting(self):
346
    """Returns whether there are active waiters.
347

348
    """
349
    return bool(self._nwaiters)
350

    
351

    
352
class SharedLock(object):
353
  """Implements a shared lock.
354

355
  Multiple threads can acquire the lock in a shared way, calling
356
  acquire_shared().  In order to acquire the lock in an exclusive way threads
357
  can call acquire_exclusive().
358

359
  The lock prevents starvation but does not guarantee that threads will acquire
360
  the shared lock in the order they queued for it, just that they will
361
  eventually do so.
362

363
  """
364
  __slots__ = [
365
    "__active_shr_c",
366
    "__inactive_shr_c",
367
    "__deleted",
368
    "__exc",
369
    "__lock",
370
    "__pending",
371
    "__shr",
372
    ]
373

    
374
  __condition_class = PipeCondition
375

    
376
  def __init__(self):
377
    """Construct a new SharedLock.
378

379
    """
380
    object.__init__(self)
381

    
382
    # Internal lock
383
    self.__lock = threading.Lock()
384

    
385
    # Queue containing waiting acquires
386
    self.__pending = []
387

    
388
    # Active and inactive conditions for shared locks
389
    self.__active_shr_c = self.__condition_class(self.__lock)
390
    self.__inactive_shr_c = self.__condition_class(self.__lock)
391

    
392
    # Current lock holders
393
    self.__shr = set()
394
    self.__exc = None
395

    
396
    # is this lock in the deleted state?
397
    self.__deleted = False
398

    
399
  def __check_deleted(self):
400
    """Raises an exception if the lock has been deleted.
401

402
    """
403
    if self.__deleted:
404
      raise errors.LockError("Deleted lock")
405

    
406
  def __is_sharer(self):
407
    """Is the current thread sharing the lock at this time?
408

409
    """
410
    return threading.currentThread() in self.__shr
411

    
412
  def __is_exclusive(self):
413
    """Is the current thread holding the lock exclusively at this time?
414

415
    """
416
    return threading.currentThread() == self.__exc
417

    
418
  def __is_owned(self, shared=-1):
419
    """Is the current thread somehow owning the lock at this time?
420

421
    This is a private version of the function, which presumes you're holding
422
    the internal lock.
423

424
    """
425
    if shared < 0:
426
      return self.__is_sharer() or self.__is_exclusive()
427
    elif shared:
428
      return self.__is_sharer()
429
    else:
430
      return self.__is_exclusive()
431

    
432
  def _is_owned(self, shared=-1):
433
    """Is the current thread somehow owning the lock at this time?
434

435
    @param shared:
436
        - < 0: check for any type of ownership (default)
437
        - 0: check for exclusive ownership
438
        - > 0: check for shared ownership
439

440
    """
441
    self.__lock.acquire()
442
    try:
443
      return self.__is_owned(shared=shared)
444
    finally:
445
      self.__lock.release()
446

    
447
  def _count_pending(self):
448
    """Returns the number of pending acquires.
449

450
    @rtype: int
451

452
    """
453
    self.__lock.acquire()
454
    try:
455
      return len(self.__pending)
456
    finally:
457
      self.__lock.release()
458

    
459
  def __do_acquire(self, shared):
460
    """Actually acquire the lock.
461

462
    """
463
    if shared:
464
      self.__shr.add(threading.currentThread())
465
    else:
466
      self.__exc = threading.currentThread()
467

    
468
  def __can_acquire(self, shared):
469
    """Determine whether lock can be acquired.
470

471
    """
472
    if shared:
473
      return self.__exc is None
474
    else:
475
      return len(self.__shr) == 0 and self.__exc is None
476

    
477
  def __is_on_top(self, cond):
478
    """Checks whether the passed condition is on top of the queue.
479

480
    The caller must make sure the queue isn't empty.
481

482
    """
483
    return self.__pending[0] == cond
484

    
485
  def __acquire_unlocked(self, shared, timeout):
486
    """Acquire a shared lock.
487

488
    @param shared: whether to acquire in shared mode; by default an
489
        exclusive lock will be acquired
490
    @param timeout: maximum waiting time before giving up
491

492
    """
493
    self.__check_deleted()
494

    
495
    # We cannot acquire the lock if we already have it
496
    assert not self.__is_owned(), "double acquire() on a non-recursive lock"
497

    
498
    # Check whether someone else holds the lock or there are pending acquires.
499
    if not self.__pending and self.__can_acquire(shared):
500
      # Apparently not, can acquire lock directly.
501
      self.__do_acquire(shared)
502
      return True
503

    
504
    if shared:
505
      wait_condition = self.__active_shr_c
506

    
507
      # Check if we're not yet in the queue
508
      if wait_condition not in self.__pending:
509
        self.__pending.append(wait_condition)
510
    else:
511
      wait_condition = self.__condition_class(self.__lock)
512
      # Always add to queue
513
      self.__pending.append(wait_condition)
514

    
515
    try:
516
      # Wait until we become the topmost acquire in the queue or the timeout
517
      # expires.
518
      while not (self.__is_on_top(wait_condition) and
519
                 self.__can_acquire(shared)):
520
        # Wait for notification
521
        wait_condition.wait(timeout)
522
        self.__check_deleted()
523

    
524
        # A lot of code assumes blocking acquires always succeed. Loop
525
        # internally for that case.
526
        if timeout is not None:
527
          break
528

    
529
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
530
        self.__do_acquire(shared)
531
        return True
532
    finally:
533
      # Remove condition from queue if there are no more waiters
534
      if not wait_condition.has_waiting() and not self.__deleted:
535
        self.__pending.remove(wait_condition)
536

    
537
    return False
538

    
539
  def acquire(self, shared=0, timeout=None):
540
    """Acquire a shared lock.
541

542
    @type shared: int
543
    @param shared: whether to acquire in shared mode; by default an
544
        exclusive lock will be acquired
545
    @type timeout: float
546
    @param timeout: maximum waiting time before giving up
547

548
    """
549
    self.__lock.acquire()
550
    try:
551
      return self.__acquire_unlocked(shared, timeout)
552
    finally:
553
      self.__lock.release()
554

    
555
  def release(self):
556
    """Release a Shared Lock.
557

558
    You must have acquired the lock, either in shared or in exclusive mode,
559
    before calling this function.
560

561
    """
562
    self.__lock.acquire()
563
    try:
564
      assert self.__is_exclusive() or self.__is_sharer(), \
565
        "Cannot release non-owned lock"
566

    
567
      # Autodetect release type
568
      if self.__is_exclusive():
569
        self.__exc = None
570
      else:
571
        self.__shr.remove(threading.currentThread())
572

    
573
      # Notify topmost condition in queue
574
      if self.__pending:
575
        first_condition = self.__pending[0]
576
        first_condition.notifyAll()
577

    
578
        if first_condition == self.__active_shr_c:
579
          self.__active_shr_c = self.__inactive_shr_c
580
          self.__inactive_shr_c = first_condition
581

    
582
    finally:
583
      self.__lock.release()
584

    
585
  def delete(self, timeout=None):
586
    """Delete a Shared Lock.
587

588
    This operation will declare the lock for removal. First the lock will be
589
    acquired in exclusive mode if you don't already own it, then the lock
590
    will be put in a state where any future and pending acquire() fail.
591

592
    @type timeout: float
593
    @param timeout: maximum waiting time before giving up
594

595
    """
596
    self.__lock.acquire()
597
    try:
598
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
599

    
600
      self.__check_deleted()
601

    
602
      # The caller is allowed to hold the lock exclusively already.
603
      acquired = self.__is_exclusive()
604

    
605
      if not acquired:
606
        acquired = self.__acquire_unlocked(0, timeout)
607

    
608
        assert self.__is_exclusive() and not self.__is_sharer(), \
609
          "Lock wasn't acquired in exclusive mode"
610

    
611
      if acquired:
612
        self.__deleted = True
613
        self.__exc = None
614

    
615
        # Notify all acquires. They'll throw an error.
616
        while self.__pending:
617
          self.__pending.pop().notifyAll()
618

    
619
      return acquired
620
    finally:
621
      self.__lock.release()
622

    
623

    
624
# Whenever we want to acquire a full LockSet we pass None as the value
625
# to acquire.  Hide this behind this nicely named constant.
626
ALL_SET = None
627

    
628

    
629
class LockSet:
630
  """Implements a set of locks.
631

632
  This abstraction implements a set of shared locks for the same resource type,
633
  distinguished by name. The user can lock a subset of the resources and the
634
  LockSet will take care of acquiring the locks always in the same order, thus
635
  preventing deadlock.
636

637
  All the locks needed in the same set must be acquired together, though.
638

639
  """
640
  def __init__(self, members=None):
641
    """Constructs a new LockSet.
642

643
    @param members: initial members of the set
644

645
    """
646
    # Used internally to guarantee coherency.
647
    self.__lock = SharedLock()
648

    
649
    # The lockdict indexes the relationship name -> lock
650
    # The order-of-locking is implied by the alphabetical order of names
651
    self.__lockdict = {}
652

    
653
    if members is not None:
654
      for name in members:
655
        self.__lockdict[name] = SharedLock()
656

    
657
    # The owner dict contains the set of locks each thread owns. For
658
    # performance each thread can access its own key without a global lock on
659
    # this structure. It is paramount though that *no* other type of access is
660
    # done to this structure (eg. no looping over its keys). *_owner helper
661
    # function are defined to guarantee access is correct, but in general never
662
    # do anything different than __owners[threading.currentThread()], or there
663
    # will be trouble.
664
    self.__owners = {}
665

    
666
  def _is_owned(self):
667
    """Is the current thread a current level owner?"""
668
    return threading.currentThread() in self.__owners
669

    
670
  def _add_owned(self, name=None):
671
    """Note the current thread owns the given lock"""
672
    if name is None:
673
      if not self._is_owned():
674
        self.__owners[threading.currentThread()] = set()
675
    else:
676
      if self._is_owned():
677
        self.__owners[threading.currentThread()].add(name)
678
      else:
679
        self.__owners[threading.currentThread()] = set([name])
680

    
681
  def _del_owned(self, name=None):
682
    """Note the current thread owns the given lock"""
683

    
684
    if name is not None:
685
      self.__owners[threading.currentThread()].remove(name)
686

    
687
    # Only remove the key if we don't hold the set-lock as well
688
    if (not self.__lock._is_owned() and
689
        not self.__owners[threading.currentThread()]):
690
      del self.__owners[threading.currentThread()]
691

    
692
  def _list_owned(self):
693
    """Get the set of resource names owned by the current thread"""
694
    if self._is_owned():
695
      return self.__owners[threading.currentThread()].copy()
696
    else:
697
      return set()
698

    
699
  def __names(self):
700
    """Return the current set of names.
701

702
    Only call this function while holding __lock and don't iterate on the
703
    result after releasing the lock.
704

705
    """
706
    return self.__lockdict.keys()
707

    
708
  def _names(self):
709
    """Return a copy of the current set of elements.
710

711
    Used only for debugging purposes.
712

713
    """
714
    # If we don't already own the set-level lock acquired
715
    # we'll get it and note we need to release it later.
716
    release_lock = False
717
    if not self.__lock._is_owned():
718
      release_lock = True
719
      self.__lock.acquire(shared=1)
720
    try:
721
      result = self.__names()
722
    finally:
723
      if release_lock:
724
        self.__lock.release()
725
    return set(result)
726

    
727
  def acquire(self, names, timeout=None, shared=0):
728
    """Acquire a set of resource locks.
729

730
    @param names: the names of the locks which shall be acquired
731
        (special lock names, or instance/node names)
732
    @param shared: whether to acquire in shared mode; by default an
733
        exclusive lock will be acquired
734
    @type timeout: float
735
    @param timeout: Maximum time to acquire all locks
736

737
    @return: True when all the locks are successfully acquired
738

739
    @raise errors.LockError: when any lock we try to acquire has
740
        been deleted before we succeed. In this case none of the
741
        locks requested will be acquired.
742

743
    """
744
    if timeout is not None:
745
      raise NotImplementedError
746

    
747
    # Check we don't already own locks at this level
748
    assert not self._is_owned(), "Cannot acquire locks in the same set twice"
749

    
750
    if names is None:
751
      # If no names are given acquire the whole set by not letting new names
752
      # being added before we release, and getting the current list of names.
753
      # Some of them may then be deleted later, but we'll cope with this.
754
      #
755
      # We'd like to acquire this lock in a shared way, as it's nice if
756
      # everybody else can use the instances at the same time. If are acquiring
757
      # them exclusively though they won't be able to do this anyway, though,
758
      # so we'll get the list lock exclusively as well in order to be able to
759
      # do add() on the set while owning it.
760
      self.__lock.acquire(shared=shared)
761
      try:
762
        # note we own the set-lock
763
        self._add_owned()
764
        names = self.__names()
765
      except:
766
        # We shouldn't have problems adding the lock to the owners list, but
767
        # if we did we'll try to release this lock and re-raise exception.
768
        # Of course something is going to be really wrong, after this.
769
        self.__lock.release()
770
        raise
771

    
772
    try:
773
      # Support passing in a single resource to acquire rather than many
774
      if isinstance(names, basestring):
775
        names = [names]
776
      else:
777
        names = sorted(names)
778

    
779
      acquire_list = []
780
      # First we look the locks up on __lockdict. We have no way of being sure
781
      # they will still be there after, but this makes it a lot faster should
782
      # just one of them be the already wrong
783
      for lname in utils.UniqueSequence(names):
784
        try:
785
          lock = self.__lockdict[lname] # raises KeyError if lock is not there
786
          acquire_list.append((lname, lock))
787
        except (KeyError):
788
          if self.__lock._is_owned():
789
            # We are acquiring all the set, it doesn't matter if this
790
            # particular element is not there anymore.
791
            continue
792
          else:
793
            raise errors.LockError('non-existing lock in set (%s)' % lname)
794

    
795
      # This will hold the locknames we effectively acquired.
796
      acquired = set()
797
      # Now acquire_list contains a sorted list of resources and locks we want.
798
      # In order to get them we loop on this (private) list and acquire() them.
799
      # We gave no real guarantee they will still exist till this is done but
800
      # .acquire() itself is safe and will alert us if the lock gets deleted.
801
      for (lname, lock) in acquire_list:
802
        try:
803
          lock.acquire(shared=shared) # raises LockError if the lock is deleted
804
          # now the lock cannot be deleted, we have it!
805
          self._add_owned(name=lname)
806
          acquired.add(lname)
807
        except (errors.LockError):
808
          if self.__lock._is_owned():
809
            # We are acquiring all the set, it doesn't matter if this
810
            # particular element is not there anymore.
811
            continue
812
          else:
813
            name_fail = lname
814
            for lname in self._list_owned():
815
              self.__lockdict[lname].release()
816
              self._del_owned(name=lname)
817
            raise errors.LockError('non-existing lock in set (%s)' % name_fail)
818
        except:
819
          # We shouldn't have problems adding the lock to the owners list, but
820
          # if we did we'll try to release this lock and re-raise exception.
821
          # Of course something is going to be really wrong, after this.
822
          if lock._is_owned():
823
            lock.release()
824
          raise
825

    
826
    except:
827
      # If something went wrong and we had the set-lock let's release it...
828
      if self.__lock._is_owned():
829
        self.__lock.release()
830
      raise
831

    
832
    return acquired
833

    
834
  def release(self, names=None):
835
    """Release a set of resource locks, at the same level.
836

837
    You must have acquired the locks, either in shared or in exclusive mode,
838
    before releasing them.
839

840
    @param names: the names of the locks which shall be released
841
        (defaults to all the locks acquired at that level).
842

843
    """
844
    assert self._is_owned(), "release() on lock set while not owner"
845

    
846
    # Support passing in a single resource to release rather than many
847
    if isinstance(names, basestring):
848
      names = [names]
849

    
850
    if names is None:
851
      names = self._list_owned()
852
    else:
853
      names = set(names)
854
      assert self._list_owned().issuperset(names), (
855
               "release() on unheld resources %s" %
856
               names.difference(self._list_owned()))
857

    
858
    # First of all let's release the "all elements" lock, if set.
859
    # After this 'add' can work again
860
    if self.__lock._is_owned():
861
      self.__lock.release()
862
      self._del_owned()
863

    
864
    for lockname in names:
865
      # If we are sure the lock doesn't leave __lockdict without being
866
      # exclusively held we can do this...
867
      self.__lockdict[lockname].release()
868
      self._del_owned(name=lockname)
869

    
870
  def add(self, names, acquired=0, shared=0):
871
    """Add a new set of elements to the set
872

873
    @param names: names of the new elements to add
874
    @param acquired: pre-acquire the new resource?
875
    @param shared: is the pre-acquisition shared?
876

877
    """
878
    # Check we don't already own locks at this level
879
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
880
      "Cannot add locks if the set is only partially owned, or shared"
881

    
882
    # Support passing in a single resource to add rather than many
883
    if isinstance(names, basestring):
884
      names = [names]
885

    
886
    # If we don't already own the set-level lock acquired in an exclusive way
887
    # we'll get it and note we need to release it later.
888
    release_lock = False
889
    if not self.__lock._is_owned():
890
      release_lock = True
891
      self.__lock.acquire()
892

    
893
    try:
894
      invalid_names = set(self.__names()).intersection(names)
895
      if invalid_names:
896
        # This must be an explicit raise, not an assert, because assert is
897
        # turned off when using optimization, and this can happen because of
898
        # concurrency even if the user doesn't want it.
899
        raise errors.LockError("duplicate add() (%s)" % invalid_names)
900

    
901
      for lockname in names:
902
        lock = SharedLock()
903

    
904
        if acquired:
905
          lock.acquire(shared=shared)
906
          # now the lock cannot be deleted, we have it!
907
          try:
908
            self._add_owned(name=lockname)
909
          except:
910
            # We shouldn't have problems adding the lock to the owners list,
911
            # but if we did we'll try to release this lock and re-raise
912
            # exception.  Of course something is going to be really wrong,
913
            # after this.  On the other hand the lock hasn't been added to the
914
            # __lockdict yet so no other threads should be pending on it. This
915
            # release is just a safety measure.
916
            lock.release()
917
            raise
918

    
919
        self.__lockdict[lockname] = lock
920

    
921
    finally:
922
      # Only release __lock if we were not holding it previously.
923
      if release_lock:
924
        self.__lock.release()
925

    
926
    return True
927

    
928
  def remove(self, names):
929
    """Remove elements from the lock set.
930

931
    You can either not hold anything in the lockset or already hold a superset
932
    of the elements you want to delete, exclusively.
933

934
    @param names: names of the resource to remove.
935

936
    @return:: a list of locks which we removed; the list is always
937
        equal to the names list if we were holding all the locks
938
        exclusively
939

940
    """
941
    # Support passing in a single resource to remove rather than many
942
    if isinstance(names, basestring):
943
      names = [names]
944

    
945
    # If we own any subset of this lock it must be a superset of what we want
946
    # to delete. The ownership must also be exclusive, but that will be checked
947
    # by the lock itself.
948
    assert not self._is_owned() or self._list_owned().issuperset(names), (
949
      "remove() on acquired lockset while not owning all elements")
950

    
951
    removed = []
952

    
953
    for lname in names:
954
      # Calling delete() acquires the lock exclusively if we don't already own
955
      # it, and causes all pending and subsequent lock acquires to fail. It's
956
      # fine to call it out of order because delete() also implies release(),
957
      # and the assertion above guarantees that if we either already hold
958
      # everything we want to delete, or we hold none.
959
      try:
960
        self.__lockdict[lname].delete()
961
        removed.append(lname)
962
      except (KeyError, errors.LockError):
963
        # This cannot happen if we were already holding it, verify:
964
        assert not self._is_owned(), "remove failed while holding lockset"
965
      else:
966
        # If no LockError was raised we are the ones who deleted the lock.
967
        # This means we can safely remove it from lockdict, as any further or
968
        # pending delete() or acquire() will fail (and nobody can have the lock
969
        # since before our call to delete()).
970
        #
971
        # This is done in an else clause because if the exception was thrown
972
        # it's the job of the one who actually deleted it.
973
        del self.__lockdict[lname]
974
        # And let's remove it from our private list if we owned it.
975
        if self._is_owned():
976
          self._del_owned(name=lname)
977

    
978
    return removed
979

    
980

    
981
# Locking levels, must be acquired in increasing order.
982
# Current rules are:
983
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
984
#   acquired before performing any operation, either in shared or in exclusive
985
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
986
#   avoided.
987
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
988
#   If you need more than one node, or more than one instance, acquire them at
989
#   the same time.
990
LEVEL_CLUSTER = 0
991
LEVEL_INSTANCE = 1
992
LEVEL_NODE = 2
993

    
994
LEVELS = [LEVEL_CLUSTER,
995
          LEVEL_INSTANCE,
996
          LEVEL_NODE]
997

    
998
# Lock levels which are modifiable
999
LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1000

    
1001
LEVEL_NAMES = {
1002
  LEVEL_CLUSTER: "cluster",
1003
  LEVEL_INSTANCE: "instance",
1004
  LEVEL_NODE: "node",
1005
  }
1006

    
1007
# Constant for the big ganeti lock
1008
BGL = 'BGL'
1009

    
1010

    
1011
class GanetiLockManager:
1012
  """The Ganeti Locking Library
1013

1014
  The purpose of this small library is to manage locking for ganeti clusters
1015
  in a central place, while at the same time doing dynamic checks against
1016
  possible deadlocks. It will also make it easier to transition to a different
1017
  lock type should we migrate away from python threads.
1018

1019
  """
1020
  _instance = None
1021

    
1022
  def __init__(self, nodes=None, instances=None):
1023
    """Constructs a new GanetiLockManager object.
1024

1025
    There should be only a GanetiLockManager object at any time, so this
1026
    function raises an error if this is not the case.
1027

1028
    @param nodes: list of node names
1029
    @param instances: list of instance names
1030

1031
    """
1032
    assert self.__class__._instance is None, \
1033
           "double GanetiLockManager instance"
1034

    
1035
    self.__class__._instance = self
1036

    
1037
    # The keyring contains all the locks, at their level and in the correct
1038
    # locking order.
1039
    self.__keyring = {
1040
      LEVEL_CLUSTER: LockSet([BGL]),
1041
      LEVEL_NODE: LockSet(nodes),
1042
      LEVEL_INSTANCE: LockSet(instances),
1043
    }
1044

    
1045
  def _names(self, level):
1046
    """List the lock names at the given level.
1047

1048
    This can be used for debugging/testing purposes.
1049

1050
    @param level: the level whose list of locks to get
1051

1052
    """
1053
    assert level in LEVELS, "Invalid locking level %s" % level
1054
    return self.__keyring[level]._names()
1055

    
1056
  def _is_owned(self, level):
1057
    """Check whether we are owning locks at the given level
1058

1059
    """
1060
    return self.__keyring[level]._is_owned()
1061

    
1062
  is_owned = _is_owned
1063

    
1064
  def _list_owned(self, level):
1065
    """Get the set of owned locks at the given level
1066

1067
    """
1068
    return self.__keyring[level]._list_owned()
1069

    
1070
  def _upper_owned(self, level):
1071
    """Check that we don't own any lock at a level greater than the given one.
1072

1073
    """
1074
    # This way of checking only works if LEVELS[i] = i, which we check for in
1075
    # the test cases.
1076
    return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1077

    
1078
  def _BGL_owned(self):
1079
    """Check if the current thread owns the BGL.
1080

1081
    Both an exclusive or a shared acquisition work.
1082

1083
    """
1084
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1085

    
1086
  def _contains_BGL(self, level, names):
1087
    """Check if the level contains the BGL.
1088

1089
    Check if acting on the given level and set of names will change
1090
    the status of the Big Ganeti Lock.
1091

1092
    """
1093
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1094

    
1095
  def acquire(self, level, names, timeout=None, shared=0):
1096
    """Acquire a set of resource locks, at the same level.
1097

1098
    @param level: the level at which the locks shall be acquired;
1099
        it must be a member of LEVELS.
1100
    @param names: the names of the locks which shall be acquired
1101
        (special lock names, or instance/node names)
1102
    @param shared: whether to acquire in shared mode; by default
1103
        an exclusive lock will be acquired
1104
    @type timeout: float
1105
    @param timeout: Maximum time to acquire all locks
1106

1107
    """
1108
    assert level in LEVELS, "Invalid locking level %s" % level
1109

    
1110
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1111
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1112
    # so even if we've migrated we need to at least share the BGL to be
1113
    # compatible with them. Of course if we own the BGL exclusively there's no
1114
    # point in acquiring any other lock, unless perhaps we are half way through
1115
    # the migration of the current opcode.
1116
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1117
            "You must own the Big Ganeti Lock before acquiring any other")
1118

    
1119
    # Check we don't own locks at the same or upper levels.
1120
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1121
           " while owning some at a greater one")
1122

    
1123
    # Acquire the locks in the set.
1124
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1125

    
1126
  def release(self, level, names=None):
1127
    """Release a set of resource locks, at the same level.
1128

1129
    You must have acquired the locks, either in shared or in exclusive
1130
    mode, before releasing them.
1131

1132
    @param level: the level at which the locks shall be released;
1133
        it must be a member of LEVELS
1134
    @param names: the names of the locks which shall be released
1135
        (defaults to all the locks acquired at that level)
1136

1137
    """
1138
    assert level in LEVELS, "Invalid locking level %s" % level
1139
    assert (not self._contains_BGL(level, names) or
1140
            not self._upper_owned(LEVEL_CLUSTER)), (
1141
            "Cannot release the Big Ganeti Lock while holding something"
1142
            " at upper levels")
1143

    
1144
    # Release will complain if we don't own the locks already
1145
    return self.__keyring[level].release(names)
1146

    
1147
  def add(self, level, names, acquired=0, shared=0):
1148
    """Add locks at the specified level.
1149

1150
    @param level: the level at which the locks shall be added;
1151
        it must be a member of LEVELS_MOD.
1152
    @param names: names of the locks to acquire
1153
    @param acquired: whether to acquire the newly added locks
1154
    @param shared: whether the acquisition will be shared
1155

1156
    """
1157
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1158
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1159
           " operations")
1160
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1161
           " while owning some at a greater one")
1162
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1163

    
1164
  def remove(self, level, names):
1165
    """Remove locks from the specified level.
1166

1167
    You must either already own the locks you are trying to remove
1168
    exclusively or not own any lock at an upper level.
1169

1170
    @param level: the level at which the locks shall be removed;
1171
        it must be a member of LEVELS_MOD
1172
    @param names: the names of the locks which shall be removed
1173
        (special lock names, or instance/node names)
1174

1175
    """
1176
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1177
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1178
           " operations")
1179
    # Check we either own the level or don't own anything from here
1180
    # up. LockSet.remove() will check the case in which we don't own
1181
    # all the needed resources, or we have a shared ownership.
1182
    assert self._is_owned(level) or not self._upper_owned(level), (
1183
           "Cannot remove locks at a level while not owning it or"
1184
           " owning some at a greater one")
1185
    return self.__keyring[level].remove(names)