mcpu: Change lock attempt timeout calculation
[ganeti-local] / lib / locking.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 # Disable "Invalid name ..." message
22 # pylint: disable-msg=C0103
23
24 """Module implementing the Ganeti locking code."""
25
26 import os
27 import select
28 import threading
29 import time
30 import errno
31
32 from ganeti import errors
33 from ganeti import utils
34
35
36 def ssynchronized(lock, shared=0):
37   """Shared Synchronization decorator.
38
39   Calls the function holding the given lock, either in exclusive or shared
40   mode. It requires the passed lock to be a SharedLock (or support its
41   semantics).
42
43   """
44   def wrap(fn):
45     def sync_function(*args, **kwargs):
46       lock.acquire(shared=shared)
47       try:
48         return fn(*args, **kwargs)
49       finally:
50         lock.release()
51     return sync_function
52   return wrap
53
54
55 class _SingleNotifyPipeConditionWaiter(object):
56   """Helper class for SingleNotifyPipeCondition
57
58   """
59   __slots__ = [
60     "_fd",
61     "_poller",
62     ]
63
64   def __init__(self, poller, fd):
65     """Constructor for _SingleNotifyPipeConditionWaiter
66
67     @type poller: select.poll
68     @param poller: Poller object
69     @type fd: int
70     @param fd: File descriptor to wait for
71
72     """
73     object.__init__(self)
74     self._poller = poller
75     self._fd = fd
76
77   def __call__(self, timeout):
78     """Wait for something to happen on the pipe.
79
80     @type timeout: float or None
81     @param timeout: Timeout for waiting (can be None)
82
83     """
84     start_time = time.time()
85     remaining_time = timeout
86
87     while timeout is None or remaining_time > 0:
88       try:
89         result = self._poller.poll(remaining_time)
90       except EnvironmentError, err:
91         if err.errno != errno.EINTR:
92           raise
93         result = None
94
95       # Check whether we were notified
96       if result and result[0][0] == self._fd:
97         break
98
99       # Re-calculate timeout if necessary
100       if timeout is not None:
101         remaining_time = start_time + timeout - time.time()
102
103
104 class _BaseCondition(object):
105   """Base class containing common code for conditions.
106
107   Some of this code is taken from python's threading module.
108
109   """
110   __slots__ = [
111     "_lock",
112     "acquire",
113     "release",
114     ]
115
116   def __init__(self, lock):
117     """Constructor for _BaseCondition.
118
119     @type lock: threading.Lock
120     @param lock: condition base lock
121
122     """
123     object.__init__(self)
124
125     # Recursive locks are not supported
126     assert not hasattr(lock, "_acquire_restore")
127     assert not hasattr(lock, "_release_save")
128
129     self._lock = lock
130
131     # Export the lock's acquire() and release() methods
132     self.acquire = lock.acquire
133     self.release = lock.release
134
135   def _is_owned(self):
136     """Check whether lock is owned by current thread.
137
138     """
139     if self._lock.acquire(0):
140       self._lock.release()
141       return False
142
143     return True
144
145   def _check_owned(self):
146     """Raise an exception if the current thread doesn't own the lock.
147
148     """
149     if not self._is_owned():
150       raise RuntimeError("cannot work with un-aquired lock")
151
152
153 class SingleNotifyPipeCondition(_BaseCondition):
154   """Condition which can only be notified once.
155
156   This condition class uses pipes and poll, internally, to be able to wait for
157   notification with a timeout, without resorting to polling. It is almost
158   compatible with Python's threading.Condition, with the following differences:
159     - notifyAll can only be called once, and no wait can happen after that
160     - notify is not supported, only notifyAll
161
162   """
163
164   __slots__ = _BaseCondition.__slots__ + [
165     "_poller",
166     "_read_fd",
167     "_write_fd",
168     "_nwaiters",
169     "_notified",
170     ]
171
172   _waiter_class = _SingleNotifyPipeConditionWaiter
173
174   def __init__(self, lock):
175     """Constructor for SingleNotifyPipeCondition
176
177     """
178     _BaseCondition.__init__(self, lock)
179     self._nwaiters = 0
180     self._notified = False
181     self._read_fd = None
182     self._write_fd = None
183     self._poller = None
184
185   def _check_unnotified(self):
186     """Throws an exception if already notified.
187
188     """
189     if self._notified:
190       raise RuntimeError("cannot use already notified condition")
191
192   def _Cleanup(self):
193     """Cleanup open file descriptors, if any.
194
195     """
196     if self._read_fd is not None:
197       os.close(self._read_fd)
198       self._read_fd = None
199
200     if self._write_fd is not None:
201       os.close(self._write_fd)
202       self._write_fd = None
203     self._poller = None
204
205   def wait(self, timeout=None):
206     """Wait for a notification.
207
208     @type timeout: float or None
209     @param timeout: Waiting timeout (can be None)
210
211     """
212     self._check_owned()
213     self._check_unnotified()
214
215     self._nwaiters += 1
216     try:
217       if self._poller is None:
218         (self._read_fd, self._write_fd) = os.pipe()
219         self._poller = select.poll()
220         self._poller.register(self._read_fd, select.POLLHUP)
221
222       wait_fn = self._waiter_class(self._poller, self._read_fd)
223       self.release()
224       try:
225         # Wait for notification
226         wait_fn(timeout)
227       finally:
228         # Re-acquire lock
229         self.acquire()
230     finally:
231       self._nwaiters -= 1
232       if self._nwaiters == 0:
233         self._Cleanup()
234
235   def notifyAll(self):
236     """Close the writing side of the pipe to notify all waiters.
237
238     """
239     self._check_owned()
240     self._check_unnotified()
241     self._notified = True
242     if self._write_fd is not None:
243       os.close(self._write_fd)
244       self._write_fd = None
245
246
247 class PipeCondition(_BaseCondition):
248   """Group-only non-polling condition with counters.
249
250   This condition class uses pipes and poll, internally, to be able to wait for
251   notification with a timeout, without resorting to polling. It is almost
252   compatible with Python's threading.Condition, but only supports notifyAll and
253   non-recursive locks. As an additional features it's able to report whether
254   there are any waiting threads.
255
256   """
257   __slots__ = _BaseCondition.__slots__ + [
258     "_nwaiters",
259     "_single_condition",
260     ]
261
262   _single_condition_class = SingleNotifyPipeCondition
263
264   def __init__(self, lock):
265     """Initializes this class.
266
267     """
268     _BaseCondition.__init__(self, lock)
269     self._nwaiters = 0
270     self._single_condition = self._single_condition_class(self._lock)
271
272   def wait(self, timeout=None):
273     """Wait for a notification.
274
275     @type timeout: float or None
276     @param timeout: Waiting timeout (can be None)
277
278     """
279     self._check_owned()
280
281     # Keep local reference to the pipe. It could be replaced by another thread
282     # notifying while we're waiting.
283     my_condition = self._single_condition
284
285     assert self._nwaiters >= 0
286     self._nwaiters += 1
287     try:
288       my_condition.wait(timeout)
289     finally:
290       assert self._nwaiters > 0
291       self._nwaiters -= 1
292
293   def notifyAll(self):
294     """Notify all currently waiting threads.
295
296     """
297     self._check_owned()
298     self._single_condition.notifyAll()
299     self._single_condition = self._single_condition_class(self._lock)
300
301   def has_waiting(self):
302     """Returns whether there are active waiters.
303
304     """
305     self._check_owned()
306
307     return bool(self._nwaiters)
308
309
310 class _CountingCondition(object):
311   """Wrapper for Python's built-in threading.Condition class.
312
313   This wrapper keeps a count of active waiters. We can't access the internal
314   "__waiters" attribute of threading.Condition because it's not thread-safe.
315
316   """
317   __slots__ = [
318     "_cond",
319     "_nwaiters",
320     ]
321
322   def __init__(self, lock):
323     """Initializes this class.
324
325     """
326     object.__init__(self)
327     self._cond = threading.Condition(lock=lock)
328     self._nwaiters = 0
329
330   def notifyAll(self):
331     """Notifies the condition.
332
333     """
334     return self._cond.notifyAll()
335
336   def wait(self, timeout=None):
337     """Waits for the condition to be notified.
338
339     @type timeout: float or None
340     @param timeout: Waiting timeout (can be None)
341
342     """
343     assert self._nwaiters >= 0
344
345     self._nwaiters += 1
346     try:
347       return self._cond.wait(timeout=timeout)
348     finally:
349       self._nwaiters -= 1
350
351   def has_waiting(self):
352     """Returns whether there are active waiters.
353
354     """
355     return bool(self._nwaiters)
356
357
358 class SharedLock(object):
359   """Implements a shared lock.
360
361   Multiple threads can acquire the lock in a shared way, calling
362   acquire_shared().  In order to acquire the lock in an exclusive way threads
363   can call acquire_exclusive().
364
365   The lock prevents starvation but does not guarantee that threads will acquire
366   the shared lock in the order they queued for it, just that they will
367   eventually do so.
368
369   """
370   __slots__ = [
371     "__active_shr_c",
372     "__inactive_shr_c",
373     "__deleted",
374     "__exc",
375     "__lock",
376     "__pending",
377     "__shr",
378     ]
379
380   __condition_class = PipeCondition
381
382   def __init__(self):
383     """Construct a new SharedLock.
384
385     """
386     object.__init__(self)
387
388     # Internal lock
389     self.__lock = threading.Lock()
390
391     # Queue containing waiting acquires
392     self.__pending = []
393
394     # Active and inactive conditions for shared locks
395     self.__active_shr_c = self.__condition_class(self.__lock)
396     self.__inactive_shr_c = self.__condition_class(self.__lock)
397
398     # Current lock holders
399     self.__shr = set()
400     self.__exc = None
401
402     # is this lock in the deleted state?
403     self.__deleted = False
404
405   def __check_deleted(self):
406     """Raises an exception if the lock has been deleted.
407
408     """
409     if self.__deleted:
410       raise errors.LockError("Deleted lock")
411
412   def __is_sharer(self):
413     """Is the current thread sharing the lock at this time?
414
415     """
416     return threading.currentThread() in self.__shr
417
418   def __is_exclusive(self):
419     """Is the current thread holding the lock exclusively at this time?
420
421     """
422     return threading.currentThread() == self.__exc
423
424   def __is_owned(self, shared=-1):
425     """Is the current thread somehow owning the lock at this time?
426
427     This is a private version of the function, which presumes you're holding
428     the internal lock.
429
430     """
431     if shared < 0:
432       return self.__is_sharer() or self.__is_exclusive()
433     elif shared:
434       return self.__is_sharer()
435     else:
436       return self.__is_exclusive()
437
438   def _is_owned(self, shared=-1):
439     """Is the current thread somehow owning the lock at this time?
440
441     @param shared:
442         - < 0: check for any type of ownership (default)
443         - 0: check for exclusive ownership
444         - > 0: check for shared ownership
445
446     """
447     self.__lock.acquire()
448     try:
449       return self.__is_owned(shared=shared)
450     finally:
451       self.__lock.release()
452
453   def _count_pending(self):
454     """Returns the number of pending acquires.
455
456     @rtype: int
457
458     """
459     self.__lock.acquire()
460     try:
461       return len(self.__pending)
462     finally:
463       self.__lock.release()
464
465   def __do_acquire(self, shared):
466     """Actually acquire the lock.
467
468     """
469     if shared:
470       self.__shr.add(threading.currentThread())
471     else:
472       self.__exc = threading.currentThread()
473
474   def __can_acquire(self, shared):
475     """Determine whether lock can be acquired.
476
477     """
478     if shared:
479       return self.__exc is None
480     else:
481       return len(self.__shr) == 0 and self.__exc is None
482
483   def __is_on_top(self, cond):
484     """Checks whether the passed condition is on top of the queue.
485
486     The caller must make sure the queue isn't empty.
487
488     """
489     return self.__pending[0] == cond
490
491   def __acquire_unlocked(self, shared, timeout):
492     """Acquire a shared lock.
493
494     @param shared: whether to acquire in shared mode; by default an
495         exclusive lock will be acquired
496     @param timeout: maximum waiting time before giving up
497
498     """
499     self.__check_deleted()
500
501     # We cannot acquire the lock if we already have it
502     assert not self.__is_owned(), "double acquire() on a non-recursive lock"
503
504     # Check whether someone else holds the lock or there are pending acquires.
505     if not self.__pending and self.__can_acquire(shared):
506       # Apparently not, can acquire lock directly.
507       self.__do_acquire(shared)
508       return True
509
510     if shared:
511       wait_condition = self.__active_shr_c
512
513       # Check if we're not yet in the queue
514       if wait_condition not in self.__pending:
515         self.__pending.append(wait_condition)
516     else:
517       wait_condition = self.__condition_class(self.__lock)
518       # Always add to queue
519       self.__pending.append(wait_condition)
520
521     try:
522       # Wait until we become the topmost acquire in the queue or the timeout
523       # expires.
524       while not (self.__is_on_top(wait_condition) and
525                  self.__can_acquire(shared)):
526         # Wait for notification
527         wait_condition.wait(timeout)
528         self.__check_deleted()
529
530         # A lot of code assumes blocking acquires always succeed. Loop
531         # internally for that case.
532         if timeout is not None:
533           break
534
535       if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
536         self.__do_acquire(shared)
537         return True
538     finally:
539       # Remove condition from queue if there are no more waiters
540       if not wait_condition.has_waiting() and not self.__deleted:
541         self.__pending.remove(wait_condition)
542
543     return False
544
545   def acquire(self, shared=0, timeout=None, test_notify=None):
546     """Acquire a shared lock.
547
548     @type shared: int
549     @param shared: whether to acquire in shared mode; by default an
550         exclusive lock will be acquired
551     @type timeout: float
552     @param timeout: maximum waiting time before giving up
553     @type test_notify: callable or None
554     @param test_notify: Special callback function for unittesting
555
556     """
557     self.__lock.acquire()
558     try:
559       # We already got the lock, notify now
560       if __debug__ and callable(test_notify):
561         test_notify()
562
563       return self.__acquire_unlocked(shared, timeout)
564     finally:
565       self.__lock.release()
566
567   def release(self):
568     """Release a Shared Lock.
569
570     You must have acquired the lock, either in shared or in exclusive mode,
571     before calling this function.
572
573     """
574     self.__lock.acquire()
575     try:
576       assert self.__is_exclusive() or self.__is_sharer(), \
577         "Cannot release non-owned lock"
578
579       # Autodetect release type
580       if self.__is_exclusive():
581         self.__exc = None
582       else:
583         self.__shr.remove(threading.currentThread())
584
585       # Notify topmost condition in queue
586       if self.__pending:
587         first_condition = self.__pending[0]
588         first_condition.notifyAll()
589
590         if first_condition == self.__active_shr_c:
591           self.__active_shr_c = self.__inactive_shr_c
592           self.__inactive_shr_c = first_condition
593
594     finally:
595       self.__lock.release()
596
597   def delete(self, timeout=None):
598     """Delete a Shared Lock.
599
600     This operation will declare the lock for removal. First the lock will be
601     acquired in exclusive mode if you don't already own it, then the lock
602     will be put in a state where any future and pending acquire() fail.
603
604     @type timeout: float
605     @param timeout: maximum waiting time before giving up
606
607     """
608     self.__lock.acquire()
609     try:
610       assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
611
612       self.__check_deleted()
613
614       # The caller is allowed to hold the lock exclusively already.
615       acquired = self.__is_exclusive()
616
617       if not acquired:
618         acquired = self.__acquire_unlocked(0, timeout)
619
620         assert self.__is_exclusive() and not self.__is_sharer(), \
621           "Lock wasn't acquired in exclusive mode"
622
623       if acquired:
624         self.__deleted = True
625         self.__exc = None
626
627         # Notify all acquires. They'll throw an error.
628         while self.__pending:
629           self.__pending.pop().notifyAll()
630
631       return acquired
632     finally:
633       self.__lock.release()
634
635
636 # Whenever we want to acquire a full LockSet we pass None as the value
637 # to acquire.  Hide this behind this nicely named constant.
638 ALL_SET = None
639
640
641 class _AcquireTimeout(Exception):
642   """Internal exception to abort an acquire on a timeout.
643
644   """
645
646
647 class LockSet:
648   """Implements a set of locks.
649
650   This abstraction implements a set of shared locks for the same resource type,
651   distinguished by name. The user can lock a subset of the resources and the
652   LockSet will take care of acquiring the locks always in the same order, thus
653   preventing deadlock.
654
655   All the locks needed in the same set must be acquired together, though.
656
657   """
658   def __init__(self, members=None):
659     """Constructs a new LockSet.
660
661     @param members: initial members of the set
662
663     """
664     # Used internally to guarantee coherency.
665     self.__lock = SharedLock()
666
667     # The lockdict indexes the relationship name -> lock
668     # The order-of-locking is implied by the alphabetical order of names
669     self.__lockdict = {}
670
671     if members is not None:
672       for name in members:
673         self.__lockdict[name] = SharedLock()
674
675     # The owner dict contains the set of locks each thread owns. For
676     # performance each thread can access its own key without a global lock on
677     # this structure. It is paramount though that *no* other type of access is
678     # done to this structure (eg. no looping over its keys). *_owner helper
679     # function are defined to guarantee access is correct, but in general never
680     # do anything different than __owners[threading.currentThread()], or there
681     # will be trouble.
682     self.__owners = {}
683
684   def _is_owned(self):
685     """Is the current thread a current level owner?"""
686     return threading.currentThread() in self.__owners
687
688   def _add_owned(self, name=None):
689     """Note the current thread owns the given lock"""
690     if name is None:
691       if not self._is_owned():
692         self.__owners[threading.currentThread()] = set()
693     else:
694       if self._is_owned():
695         self.__owners[threading.currentThread()].add(name)
696       else:
697         self.__owners[threading.currentThread()] = set([name])
698
699   def _del_owned(self, name=None):
700     """Note the current thread owns the given lock"""
701
702     if name is not None:
703       self.__owners[threading.currentThread()].remove(name)
704
705     # Only remove the key if we don't hold the set-lock as well
706     if (not self.__lock._is_owned() and
707         not self.__owners[threading.currentThread()]):
708       del self.__owners[threading.currentThread()]
709
710   def _list_owned(self):
711     """Get the set of resource names owned by the current thread"""
712     if self._is_owned():
713       return self.__owners[threading.currentThread()].copy()
714     else:
715       return set()
716
717   def _release_and_delete_owned(self):
718     """Release and delete all resources owned by the current thread"""
719     for lname in self._list_owned():
720       self.__lockdict[lname].release()
721       self._del_owned(name=lname)
722
723   def __names(self):
724     """Return the current set of names.
725
726     Only call this function while holding __lock and don't iterate on the
727     result after releasing the lock.
728
729     """
730     return self.__lockdict.keys()
731
732   def _names(self):
733     """Return a copy of the current set of elements.
734
735     Used only for debugging purposes.
736
737     """
738     # If we don't already own the set-level lock acquired
739     # we'll get it and note we need to release it later.
740     release_lock = False
741     if not self.__lock._is_owned():
742       release_lock = True
743       self.__lock.acquire(shared=1)
744     try:
745       result = self.__names()
746     finally:
747       if release_lock:
748         self.__lock.release()
749     return set(result)
750
751   def acquire(self, names, timeout=None, shared=0, test_notify=None):
752     """Acquire a set of resource locks.
753
754     @param names: the names of the locks which shall be acquired
755         (special lock names, or instance/node names)
756     @param shared: whether to acquire in shared mode; by default an
757         exclusive lock will be acquired
758     @type timeout: float or None
759     @param timeout: Maximum time to acquire all locks
760     @type test_notify: callable or None
761     @param test_notify: Special callback function for unittesting
762
763     @return: Set of all locks successfully acquired or None in case of timeout
764
765     @raise errors.LockError: when any lock we try to acquire has
766         been deleted before we succeed. In this case none of the
767         locks requested will be acquired.
768
769     """
770     assert timeout is None or timeout >= 0.0
771
772     # Check we don't already own locks at this level
773     assert not self._is_owned(), "Cannot acquire locks in the same set twice"
774
775     # We need to keep track of how long we spent waiting for a lock. The
776     # timeout passed to this function is over all lock acquires.
777     remaining_timeout = timeout
778     if timeout is None:
779       start = None
780       calc_remaining_timeout = lambda: None
781     else:
782       start = time.time()
783       calc_remaining_timeout = lambda: (start + timeout) - time.time()
784
785     want_all = names is None
786
787     if want_all:
788       # If no names are given acquire the whole set by not letting new names
789       # being added before we release, and getting the current list of names.
790       # Some of them may then be deleted later, but we'll cope with this.
791       #
792       # We'd like to acquire this lock in a shared way, as it's nice if
793       # everybody else can use the instances at the same time. If are acquiring
794       # them exclusively though they won't be able to do this anyway, though,
795       # so we'll get the list lock exclusively as well in order to be able to
796       # do add() on the set while owning it.
797       self.__lock.acquire(shared=shared, timeout=remaining_timeout)
798       try:
799         # note we own the set-lock
800         self._add_owned()
801         names = self.__names()
802       except:
803         # We shouldn't have problems adding the lock to the owners list, but
804         # if we did we'll try to release this lock and re-raise exception.
805         # Of course something is going to be really wrong, after this.
806         self.__lock.release()
807         raise
808
809       # Re-calculate timeout
810       remaining_timeout = calc_remaining_timeout()
811
812     try:
813       try:
814         # Support passing in a single resource to acquire rather than many
815         if isinstance(names, basestring):
816           names = [names]
817         else:
818           names = sorted(names)
819
820         acquire_list = []
821         # First we look the locks up on __lockdict. We have no way of being sure
822         # they will still be there after, but this makes it a lot faster should
823         # just one of them be the already wrong
824         for lname in utils.UniqueSequence(names):
825           try:
826             lock = self.__lockdict[lname] # raises KeyError if lock is not there
827             acquire_list.append((lname, lock))
828           except KeyError:
829             if want_all:
830               # We are acquiring all the set, it doesn't matter if this
831               # particular element is not there anymore.
832               continue
833             else:
834               raise errors.LockError("Non-existing lock in set (%s)" % lname)
835
836         # This will hold the locknames we effectively acquired.
837         acquired = set()
838
839         # Now acquire_list contains a sorted list of resources and locks we
840         # want.  In order to get them we loop on this (private) list and
841         # acquire() them.  We gave no real guarantee they will still exist till
842         # this is done but .acquire() itself is safe and will alert us if the
843         # lock gets deleted.
844         for (lname, lock) in acquire_list:
845           if __debug__ and callable(test_notify):
846             test_notify_fn = lambda: test_notify(lname)
847           else:
848             test_notify_fn = None
849
850           try:
851             if timeout is not None and remaining_timeout < 0:
852               raise _AcquireTimeout()
853
854             # raises LockError if the lock was deleted
855             if not lock.acquire(shared=shared, timeout=remaining_timeout,
856                                 test_notify=test_notify_fn):
857               # Couldn't get lock or timeout occurred
858               if timeout is None:
859                 # This shouldn't happen as SharedLock.acquire(timeout=None) is
860                 # blocking.
861                 raise errors.LockError("Failed to get lock %s" % lname)
862
863               raise _AcquireTimeout()
864
865             # Re-calculate timeout
866             remaining_timeout = calc_remaining_timeout()
867
868             # now the lock cannot be deleted, we have it!
869             self._add_owned(name=lname)
870             acquired.add(lname)
871
872           except _AcquireTimeout:
873             # Release all acquired locks
874             self._release_and_delete_owned()
875             raise
876
877           except errors.LockError:
878             if want_all:
879               # We are acquiring all the set, it doesn't matter if this
880               # particular element is not there anymore.
881               continue
882
883             self._release_and_delete_owned()
884
885             raise errors.LockError("Non-existing lock in set (%s)" % lname)
886
887           except:
888             # We shouldn't have problems adding the lock to the owners list, but
889             # if we did we'll try to release this lock and re-raise exception.
890             # Of course something is going to be really wrong, after this.
891             if lock._is_owned():
892               lock.release()
893             raise
894
895       except:
896         # If something went wrong and we had the set-lock let's release it...
897         if want_all:
898           self.__lock.release()
899         raise
900
901     except _AcquireTimeout:
902       if want_all:
903         self._del_owned()
904
905       return None
906
907     return acquired
908
909   def release(self, names=None):
910     """Release a set of resource locks, at the same level.
911
912     You must have acquired the locks, either in shared or in exclusive mode,
913     before releasing them.
914
915     @param names: the names of the locks which shall be released
916         (defaults to all the locks acquired at that level).
917
918     """
919     assert self._is_owned(), "release() on lock set while not owner"
920
921     # Support passing in a single resource to release rather than many
922     if isinstance(names, basestring):
923       names = [names]
924
925     if names is None:
926       names = self._list_owned()
927     else:
928       names = set(names)
929       assert self._list_owned().issuperset(names), (
930                "release() on unheld resources %s" %
931                names.difference(self._list_owned()))
932
933     # First of all let's release the "all elements" lock, if set.
934     # After this 'add' can work again
935     if self.__lock._is_owned():
936       self.__lock.release()
937       self._del_owned()
938
939     for lockname in names:
940       # If we are sure the lock doesn't leave __lockdict without being
941       # exclusively held we can do this...
942       self.__lockdict[lockname].release()
943       self._del_owned(name=lockname)
944
945   def add(self, names, acquired=0, shared=0):
946     """Add a new set of elements to the set
947
948     @param names: names of the new elements to add
949     @param acquired: pre-acquire the new resource?
950     @param shared: is the pre-acquisition shared?
951
952     """
953     # Check we don't already own locks at this level
954     assert not self._is_owned() or self.__lock._is_owned(shared=0), \
955       "Cannot add locks if the set is only partially owned, or shared"
956
957     # Support passing in a single resource to add rather than many
958     if isinstance(names, basestring):
959       names = [names]
960
961     # If we don't already own the set-level lock acquired in an exclusive way
962     # we'll get it and note we need to release it later.
963     release_lock = False
964     if not self.__lock._is_owned():
965       release_lock = True
966       self.__lock.acquire()
967
968     try:
969       invalid_names = set(self.__names()).intersection(names)
970       if invalid_names:
971         # This must be an explicit raise, not an assert, because assert is
972         # turned off when using optimization, and this can happen because of
973         # concurrency even if the user doesn't want it.
974         raise errors.LockError("duplicate add() (%s)" % invalid_names)
975
976       for lockname in names:
977         lock = SharedLock()
978
979         if acquired:
980           lock.acquire(shared=shared)
981           # now the lock cannot be deleted, we have it!
982           try:
983             self._add_owned(name=lockname)
984           except:
985             # We shouldn't have problems adding the lock to the owners list,
986             # but if we did we'll try to release this lock and re-raise
987             # exception.  Of course something is going to be really wrong,
988             # after this.  On the other hand the lock hasn't been added to the
989             # __lockdict yet so no other threads should be pending on it. This
990             # release is just a safety measure.
991             lock.release()
992             raise
993
994         self.__lockdict[lockname] = lock
995
996     finally:
997       # Only release __lock if we were not holding it previously.
998       if release_lock:
999         self.__lock.release()
1000
1001     return True
1002
1003   def remove(self, names):
1004     """Remove elements from the lock set.
1005
1006     You can either not hold anything in the lockset or already hold a superset
1007     of the elements you want to delete, exclusively.
1008
1009     @param names: names of the resource to remove.
1010
1011     @return: a list of locks which we removed; the list is always
1012         equal to the names list if we were holding all the locks
1013         exclusively
1014
1015     """
1016     # Support passing in a single resource to remove rather than many
1017     if isinstance(names, basestring):
1018       names = [names]
1019
1020     # If we own any subset of this lock it must be a superset of what we want
1021     # to delete. The ownership must also be exclusive, but that will be checked
1022     # by the lock itself.
1023     assert not self._is_owned() or self._list_owned().issuperset(names), (
1024       "remove() on acquired lockset while not owning all elements")
1025
1026     removed = []
1027
1028     for lname in names:
1029       # Calling delete() acquires the lock exclusively if we don't already own
1030       # it, and causes all pending and subsequent lock acquires to fail. It's
1031       # fine to call it out of order because delete() also implies release(),
1032       # and the assertion above guarantees that if we either already hold
1033       # everything we want to delete, or we hold none.
1034       try:
1035         self.__lockdict[lname].delete()
1036         removed.append(lname)
1037       except (KeyError, errors.LockError):
1038         # This cannot happen if we were already holding it, verify:
1039         assert not self._is_owned(), "remove failed while holding lockset"
1040       else:
1041         # If no LockError was raised we are the ones who deleted the lock.
1042         # This means we can safely remove it from lockdict, as any further or
1043         # pending delete() or acquire() will fail (and nobody can have the lock
1044         # since before our call to delete()).
1045         #
1046         # This is done in an else clause because if the exception was thrown
1047         # it's the job of the one who actually deleted it.
1048         del self.__lockdict[lname]
1049         # And let's remove it from our private list if we owned it.
1050         if self._is_owned():
1051           self._del_owned(name=lname)
1052
1053     return removed
1054
1055
1056 # Locking levels, must be acquired in increasing order.
1057 # Current rules are:
1058 #   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1059 #   acquired before performing any operation, either in shared or in exclusive
1060 #   mode. acquiring the BGL in exclusive mode is discouraged and should be
1061 #   avoided.
1062 #   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1063 #   If you need more than one node, or more than one instance, acquire them at
1064 #   the same time.
1065 LEVEL_CLUSTER = 0
1066 LEVEL_INSTANCE = 1
1067 LEVEL_NODE = 2
1068
1069 LEVELS = [LEVEL_CLUSTER,
1070           LEVEL_INSTANCE,
1071           LEVEL_NODE]
1072
1073 # Lock levels which are modifiable
1074 LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1075
1076 LEVEL_NAMES = {
1077   LEVEL_CLUSTER: "cluster",
1078   LEVEL_INSTANCE: "instance",
1079   LEVEL_NODE: "node",
1080   }
1081
1082 # Constant for the big ganeti lock
1083 BGL = 'BGL'
1084
1085
1086 class GanetiLockManager:
1087   """The Ganeti Locking Library
1088
1089   The purpose of this small library is to manage locking for ganeti clusters
1090   in a central place, while at the same time doing dynamic checks against
1091   possible deadlocks. It will also make it easier to transition to a different
1092   lock type should we migrate away from python threads.
1093
1094   """
1095   _instance = None
1096
1097   def __init__(self, nodes=None, instances=None):
1098     """Constructs a new GanetiLockManager object.
1099
1100     There should be only a GanetiLockManager object at any time, so this
1101     function raises an error if this is not the case.
1102
1103     @param nodes: list of node names
1104     @param instances: list of instance names
1105
1106     """
1107     assert self.__class__._instance is None, \
1108            "double GanetiLockManager instance"
1109
1110     self.__class__._instance = self
1111
1112     # The keyring contains all the locks, at their level and in the correct
1113     # locking order.
1114     self.__keyring = {
1115       LEVEL_CLUSTER: LockSet([BGL]),
1116       LEVEL_NODE: LockSet(nodes),
1117       LEVEL_INSTANCE: LockSet(instances),
1118     }
1119
1120   def _names(self, level):
1121     """List the lock names at the given level.
1122
1123     This can be used for debugging/testing purposes.
1124
1125     @param level: the level whose list of locks to get
1126
1127     """
1128     assert level in LEVELS, "Invalid locking level %s" % level
1129     return self.__keyring[level]._names()
1130
1131   def _is_owned(self, level):
1132     """Check whether we are owning locks at the given level
1133
1134     """
1135     return self.__keyring[level]._is_owned()
1136
1137   is_owned = _is_owned
1138
1139   def _list_owned(self, level):
1140     """Get the set of owned locks at the given level
1141
1142     """
1143     return self.__keyring[level]._list_owned()
1144
1145   def _upper_owned(self, level):
1146     """Check that we don't own any lock at a level greater than the given one.
1147
1148     """
1149     # This way of checking only works if LEVELS[i] = i, which we check for in
1150     # the test cases.
1151     return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1152
1153   def _BGL_owned(self):
1154     """Check if the current thread owns the BGL.
1155
1156     Both an exclusive or a shared acquisition work.
1157
1158     """
1159     return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1160
1161   def _contains_BGL(self, level, names):
1162     """Check if the level contains the BGL.
1163
1164     Check if acting on the given level and set of names will change
1165     the status of the Big Ganeti Lock.
1166
1167     """
1168     return level == LEVEL_CLUSTER and (names is None or BGL in names)
1169
1170   def acquire(self, level, names, timeout=None, shared=0):
1171     """Acquire a set of resource locks, at the same level.
1172
1173     @param level: the level at which the locks shall be acquired;
1174         it must be a member of LEVELS.
1175     @param names: the names of the locks which shall be acquired
1176         (special lock names, or instance/node names)
1177     @param shared: whether to acquire in shared mode; by default
1178         an exclusive lock will be acquired
1179     @type timeout: float
1180     @param timeout: Maximum time to acquire all locks
1181
1182     """
1183     assert level in LEVELS, "Invalid locking level %s" % level
1184
1185     # Check that we are either acquiring the Big Ganeti Lock or we already own
1186     # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1187     # so even if we've migrated we need to at least share the BGL to be
1188     # compatible with them. Of course if we own the BGL exclusively there's no
1189     # point in acquiring any other lock, unless perhaps we are half way through
1190     # the migration of the current opcode.
1191     assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1192             "You must own the Big Ganeti Lock before acquiring any other")
1193
1194     # Check we don't own locks at the same or upper levels.
1195     assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1196            " while owning some at a greater one")
1197
1198     # Acquire the locks in the set.
1199     return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1200
1201   def release(self, level, names=None):
1202     """Release a set of resource locks, at the same level.
1203
1204     You must have acquired the locks, either in shared or in exclusive
1205     mode, before releasing them.
1206
1207     @param level: the level at which the locks shall be released;
1208         it must be a member of LEVELS
1209     @param names: the names of the locks which shall be released
1210         (defaults to all the locks acquired at that level)
1211
1212     """
1213     assert level in LEVELS, "Invalid locking level %s" % level
1214     assert (not self._contains_BGL(level, names) or
1215             not self._upper_owned(LEVEL_CLUSTER)), (
1216             "Cannot release the Big Ganeti Lock while holding something"
1217             " at upper levels")
1218
1219     # Release will complain if we don't own the locks already
1220     return self.__keyring[level].release(names)
1221
1222   def add(self, level, names, acquired=0, shared=0):
1223     """Add locks at the specified level.
1224
1225     @param level: the level at which the locks shall be added;
1226         it must be a member of LEVELS_MOD.
1227     @param names: names of the locks to acquire
1228     @param acquired: whether to acquire the newly added locks
1229     @param shared: whether the acquisition will be shared
1230
1231     """
1232     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1233     assert self._BGL_owned(), ("You must own the BGL before performing other"
1234            " operations")
1235     assert not self._upper_owned(level), ("Cannot add locks at a level"
1236            " while owning some at a greater one")
1237     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1238
1239   def remove(self, level, names):
1240     """Remove locks from the specified level.
1241
1242     You must either already own the locks you are trying to remove
1243     exclusively or not own any lock at an upper level.
1244
1245     @param level: the level at which the locks shall be removed;
1246         it must be a member of LEVELS_MOD
1247     @param names: the names of the locks which shall be removed
1248         (special lock names, or instance/node names)
1249
1250     """
1251     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1252     assert self._BGL_owned(), ("You must own the BGL before performing other"
1253            " operations")
1254     # Check we either own the level or don't own anything from here
1255     # up. LockSet.remove() will check the case in which we don't own
1256     # all the needed resources, or we have a shared ownership.
1257     assert self._is_owned(level) or not self._upper_owned(level), (
1258            "Cannot remove locks at a level while not owning it or"
1259            " owning some at a greater one")
1260     return self.__keyring[level].remove(names)