locking: Factorize LockSet.acquire
[ganeti-local] / lib / locking.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 # Disable "Invalid name ..." message
22 # pylint: disable-msg=C0103
23
24 """Module implementing the Ganeti locking code."""
25
26 import os
27 import select
28 import threading
29 import time
30 import errno
31
32 from ganeti import errors
33 from ganeti import utils
34
35
36 def ssynchronized(lock, shared=0):
37   """Shared Synchronization decorator.
38
39   Calls the function holding the given lock, either in exclusive or shared
40   mode. It requires the passed lock to be a SharedLock (or support its
41   semantics).
42
43   """
44   def wrap(fn):
45     def sync_function(*args, **kwargs):
46       lock.acquire(shared=shared)
47       try:
48         return fn(*args, **kwargs)
49       finally:
50         lock.release()
51     return sync_function
52   return wrap
53
54
55 class _SingleNotifyPipeConditionWaiter(object):
56   """Helper class for SingleNotifyPipeCondition
57
58   """
59   __slots__ = [
60     "_fd",
61     "_poller",
62     ]
63
64   def __init__(self, poller, fd):
65     """Constructor for _SingleNotifyPipeConditionWaiter
66
67     @type poller: select.poll
68     @param poller: Poller object
69     @type fd: int
70     @param fd: File descriptor to wait for
71
72     """
73     object.__init__(self)
74     self._poller = poller
75     self._fd = fd
76
77   def __call__(self, timeout):
78     """Wait for something to happen on the pipe.
79
80     @type timeout: float or None
81     @param timeout: Timeout for waiting (can be None)
82
83     """
84     start_time = time.time()
85     remaining_time = timeout
86
87     while timeout is None or remaining_time > 0:
88       try:
89         result = self._poller.poll(remaining_time)
90       except EnvironmentError, err:
91         if err.errno != errno.EINTR:
92           raise
93         result = None
94
95       # Check whether we were notified
96       if result and result[0][0] == self._fd:
97         break
98
99       # Re-calculate timeout if necessary
100       if timeout is not None:
101         remaining_time = start_time + timeout - time.time()
102
103
104 class _BaseCondition(object):
105   """Base class containing common code for conditions.
106
107   Some of this code is taken from python's threading module.
108
109   """
110   __slots__ = [
111     "_lock",
112     "acquire",
113     "release",
114     ]
115
116   def __init__(self, lock):
117     """Constructor for _BaseCondition.
118
119     @type lock: threading.Lock
120     @param lock: condition base lock
121
122     """
123     object.__init__(self)
124
125     # Recursive locks are not supported
126     assert not hasattr(lock, "_acquire_restore")
127     assert not hasattr(lock, "_release_save")
128
129     self._lock = lock
130
131     # Export the lock's acquire() and release() methods
132     self.acquire = lock.acquire
133     self.release = lock.release
134
135   def _is_owned(self):
136     """Check whether lock is owned by current thread.
137
138     """
139     if self._lock.acquire(0):
140       self._lock.release()
141       return False
142
143     return True
144
145   def _check_owned(self):
146     """Raise an exception if the current thread doesn't own the lock.
147
148     """
149     if not self._is_owned():
150       raise RuntimeError("cannot work with un-aquired lock")
151
152
153 class SingleNotifyPipeCondition(_BaseCondition):
154   """Condition which can only be notified once.
155
156   This condition class uses pipes and poll, internally, to be able to wait for
157   notification with a timeout, without resorting to polling. It is almost
158   compatible with Python's threading.Condition, with the following differences:
159     - notifyAll can only be called once, and no wait can happen after that
160     - notify is not supported, only notifyAll
161
162   """
163
164   __slots__ = _BaseCondition.__slots__ + [
165     "_poller",
166     "_read_fd",
167     "_write_fd",
168     "_nwaiters",
169     "_notified",
170     ]
171
172   _waiter_class = _SingleNotifyPipeConditionWaiter
173
174   def __init__(self, lock):
175     """Constructor for SingleNotifyPipeCondition
176
177     """
178     _BaseCondition.__init__(self, lock)
179     self._nwaiters = 0
180     self._notified = False
181     self._read_fd = None
182     self._write_fd = None
183     self._poller = None
184
185   def _check_unnotified(self):
186     """Throws an exception if already notified.
187
188     """
189     if self._notified:
190       raise RuntimeError("cannot use already notified condition")
191
192   def _Cleanup(self):
193     """Cleanup open file descriptors, if any.
194
195     """
196     if self._read_fd is not None:
197       os.close(self._read_fd)
198       self._read_fd = None
199
200     if self._write_fd is not None:
201       os.close(self._write_fd)
202       self._write_fd = None
203     self._poller = None
204
205   def wait(self, timeout=None):
206     """Wait for a notification.
207
208     @type timeout: float or None
209     @param timeout: Waiting timeout (can be None)
210
211     """
212     self._check_owned()
213     self._check_unnotified()
214
215     self._nwaiters += 1
216     try:
217       if self._poller is None:
218         (self._read_fd, self._write_fd) = os.pipe()
219         self._poller = select.poll()
220         self._poller.register(self._read_fd, select.POLLHUP)
221
222       wait_fn = self._waiter_class(self._poller, self._read_fd)
223       self.release()
224       try:
225         # Wait for notification
226         wait_fn(timeout)
227       finally:
228         # Re-acquire lock
229         self.acquire()
230     finally:
231       self._nwaiters -= 1
232       if self._nwaiters == 0:
233         self._Cleanup()
234
235   def notifyAll(self):
236     """Close the writing side of the pipe to notify all waiters.
237
238     """
239     self._check_owned()
240     self._check_unnotified()
241     self._notified = True
242     if self._write_fd is not None:
243       os.close(self._write_fd)
244       self._write_fd = None
245
246
247 class PipeCondition(_BaseCondition):
248   """Group-only non-polling condition with counters.
249
250   This condition class uses pipes and poll, internally, to be able to wait for
251   notification with a timeout, without resorting to polling. It is almost
252   compatible with Python's threading.Condition, but only supports notifyAll and
253   non-recursive locks. As an additional features it's able to report whether
254   there are any waiting threads.
255
256   """
257   __slots__ = _BaseCondition.__slots__ + [
258     "_nwaiters",
259     "_single_condition",
260     ]
261
262   _single_condition_class = SingleNotifyPipeCondition
263
264   def __init__(self, lock):
265     """Initializes this class.
266
267     """
268     _BaseCondition.__init__(self, lock)
269     self._nwaiters = 0
270     self._single_condition = self._single_condition_class(self._lock)
271
272   def wait(self, timeout=None):
273     """Wait for a notification.
274
275     @type timeout: float or None
276     @param timeout: Waiting timeout (can be None)
277
278     """
279     self._check_owned()
280
281     # Keep local reference to the pipe. It could be replaced by another thread
282     # notifying while we're waiting.
283     my_condition = self._single_condition
284
285     assert self._nwaiters >= 0
286     self._nwaiters += 1
287     try:
288       my_condition.wait(timeout)
289     finally:
290       assert self._nwaiters > 0
291       self._nwaiters -= 1
292
293   def notifyAll(self):
294     """Notify all currently waiting threads.
295
296     """
297     self._check_owned()
298     self._single_condition.notifyAll()
299     self._single_condition = self._single_condition_class(self._lock)
300
301   def has_waiting(self):
302     """Returns whether there are active waiters.
303
304     """
305     self._check_owned()
306
307     return bool(self._nwaiters)
308
309
310 class _CountingCondition(object):
311   """Wrapper for Python's built-in threading.Condition class.
312
313   This wrapper keeps a count of active waiters. We can't access the internal
314   "__waiters" attribute of threading.Condition because it's not thread-safe.
315
316   """
317   __slots__ = [
318     "_cond",
319     "_nwaiters",
320     ]
321
322   def __init__(self, lock):
323     """Initializes this class.
324
325     """
326     object.__init__(self)
327     self._cond = threading.Condition(lock=lock)
328     self._nwaiters = 0
329
330   def notifyAll(self):
331     """Notifies the condition.
332
333     """
334     return self._cond.notifyAll()
335
336   def wait(self, timeout=None):
337     """Waits for the condition to be notified.
338
339     @type timeout: float or None
340     @param timeout: Waiting timeout (can be None)
341
342     """
343     assert self._nwaiters >= 0
344
345     self._nwaiters += 1
346     try:
347       return self._cond.wait(timeout=timeout)
348     finally:
349       self._nwaiters -= 1
350
351   def has_waiting(self):
352     """Returns whether there are active waiters.
353
354     """
355     return bool(self._nwaiters)
356
357
358 class SharedLock(object):
359   """Implements a shared lock.
360
361   Multiple threads can acquire the lock in a shared way, calling
362   acquire_shared().  In order to acquire the lock in an exclusive way threads
363   can call acquire_exclusive().
364
365   The lock prevents starvation but does not guarantee that threads will acquire
366   the shared lock in the order they queued for it, just that they will
367   eventually do so.
368
369   """
370   __slots__ = [
371     "__active_shr_c",
372     "__inactive_shr_c",
373     "__deleted",
374     "__exc",
375     "__lock",
376     "__pending",
377     "__shr",
378     ]
379
380   __condition_class = PipeCondition
381
382   def __init__(self):
383     """Construct a new SharedLock.
384
385     """
386     object.__init__(self)
387
388     # Internal lock
389     self.__lock = threading.Lock()
390
391     # Queue containing waiting acquires
392     self.__pending = []
393
394     # Active and inactive conditions for shared locks
395     self.__active_shr_c = self.__condition_class(self.__lock)
396     self.__inactive_shr_c = self.__condition_class(self.__lock)
397
398     # Current lock holders
399     self.__shr = set()
400     self.__exc = None
401
402     # is this lock in the deleted state?
403     self.__deleted = False
404
405   def __check_deleted(self):
406     """Raises an exception if the lock has been deleted.
407
408     """
409     if self.__deleted:
410       raise errors.LockError("Deleted lock")
411
412   def __is_sharer(self):
413     """Is the current thread sharing the lock at this time?
414
415     """
416     return threading.currentThread() in self.__shr
417
418   def __is_exclusive(self):
419     """Is the current thread holding the lock exclusively at this time?
420
421     """
422     return threading.currentThread() == self.__exc
423
424   def __is_owned(self, shared=-1):
425     """Is the current thread somehow owning the lock at this time?
426
427     This is a private version of the function, which presumes you're holding
428     the internal lock.
429
430     """
431     if shared < 0:
432       return self.__is_sharer() or self.__is_exclusive()
433     elif shared:
434       return self.__is_sharer()
435     else:
436       return self.__is_exclusive()
437
438   def _is_owned(self, shared=-1):
439     """Is the current thread somehow owning the lock at this time?
440
441     @param shared:
442         - < 0: check for any type of ownership (default)
443         - 0: check for exclusive ownership
444         - > 0: check for shared ownership
445
446     """
447     self.__lock.acquire()
448     try:
449       return self.__is_owned(shared=shared)
450     finally:
451       self.__lock.release()
452
453   def _count_pending(self):
454     """Returns the number of pending acquires.
455
456     @rtype: int
457
458     """
459     self.__lock.acquire()
460     try:
461       return len(self.__pending)
462     finally:
463       self.__lock.release()
464
465   def __do_acquire(self, shared):
466     """Actually acquire the lock.
467
468     """
469     if shared:
470       self.__shr.add(threading.currentThread())
471     else:
472       self.__exc = threading.currentThread()
473
474   def __can_acquire(self, shared):
475     """Determine whether lock can be acquired.
476
477     """
478     if shared:
479       return self.__exc is None
480     else:
481       return len(self.__shr) == 0 and self.__exc is None
482
483   def __is_on_top(self, cond):
484     """Checks whether the passed condition is on top of the queue.
485
486     The caller must make sure the queue isn't empty.
487
488     """
489     return self.__pending[0] == cond
490
491   def __acquire_unlocked(self, shared, timeout):
492     """Acquire a shared lock.
493
494     @param shared: whether to acquire in shared mode; by default an
495         exclusive lock will be acquired
496     @param timeout: maximum waiting time before giving up
497
498     """
499     self.__check_deleted()
500
501     # We cannot acquire the lock if we already have it
502     assert not self.__is_owned(), "double acquire() on a non-recursive lock"
503
504     # Check whether someone else holds the lock or there are pending acquires.
505     if not self.__pending and self.__can_acquire(shared):
506       # Apparently not, can acquire lock directly.
507       self.__do_acquire(shared)
508       return True
509
510     if shared:
511       wait_condition = self.__active_shr_c
512
513       # Check if we're not yet in the queue
514       if wait_condition not in self.__pending:
515         self.__pending.append(wait_condition)
516     else:
517       wait_condition = self.__condition_class(self.__lock)
518       # Always add to queue
519       self.__pending.append(wait_condition)
520
521     try:
522       # Wait until we become the topmost acquire in the queue or the timeout
523       # expires.
524       while not (self.__is_on_top(wait_condition) and
525                  self.__can_acquire(shared)):
526         # Wait for notification
527         wait_condition.wait(timeout)
528         self.__check_deleted()
529
530         # A lot of code assumes blocking acquires always succeed. Loop
531         # internally for that case.
532         if timeout is not None:
533           break
534
535       if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
536         self.__do_acquire(shared)
537         return True
538     finally:
539       # Remove condition from queue if there are no more waiters
540       if not wait_condition.has_waiting() and not self.__deleted:
541         self.__pending.remove(wait_condition)
542
543     return False
544
545   def acquire(self, shared=0, timeout=None, test_notify=None):
546     """Acquire a shared lock.
547
548     @type shared: int
549     @param shared: whether to acquire in shared mode; by default an
550         exclusive lock will be acquired
551     @type timeout: float
552     @param timeout: maximum waiting time before giving up
553     @type test_notify: callable or None
554     @param test_notify: Special callback function for unittesting
555
556     """
557     self.__lock.acquire()
558     try:
559       # We already got the lock, notify now
560       if __debug__ and callable(test_notify):
561         test_notify()
562
563       return self.__acquire_unlocked(shared, timeout)
564     finally:
565       self.__lock.release()
566
567   def release(self):
568     """Release a Shared Lock.
569
570     You must have acquired the lock, either in shared or in exclusive mode,
571     before calling this function.
572
573     """
574     self.__lock.acquire()
575     try:
576       assert self.__is_exclusive() or self.__is_sharer(), \
577         "Cannot release non-owned lock"
578
579       # Autodetect release type
580       if self.__is_exclusive():
581         self.__exc = None
582       else:
583         self.__shr.remove(threading.currentThread())
584
585       # Notify topmost condition in queue
586       if self.__pending:
587         first_condition = self.__pending[0]
588         first_condition.notifyAll()
589
590         if first_condition == self.__active_shr_c:
591           self.__active_shr_c = self.__inactive_shr_c
592           self.__inactive_shr_c = first_condition
593
594     finally:
595       self.__lock.release()
596
597   def delete(self, timeout=None):
598     """Delete a Shared Lock.
599
600     This operation will declare the lock for removal. First the lock will be
601     acquired in exclusive mode if you don't already own it, then the lock
602     will be put in a state where any future and pending acquire() fail.
603
604     @type timeout: float
605     @param timeout: maximum waiting time before giving up
606
607     """
608     self.__lock.acquire()
609     try:
610       assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
611
612       self.__check_deleted()
613
614       # The caller is allowed to hold the lock exclusively already.
615       acquired = self.__is_exclusive()
616
617       if not acquired:
618         acquired = self.__acquire_unlocked(0, timeout)
619
620         assert self.__is_exclusive() and not self.__is_sharer(), \
621           "Lock wasn't acquired in exclusive mode"
622
623       if acquired:
624         self.__deleted = True
625         self.__exc = None
626
627         # Notify all acquires. They'll throw an error.
628         while self.__pending:
629           self.__pending.pop().notifyAll()
630
631       return acquired
632     finally:
633       self.__lock.release()
634
635
636 # Whenever we want to acquire a full LockSet we pass None as the value
637 # to acquire.  Hide this behind this nicely named constant.
638 ALL_SET = None
639
640
641 class _AcquireTimeout(Exception):
642   """Internal exception to abort an acquire on a timeout.
643
644   """
645
646
647 class LockSet:
648   """Implements a set of locks.
649
650   This abstraction implements a set of shared locks for the same resource type,
651   distinguished by name. The user can lock a subset of the resources and the
652   LockSet will take care of acquiring the locks always in the same order, thus
653   preventing deadlock.
654
655   All the locks needed in the same set must be acquired together, though.
656
657   """
658   def __init__(self, members=None):
659     """Constructs a new LockSet.
660
661     @param members: initial members of the set
662
663     """
664     # Used internally to guarantee coherency.
665     self.__lock = SharedLock()
666
667     # The lockdict indexes the relationship name -> lock
668     # The order-of-locking is implied by the alphabetical order of names
669     self.__lockdict = {}
670
671     if members is not None:
672       for name in members:
673         self.__lockdict[name] = SharedLock()
674
675     # The owner dict contains the set of locks each thread owns. For
676     # performance each thread can access its own key without a global lock on
677     # this structure. It is paramount though that *no* other type of access is
678     # done to this structure (eg. no looping over its keys). *_owner helper
679     # function are defined to guarantee access is correct, but in general never
680     # do anything different than __owners[threading.currentThread()], or there
681     # will be trouble.
682     self.__owners = {}
683
684   def _is_owned(self):
685     """Is the current thread a current level owner?"""
686     return threading.currentThread() in self.__owners
687
688   def _add_owned(self, name=None):
689     """Note the current thread owns the given lock"""
690     if name is None:
691       if not self._is_owned():
692         self.__owners[threading.currentThread()] = set()
693     else:
694       if self._is_owned():
695         self.__owners[threading.currentThread()].add(name)
696       else:
697         self.__owners[threading.currentThread()] = set([name])
698
699   def _del_owned(self, name=None):
700     """Note the current thread owns the given lock"""
701
702     if name is not None:
703       self.__owners[threading.currentThread()].remove(name)
704
705     # Only remove the key if we don't hold the set-lock as well
706     if (not self.__lock._is_owned() and
707         not self.__owners[threading.currentThread()]):
708       del self.__owners[threading.currentThread()]
709
710   def _list_owned(self):
711     """Get the set of resource names owned by the current thread"""
712     if self._is_owned():
713       return self.__owners[threading.currentThread()].copy()
714     else:
715       return set()
716
717   def _release_and_delete_owned(self):
718     """Release and delete all resources owned by the current thread"""
719     for lname in self._list_owned():
720       self.__lockdict[lname].release()
721       self._del_owned(name=lname)
722
723   def __names(self):
724     """Return the current set of names.
725
726     Only call this function while holding __lock and don't iterate on the
727     result after releasing the lock.
728
729     """
730     return self.__lockdict.keys()
731
732   def _names(self):
733     """Return a copy of the current set of elements.
734
735     Used only for debugging purposes.
736
737     """
738     # If we don't already own the set-level lock acquired
739     # we'll get it and note we need to release it later.
740     release_lock = False
741     if not self.__lock._is_owned():
742       release_lock = True
743       self.__lock.acquire(shared=1)
744     try:
745       result = self.__names()
746     finally:
747       if release_lock:
748         self.__lock.release()
749     return set(result)
750
751   def acquire(self, names, timeout=None, shared=0, test_notify=None):
752     """Acquire a set of resource locks.
753
754     @param names: the names of the locks which shall be acquired
755         (special lock names, or instance/node names)
756     @param shared: whether to acquire in shared mode; by default an
757         exclusive lock will be acquired
758     @type timeout: float or None
759     @param timeout: Maximum time to acquire all locks
760     @type test_notify: callable or None
761     @param test_notify: Special callback function for unittesting
762
763     @return: Set of all locks successfully acquired or None in case of timeout
764
765     @raise errors.LockError: when any lock we try to acquire has
766         been deleted before we succeed. In this case none of the
767         locks requested will be acquired.
768
769     """
770     assert timeout is None or timeout >= 0.0
771
772     # Check we don't already own locks at this level
773     assert not self._is_owned(), "Cannot acquire locks in the same set twice"
774
775     # We need to keep track of how long we spent waiting for a lock. The
776     # timeout passed to this function is over all lock acquires.
777     remaining_timeout = timeout
778     if timeout is None:
779       start = None
780       calc_remaining_timeout = lambda: None
781     else:
782       start = time.time()
783       calc_remaining_timeout = lambda: (start + timeout) - time.time()
784
785     try:
786       if names is not None:
787         # Support passing in a single resource to acquire rather than many
788         if isinstance(names, basestring):
789           names = [names]
790         else:
791           names = sorted(names)
792
793         return self.__acquire_inner(names, False, shared,
794                                     calc_remaining_timeout, test_notify)
795
796       else:
797         # If no names are given acquire the whole set by not letting new names
798         # being added before we release, and getting the current list of names.
799         # Some of them may then be deleted later, but we'll cope with this.
800         #
801         # We'd like to acquire this lock in a shared way, as it's nice if
802         # everybody else can use the instances at the same time. If are
803         # acquiring them exclusively though they won't be able to do this
804         # anyway, though, so we'll get the list lock exclusively as well in
805         # order to be able to do add() on the set while owning it.
806         if not self.__lock.acquire(shared=shared,
807                                    timeout=calc_remaining_timeout()):
808           raise _AcquireTimeout()
809         try:
810           # note we own the set-lock
811           self._add_owned()
812
813           return self.__acquire_inner(self.__names(), True, shared,
814                                       calc_remaining_timeout, test_notify)
815         except:
816           # We shouldn't have problems adding the lock to the owners list, but
817           # if we did we'll try to release this lock and re-raise exception.
818           # Of course something is going to be really wrong, after this.
819           self.__lock.release()
820           self._del_owned()
821           raise
822
823     except _AcquireTimeout:
824       return None
825
826   def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
827     """
828
829     """
830     acquire_list = []
831
832     # First we look the locks up on __lockdict. We have no way of being sure
833     # they will still be there after, but this makes it a lot faster should
834     # just one of them be the already wrong
835     for lname in utils.UniqueSequence(names):
836       try:
837         lock = self.__lockdict[lname] # raises KeyError if lock is not there
838         acquire_list.append((lname, lock))
839       except KeyError:
840         if want_all:
841           # We are acquiring all the set, it doesn't matter if this particular
842           # element is not there anymore.
843           continue
844
845         raise errors.LockError("Non-existing lock in set (%s)" % lname)
846
847     # This will hold the locknames we effectively acquired.
848     acquired = set()
849
850     try:
851       # Now acquire_list contains a sorted list of resources and locks we
852       # want.  In order to get them we loop on this (private) list and
853       # acquire() them.  We gave no real guarantee they will still exist till
854       # this is done but .acquire() itself is safe and will alert us if the
855       # lock gets deleted.
856       for (lname, lock) in acquire_list:
857         if __debug__ and callable(test_notify):
858           test_notify_fn = lambda: test_notify(lname)
859         else:
860           test_notify_fn = None
861
862         timeout = timeout_fn()
863         if timeout is not None and timeout < 0:
864           raise _AcquireTimeout()
865
866         try:
867           # raises LockError if the lock was deleted
868           acq_success = lock.acquire(shared=shared, timeout=timeout,
869                                      test_notify=test_notify_fn)
870         except errors.LockError:
871           if want_all:
872             # We are acquiring all the set, it doesn't matter if this
873             # particular element is not there anymore.
874             continue
875
876           raise errors.LockError("Non-existing lock in set (%s)" % lname)
877
878         if not acq_success:
879           # Couldn't get lock or timeout occurred
880           if timeout is None:
881             # This shouldn't happen as SharedLock.acquire(timeout=None) is
882             # blocking.
883             raise errors.LockError("Failed to get lock %s" % lname)
884
885           raise _AcquireTimeout()
886
887         try:
888           # now the lock cannot be deleted, we have it!
889           self._add_owned(name=lname)
890           acquired.add(lname)
891
892         except:
893           # We shouldn't have problems adding the lock to the owners list, but
894           # if we did we'll try to release this lock and re-raise exception.
895           # Of course something is going to be really wrong after this.
896           if lock._is_owned():
897             lock.release()
898           raise
899
900     except:
901       # Release all owned locks
902       self._release_and_delete_owned()
903       raise
904
905     return acquired
906
907   def release(self, names=None):
908     """Release a set of resource locks, at the same level.
909
910     You must have acquired the locks, either in shared or in exclusive mode,
911     before releasing them.
912
913     @param names: the names of the locks which shall be released
914         (defaults to all the locks acquired at that level).
915
916     """
917     assert self._is_owned(), "release() on lock set while not owner"
918
919     # Support passing in a single resource to release rather than many
920     if isinstance(names, basestring):
921       names = [names]
922
923     if names is None:
924       names = self._list_owned()
925     else:
926       names = set(names)
927       assert self._list_owned().issuperset(names), (
928                "release() on unheld resources %s" %
929                names.difference(self._list_owned()))
930
931     # First of all let's release the "all elements" lock, if set.
932     # After this 'add' can work again
933     if self.__lock._is_owned():
934       self.__lock.release()
935       self._del_owned()
936
937     for lockname in names:
938       # If we are sure the lock doesn't leave __lockdict without being
939       # exclusively held we can do this...
940       self.__lockdict[lockname].release()
941       self._del_owned(name=lockname)
942
943   def add(self, names, acquired=0, shared=0):
944     """Add a new set of elements to the set
945
946     @param names: names of the new elements to add
947     @param acquired: pre-acquire the new resource?
948     @param shared: is the pre-acquisition shared?
949
950     """
951     # Check we don't already own locks at this level
952     assert not self._is_owned() or self.__lock._is_owned(shared=0), \
953       "Cannot add locks if the set is only partially owned, or shared"
954
955     # Support passing in a single resource to add rather than many
956     if isinstance(names, basestring):
957       names = [names]
958
959     # If we don't already own the set-level lock acquired in an exclusive way
960     # we'll get it and note we need to release it later.
961     release_lock = False
962     if not self.__lock._is_owned():
963       release_lock = True
964       self.__lock.acquire()
965
966     try:
967       invalid_names = set(self.__names()).intersection(names)
968       if invalid_names:
969         # This must be an explicit raise, not an assert, because assert is
970         # turned off when using optimization, and this can happen because of
971         # concurrency even if the user doesn't want it.
972         raise errors.LockError("duplicate add() (%s)" % invalid_names)
973
974       for lockname in names:
975         lock = SharedLock()
976
977         if acquired:
978           lock.acquire(shared=shared)
979           # now the lock cannot be deleted, we have it!
980           try:
981             self._add_owned(name=lockname)
982           except:
983             # We shouldn't have problems adding the lock to the owners list,
984             # but if we did we'll try to release this lock and re-raise
985             # exception.  Of course something is going to be really wrong,
986             # after this.  On the other hand the lock hasn't been added to the
987             # __lockdict yet so no other threads should be pending on it. This
988             # release is just a safety measure.
989             lock.release()
990             raise
991
992         self.__lockdict[lockname] = lock
993
994     finally:
995       # Only release __lock if we were not holding it previously.
996       if release_lock:
997         self.__lock.release()
998
999     return True
1000
1001   def remove(self, names):
1002     """Remove elements from the lock set.
1003
1004     You can either not hold anything in the lockset or already hold a superset
1005     of the elements you want to delete, exclusively.
1006
1007     @param names: names of the resource to remove.
1008
1009     @return: a list of locks which we removed; the list is always
1010         equal to the names list if we were holding all the locks
1011         exclusively
1012
1013     """
1014     # Support passing in a single resource to remove rather than many
1015     if isinstance(names, basestring):
1016       names = [names]
1017
1018     # If we own any subset of this lock it must be a superset of what we want
1019     # to delete. The ownership must also be exclusive, but that will be checked
1020     # by the lock itself.
1021     assert not self._is_owned() or self._list_owned().issuperset(names), (
1022       "remove() on acquired lockset while not owning all elements")
1023
1024     removed = []
1025
1026     for lname in names:
1027       # Calling delete() acquires the lock exclusively if we don't already own
1028       # it, and causes all pending and subsequent lock acquires to fail. It's
1029       # fine to call it out of order because delete() also implies release(),
1030       # and the assertion above guarantees that if we either already hold
1031       # everything we want to delete, or we hold none.
1032       try:
1033         self.__lockdict[lname].delete()
1034         removed.append(lname)
1035       except (KeyError, errors.LockError):
1036         # This cannot happen if we were already holding it, verify:
1037         assert not self._is_owned(), "remove failed while holding lockset"
1038       else:
1039         # If no LockError was raised we are the ones who deleted the lock.
1040         # This means we can safely remove it from lockdict, as any further or
1041         # pending delete() or acquire() will fail (and nobody can have the lock
1042         # since before our call to delete()).
1043         #
1044         # This is done in an else clause because if the exception was thrown
1045         # it's the job of the one who actually deleted it.
1046         del self.__lockdict[lname]
1047         # And let's remove it from our private list if we owned it.
1048         if self._is_owned():
1049           self._del_owned(name=lname)
1050
1051     return removed
1052
1053
1054 # Locking levels, must be acquired in increasing order.
1055 # Current rules are:
1056 #   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1057 #   acquired before performing any operation, either in shared or in exclusive
1058 #   mode. acquiring the BGL in exclusive mode is discouraged and should be
1059 #   avoided.
1060 #   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1061 #   If you need more than one node, or more than one instance, acquire them at
1062 #   the same time.
1063 LEVEL_CLUSTER = 0
1064 LEVEL_INSTANCE = 1
1065 LEVEL_NODE = 2
1066
1067 LEVELS = [LEVEL_CLUSTER,
1068           LEVEL_INSTANCE,
1069           LEVEL_NODE]
1070
1071 # Lock levels which are modifiable
1072 LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1073
1074 LEVEL_NAMES = {
1075   LEVEL_CLUSTER: "cluster",
1076   LEVEL_INSTANCE: "instance",
1077   LEVEL_NODE: "node",
1078   }
1079
1080 # Constant for the big ganeti lock
1081 BGL = 'BGL'
1082
1083
1084 class GanetiLockManager:
1085   """The Ganeti Locking Library
1086
1087   The purpose of this small library is to manage locking for ganeti clusters
1088   in a central place, while at the same time doing dynamic checks against
1089   possible deadlocks. It will also make it easier to transition to a different
1090   lock type should we migrate away from python threads.
1091
1092   """
1093   _instance = None
1094
1095   def __init__(self, nodes=None, instances=None):
1096     """Constructs a new GanetiLockManager object.
1097
1098     There should be only a GanetiLockManager object at any time, so this
1099     function raises an error if this is not the case.
1100
1101     @param nodes: list of node names
1102     @param instances: list of instance names
1103
1104     """
1105     assert self.__class__._instance is None, \
1106            "double GanetiLockManager instance"
1107
1108     self.__class__._instance = self
1109
1110     # The keyring contains all the locks, at their level and in the correct
1111     # locking order.
1112     self.__keyring = {
1113       LEVEL_CLUSTER: LockSet([BGL]),
1114       LEVEL_NODE: LockSet(nodes),
1115       LEVEL_INSTANCE: LockSet(instances),
1116     }
1117
1118   def _names(self, level):
1119     """List the lock names at the given level.
1120
1121     This can be used for debugging/testing purposes.
1122
1123     @param level: the level whose list of locks to get
1124
1125     """
1126     assert level in LEVELS, "Invalid locking level %s" % level
1127     return self.__keyring[level]._names()
1128
1129   def _is_owned(self, level):
1130     """Check whether we are owning locks at the given level
1131
1132     """
1133     return self.__keyring[level]._is_owned()
1134
1135   is_owned = _is_owned
1136
1137   def _list_owned(self, level):
1138     """Get the set of owned locks at the given level
1139
1140     """
1141     return self.__keyring[level]._list_owned()
1142
1143   def _upper_owned(self, level):
1144     """Check that we don't own any lock at a level greater than the given one.
1145
1146     """
1147     # This way of checking only works if LEVELS[i] = i, which we check for in
1148     # the test cases.
1149     return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1150
1151   def _BGL_owned(self):
1152     """Check if the current thread owns the BGL.
1153
1154     Both an exclusive or a shared acquisition work.
1155
1156     """
1157     return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1158
1159   def _contains_BGL(self, level, names):
1160     """Check if the level contains the BGL.
1161
1162     Check if acting on the given level and set of names will change
1163     the status of the Big Ganeti Lock.
1164
1165     """
1166     return level == LEVEL_CLUSTER and (names is None or BGL in names)
1167
1168   def acquire(self, level, names, timeout=None, shared=0):
1169     """Acquire a set of resource locks, at the same level.
1170
1171     @param level: the level at which the locks shall be acquired;
1172         it must be a member of LEVELS.
1173     @param names: the names of the locks which shall be acquired
1174         (special lock names, or instance/node names)
1175     @param shared: whether to acquire in shared mode; by default
1176         an exclusive lock will be acquired
1177     @type timeout: float
1178     @param timeout: Maximum time to acquire all locks
1179
1180     """
1181     assert level in LEVELS, "Invalid locking level %s" % level
1182
1183     # Check that we are either acquiring the Big Ganeti Lock or we already own
1184     # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1185     # so even if we've migrated we need to at least share the BGL to be
1186     # compatible with them. Of course if we own the BGL exclusively there's no
1187     # point in acquiring any other lock, unless perhaps we are half way through
1188     # the migration of the current opcode.
1189     assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1190             "You must own the Big Ganeti Lock before acquiring any other")
1191
1192     # Check we don't own locks at the same or upper levels.
1193     assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1194            " while owning some at a greater one")
1195
1196     # Acquire the locks in the set.
1197     return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1198
1199   def release(self, level, names=None):
1200     """Release a set of resource locks, at the same level.
1201
1202     You must have acquired the locks, either in shared or in exclusive
1203     mode, before releasing them.
1204
1205     @param level: the level at which the locks shall be released;
1206         it must be a member of LEVELS
1207     @param names: the names of the locks which shall be released
1208         (defaults to all the locks acquired at that level)
1209
1210     """
1211     assert level in LEVELS, "Invalid locking level %s" % level
1212     assert (not self._contains_BGL(level, names) or
1213             not self._upper_owned(LEVEL_CLUSTER)), (
1214             "Cannot release the Big Ganeti Lock while holding something"
1215             " at upper levels")
1216
1217     # Release will complain if we don't own the locks already
1218     return self.__keyring[level].release(names)
1219
1220   def add(self, level, names, acquired=0, shared=0):
1221     """Add locks at the specified level.
1222
1223     @param level: the level at which the locks shall be added;
1224         it must be a member of LEVELS_MOD.
1225     @param names: names of the locks to acquire
1226     @param acquired: whether to acquire the newly added locks
1227     @param shared: whether the acquisition will be shared
1228
1229     """
1230     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1231     assert self._BGL_owned(), ("You must own the BGL before performing other"
1232            " operations")
1233     assert not self._upper_owned(level), ("Cannot add locks at a level"
1234            " while owning some at a greater one")
1235     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1236
1237   def remove(self, level, names):
1238     """Remove locks from the specified level.
1239
1240     You must either already own the locks you are trying to remove
1241     exclusively or not own any lock at an upper level.
1242
1243     @param level: the level at which the locks shall be removed;
1244         it must be a member of LEVELS_MOD
1245     @param names: the names of the locks which shall be removed
1246         (special lock names, or instance/node names)
1247
1248     """
1249     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1250     assert self._BGL_owned(), ("You must own the BGL before performing other"
1251            " operations")
1252     # Check we either own the level or don't own anything from here
1253     # up. LockSet.remove() will check the case in which we don't own
1254     # all the needed resources, or we have a shared ownership.
1255     assert self._is_owned(level) or not self._upper_owned(level), (
1256            "Cannot remove locks at a level while not owning it or"
1257            " owning some at a greater one")
1258     return self.__keyring[level].remove(names)