Add the options attribute to cli.JobExecutor
[ganeti-local] / lib / locking.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Module implementing the Ganeti locking code."""
22
23 # pylint: disable-msg=W0212
24
25 # W0212 since e.g. LockSet methods use (a lot) the internals of
26 # SharedLock
27
28 import os
29 import select
30 import threading
31 import time
32 import errno
33
34 from ganeti import errors
35 from ganeti import utils
36
37
38 def ssynchronized(lock, shared=0):
39   """Shared Synchronization decorator.
40
41   Calls the function holding the given lock, either in exclusive or shared
42   mode. It requires the passed lock to be a SharedLock (or support its
43   semantics).
44
45   """
46   def wrap(fn):
47     def sync_function(*args, **kwargs):
48       lock.acquire(shared=shared)
49       try:
50         return fn(*args, **kwargs)
51       finally:
52         lock.release()
53     return sync_function
54   return wrap
55
56
57 class RunningTimeout(object):
58   """Class to calculate remaining timeout when doing several operations.
59
60   """
61   __slots__ = [
62     "_allow_negative",
63     "_start_time",
64     "_time_fn",
65     "_timeout",
66     ]
67
68   def __init__(self, timeout, allow_negative, _time_fn=time.time):
69     """Initializes this class.
70
71     @type timeout: float
72     @param timeout: Timeout duration
73     @type allow_negative: bool
74     @param allow_negative: Whether to return values below zero
75     @param _time_fn: Time function for unittests
76
77     """
78     object.__init__(self)
79
80     if timeout is not None and timeout < 0.0:
81       raise ValueError("Timeout must not be negative")
82
83     self._timeout = timeout
84     self._allow_negative = allow_negative
85     self._time_fn = _time_fn
86
87     self._start_time = None
88
89   def Remaining(self):
90     """Returns the remaining timeout.
91
92     """
93     if self._timeout is None:
94       return None
95
96     # Get start time on first calculation
97     if self._start_time is None:
98       self._start_time = self._time_fn()
99
100     # Calculate remaining time
101     remaining_timeout = self._start_time + self._timeout - self._time_fn()
102
103     if not self._allow_negative:
104       # Ensure timeout is always >= 0
105       return max(0.0, remaining_timeout)
106
107     return remaining_timeout
108
109
110 class _SingleNotifyPipeConditionWaiter(object):
111   """Helper class for SingleNotifyPipeCondition
112
113   """
114   __slots__ = [
115     "_fd",
116     "_poller",
117     ]
118
119   def __init__(self, poller, fd):
120     """Constructor for _SingleNotifyPipeConditionWaiter
121
122     @type poller: select.poll
123     @param poller: Poller object
124     @type fd: int
125     @param fd: File descriptor to wait for
126
127     """
128     object.__init__(self)
129     self._poller = poller
130     self._fd = fd
131
132   def __call__(self, timeout):
133     """Wait for something to happen on the pipe.
134
135     @type timeout: float or None
136     @param timeout: Timeout for waiting (can be None)
137
138     """
139     running_timeout = RunningTimeout(timeout, True)
140
141     while True:
142       remaining_time = running_timeout.Remaining()
143
144       if remaining_time is not None and remaining_time < 0.0:
145         break
146
147       try:
148         result = self._poller.poll(remaining_time)
149       except EnvironmentError, err:
150         if err.errno != errno.EINTR:
151           raise
152         result = None
153
154       # Check whether we were notified
155       if result and result[0][0] == self._fd:
156         break
157
158
159 class _BaseCondition(object):
160   """Base class containing common code for conditions.
161
162   Some of this code is taken from python's threading module.
163
164   """
165   __slots__ = [
166     "_lock",
167     "acquire",
168     "release",
169     ]
170
171   def __init__(self, lock):
172     """Constructor for _BaseCondition.
173
174     @type lock: threading.Lock
175     @param lock: condition base lock
176
177     """
178     object.__init__(self)
179
180     # Recursive locks are not supported
181     assert not hasattr(lock, "_acquire_restore")
182     assert not hasattr(lock, "_release_save")
183
184     self._lock = lock
185
186     # Export the lock's acquire() and release() methods
187     self.acquire = lock.acquire
188     self.release = lock.release
189
190   def _is_owned(self):
191     """Check whether lock is owned by current thread.
192
193     """
194     if self._lock.acquire(0):
195       self._lock.release()
196       return False
197
198     return True
199
200   def _check_owned(self):
201     """Raise an exception if the current thread doesn't own the lock.
202
203     """
204     if not self._is_owned():
205       raise RuntimeError("cannot work with un-aquired lock")
206
207
208 class SingleNotifyPipeCondition(_BaseCondition):
209   """Condition which can only be notified once.
210
211   This condition class uses pipes and poll, internally, to be able to wait for
212   notification with a timeout, without resorting to polling. It is almost
213   compatible with Python's threading.Condition, with the following differences:
214     - notifyAll can only be called once, and no wait can happen after that
215     - notify is not supported, only notifyAll
216
217   """
218
219   __slots__ = [
220     "_poller",
221     "_read_fd",
222     "_write_fd",
223     "_nwaiters",
224     "_notified",
225     ]
226
227   _waiter_class = _SingleNotifyPipeConditionWaiter
228
229   def __init__(self, lock):
230     """Constructor for SingleNotifyPipeCondition
231
232     """
233     _BaseCondition.__init__(self, lock)
234     self._nwaiters = 0
235     self._notified = False
236     self._read_fd = None
237     self._write_fd = None
238     self._poller = None
239
240   def _check_unnotified(self):
241     """Throws an exception if already notified.
242
243     """
244     if self._notified:
245       raise RuntimeError("cannot use already notified condition")
246
247   def _Cleanup(self):
248     """Cleanup open file descriptors, if any.
249
250     """
251     if self._read_fd is not None:
252       os.close(self._read_fd)
253       self._read_fd = None
254
255     if self._write_fd is not None:
256       os.close(self._write_fd)
257       self._write_fd = None
258     self._poller = None
259
260   def wait(self, timeout=None):
261     """Wait for a notification.
262
263     @type timeout: float or None
264     @param timeout: Waiting timeout (can be None)
265
266     """
267     self._check_owned()
268     self._check_unnotified()
269
270     self._nwaiters += 1
271     try:
272       if self._poller is None:
273         (self._read_fd, self._write_fd) = os.pipe()
274         self._poller = select.poll()
275         self._poller.register(self._read_fd, select.POLLHUP)
276
277       wait_fn = self._waiter_class(self._poller, self._read_fd)
278       self.release()
279       try:
280         # Wait for notification
281         wait_fn(timeout)
282       finally:
283         # Re-acquire lock
284         self.acquire()
285     finally:
286       self._nwaiters -= 1
287       if self._nwaiters == 0:
288         self._Cleanup()
289
290   def notifyAll(self): # pylint: disable-msg=C0103
291     """Close the writing side of the pipe to notify all waiters.
292
293     """
294     self._check_owned()
295     self._check_unnotified()
296     self._notified = True
297     if self._write_fd is not None:
298       os.close(self._write_fd)
299       self._write_fd = None
300
301
302 class PipeCondition(_BaseCondition):
303   """Group-only non-polling condition with counters.
304
305   This condition class uses pipes and poll, internally, to be able to wait for
306   notification with a timeout, without resorting to polling. It is almost
307   compatible with Python's threading.Condition, but only supports notifyAll and
308   non-recursive locks. As an additional features it's able to report whether
309   there are any waiting threads.
310
311   """
312   __slots__ = [
313     "_nwaiters",
314     "_single_condition",
315     ]
316
317   _single_condition_class = SingleNotifyPipeCondition
318
319   def __init__(self, lock):
320     """Initializes this class.
321
322     """
323     _BaseCondition.__init__(self, lock)
324     self._nwaiters = 0
325     self._single_condition = self._single_condition_class(self._lock)
326
327   def wait(self, timeout=None):
328     """Wait for a notification.
329
330     @type timeout: float or None
331     @param timeout: Waiting timeout (can be None)
332
333     """
334     self._check_owned()
335
336     # Keep local reference to the pipe. It could be replaced by another thread
337     # notifying while we're waiting.
338     my_condition = self._single_condition
339
340     assert self._nwaiters >= 0
341     self._nwaiters += 1
342     try:
343       my_condition.wait(timeout)
344     finally:
345       assert self._nwaiters > 0
346       self._nwaiters -= 1
347
348   def notifyAll(self): # pylint: disable-msg=C0103
349     """Notify all currently waiting threads.
350
351     """
352     self._check_owned()
353     self._single_condition.notifyAll()
354     self._single_condition = self._single_condition_class(self._lock)
355
356   def has_waiting(self):
357     """Returns whether there are active waiters.
358
359     """
360     self._check_owned()
361
362     return bool(self._nwaiters)
363
364
365 class _CountingCondition(object):
366   """Wrapper for Python's built-in threading.Condition class.
367
368   This wrapper keeps a count of active waiters. We can't access the internal
369   "__waiters" attribute of threading.Condition because it's not thread-safe.
370
371   """
372   __slots__ = [
373     "_cond",
374     "_nwaiters",
375     ]
376
377   def __init__(self, lock):
378     """Initializes this class.
379
380     """
381     object.__init__(self)
382     self._cond = threading.Condition(lock=lock)
383     self._nwaiters = 0
384
385   def notifyAll(self): # pylint: disable-msg=C0103
386     """Notifies the condition.
387
388     """
389     return self._cond.notifyAll()
390
391   def wait(self, timeout=None):
392     """Waits for the condition to be notified.
393
394     @type timeout: float or None
395     @param timeout: Waiting timeout (can be None)
396
397     """
398     assert self._nwaiters >= 0
399
400     self._nwaiters += 1
401     try:
402       return self._cond.wait(timeout=timeout)
403     finally:
404       self._nwaiters -= 1
405
406   def has_waiting(self):
407     """Returns whether there are active waiters.
408
409     """
410     return bool(self._nwaiters)
411
412
413 class SharedLock(object):
414   """Implements a shared lock.
415
416   Multiple threads can acquire the lock in a shared way, calling
417   acquire_shared().  In order to acquire the lock in an exclusive way threads
418   can call acquire_exclusive().
419
420   The lock prevents starvation but does not guarantee that threads will acquire
421   the shared lock in the order they queued for it, just that they will
422   eventually do so.
423
424   """
425   __slots__ = [
426     "__active_shr_c",
427     "__inactive_shr_c",
428     "__deleted",
429     "__exc",
430     "__lock",
431     "__pending",
432     "__shr",
433     ]
434
435   __condition_class = PipeCondition
436
437   def __init__(self):
438     """Construct a new SharedLock.
439
440     """
441     object.__init__(self)
442
443     # Internal lock
444     self.__lock = threading.Lock()
445
446     # Queue containing waiting acquires
447     self.__pending = []
448
449     # Active and inactive conditions for shared locks
450     self.__active_shr_c = self.__condition_class(self.__lock)
451     self.__inactive_shr_c = self.__condition_class(self.__lock)
452
453     # Current lock holders
454     self.__shr = set()
455     self.__exc = None
456
457     # is this lock in the deleted state?
458     self.__deleted = False
459
460   def __check_deleted(self):
461     """Raises an exception if the lock has been deleted.
462
463     """
464     if self.__deleted:
465       raise errors.LockError("Deleted lock")
466
467   def __is_sharer(self):
468     """Is the current thread sharing the lock at this time?
469
470     """
471     return threading.currentThread() in self.__shr
472
473   def __is_exclusive(self):
474     """Is the current thread holding the lock exclusively at this time?
475
476     """
477     return threading.currentThread() == self.__exc
478
479   def __is_owned(self, shared=-1):
480     """Is the current thread somehow owning the lock at this time?
481
482     This is a private version of the function, which presumes you're holding
483     the internal lock.
484
485     """
486     if shared < 0:
487       return self.__is_sharer() or self.__is_exclusive()
488     elif shared:
489       return self.__is_sharer()
490     else:
491       return self.__is_exclusive()
492
493   def _is_owned(self, shared=-1):
494     """Is the current thread somehow owning the lock at this time?
495
496     @param shared:
497         - < 0: check for any type of ownership (default)
498         - 0: check for exclusive ownership
499         - > 0: check for shared ownership
500
501     """
502     self.__lock.acquire()
503     try:
504       return self.__is_owned(shared=shared)
505     finally:
506       self.__lock.release()
507
508   def _count_pending(self):
509     """Returns the number of pending acquires.
510
511     @rtype: int
512
513     """
514     self.__lock.acquire()
515     try:
516       return len(self.__pending)
517     finally:
518       self.__lock.release()
519
520   def __do_acquire(self, shared):
521     """Actually acquire the lock.
522
523     """
524     if shared:
525       self.__shr.add(threading.currentThread())
526     else:
527       self.__exc = threading.currentThread()
528
529   def __can_acquire(self, shared):
530     """Determine whether lock can be acquired.
531
532     """
533     if shared:
534       return self.__exc is None
535     else:
536       return len(self.__shr) == 0 and self.__exc is None
537
538   def __is_on_top(self, cond):
539     """Checks whether the passed condition is on top of the queue.
540
541     The caller must make sure the queue isn't empty.
542
543     """
544     return self.__pending[0] == cond
545
546   def __acquire_unlocked(self, shared, timeout):
547     """Acquire a shared lock.
548
549     @param shared: whether to acquire in shared mode; by default an
550         exclusive lock will be acquired
551     @param timeout: maximum waiting time before giving up
552
553     """
554     self.__check_deleted()
555
556     # We cannot acquire the lock if we already have it
557     assert not self.__is_owned(), "double acquire() on a non-recursive lock"
558
559     # Check whether someone else holds the lock or there are pending acquires.
560     if not self.__pending and self.__can_acquire(shared):
561       # Apparently not, can acquire lock directly.
562       self.__do_acquire(shared)
563       return True
564
565     if shared:
566       wait_condition = self.__active_shr_c
567
568       # Check if we're not yet in the queue
569       if wait_condition not in self.__pending:
570         self.__pending.append(wait_condition)
571     else:
572       wait_condition = self.__condition_class(self.__lock)
573       # Always add to queue
574       self.__pending.append(wait_condition)
575
576     try:
577       # Wait until we become the topmost acquire in the queue or the timeout
578       # expires.
579       while not (self.__is_on_top(wait_condition) and
580                  self.__can_acquire(shared)):
581         # Wait for notification
582         wait_condition.wait(timeout)
583         self.__check_deleted()
584
585         # A lot of code assumes blocking acquires always succeed. Loop
586         # internally for that case.
587         if timeout is not None:
588           break
589
590       if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
591         self.__do_acquire(shared)
592         return True
593     finally:
594       # Remove condition from queue if there are no more waiters
595       if not wait_condition.has_waiting() and not self.__deleted:
596         self.__pending.remove(wait_condition)
597
598     return False
599
600   def acquire(self, shared=0, timeout=None, test_notify=None):
601     """Acquire a shared lock.
602
603     @type shared: integer (0/1) used as a boolean
604     @param shared: whether to acquire in shared mode; by default an
605         exclusive lock will be acquired
606     @type timeout: float
607     @param timeout: maximum waiting time before giving up
608     @type test_notify: callable or None
609     @param test_notify: Special callback function for unittesting
610
611     """
612     self.__lock.acquire()
613     try:
614       # We already got the lock, notify now
615       if __debug__ and callable(test_notify):
616         test_notify()
617
618       return self.__acquire_unlocked(shared, timeout)
619     finally:
620       self.__lock.release()
621
622   def release(self):
623     """Release a Shared Lock.
624
625     You must have acquired the lock, either in shared or in exclusive mode,
626     before calling this function.
627
628     """
629     self.__lock.acquire()
630     try:
631       assert self.__is_exclusive() or self.__is_sharer(), \
632         "Cannot release non-owned lock"
633
634       # Autodetect release type
635       if self.__is_exclusive():
636         self.__exc = None
637       else:
638         self.__shr.remove(threading.currentThread())
639
640       # Notify topmost condition in queue
641       if self.__pending:
642         first_condition = self.__pending[0]
643         first_condition.notifyAll()
644
645         if first_condition == self.__active_shr_c:
646           self.__active_shr_c = self.__inactive_shr_c
647           self.__inactive_shr_c = first_condition
648
649     finally:
650       self.__lock.release()
651
652   def delete(self, timeout=None):
653     """Delete a Shared Lock.
654
655     This operation will declare the lock for removal. First the lock will be
656     acquired in exclusive mode if you don't already own it, then the lock
657     will be put in a state where any future and pending acquire() fail.
658
659     @type timeout: float
660     @param timeout: maximum waiting time before giving up
661
662     """
663     self.__lock.acquire()
664     try:
665       assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
666
667       self.__check_deleted()
668
669       # The caller is allowed to hold the lock exclusively already.
670       acquired = self.__is_exclusive()
671
672       if not acquired:
673         acquired = self.__acquire_unlocked(0, timeout)
674
675         assert self.__is_exclusive() and not self.__is_sharer(), \
676           "Lock wasn't acquired in exclusive mode"
677
678       if acquired:
679         self.__deleted = True
680         self.__exc = None
681
682         # Notify all acquires. They'll throw an error.
683         while self.__pending:
684           self.__pending.pop().notifyAll()
685
686       return acquired
687     finally:
688       self.__lock.release()
689
690
691 # Whenever we want to acquire a full LockSet we pass None as the value
692 # to acquire.  Hide this behind this nicely named constant.
693 ALL_SET = None
694
695
696 class _AcquireTimeout(Exception):
697   """Internal exception to abort an acquire on a timeout.
698
699   """
700
701
702 class LockSet:
703   """Implements a set of locks.
704
705   This abstraction implements a set of shared locks for the same resource type,
706   distinguished by name. The user can lock a subset of the resources and the
707   LockSet will take care of acquiring the locks always in the same order, thus
708   preventing deadlock.
709
710   All the locks needed in the same set must be acquired together, though.
711
712   """
713   def __init__(self, members=None):
714     """Constructs a new LockSet.
715
716     @type members: list of strings
717     @param members: initial members of the set
718
719     """
720     # Used internally to guarantee coherency.
721     self.__lock = SharedLock()
722
723     # The lockdict indexes the relationship name -> lock
724     # The order-of-locking is implied by the alphabetical order of names
725     self.__lockdict = {}
726
727     if members is not None:
728       for name in members:
729         self.__lockdict[name] = SharedLock()
730
731     # The owner dict contains the set of locks each thread owns. For
732     # performance each thread can access its own key without a global lock on
733     # this structure. It is paramount though that *no* other type of access is
734     # done to this structure (eg. no looping over its keys). *_owner helper
735     # function are defined to guarantee access is correct, but in general never
736     # do anything different than __owners[threading.currentThread()], or there
737     # will be trouble.
738     self.__owners = {}
739
740   def _is_owned(self):
741     """Is the current thread a current level owner?"""
742     return threading.currentThread() in self.__owners
743
744   def _add_owned(self, name=None):
745     """Note the current thread owns the given lock"""
746     if name is None:
747       if not self._is_owned():
748         self.__owners[threading.currentThread()] = set()
749     else:
750       if self._is_owned():
751         self.__owners[threading.currentThread()].add(name)
752       else:
753         self.__owners[threading.currentThread()] = set([name])
754
755   def _del_owned(self, name=None):
756     """Note the current thread owns the given lock"""
757
758     assert not (name is None and self.__lock._is_owned()), \
759            "Cannot hold internal lock when deleting owner status"
760
761     if name is not None:
762       self.__owners[threading.currentThread()].remove(name)
763
764     # Only remove the key if we don't hold the set-lock as well
765     if (not self.__lock._is_owned() and
766         not self.__owners[threading.currentThread()]):
767       del self.__owners[threading.currentThread()]
768
769   def _list_owned(self):
770     """Get the set of resource names owned by the current thread"""
771     if self._is_owned():
772       return self.__owners[threading.currentThread()].copy()
773     else:
774       return set()
775
776   def _release_and_delete_owned(self):
777     """Release and delete all resources owned by the current thread"""
778     for lname in self._list_owned():
779       lock = self.__lockdict[lname]
780       if lock._is_owned():
781         lock.release()
782       self._del_owned(name=lname)
783
784   def __names(self):
785     """Return the current set of names.
786
787     Only call this function while holding __lock and don't iterate on the
788     result after releasing the lock.
789
790     """
791     return self.__lockdict.keys()
792
793   def _names(self):
794     """Return a copy of the current set of elements.
795
796     Used only for debugging purposes.
797
798     """
799     # If we don't already own the set-level lock acquired
800     # we'll get it and note we need to release it later.
801     release_lock = False
802     if not self.__lock._is_owned():
803       release_lock = True
804       self.__lock.acquire(shared=1)
805     try:
806       result = self.__names()
807     finally:
808       if release_lock:
809         self.__lock.release()
810     return set(result)
811
812   def acquire(self, names, timeout=None, shared=0, test_notify=None):
813     """Acquire a set of resource locks.
814
815     @type names: list of strings (or string)
816     @param names: the names of the locks which shall be acquired
817         (special lock names, or instance/node names)
818     @type shared: integer (0/1) used as a boolean
819     @param shared: whether to acquire in shared mode; by default an
820         exclusive lock will be acquired
821     @type timeout: float or None
822     @param timeout: Maximum time to acquire all locks
823     @type test_notify: callable or None
824     @param test_notify: Special callback function for unittesting
825
826     @return: Set of all locks successfully acquired or None in case of timeout
827
828     @raise errors.LockError: when any lock we try to acquire has
829         been deleted before we succeed. In this case none of the
830         locks requested will be acquired.
831
832     """
833     assert timeout is None or timeout >= 0.0
834
835     # Check we don't already own locks at this level
836     assert not self._is_owned(), "Cannot acquire locks in the same set twice"
837
838     # We need to keep track of how long we spent waiting for a lock. The
839     # timeout passed to this function is over all lock acquires.
840     running_timeout = RunningTimeout(timeout, False)
841
842     try:
843       if names is not None:
844         # Support passing in a single resource to acquire rather than many
845         if isinstance(names, basestring):
846           names = [names]
847
848         return self.__acquire_inner(names, False, shared,
849                                     running_timeout.Remaining, test_notify)
850
851       else:
852         # If no names are given acquire the whole set by not letting new names
853         # being added before we release, and getting the current list of names.
854         # Some of them may then be deleted later, but we'll cope with this.
855         #
856         # We'd like to acquire this lock in a shared way, as it's nice if
857         # everybody else can use the instances at the same time. If are
858         # acquiring them exclusively though they won't be able to do this
859         # anyway, though, so we'll get the list lock exclusively as well in
860         # order to be able to do add() on the set while owning it.
861         if not self.__lock.acquire(shared=shared,
862                                    timeout=running_timeout.Remaining()):
863           raise _AcquireTimeout()
864         try:
865           # note we own the set-lock
866           self._add_owned()
867
868           return self.__acquire_inner(self.__names(), True, shared,
869                                       running_timeout.Remaining, test_notify)
870         except:
871           # We shouldn't have problems adding the lock to the owners list, but
872           # if we did we'll try to release this lock and re-raise exception.
873           # Of course something is going to be really wrong, after this.
874           self.__lock.release()
875           self._del_owned()
876           raise
877
878     except _AcquireTimeout:
879       return None
880
881   def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
882     """Inner logic for acquiring a number of locks.
883
884     @param names: Names of the locks to be acquired
885     @param want_all: Whether all locks in the set should be acquired
886     @param shared: Whether to acquire in shared mode
887     @param timeout_fn: Function returning remaining timeout
888     @param test_notify: Special callback function for unittesting
889
890     """
891     acquire_list = []
892
893     # First we look the locks up on __lockdict. We have no way of being sure
894     # they will still be there after, but this makes it a lot faster should
895     # just one of them be the already wrong. Using a sorted sequence to prevent
896     # deadlocks.
897     for lname in sorted(utils.UniqueSequence(names)):
898       try:
899         lock = self.__lockdict[lname] # raises KeyError if lock is not there
900       except KeyError:
901         if want_all:
902           # We are acquiring all the set, it doesn't matter if this particular
903           # element is not there anymore.
904           continue
905
906         raise errors.LockError("Non-existing lock in set (%s)" % lname)
907
908       acquire_list.append((lname, lock))
909
910     # This will hold the locknames we effectively acquired.
911     acquired = set()
912
913     try:
914       # Now acquire_list contains a sorted list of resources and locks we
915       # want.  In order to get them we loop on this (private) list and
916       # acquire() them.  We gave no real guarantee they will still exist till
917       # this is done but .acquire() itself is safe and will alert us if the
918       # lock gets deleted.
919       for (lname, lock) in acquire_list:
920         if __debug__ and callable(test_notify):
921           test_notify_fn = lambda: test_notify(lname)
922         else:
923           test_notify_fn = None
924
925         timeout = timeout_fn()
926
927         try:
928           # raises LockError if the lock was deleted
929           acq_success = lock.acquire(shared=shared, timeout=timeout,
930                                      test_notify=test_notify_fn)
931         except errors.LockError:
932           if want_all:
933             # We are acquiring all the set, it doesn't matter if this
934             # particular element is not there anymore.
935             continue
936
937           raise errors.LockError("Non-existing lock in set (%s)" % lname)
938
939         if not acq_success:
940           # Couldn't get lock or timeout occurred
941           if timeout is None:
942             # This shouldn't happen as SharedLock.acquire(timeout=None) is
943             # blocking.
944             raise errors.LockError("Failed to get lock %s" % lname)
945
946           raise _AcquireTimeout()
947
948         try:
949           # now the lock cannot be deleted, we have it!
950           self._add_owned(name=lname)
951           acquired.add(lname)
952
953         except:
954           # We shouldn't have problems adding the lock to the owners list, but
955           # if we did we'll try to release this lock and re-raise exception.
956           # Of course something is going to be really wrong after this.
957           if lock._is_owned():
958             lock.release()
959           raise
960
961     except:
962       # Release all owned locks
963       self._release_and_delete_owned()
964       raise
965
966     return acquired
967
968   def release(self, names=None):
969     """Release a set of resource locks, at the same level.
970
971     You must have acquired the locks, either in shared or in exclusive mode,
972     before releasing them.
973
974     @type names: list of strings, or None
975     @param names: the names of the locks which shall be released
976         (defaults to all the locks acquired at that level).
977
978     """
979     assert self._is_owned(), "release() on lock set while not owner"
980
981     # Support passing in a single resource to release rather than many
982     if isinstance(names, basestring):
983       names = [names]
984
985     if names is None:
986       names = self._list_owned()
987     else:
988       names = set(names)
989       assert self._list_owned().issuperset(names), (
990                "release() on unheld resources %s" %
991                names.difference(self._list_owned()))
992
993     # First of all let's release the "all elements" lock, if set.
994     # After this 'add' can work again
995     if self.__lock._is_owned():
996       self.__lock.release()
997       self._del_owned()
998
999     for lockname in names:
1000       # If we are sure the lock doesn't leave __lockdict without being
1001       # exclusively held we can do this...
1002       self.__lockdict[lockname].release()
1003       self._del_owned(name=lockname)
1004
1005   def add(self, names, acquired=0, shared=0):
1006     """Add a new set of elements to the set
1007
1008     @type names: list of strings
1009     @param names: names of the new elements to add
1010     @type acquired: integer (0/1) used as a boolean
1011     @param acquired: pre-acquire the new resource?
1012     @type shared: integer (0/1) used as a boolean
1013     @param shared: is the pre-acquisition shared?
1014
1015     """
1016     # Check we don't already own locks at this level
1017     assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1018       "Cannot add locks if the set is only partially owned, or shared"
1019
1020     # Support passing in a single resource to add rather than many
1021     if isinstance(names, basestring):
1022       names = [names]
1023
1024     # If we don't already own the set-level lock acquired in an exclusive way
1025     # we'll get it and note we need to release it later.
1026     release_lock = False
1027     if not self.__lock._is_owned():
1028       release_lock = True
1029       self.__lock.acquire()
1030
1031     try:
1032       invalid_names = set(self.__names()).intersection(names)
1033       if invalid_names:
1034         # This must be an explicit raise, not an assert, because assert is
1035         # turned off when using optimization, and this can happen because of
1036         # concurrency even if the user doesn't want it.
1037         raise errors.LockError("duplicate add() (%s)" % invalid_names)
1038
1039       for lockname in names:
1040         lock = SharedLock()
1041
1042         if acquired:
1043           lock.acquire(shared=shared)
1044           # now the lock cannot be deleted, we have it!
1045           try:
1046             self._add_owned(name=lockname)
1047           except:
1048             # We shouldn't have problems adding the lock to the owners list,
1049             # but if we did we'll try to release this lock and re-raise
1050             # exception.  Of course something is going to be really wrong,
1051             # after this.  On the other hand the lock hasn't been added to the
1052             # __lockdict yet so no other threads should be pending on it. This
1053             # release is just a safety measure.
1054             lock.release()
1055             raise
1056
1057         self.__lockdict[lockname] = lock
1058
1059     finally:
1060       # Only release __lock if we were not holding it previously.
1061       if release_lock:
1062         self.__lock.release()
1063
1064     return True
1065
1066   def remove(self, names):
1067     """Remove elements from the lock set.
1068
1069     You can either not hold anything in the lockset or already hold a superset
1070     of the elements you want to delete, exclusively.
1071
1072     @type names: list of strings
1073     @param names: names of the resource to remove.
1074
1075     @return: a list of locks which we removed; the list is always
1076         equal to the names list if we were holding all the locks
1077         exclusively
1078
1079     """
1080     # Support passing in a single resource to remove rather than many
1081     if isinstance(names, basestring):
1082       names = [names]
1083
1084     # If we own any subset of this lock it must be a superset of what we want
1085     # to delete. The ownership must also be exclusive, but that will be checked
1086     # by the lock itself.
1087     assert not self._is_owned() or self._list_owned().issuperset(names), (
1088       "remove() on acquired lockset while not owning all elements")
1089
1090     removed = []
1091
1092     for lname in names:
1093       # Calling delete() acquires the lock exclusively if we don't already own
1094       # it, and causes all pending and subsequent lock acquires to fail. It's
1095       # fine to call it out of order because delete() also implies release(),
1096       # and the assertion above guarantees that if we either already hold
1097       # everything we want to delete, or we hold none.
1098       try:
1099         self.__lockdict[lname].delete()
1100         removed.append(lname)
1101       except (KeyError, errors.LockError):
1102         # This cannot happen if we were already holding it, verify:
1103         assert not self._is_owned(), "remove failed while holding lockset"
1104       else:
1105         # If no LockError was raised we are the ones who deleted the lock.
1106         # This means we can safely remove it from lockdict, as any further or
1107         # pending delete() or acquire() will fail (and nobody can have the lock
1108         # since before our call to delete()).
1109         #
1110         # This is done in an else clause because if the exception was thrown
1111         # it's the job of the one who actually deleted it.
1112         del self.__lockdict[lname]
1113         # And let's remove it from our private list if we owned it.
1114         if self._is_owned():
1115           self._del_owned(name=lname)
1116
1117     return removed
1118
1119
1120 # Locking levels, must be acquired in increasing order.
1121 # Current rules are:
1122 #   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1123 #   acquired before performing any operation, either in shared or in exclusive
1124 #   mode. acquiring the BGL in exclusive mode is discouraged and should be
1125 #   avoided.
1126 #   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1127 #   If you need more than one node, or more than one instance, acquire them at
1128 #   the same time.
1129 LEVEL_CLUSTER = 0
1130 LEVEL_INSTANCE = 1
1131 LEVEL_NODE = 2
1132
1133 LEVELS = [LEVEL_CLUSTER,
1134           LEVEL_INSTANCE,
1135           LEVEL_NODE]
1136
1137 # Lock levels which are modifiable
1138 LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1139
1140 LEVEL_NAMES = {
1141   LEVEL_CLUSTER: "cluster",
1142   LEVEL_INSTANCE: "instance",
1143   LEVEL_NODE: "node",
1144   }
1145
1146 # Constant for the big ganeti lock
1147 BGL = 'BGL'
1148
1149
1150 class GanetiLockManager:
1151   """The Ganeti Locking Library
1152
1153   The purpose of this small library is to manage locking for ganeti clusters
1154   in a central place, while at the same time doing dynamic checks against
1155   possible deadlocks. It will also make it easier to transition to a different
1156   lock type should we migrate away from python threads.
1157
1158   """
1159   _instance = None
1160
1161   def __init__(self, nodes=None, instances=None):
1162     """Constructs a new GanetiLockManager object.
1163
1164     There should be only a GanetiLockManager object at any time, so this
1165     function raises an error if this is not the case.
1166
1167     @param nodes: list of node names
1168     @param instances: list of instance names
1169
1170     """
1171     assert self.__class__._instance is None, \
1172            "double GanetiLockManager instance"
1173
1174     self.__class__._instance = self
1175
1176     # The keyring contains all the locks, at their level and in the correct
1177     # locking order.
1178     self.__keyring = {
1179       LEVEL_CLUSTER: LockSet([BGL]),
1180       LEVEL_NODE: LockSet(nodes),
1181       LEVEL_INSTANCE: LockSet(instances),
1182     }
1183
1184   def _names(self, level):
1185     """List the lock names at the given level.
1186
1187     This can be used for debugging/testing purposes.
1188
1189     @param level: the level whose list of locks to get
1190
1191     """
1192     assert level in LEVELS, "Invalid locking level %s" % level
1193     return self.__keyring[level]._names()
1194
1195   def _is_owned(self, level):
1196     """Check whether we are owning locks at the given level
1197
1198     """
1199     return self.__keyring[level]._is_owned()
1200
1201   is_owned = _is_owned
1202
1203   def _list_owned(self, level):
1204     """Get the set of owned locks at the given level
1205
1206     """
1207     return self.__keyring[level]._list_owned()
1208
1209   def _upper_owned(self, level):
1210     """Check that we don't own any lock at a level greater than the given one.
1211
1212     """
1213     # This way of checking only works if LEVELS[i] = i, which we check for in
1214     # the test cases.
1215     return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1216
1217   def _BGL_owned(self): # pylint: disable-msg=C0103
1218     """Check if the current thread owns the BGL.
1219
1220     Both an exclusive or a shared acquisition work.
1221
1222     """
1223     return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1224
1225   @staticmethod
1226   def _contains_BGL(level, names): # pylint: disable-msg=C0103
1227     """Check if the level contains the BGL.
1228
1229     Check if acting on the given level and set of names will change
1230     the status of the Big Ganeti Lock.
1231
1232     """
1233     return level == LEVEL_CLUSTER and (names is None or BGL in names)
1234
1235   def acquire(self, level, names, timeout=None, shared=0):
1236     """Acquire a set of resource locks, at the same level.
1237
1238     @type level: member of locking.LEVELS
1239     @param level: the level at which the locks shall be acquired
1240     @type names: list of strings (or string)
1241     @param names: the names of the locks which shall be acquired
1242         (special lock names, or instance/node names)
1243     @type shared: integer (0/1) used as a boolean
1244     @param shared: whether to acquire in shared mode; by default
1245         an exclusive lock will be acquired
1246     @type timeout: float
1247     @param timeout: Maximum time to acquire all locks
1248
1249     """
1250     assert level in LEVELS, "Invalid locking level %s" % level
1251
1252     # Check that we are either acquiring the Big Ganeti Lock or we already own
1253     # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1254     # so even if we've migrated we need to at least share the BGL to be
1255     # compatible with them. Of course if we own the BGL exclusively there's no
1256     # point in acquiring any other lock, unless perhaps we are half way through
1257     # the migration of the current opcode.
1258     assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1259             "You must own the Big Ganeti Lock before acquiring any other")
1260
1261     # Check we don't own locks at the same or upper levels.
1262     assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1263            " while owning some at a greater one")
1264
1265     # Acquire the locks in the set.
1266     return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1267
1268   def release(self, level, names=None):
1269     """Release a set of resource locks, at the same level.
1270
1271     You must have acquired the locks, either in shared or in exclusive
1272     mode, before releasing them.
1273
1274     @type level: member of locking.LEVELS
1275     @param level: the level at which the locks shall be released
1276     @type names: list of strings, or None
1277     @param names: the names of the locks which shall be released
1278         (defaults to all the locks acquired at that level)
1279
1280     """
1281     assert level in LEVELS, "Invalid locking level %s" % level
1282     assert (not self._contains_BGL(level, names) or
1283             not self._upper_owned(LEVEL_CLUSTER)), (
1284             "Cannot release the Big Ganeti Lock while holding something"
1285             " at upper levels (%r)" %
1286             (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1287                               for i in self.__keyring.keys()]), ))
1288
1289     # Release will complain if we don't own the locks already
1290     return self.__keyring[level].release(names)
1291
1292   def add(self, level, names, acquired=0, shared=0):
1293     """Add locks at the specified level.
1294
1295     @type level: member of locking.LEVELS_MOD
1296     @param level: the level at which the locks shall be added
1297     @type names: list of strings
1298     @param names: names of the locks to acquire
1299     @type acquired: integer (0/1) used as a boolean
1300     @param acquired: whether to acquire the newly added locks
1301     @type shared: integer (0/1) used as a boolean
1302     @param shared: whether the acquisition will be shared
1303
1304     """
1305     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1306     assert self._BGL_owned(), ("You must own the BGL before performing other"
1307            " operations")
1308     assert not self._upper_owned(level), ("Cannot add locks at a level"
1309            " while owning some at a greater one")
1310     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1311
1312   def remove(self, level, names):
1313     """Remove locks from the specified level.
1314
1315     You must either already own the locks you are trying to remove
1316     exclusively or not own any lock at an upper level.
1317
1318     @type level: member of locking.LEVELS_MOD
1319     @param level: the level at which the locks shall be removed
1320     @type names: list of strings
1321     @param names: the names of the locks which shall be removed
1322         (special lock names, or instance/node names)
1323
1324     """
1325     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1326     assert self._BGL_owned(), ("You must own the BGL before performing other"
1327            " operations")
1328     # Check we either own the level or don't own anything from here
1329     # up. LockSet.remove() will check the case in which we don't own
1330     # all the needed resources, or we have a shared ownership.
1331     assert self._is_owned(level) or not self._upper_owned(level), (
1332            "Cannot remove locks at a level while not owning it or"
1333            " owning some at a greater one")
1334     return self.__keyring[level].remove(names)