install.rst: note about the default parameters
[ganeti-local] / lib / locking.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 # Disable "Invalid name ..." message
22 # pylint: disable-msg=C0103
23
24 """Module implementing the Ganeti locking code."""
25
26 import os
27 import select
28 import threading
29 import time
30 import errno
31
32 from ganeti import errors
33 from ganeti import utils
34
35
36 def ssynchronized(lock, shared=0):
37   """Shared Synchronization decorator.
38
39   Calls the function holding the given lock, either in exclusive or shared
40   mode. It requires the passed lock to be a SharedLock (or support its
41   semantics).
42
43   """
44   def wrap(fn):
45     def sync_function(*args, **kwargs):
46       lock.acquire(shared=shared)
47       try:
48         return fn(*args, **kwargs)
49       finally:
50         lock.release()
51     return sync_function
52   return wrap
53
54
55 class RunningTimeout(object):
56   """Class to calculate remaining timeout when doing several operations.
57
58   """
59   __slots__ = [
60     "_allow_negative",
61     "_start_time",
62     "_time_fn",
63     "_timeout",
64     ]
65
66   def __init__(self, timeout, allow_negative, _time_fn=time.time):
67     """Initializes this class.
68
69     @type timeout: float
70     @param timeout: Timeout duration
71     @type allow_negative: bool
72     @param allow_negative: Whether to return values below zero
73     @param _time_fn: Time function for unittests
74
75     """
76     object.__init__(self)
77
78     if timeout is not None and timeout < 0.0:
79       raise ValueError("Timeout must not be negative")
80
81     self._timeout = timeout
82     self._allow_negative = allow_negative
83     self._time_fn = _time_fn
84
85     self._start_time = None
86
87   def Remaining(self):
88     """Returns the remaining timeout.
89
90     """
91     if self._timeout is None:
92       return None
93
94     # Get start time on first calculation
95     if self._start_time is None:
96       self._start_time = self._time_fn()
97
98     # Calculate remaining time
99     remaining_timeout = self._start_time + self._timeout - self._time_fn()
100
101     if not self._allow_negative:
102       # Ensure timeout is always >= 0
103       return max(0.0, remaining_timeout)
104
105     return remaining_timeout
106
107
108 class _SingleNotifyPipeConditionWaiter(object):
109   """Helper class for SingleNotifyPipeCondition
110
111   """
112   __slots__ = [
113     "_fd",
114     "_poller",
115     ]
116
117   def __init__(self, poller, fd):
118     """Constructor for _SingleNotifyPipeConditionWaiter
119
120     @type poller: select.poll
121     @param poller: Poller object
122     @type fd: int
123     @param fd: File descriptor to wait for
124
125     """
126     object.__init__(self)
127     self._poller = poller
128     self._fd = fd
129
130   def __call__(self, timeout):
131     """Wait for something to happen on the pipe.
132
133     @type timeout: float or None
134     @param timeout: Timeout for waiting (can be None)
135
136     """
137     running_timeout = RunningTimeout(timeout, True)
138
139     while True:
140       remaining_time = running_timeout.Remaining()
141
142       if remaining_time is not None and remaining_time < 0.0:
143         break
144
145       try:
146         result = self._poller.poll(remaining_time)
147       except EnvironmentError, err:
148         if err.errno != errno.EINTR:
149           raise
150         result = None
151
152       # Check whether we were notified
153       if result and result[0][0] == self._fd:
154         break
155
156
157 class _BaseCondition(object):
158   """Base class containing common code for conditions.
159
160   Some of this code is taken from python's threading module.
161
162   """
163   __slots__ = [
164     "_lock",
165     "acquire",
166     "release",
167     ]
168
169   def __init__(self, lock):
170     """Constructor for _BaseCondition.
171
172     @type lock: threading.Lock
173     @param lock: condition base lock
174
175     """
176     object.__init__(self)
177
178     # Recursive locks are not supported
179     assert not hasattr(lock, "_acquire_restore")
180     assert not hasattr(lock, "_release_save")
181
182     self._lock = lock
183
184     # Export the lock's acquire() and release() methods
185     self.acquire = lock.acquire
186     self.release = lock.release
187
188   def _is_owned(self):
189     """Check whether lock is owned by current thread.
190
191     """
192     if self._lock.acquire(0):
193       self._lock.release()
194       return False
195
196     return True
197
198   def _check_owned(self):
199     """Raise an exception if the current thread doesn't own the lock.
200
201     """
202     if not self._is_owned():
203       raise RuntimeError("cannot work with un-aquired lock")
204
205
206 class SingleNotifyPipeCondition(_BaseCondition):
207   """Condition which can only be notified once.
208
209   This condition class uses pipes and poll, internally, to be able to wait for
210   notification with a timeout, without resorting to polling. It is almost
211   compatible with Python's threading.Condition, with the following differences:
212     - notifyAll can only be called once, and no wait can happen after that
213     - notify is not supported, only notifyAll
214
215   """
216
217   __slots__ = _BaseCondition.__slots__ + [
218     "_poller",
219     "_read_fd",
220     "_write_fd",
221     "_nwaiters",
222     "_notified",
223     ]
224
225   _waiter_class = _SingleNotifyPipeConditionWaiter
226
227   def __init__(self, lock):
228     """Constructor for SingleNotifyPipeCondition
229
230     """
231     _BaseCondition.__init__(self, lock)
232     self._nwaiters = 0
233     self._notified = False
234     self._read_fd = None
235     self._write_fd = None
236     self._poller = None
237
238   def _check_unnotified(self):
239     """Throws an exception if already notified.
240
241     """
242     if self._notified:
243       raise RuntimeError("cannot use already notified condition")
244
245   def _Cleanup(self):
246     """Cleanup open file descriptors, if any.
247
248     """
249     if self._read_fd is not None:
250       os.close(self._read_fd)
251       self._read_fd = None
252
253     if self._write_fd is not None:
254       os.close(self._write_fd)
255       self._write_fd = None
256     self._poller = None
257
258   def wait(self, timeout=None):
259     """Wait for a notification.
260
261     @type timeout: float or None
262     @param timeout: Waiting timeout (can be None)
263
264     """
265     self._check_owned()
266     self._check_unnotified()
267
268     self._nwaiters += 1
269     try:
270       if self._poller is None:
271         (self._read_fd, self._write_fd) = os.pipe()
272         self._poller = select.poll()
273         self._poller.register(self._read_fd, select.POLLHUP)
274
275       wait_fn = self._waiter_class(self._poller, self._read_fd)
276       self.release()
277       try:
278         # Wait for notification
279         wait_fn(timeout)
280       finally:
281         # Re-acquire lock
282         self.acquire()
283     finally:
284       self._nwaiters -= 1
285       if self._nwaiters == 0:
286         self._Cleanup()
287
288   def notifyAll(self):
289     """Close the writing side of the pipe to notify all waiters.
290
291     """
292     self._check_owned()
293     self._check_unnotified()
294     self._notified = True
295     if self._write_fd is not None:
296       os.close(self._write_fd)
297       self._write_fd = None
298
299
300 class PipeCondition(_BaseCondition):
301   """Group-only non-polling condition with counters.
302
303   This condition class uses pipes and poll, internally, to be able to wait for
304   notification with a timeout, without resorting to polling. It is almost
305   compatible with Python's threading.Condition, but only supports notifyAll and
306   non-recursive locks. As an additional features it's able to report whether
307   there are any waiting threads.
308
309   """
310   __slots__ = _BaseCondition.__slots__ + [
311     "_nwaiters",
312     "_single_condition",
313     ]
314
315   _single_condition_class = SingleNotifyPipeCondition
316
317   def __init__(self, lock):
318     """Initializes this class.
319
320     """
321     _BaseCondition.__init__(self, lock)
322     self._nwaiters = 0
323     self._single_condition = self._single_condition_class(self._lock)
324
325   def wait(self, timeout=None):
326     """Wait for a notification.
327
328     @type timeout: float or None
329     @param timeout: Waiting timeout (can be None)
330
331     """
332     self._check_owned()
333
334     # Keep local reference to the pipe. It could be replaced by another thread
335     # notifying while we're waiting.
336     my_condition = self._single_condition
337
338     assert self._nwaiters >= 0
339     self._nwaiters += 1
340     try:
341       my_condition.wait(timeout)
342     finally:
343       assert self._nwaiters > 0
344       self._nwaiters -= 1
345
346   def notifyAll(self):
347     """Notify all currently waiting threads.
348
349     """
350     self._check_owned()
351     self._single_condition.notifyAll()
352     self._single_condition = self._single_condition_class(self._lock)
353
354   def has_waiting(self):
355     """Returns whether there are active waiters.
356
357     """
358     self._check_owned()
359
360     return bool(self._nwaiters)
361
362
363 class _CountingCondition(object):
364   """Wrapper for Python's built-in threading.Condition class.
365
366   This wrapper keeps a count of active waiters. We can't access the internal
367   "__waiters" attribute of threading.Condition because it's not thread-safe.
368
369   """
370   __slots__ = [
371     "_cond",
372     "_nwaiters",
373     ]
374
375   def __init__(self, lock):
376     """Initializes this class.
377
378     """
379     object.__init__(self)
380     self._cond = threading.Condition(lock=lock)
381     self._nwaiters = 0
382
383   def notifyAll(self):
384     """Notifies the condition.
385
386     """
387     return self._cond.notifyAll()
388
389   def wait(self, timeout=None):
390     """Waits for the condition to be notified.
391
392     @type timeout: float or None
393     @param timeout: Waiting timeout (can be None)
394
395     """
396     assert self._nwaiters >= 0
397
398     self._nwaiters += 1
399     try:
400       return self._cond.wait(timeout=timeout)
401     finally:
402       self._nwaiters -= 1
403
404   def has_waiting(self):
405     """Returns whether there are active waiters.
406
407     """
408     return bool(self._nwaiters)
409
410
411 class SharedLock(object):
412   """Implements a shared lock.
413
414   Multiple threads can acquire the lock in a shared way, calling
415   acquire_shared().  In order to acquire the lock in an exclusive way threads
416   can call acquire_exclusive().
417
418   The lock prevents starvation but does not guarantee that threads will acquire
419   the shared lock in the order they queued for it, just that they will
420   eventually do so.
421
422   """
423   __slots__ = [
424     "__active_shr_c",
425     "__inactive_shr_c",
426     "__deleted",
427     "__exc",
428     "__lock",
429     "__pending",
430     "__shr",
431     ]
432
433   __condition_class = PipeCondition
434
435   def __init__(self):
436     """Construct a new SharedLock.
437
438     """
439     object.__init__(self)
440
441     # Internal lock
442     self.__lock = threading.Lock()
443
444     # Queue containing waiting acquires
445     self.__pending = []
446
447     # Active and inactive conditions for shared locks
448     self.__active_shr_c = self.__condition_class(self.__lock)
449     self.__inactive_shr_c = self.__condition_class(self.__lock)
450
451     # Current lock holders
452     self.__shr = set()
453     self.__exc = None
454
455     # is this lock in the deleted state?
456     self.__deleted = False
457
458   def __check_deleted(self):
459     """Raises an exception if the lock has been deleted.
460
461     """
462     if self.__deleted:
463       raise errors.LockError("Deleted lock")
464
465   def __is_sharer(self):
466     """Is the current thread sharing the lock at this time?
467
468     """
469     return threading.currentThread() in self.__shr
470
471   def __is_exclusive(self):
472     """Is the current thread holding the lock exclusively at this time?
473
474     """
475     return threading.currentThread() == self.__exc
476
477   def __is_owned(self, shared=-1):
478     """Is the current thread somehow owning the lock at this time?
479
480     This is a private version of the function, which presumes you're holding
481     the internal lock.
482
483     """
484     if shared < 0:
485       return self.__is_sharer() or self.__is_exclusive()
486     elif shared:
487       return self.__is_sharer()
488     else:
489       return self.__is_exclusive()
490
491   def _is_owned(self, shared=-1):
492     """Is the current thread somehow owning the lock at this time?
493
494     @param shared:
495         - < 0: check for any type of ownership (default)
496         - 0: check for exclusive ownership
497         - > 0: check for shared ownership
498
499     """
500     self.__lock.acquire()
501     try:
502       return self.__is_owned(shared=shared)
503     finally:
504       self.__lock.release()
505
506   def _count_pending(self):
507     """Returns the number of pending acquires.
508
509     @rtype: int
510
511     """
512     self.__lock.acquire()
513     try:
514       return len(self.__pending)
515     finally:
516       self.__lock.release()
517
518   def __do_acquire(self, shared):
519     """Actually acquire the lock.
520
521     """
522     if shared:
523       self.__shr.add(threading.currentThread())
524     else:
525       self.__exc = threading.currentThread()
526
527   def __can_acquire(self, shared):
528     """Determine whether lock can be acquired.
529
530     """
531     if shared:
532       return self.__exc is None
533     else:
534       return len(self.__shr) == 0 and self.__exc is None
535
536   def __is_on_top(self, cond):
537     """Checks whether the passed condition is on top of the queue.
538
539     The caller must make sure the queue isn't empty.
540
541     """
542     return self.__pending[0] == cond
543
544   def __acquire_unlocked(self, shared, timeout):
545     """Acquire a shared lock.
546
547     @param shared: whether to acquire in shared mode; by default an
548         exclusive lock will be acquired
549     @param timeout: maximum waiting time before giving up
550
551     """
552     self.__check_deleted()
553
554     # We cannot acquire the lock if we already have it
555     assert not self.__is_owned(), "double acquire() on a non-recursive lock"
556
557     # Check whether someone else holds the lock or there are pending acquires.
558     if not self.__pending and self.__can_acquire(shared):
559       # Apparently not, can acquire lock directly.
560       self.__do_acquire(shared)
561       return True
562
563     if shared:
564       wait_condition = self.__active_shr_c
565
566       # Check if we're not yet in the queue
567       if wait_condition not in self.__pending:
568         self.__pending.append(wait_condition)
569     else:
570       wait_condition = self.__condition_class(self.__lock)
571       # Always add to queue
572       self.__pending.append(wait_condition)
573
574     try:
575       # Wait until we become the topmost acquire in the queue or the timeout
576       # expires.
577       while not (self.__is_on_top(wait_condition) and
578                  self.__can_acquire(shared)):
579         # Wait for notification
580         wait_condition.wait(timeout)
581         self.__check_deleted()
582
583         # A lot of code assumes blocking acquires always succeed. Loop
584         # internally for that case.
585         if timeout is not None:
586           break
587
588       if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
589         self.__do_acquire(shared)
590         return True
591     finally:
592       # Remove condition from queue if there are no more waiters
593       if not wait_condition.has_waiting() and not self.__deleted:
594         self.__pending.remove(wait_condition)
595
596     return False
597
598   def acquire(self, shared=0, timeout=None, test_notify=None):
599     """Acquire a shared lock.
600
601     @type shared: int
602     @param shared: whether to acquire in shared mode; by default an
603         exclusive lock will be acquired
604     @type timeout: float
605     @param timeout: maximum waiting time before giving up
606     @type test_notify: callable or None
607     @param test_notify: Special callback function for unittesting
608
609     """
610     self.__lock.acquire()
611     try:
612       # We already got the lock, notify now
613       if __debug__ and callable(test_notify):
614         test_notify()
615
616       return self.__acquire_unlocked(shared, timeout)
617     finally:
618       self.__lock.release()
619
620   def release(self):
621     """Release a Shared Lock.
622
623     You must have acquired the lock, either in shared or in exclusive mode,
624     before calling this function.
625
626     """
627     self.__lock.acquire()
628     try:
629       assert self.__is_exclusive() or self.__is_sharer(), \
630         "Cannot release non-owned lock"
631
632       # Autodetect release type
633       if self.__is_exclusive():
634         self.__exc = None
635       else:
636         self.__shr.remove(threading.currentThread())
637
638       # Notify topmost condition in queue
639       if self.__pending:
640         first_condition = self.__pending[0]
641         first_condition.notifyAll()
642
643         if first_condition == self.__active_shr_c:
644           self.__active_shr_c = self.__inactive_shr_c
645           self.__inactive_shr_c = first_condition
646
647     finally:
648       self.__lock.release()
649
650   def delete(self, timeout=None):
651     """Delete a Shared Lock.
652
653     This operation will declare the lock for removal. First the lock will be
654     acquired in exclusive mode if you don't already own it, then the lock
655     will be put in a state where any future and pending acquire() fail.
656
657     @type timeout: float
658     @param timeout: maximum waiting time before giving up
659
660     """
661     self.__lock.acquire()
662     try:
663       assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
664
665       self.__check_deleted()
666
667       # The caller is allowed to hold the lock exclusively already.
668       acquired = self.__is_exclusive()
669
670       if not acquired:
671         acquired = self.__acquire_unlocked(0, timeout)
672
673         assert self.__is_exclusive() and not self.__is_sharer(), \
674           "Lock wasn't acquired in exclusive mode"
675
676       if acquired:
677         self.__deleted = True
678         self.__exc = None
679
680         # Notify all acquires. They'll throw an error.
681         while self.__pending:
682           self.__pending.pop().notifyAll()
683
684       return acquired
685     finally:
686       self.__lock.release()
687
688
689 # Whenever we want to acquire a full LockSet we pass None as the value
690 # to acquire.  Hide this behind this nicely named constant.
691 ALL_SET = None
692
693
694 class _AcquireTimeout(Exception):
695   """Internal exception to abort an acquire on a timeout.
696
697   """
698
699
700 class LockSet:
701   """Implements a set of locks.
702
703   This abstraction implements a set of shared locks for the same resource type,
704   distinguished by name. The user can lock a subset of the resources and the
705   LockSet will take care of acquiring the locks always in the same order, thus
706   preventing deadlock.
707
708   All the locks needed in the same set must be acquired together, though.
709
710   """
711   def __init__(self, members=None):
712     """Constructs a new LockSet.
713
714     @param members: initial members of the set
715
716     """
717     # Used internally to guarantee coherency.
718     self.__lock = SharedLock()
719
720     # The lockdict indexes the relationship name -> lock
721     # The order-of-locking is implied by the alphabetical order of names
722     self.__lockdict = {}
723
724     if members is not None:
725       for name in members:
726         self.__lockdict[name] = SharedLock()
727
728     # The owner dict contains the set of locks each thread owns. For
729     # performance each thread can access its own key without a global lock on
730     # this structure. It is paramount though that *no* other type of access is
731     # done to this structure (eg. no looping over its keys). *_owner helper
732     # function are defined to guarantee access is correct, but in general never
733     # do anything different than __owners[threading.currentThread()], or there
734     # will be trouble.
735     self.__owners = {}
736
737   def _is_owned(self):
738     """Is the current thread a current level owner?"""
739     return threading.currentThread() in self.__owners
740
741   def _add_owned(self, name=None):
742     """Note the current thread owns the given lock"""
743     if name is None:
744       if not self._is_owned():
745         self.__owners[threading.currentThread()] = set()
746     else:
747       if self._is_owned():
748         self.__owners[threading.currentThread()].add(name)
749       else:
750         self.__owners[threading.currentThread()] = set([name])
751
752   def _del_owned(self, name=None):
753     """Note the current thread owns the given lock"""
754
755     assert not (name is None and self.__lock._is_owned()), \
756            "Cannot hold internal lock when deleting owner status"
757
758     if name is not None:
759       self.__owners[threading.currentThread()].remove(name)
760
761     # Only remove the key if we don't hold the set-lock as well
762     if (not self.__lock._is_owned() and
763         not self.__owners[threading.currentThread()]):
764       del self.__owners[threading.currentThread()]
765
766   def _list_owned(self):
767     """Get the set of resource names owned by the current thread"""
768     if self._is_owned():
769       return self.__owners[threading.currentThread()].copy()
770     else:
771       return set()
772
773   def _release_and_delete_owned(self):
774     """Release and delete all resources owned by the current thread"""
775     for lname in self._list_owned():
776       self.__lockdict[lname].release()
777       self._del_owned(name=lname)
778
779   def __names(self):
780     """Return the current set of names.
781
782     Only call this function while holding __lock and don't iterate on the
783     result after releasing the lock.
784
785     """
786     return self.__lockdict.keys()
787
788   def _names(self):
789     """Return a copy of the current set of elements.
790
791     Used only for debugging purposes.
792
793     """
794     # If we don't already own the set-level lock acquired
795     # we'll get it and note we need to release it later.
796     release_lock = False
797     if not self.__lock._is_owned():
798       release_lock = True
799       self.__lock.acquire(shared=1)
800     try:
801       result = self.__names()
802     finally:
803       if release_lock:
804         self.__lock.release()
805     return set(result)
806
807   def acquire(self, names, timeout=None, shared=0, test_notify=None):
808     """Acquire a set of resource locks.
809
810     @param names: the names of the locks which shall be acquired
811         (special lock names, or instance/node names)
812     @param shared: whether to acquire in shared mode; by default an
813         exclusive lock will be acquired
814     @type timeout: float or None
815     @param timeout: Maximum time to acquire all locks
816     @type test_notify: callable or None
817     @param test_notify: Special callback function for unittesting
818
819     @return: Set of all locks successfully acquired or None in case of timeout
820
821     @raise errors.LockError: when any lock we try to acquire has
822         been deleted before we succeed. In this case none of the
823         locks requested will be acquired.
824
825     """
826     assert timeout is None or timeout >= 0.0
827
828     # Check we don't already own locks at this level
829     assert not self._is_owned(), "Cannot acquire locks in the same set twice"
830
831     # We need to keep track of how long we spent waiting for a lock. The
832     # timeout passed to this function is over all lock acquires.
833     running_timeout = RunningTimeout(timeout, False)
834
835     try:
836       if names is not None:
837         # Support passing in a single resource to acquire rather than many
838         if isinstance(names, basestring):
839           names = [names]
840         else:
841           names = sorted(names)
842
843         return self.__acquire_inner(names, False, shared,
844                                     running_timeout.Remaining, test_notify)
845
846       else:
847         # If no names are given acquire the whole set by not letting new names
848         # being added before we release, and getting the current list of names.
849         # Some of them may then be deleted later, but we'll cope with this.
850         #
851         # We'd like to acquire this lock in a shared way, as it's nice if
852         # everybody else can use the instances at the same time. If are
853         # acquiring them exclusively though they won't be able to do this
854         # anyway, though, so we'll get the list lock exclusively as well in
855         # order to be able to do add() on the set while owning it.
856         if not self.__lock.acquire(shared=shared,
857                                    timeout=running_timeout.Remaining()):
858           raise _AcquireTimeout()
859         try:
860           # note we own the set-lock
861           self._add_owned()
862
863           return self.__acquire_inner(self.__names(), True, shared,
864                                       running_timeout.Remaining, test_notify)
865         except:
866           # We shouldn't have problems adding the lock to the owners list, but
867           # if we did we'll try to release this lock and re-raise exception.
868           # Of course something is going to be really wrong, after this.
869           self.__lock.release()
870           self._del_owned()
871           raise
872
873     except _AcquireTimeout:
874       return None
875
876   def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
877     """Inner logic for acquiring a number of locks.
878
879     @param names: Names of the locks to be acquired
880     @param want_all: Whether all locks in the set should be acquired
881     @param shared: Whether to acquire in shared mode
882     @param timeout_fn: Function returning remaining timeout
883     @param test_notify: Special callback function for unittesting
884
885     """
886     acquire_list = []
887
888     # First we look the locks up on __lockdict. We have no way of being sure
889     # they will still be there after, but this makes it a lot faster should
890     # just one of them be the already wrong
891     for lname in utils.UniqueSequence(names):
892       try:
893         lock = self.__lockdict[lname] # raises KeyError if lock is not there
894         acquire_list.append((lname, lock))
895       except KeyError:
896         if want_all:
897           # We are acquiring all the set, it doesn't matter if this particular
898           # element is not there anymore.
899           continue
900
901         raise errors.LockError("Non-existing lock in set (%s)" % lname)
902
903     # This will hold the locknames we effectively acquired.
904     acquired = set()
905
906     try:
907       # Now acquire_list contains a sorted list of resources and locks we
908       # want.  In order to get them we loop on this (private) list and
909       # acquire() them.  We gave no real guarantee they will still exist till
910       # this is done but .acquire() itself is safe and will alert us if the
911       # lock gets deleted.
912       for (lname, lock) in acquire_list:
913         if __debug__ and callable(test_notify):
914           test_notify_fn = lambda: test_notify(lname)
915         else:
916           test_notify_fn = None
917
918         timeout = timeout_fn()
919
920         try:
921           # raises LockError if the lock was deleted
922           acq_success = lock.acquire(shared=shared, timeout=timeout,
923                                      test_notify=test_notify_fn)
924         except errors.LockError:
925           if want_all:
926             # We are acquiring all the set, it doesn't matter if this
927             # particular element is not there anymore.
928             continue
929
930           raise errors.LockError("Non-existing lock in set (%s)" % lname)
931
932         if not acq_success:
933           # Couldn't get lock or timeout occurred
934           if timeout is None:
935             # This shouldn't happen as SharedLock.acquire(timeout=None) is
936             # blocking.
937             raise errors.LockError("Failed to get lock %s" % lname)
938
939           raise _AcquireTimeout()
940
941         try:
942           # now the lock cannot be deleted, we have it!
943           self._add_owned(name=lname)
944           acquired.add(lname)
945
946         except:
947           # We shouldn't have problems adding the lock to the owners list, but
948           # if we did we'll try to release this lock and re-raise exception.
949           # Of course something is going to be really wrong after this.
950           if lock._is_owned():
951             lock.release()
952           raise
953
954     except:
955       # Release all owned locks
956       self._release_and_delete_owned()
957       raise
958
959     return acquired
960
961   def release(self, names=None):
962     """Release a set of resource locks, at the same level.
963
964     You must have acquired the locks, either in shared or in exclusive mode,
965     before releasing them.
966
967     @param names: the names of the locks which shall be released
968         (defaults to all the locks acquired at that level).
969
970     """
971     assert self._is_owned(), "release() on lock set while not owner"
972
973     # Support passing in a single resource to release rather than many
974     if isinstance(names, basestring):
975       names = [names]
976
977     if names is None:
978       names = self._list_owned()
979     else:
980       names = set(names)
981       assert self._list_owned().issuperset(names), (
982                "release() on unheld resources %s" %
983                names.difference(self._list_owned()))
984
985     # First of all let's release the "all elements" lock, if set.
986     # After this 'add' can work again
987     if self.__lock._is_owned():
988       self.__lock.release()
989       self._del_owned()
990
991     for lockname in names:
992       # If we are sure the lock doesn't leave __lockdict without being
993       # exclusively held we can do this...
994       self.__lockdict[lockname].release()
995       self._del_owned(name=lockname)
996
997   def add(self, names, acquired=0, shared=0):
998     """Add a new set of elements to the set
999
1000     @param names: names of the new elements to add
1001     @param acquired: pre-acquire the new resource?
1002     @param shared: is the pre-acquisition shared?
1003
1004     """
1005     # Check we don't already own locks at this level
1006     assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1007       "Cannot add locks if the set is only partially owned, or shared"
1008
1009     # Support passing in a single resource to add rather than many
1010     if isinstance(names, basestring):
1011       names = [names]
1012
1013     # If we don't already own the set-level lock acquired in an exclusive way
1014     # we'll get it and note we need to release it later.
1015     release_lock = False
1016     if not self.__lock._is_owned():
1017       release_lock = True
1018       self.__lock.acquire()
1019
1020     try:
1021       invalid_names = set(self.__names()).intersection(names)
1022       if invalid_names:
1023         # This must be an explicit raise, not an assert, because assert is
1024         # turned off when using optimization, and this can happen because of
1025         # concurrency even if the user doesn't want it.
1026         raise errors.LockError("duplicate add() (%s)" % invalid_names)
1027
1028       for lockname in names:
1029         lock = SharedLock()
1030
1031         if acquired:
1032           lock.acquire(shared=shared)
1033           # now the lock cannot be deleted, we have it!
1034           try:
1035             self._add_owned(name=lockname)
1036           except:
1037             # We shouldn't have problems adding the lock to the owners list,
1038             # but if we did we'll try to release this lock and re-raise
1039             # exception.  Of course something is going to be really wrong,
1040             # after this.  On the other hand the lock hasn't been added to the
1041             # __lockdict yet so no other threads should be pending on it. This
1042             # release is just a safety measure.
1043             lock.release()
1044             raise
1045
1046         self.__lockdict[lockname] = lock
1047
1048     finally:
1049       # Only release __lock if we were not holding it previously.
1050       if release_lock:
1051         self.__lock.release()
1052
1053     return True
1054
1055   def remove(self, names):
1056     """Remove elements from the lock set.
1057
1058     You can either not hold anything in the lockset or already hold a superset
1059     of the elements you want to delete, exclusively.
1060
1061     @param names: names of the resource to remove.
1062
1063     @return: a list of locks which we removed; the list is always
1064         equal to the names list if we were holding all the locks
1065         exclusively
1066
1067     """
1068     # Support passing in a single resource to remove rather than many
1069     if isinstance(names, basestring):
1070       names = [names]
1071
1072     # If we own any subset of this lock it must be a superset of what we want
1073     # to delete. The ownership must also be exclusive, but that will be checked
1074     # by the lock itself.
1075     assert not self._is_owned() or self._list_owned().issuperset(names), (
1076       "remove() on acquired lockset while not owning all elements")
1077
1078     removed = []
1079
1080     for lname in names:
1081       # Calling delete() acquires the lock exclusively if we don't already own
1082       # it, and causes all pending and subsequent lock acquires to fail. It's
1083       # fine to call it out of order because delete() also implies release(),
1084       # and the assertion above guarantees that if we either already hold
1085       # everything we want to delete, or we hold none.
1086       try:
1087         self.__lockdict[lname].delete()
1088         removed.append(lname)
1089       except (KeyError, errors.LockError):
1090         # This cannot happen if we were already holding it, verify:
1091         assert not self._is_owned(), "remove failed while holding lockset"
1092       else:
1093         # If no LockError was raised we are the ones who deleted the lock.
1094         # This means we can safely remove it from lockdict, as any further or
1095         # pending delete() or acquire() will fail (and nobody can have the lock
1096         # since before our call to delete()).
1097         #
1098         # This is done in an else clause because if the exception was thrown
1099         # it's the job of the one who actually deleted it.
1100         del self.__lockdict[lname]
1101         # And let's remove it from our private list if we owned it.
1102         if self._is_owned():
1103           self._del_owned(name=lname)
1104
1105     return removed
1106
1107
1108 # Locking levels, must be acquired in increasing order.
1109 # Current rules are:
1110 #   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1111 #   acquired before performing any operation, either in shared or in exclusive
1112 #   mode. acquiring the BGL in exclusive mode is discouraged and should be
1113 #   avoided.
1114 #   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1115 #   If you need more than one node, or more than one instance, acquire them at
1116 #   the same time.
1117 LEVEL_CLUSTER = 0
1118 LEVEL_INSTANCE = 1
1119 LEVEL_NODE = 2
1120
1121 LEVELS = [LEVEL_CLUSTER,
1122           LEVEL_INSTANCE,
1123           LEVEL_NODE]
1124
1125 # Lock levels which are modifiable
1126 LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1127
1128 LEVEL_NAMES = {
1129   LEVEL_CLUSTER: "cluster",
1130   LEVEL_INSTANCE: "instance",
1131   LEVEL_NODE: "node",
1132   }
1133
1134 # Constant for the big ganeti lock
1135 BGL = 'BGL'
1136
1137
1138 class GanetiLockManager:
1139   """The Ganeti Locking Library
1140
1141   The purpose of this small library is to manage locking for ganeti clusters
1142   in a central place, while at the same time doing dynamic checks against
1143   possible deadlocks. It will also make it easier to transition to a different
1144   lock type should we migrate away from python threads.
1145
1146   """
1147   _instance = None
1148
1149   def __init__(self, nodes=None, instances=None):
1150     """Constructs a new GanetiLockManager object.
1151
1152     There should be only a GanetiLockManager object at any time, so this
1153     function raises an error if this is not the case.
1154
1155     @param nodes: list of node names
1156     @param instances: list of instance names
1157
1158     """
1159     assert self.__class__._instance is None, \
1160            "double GanetiLockManager instance"
1161
1162     self.__class__._instance = self
1163
1164     # The keyring contains all the locks, at their level and in the correct
1165     # locking order.
1166     self.__keyring = {
1167       LEVEL_CLUSTER: LockSet([BGL]),
1168       LEVEL_NODE: LockSet(nodes),
1169       LEVEL_INSTANCE: LockSet(instances),
1170     }
1171
1172   def _names(self, level):
1173     """List the lock names at the given level.
1174
1175     This can be used for debugging/testing purposes.
1176
1177     @param level: the level whose list of locks to get
1178
1179     """
1180     assert level in LEVELS, "Invalid locking level %s" % level
1181     return self.__keyring[level]._names()
1182
1183   def _is_owned(self, level):
1184     """Check whether we are owning locks at the given level
1185
1186     """
1187     return self.__keyring[level]._is_owned()
1188
1189   is_owned = _is_owned
1190
1191   def _list_owned(self, level):
1192     """Get the set of owned locks at the given level
1193
1194     """
1195     return self.__keyring[level]._list_owned()
1196
1197   def _upper_owned(self, level):
1198     """Check that we don't own any lock at a level greater than the given one.
1199
1200     """
1201     # This way of checking only works if LEVELS[i] = i, which we check for in
1202     # the test cases.
1203     return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1204
1205   def _BGL_owned(self):
1206     """Check if the current thread owns the BGL.
1207
1208     Both an exclusive or a shared acquisition work.
1209
1210     """
1211     return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1212
1213   def _contains_BGL(self, level, names):
1214     """Check if the level contains the BGL.
1215
1216     Check if acting on the given level and set of names will change
1217     the status of the Big Ganeti Lock.
1218
1219     """
1220     return level == LEVEL_CLUSTER and (names is None or BGL in names)
1221
1222   def acquire(self, level, names, timeout=None, shared=0):
1223     """Acquire a set of resource locks, at the same level.
1224
1225     @param level: the level at which the locks shall be acquired;
1226         it must be a member of LEVELS.
1227     @param names: the names of the locks which shall be acquired
1228         (special lock names, or instance/node names)
1229     @param shared: whether to acquire in shared mode; by default
1230         an exclusive lock will be acquired
1231     @type timeout: float
1232     @param timeout: Maximum time to acquire all locks
1233
1234     """
1235     assert level in LEVELS, "Invalid locking level %s" % level
1236
1237     # Check that we are either acquiring the Big Ganeti Lock or we already own
1238     # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1239     # so even if we've migrated we need to at least share the BGL to be
1240     # compatible with them. Of course if we own the BGL exclusively there's no
1241     # point in acquiring any other lock, unless perhaps we are half way through
1242     # the migration of the current opcode.
1243     assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1244             "You must own the Big Ganeti Lock before acquiring any other")
1245
1246     # Check we don't own locks at the same or upper levels.
1247     assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1248            " while owning some at a greater one")
1249
1250     # Acquire the locks in the set.
1251     return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1252
1253   def release(self, level, names=None):
1254     """Release a set of resource locks, at the same level.
1255
1256     You must have acquired the locks, either in shared or in exclusive
1257     mode, before releasing them.
1258
1259     @param level: the level at which the locks shall be released;
1260         it must be a member of LEVELS
1261     @param names: the names of the locks which shall be released
1262         (defaults to all the locks acquired at that level)
1263
1264     """
1265     assert level in LEVELS, "Invalid locking level %s" % level
1266     assert (not self._contains_BGL(level, names) or
1267             not self._upper_owned(LEVEL_CLUSTER)), (
1268             "Cannot release the Big Ganeti Lock while holding something"
1269             " at upper levels (%r)" %
1270             (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1271                               for i in self.__keyring.keys()]), ))
1272
1273     # Release will complain if we don't own the locks already
1274     return self.__keyring[level].release(names)
1275
1276   def add(self, level, names, acquired=0, shared=0):
1277     """Add locks at the specified level.
1278
1279     @param level: the level at which the locks shall be added;
1280         it must be a member of LEVELS_MOD.
1281     @param names: names of the locks to acquire
1282     @param acquired: whether to acquire the newly added locks
1283     @param shared: whether the acquisition will be shared
1284
1285     """
1286     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1287     assert self._BGL_owned(), ("You must own the BGL before performing other"
1288            " operations")
1289     assert not self._upper_owned(level), ("Cannot add locks at a level"
1290            " while owning some at a greater one")
1291     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1292
1293   def remove(self, level, names):
1294     """Remove locks from the specified level.
1295
1296     You must either already own the locks you are trying to remove
1297     exclusively or not own any lock at an upper level.
1298
1299     @param level: the level at which the locks shall be removed;
1300         it must be a member of LEVELS_MOD
1301     @param names: the names of the locks which shall be removed
1302         (special lock names, or instance/node names)
1303
1304     """
1305     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1306     assert self._BGL_owned(), ("You must own the BGL before performing other"
1307            " operations")
1308     # Check we either own the level or don't own anything from here
1309     # up. LockSet.remove() will check the case in which we don't own
1310     # all the needed resources, or we have a shared ownership.
1311     assert self._is_owned(level) or not self._upper_owned(level), (
1312            "Cannot remove locks at a level while not owning it or"
1313            " owning some at a greater one")
1314     return self.__keyring[level].remove(names)