Add a new cluster parameter maintain_node_health
[ganeti-local] / lib / locking.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Module implementing the Ganeti locking code."""
22
23 # pylint: disable-msg=W0212
24
25 # W0212 since e.g. LockSet methods use (a lot) the internals of
26 # SharedLock
27
28 import os
29 import select
30 import threading
31 import time
32 import errno
33
34 from ganeti import errors
35 from ganeti import utils
36
37
38 def ssynchronized(lock, shared=0):
39   """Shared Synchronization decorator.
40
41   Calls the function holding the given lock, either in exclusive or shared
42   mode. It requires the passed lock to be a SharedLock (or support its
43   semantics).
44
45   """
46   def wrap(fn):
47     def sync_function(*args, **kwargs):
48       lock.acquire(shared=shared)
49       try:
50         return fn(*args, **kwargs)
51       finally:
52         lock.release()
53     return sync_function
54   return wrap
55
56
57 class RunningTimeout(object):
58   """Class to calculate remaining timeout when doing several operations.
59
60   """
61   __slots__ = [
62     "_allow_negative",
63     "_start_time",
64     "_time_fn",
65     "_timeout",
66     ]
67
68   def __init__(self, timeout, allow_negative, _time_fn=time.time):
69     """Initializes this class.
70
71     @type timeout: float
72     @param timeout: Timeout duration
73     @type allow_negative: bool
74     @param allow_negative: Whether to return values below zero
75     @param _time_fn: Time function for unittests
76
77     """
78     object.__init__(self)
79
80     if timeout is not None and timeout < 0.0:
81       raise ValueError("Timeout must not be negative")
82
83     self._timeout = timeout
84     self._allow_negative = allow_negative
85     self._time_fn = _time_fn
86
87     self._start_time = None
88
89   def Remaining(self):
90     """Returns the remaining timeout.
91
92     """
93     if self._timeout is None:
94       return None
95
96     # Get start time on first calculation
97     if self._start_time is None:
98       self._start_time = self._time_fn()
99
100     # Calculate remaining time
101     remaining_timeout = self._start_time + self._timeout - self._time_fn()
102
103     if not self._allow_negative:
104       # Ensure timeout is always >= 0
105       return max(0.0, remaining_timeout)
106
107     return remaining_timeout
108
109
110 class _SingleNotifyPipeConditionWaiter(object):
111   """Helper class for SingleNotifyPipeCondition
112
113   """
114   __slots__ = [
115     "_fd",
116     "_poller",
117     ]
118
119   def __init__(self, poller, fd):
120     """Constructor for _SingleNotifyPipeConditionWaiter
121
122     @type poller: select.poll
123     @param poller: Poller object
124     @type fd: int
125     @param fd: File descriptor to wait for
126
127     """
128     object.__init__(self)
129     self._poller = poller
130     self._fd = fd
131
132   def __call__(self, timeout):
133     """Wait for something to happen on the pipe.
134
135     @type timeout: float or None
136     @param timeout: Timeout for waiting (can be None)
137
138     """
139     running_timeout = RunningTimeout(timeout, True)
140
141     while True:
142       remaining_time = running_timeout.Remaining()
143
144       if remaining_time is not None:
145         if remaining_time < 0.0:
146           break
147
148         # Our calculation uses seconds, poll() wants milliseconds
149         remaining_time *= 1000
150
151       try:
152         result = self._poller.poll(remaining_time)
153       except EnvironmentError, err:
154         if err.errno != errno.EINTR:
155           raise
156         result = None
157
158       # Check whether we were notified
159       if result and result[0][0] == self._fd:
160         break
161
162
163 class _BaseCondition(object):
164   """Base class containing common code for conditions.
165
166   Some of this code is taken from python's threading module.
167
168   """
169   __slots__ = [
170     "_lock",
171     "acquire",
172     "release",
173     ]
174
175   def __init__(self, lock):
176     """Constructor for _BaseCondition.
177
178     @type lock: threading.Lock
179     @param lock: condition base lock
180
181     """
182     object.__init__(self)
183
184     # Recursive locks are not supported
185     assert not hasattr(lock, "_acquire_restore")
186     assert not hasattr(lock, "_release_save")
187
188     self._lock = lock
189
190     # Export the lock's acquire() and release() methods
191     self.acquire = lock.acquire
192     self.release = lock.release
193
194   def _is_owned(self):
195     """Check whether lock is owned by current thread.
196
197     """
198     if self._lock.acquire(0):
199       self._lock.release()
200       return False
201
202     return True
203
204   def _check_owned(self):
205     """Raise an exception if the current thread doesn't own the lock.
206
207     """
208     if not self._is_owned():
209       raise RuntimeError("cannot work with un-aquired lock")
210
211
212 class SingleNotifyPipeCondition(_BaseCondition):
213   """Condition which can only be notified once.
214
215   This condition class uses pipes and poll, internally, to be able to wait for
216   notification with a timeout, without resorting to polling. It is almost
217   compatible with Python's threading.Condition, with the following differences:
218     - notifyAll can only be called once, and no wait can happen after that
219     - notify is not supported, only notifyAll
220
221   """
222
223   __slots__ = [
224     "_poller",
225     "_read_fd",
226     "_write_fd",
227     "_nwaiters",
228     "_notified",
229     ]
230
231   _waiter_class = _SingleNotifyPipeConditionWaiter
232
233   def __init__(self, lock):
234     """Constructor for SingleNotifyPipeCondition
235
236     """
237     _BaseCondition.__init__(self, lock)
238     self._nwaiters = 0
239     self._notified = False
240     self._read_fd = None
241     self._write_fd = None
242     self._poller = None
243
244   def _check_unnotified(self):
245     """Throws an exception if already notified.
246
247     """
248     if self._notified:
249       raise RuntimeError("cannot use already notified condition")
250
251   def _Cleanup(self):
252     """Cleanup open file descriptors, if any.
253
254     """
255     if self._read_fd is not None:
256       os.close(self._read_fd)
257       self._read_fd = None
258
259     if self._write_fd is not None:
260       os.close(self._write_fd)
261       self._write_fd = None
262     self._poller = None
263
264   def wait(self, timeout=None):
265     """Wait for a notification.
266
267     @type timeout: float or None
268     @param timeout: Waiting timeout (can be None)
269
270     """
271     self._check_owned()
272     self._check_unnotified()
273
274     self._nwaiters += 1
275     try:
276       if self._poller is None:
277         (self._read_fd, self._write_fd) = os.pipe()
278         self._poller = select.poll()
279         self._poller.register(self._read_fd, select.POLLHUP)
280
281       wait_fn = self._waiter_class(self._poller, self._read_fd)
282       self.release()
283       try:
284         # Wait for notification
285         wait_fn(timeout)
286       finally:
287         # Re-acquire lock
288         self.acquire()
289     finally:
290       self._nwaiters -= 1
291       if self._nwaiters == 0:
292         self._Cleanup()
293
294   def notifyAll(self): # pylint: disable-msg=C0103
295     """Close the writing side of the pipe to notify all waiters.
296
297     """
298     self._check_owned()
299     self._check_unnotified()
300     self._notified = True
301     if self._write_fd is not None:
302       os.close(self._write_fd)
303       self._write_fd = None
304
305
306 class PipeCondition(_BaseCondition):
307   """Group-only non-polling condition with counters.
308
309   This condition class uses pipes and poll, internally, to be able to wait for
310   notification with a timeout, without resorting to polling. It is almost
311   compatible with Python's threading.Condition, but only supports notifyAll and
312   non-recursive locks. As an additional features it's able to report whether
313   there are any waiting threads.
314
315   """
316   __slots__ = [
317     "_nwaiters",
318     "_single_condition",
319     ]
320
321   _single_condition_class = SingleNotifyPipeCondition
322
323   def __init__(self, lock):
324     """Initializes this class.
325
326     """
327     _BaseCondition.__init__(self, lock)
328     self._nwaiters = 0
329     self._single_condition = self._single_condition_class(self._lock)
330
331   def wait(self, timeout=None):
332     """Wait for a notification.
333
334     @type timeout: float or None
335     @param timeout: Waiting timeout (can be None)
336
337     """
338     self._check_owned()
339
340     # Keep local reference to the pipe. It could be replaced by another thread
341     # notifying while we're waiting.
342     my_condition = self._single_condition
343
344     assert self._nwaiters >= 0
345     self._nwaiters += 1
346     try:
347       my_condition.wait(timeout)
348     finally:
349       assert self._nwaiters > 0
350       self._nwaiters -= 1
351
352   def notifyAll(self): # pylint: disable-msg=C0103
353     """Notify all currently waiting threads.
354
355     """
356     self._check_owned()
357     self._single_condition.notifyAll()
358     self._single_condition = self._single_condition_class(self._lock)
359
360   def has_waiting(self):
361     """Returns whether there are active waiters.
362
363     """
364     self._check_owned()
365
366     return bool(self._nwaiters)
367
368
369 class _CountingCondition(object):
370   """Wrapper for Python's built-in threading.Condition class.
371
372   This wrapper keeps a count of active waiters. We can't access the internal
373   "__waiters" attribute of threading.Condition because it's not thread-safe.
374
375   """
376   __slots__ = [
377     "_cond",
378     "_nwaiters",
379     ]
380
381   def __init__(self, lock):
382     """Initializes this class.
383
384     """
385     object.__init__(self)
386     self._cond = threading.Condition(lock=lock)
387     self._nwaiters = 0
388
389   def notifyAll(self): # pylint: disable-msg=C0103
390     """Notifies the condition.
391
392     """
393     return self._cond.notifyAll()
394
395   def wait(self, timeout=None):
396     """Waits for the condition to be notified.
397
398     @type timeout: float or None
399     @param timeout: Waiting timeout (can be None)
400
401     """
402     assert self._nwaiters >= 0
403
404     self._nwaiters += 1
405     try:
406       return self._cond.wait(timeout=timeout)
407     finally:
408       self._nwaiters -= 1
409
410   def has_waiting(self):
411     """Returns whether there are active waiters.
412
413     """
414     return bool(self._nwaiters)
415
416
417 class SharedLock(object):
418   """Implements a shared lock.
419
420   Multiple threads can acquire the lock in a shared way, calling
421   acquire_shared().  In order to acquire the lock in an exclusive way threads
422   can call acquire_exclusive().
423
424   The lock prevents starvation but does not guarantee that threads will acquire
425   the shared lock in the order they queued for it, just that they will
426   eventually do so.
427
428   """
429   __slots__ = [
430     "__active_shr_c",
431     "__inactive_shr_c",
432     "__deleted",
433     "__exc",
434     "__lock",
435     "__pending",
436     "__shr",
437     ]
438
439   __condition_class = PipeCondition
440
441   def __init__(self):
442     """Construct a new SharedLock.
443
444     """
445     object.__init__(self)
446
447     # Internal lock
448     self.__lock = threading.Lock()
449
450     # Queue containing waiting acquires
451     self.__pending = []
452
453     # Active and inactive conditions for shared locks
454     self.__active_shr_c = self.__condition_class(self.__lock)
455     self.__inactive_shr_c = self.__condition_class(self.__lock)
456
457     # Current lock holders
458     self.__shr = set()
459     self.__exc = None
460
461     # is this lock in the deleted state?
462     self.__deleted = False
463
464   def __check_deleted(self):
465     """Raises an exception if the lock has been deleted.
466
467     """
468     if self.__deleted:
469       raise errors.LockError("Deleted lock")
470
471   def __is_sharer(self):
472     """Is the current thread sharing the lock at this time?
473
474     """
475     return threading.currentThread() in self.__shr
476
477   def __is_exclusive(self):
478     """Is the current thread holding the lock exclusively at this time?
479
480     """
481     return threading.currentThread() == self.__exc
482
483   def __is_owned(self, shared=-1):
484     """Is the current thread somehow owning the lock at this time?
485
486     This is a private version of the function, which presumes you're holding
487     the internal lock.
488
489     """
490     if shared < 0:
491       return self.__is_sharer() or self.__is_exclusive()
492     elif shared:
493       return self.__is_sharer()
494     else:
495       return self.__is_exclusive()
496
497   def _is_owned(self, shared=-1):
498     """Is the current thread somehow owning the lock at this time?
499
500     @param shared:
501         - < 0: check for any type of ownership (default)
502         - 0: check for exclusive ownership
503         - > 0: check for shared ownership
504
505     """
506     self.__lock.acquire()
507     try:
508       return self.__is_owned(shared=shared)
509     finally:
510       self.__lock.release()
511
512   def _count_pending(self):
513     """Returns the number of pending acquires.
514
515     @rtype: int
516
517     """
518     self.__lock.acquire()
519     try:
520       return len(self.__pending)
521     finally:
522       self.__lock.release()
523
524   def __do_acquire(self, shared):
525     """Actually acquire the lock.
526
527     """
528     if shared:
529       self.__shr.add(threading.currentThread())
530     else:
531       self.__exc = threading.currentThread()
532
533   def __can_acquire(self, shared):
534     """Determine whether lock can be acquired.
535
536     """
537     if shared:
538       return self.__exc is None
539     else:
540       return len(self.__shr) == 0 and self.__exc is None
541
542   def __is_on_top(self, cond):
543     """Checks whether the passed condition is on top of the queue.
544
545     The caller must make sure the queue isn't empty.
546
547     """
548     return self.__pending[0] == cond
549
550   def __acquire_unlocked(self, shared, timeout):
551     """Acquire a shared lock.
552
553     @param shared: whether to acquire in shared mode; by default an
554         exclusive lock will be acquired
555     @param timeout: maximum waiting time before giving up
556
557     """
558     self.__check_deleted()
559
560     # We cannot acquire the lock if we already have it
561     assert not self.__is_owned(), "double acquire() on a non-recursive lock"
562
563     # Check whether someone else holds the lock or there are pending acquires.
564     if not self.__pending and self.__can_acquire(shared):
565       # Apparently not, can acquire lock directly.
566       self.__do_acquire(shared)
567       return True
568
569     if shared:
570       wait_condition = self.__active_shr_c
571
572       # Check if we're not yet in the queue
573       if wait_condition not in self.__pending:
574         self.__pending.append(wait_condition)
575     else:
576       wait_condition = self.__condition_class(self.__lock)
577       # Always add to queue
578       self.__pending.append(wait_condition)
579
580     try:
581       # Wait until we become the topmost acquire in the queue or the timeout
582       # expires.
583       while not (self.__is_on_top(wait_condition) and
584                  self.__can_acquire(shared)):
585         # Wait for notification
586         wait_condition.wait(timeout)
587         self.__check_deleted()
588
589         # A lot of code assumes blocking acquires always succeed. Loop
590         # internally for that case.
591         if timeout is not None:
592           break
593
594       if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
595         self.__do_acquire(shared)
596         return True
597     finally:
598       # Remove condition from queue if there are no more waiters
599       if not wait_condition.has_waiting() and not self.__deleted:
600         self.__pending.remove(wait_condition)
601
602     return False
603
604   def acquire(self, shared=0, timeout=None, test_notify=None):
605     """Acquire a shared lock.
606
607     @type shared: integer (0/1) used as a boolean
608     @param shared: whether to acquire in shared mode; by default an
609         exclusive lock will be acquired
610     @type timeout: float
611     @param timeout: maximum waiting time before giving up
612     @type test_notify: callable or None
613     @param test_notify: Special callback function for unittesting
614
615     """
616     self.__lock.acquire()
617     try:
618       # We already got the lock, notify now
619       if __debug__ and callable(test_notify):
620         test_notify()
621
622       return self.__acquire_unlocked(shared, timeout)
623     finally:
624       self.__lock.release()
625
626   def release(self):
627     """Release a Shared Lock.
628
629     You must have acquired the lock, either in shared or in exclusive mode,
630     before calling this function.
631
632     """
633     self.__lock.acquire()
634     try:
635       assert self.__is_exclusive() or self.__is_sharer(), \
636         "Cannot release non-owned lock"
637
638       # Autodetect release type
639       if self.__is_exclusive():
640         self.__exc = None
641       else:
642         self.__shr.remove(threading.currentThread())
643
644       # Notify topmost condition in queue
645       if self.__pending:
646         first_condition = self.__pending[0]
647         first_condition.notifyAll()
648
649         if first_condition == self.__active_shr_c:
650           self.__active_shr_c = self.__inactive_shr_c
651           self.__inactive_shr_c = first_condition
652
653     finally:
654       self.__lock.release()
655
656   def delete(self, timeout=None):
657     """Delete a Shared Lock.
658
659     This operation will declare the lock for removal. First the lock will be
660     acquired in exclusive mode if you don't already own it, then the lock
661     will be put in a state where any future and pending acquire() fail.
662
663     @type timeout: float
664     @param timeout: maximum waiting time before giving up
665
666     """
667     self.__lock.acquire()
668     try:
669       assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
670
671       self.__check_deleted()
672
673       # The caller is allowed to hold the lock exclusively already.
674       acquired = self.__is_exclusive()
675
676       if not acquired:
677         acquired = self.__acquire_unlocked(0, timeout)
678
679         assert self.__is_exclusive() and not self.__is_sharer(), \
680           "Lock wasn't acquired in exclusive mode"
681
682       if acquired:
683         self.__deleted = True
684         self.__exc = None
685
686         # Notify all acquires. They'll throw an error.
687         while self.__pending:
688           self.__pending.pop().notifyAll()
689
690       return acquired
691     finally:
692       self.__lock.release()
693
694
695 # Whenever we want to acquire a full LockSet we pass None as the value
696 # to acquire.  Hide this behind this nicely named constant.
697 ALL_SET = None
698
699
700 class _AcquireTimeout(Exception):
701   """Internal exception to abort an acquire on a timeout.
702
703   """
704
705
706 class LockSet:
707   """Implements a set of locks.
708
709   This abstraction implements a set of shared locks for the same resource type,
710   distinguished by name. The user can lock a subset of the resources and the
711   LockSet will take care of acquiring the locks always in the same order, thus
712   preventing deadlock.
713
714   All the locks needed in the same set must be acquired together, though.
715
716   """
717   def __init__(self, members=None):
718     """Constructs a new LockSet.
719
720     @type members: list of strings
721     @param members: initial members of the set
722
723     """
724     # Used internally to guarantee coherency.
725     self.__lock = SharedLock()
726
727     # The lockdict indexes the relationship name -> lock
728     # The order-of-locking is implied by the alphabetical order of names
729     self.__lockdict = {}
730
731     if members is not None:
732       for name in members:
733         self.__lockdict[name] = SharedLock()
734
735     # The owner dict contains the set of locks each thread owns. For
736     # performance each thread can access its own key without a global lock on
737     # this structure. It is paramount though that *no* other type of access is
738     # done to this structure (eg. no looping over its keys). *_owner helper
739     # function are defined to guarantee access is correct, but in general never
740     # do anything different than __owners[threading.currentThread()], or there
741     # will be trouble.
742     self.__owners = {}
743
744   def _is_owned(self):
745     """Is the current thread a current level owner?"""
746     return threading.currentThread() in self.__owners
747
748   def _add_owned(self, name=None):
749     """Note the current thread owns the given lock"""
750     if name is None:
751       if not self._is_owned():
752         self.__owners[threading.currentThread()] = set()
753     else:
754       if self._is_owned():
755         self.__owners[threading.currentThread()].add(name)
756       else:
757         self.__owners[threading.currentThread()] = set([name])
758
759   def _del_owned(self, name=None):
760     """Note the current thread owns the given lock"""
761
762     assert not (name is None and self.__lock._is_owned()), \
763            "Cannot hold internal lock when deleting owner status"
764
765     if name is not None:
766       self.__owners[threading.currentThread()].remove(name)
767
768     # Only remove the key if we don't hold the set-lock as well
769     if (not self.__lock._is_owned() and
770         not self.__owners[threading.currentThread()]):
771       del self.__owners[threading.currentThread()]
772
773   def _list_owned(self):
774     """Get the set of resource names owned by the current thread"""
775     if self._is_owned():
776       return self.__owners[threading.currentThread()].copy()
777     else:
778       return set()
779
780   def _release_and_delete_owned(self):
781     """Release and delete all resources owned by the current thread"""
782     for lname in self._list_owned():
783       lock = self.__lockdict[lname]
784       if lock._is_owned():
785         lock.release()
786       self._del_owned(name=lname)
787
788   def __names(self):
789     """Return the current set of names.
790
791     Only call this function while holding __lock and don't iterate on the
792     result after releasing the lock.
793
794     """
795     return self.__lockdict.keys()
796
797   def _names(self):
798     """Return a copy of the current set of elements.
799
800     Used only for debugging purposes.
801
802     """
803     # If we don't already own the set-level lock acquired
804     # we'll get it and note we need to release it later.
805     release_lock = False
806     if not self.__lock._is_owned():
807       release_lock = True
808       self.__lock.acquire(shared=1)
809     try:
810       result = self.__names()
811     finally:
812       if release_lock:
813         self.__lock.release()
814     return set(result)
815
816   def acquire(self, names, timeout=None, shared=0, test_notify=None):
817     """Acquire a set of resource locks.
818
819     @type names: list of strings (or string)
820     @param names: the names of the locks which shall be acquired
821         (special lock names, or instance/node names)
822     @type shared: integer (0/1) used as a boolean
823     @param shared: whether to acquire in shared mode; by default an
824         exclusive lock will be acquired
825     @type timeout: float or None
826     @param timeout: Maximum time to acquire all locks
827     @type test_notify: callable or None
828     @param test_notify: Special callback function for unittesting
829
830     @return: Set of all locks successfully acquired or None in case of timeout
831
832     @raise errors.LockError: when any lock we try to acquire has
833         been deleted before we succeed. In this case none of the
834         locks requested will be acquired.
835
836     """
837     assert timeout is None or timeout >= 0.0
838
839     # Check we don't already own locks at this level
840     assert not self._is_owned(), "Cannot acquire locks in the same set twice"
841
842     # We need to keep track of how long we spent waiting for a lock. The
843     # timeout passed to this function is over all lock acquires.
844     running_timeout = RunningTimeout(timeout, False)
845
846     try:
847       if names is not None:
848         # Support passing in a single resource to acquire rather than many
849         if isinstance(names, basestring):
850           names = [names]
851
852         return self.__acquire_inner(names, False, shared,
853                                     running_timeout.Remaining, test_notify)
854
855       else:
856         # If no names are given acquire the whole set by not letting new names
857         # being added before we release, and getting the current list of names.
858         # Some of them may then be deleted later, but we'll cope with this.
859         #
860         # We'd like to acquire this lock in a shared way, as it's nice if
861         # everybody else can use the instances at the same time. If are
862         # acquiring them exclusively though they won't be able to do this
863         # anyway, though, so we'll get the list lock exclusively as well in
864         # order to be able to do add() on the set while owning it.
865         if not self.__lock.acquire(shared=shared,
866                                    timeout=running_timeout.Remaining()):
867           raise _AcquireTimeout()
868         try:
869           # note we own the set-lock
870           self._add_owned()
871
872           return self.__acquire_inner(self.__names(), True, shared,
873                                       running_timeout.Remaining, test_notify)
874         except:
875           # We shouldn't have problems adding the lock to the owners list, but
876           # if we did we'll try to release this lock and re-raise exception.
877           # Of course something is going to be really wrong, after this.
878           self.__lock.release()
879           self._del_owned()
880           raise
881
882     except _AcquireTimeout:
883       return None
884
885   def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
886     """Inner logic for acquiring a number of locks.
887
888     @param names: Names of the locks to be acquired
889     @param want_all: Whether all locks in the set should be acquired
890     @param shared: Whether to acquire in shared mode
891     @param timeout_fn: Function returning remaining timeout
892     @param test_notify: Special callback function for unittesting
893
894     """
895     acquire_list = []
896
897     # First we look the locks up on __lockdict. We have no way of being sure
898     # they will still be there after, but this makes it a lot faster should
899     # just one of them be the already wrong. Using a sorted sequence to prevent
900     # deadlocks.
901     for lname in sorted(utils.UniqueSequence(names)):
902       try:
903         lock = self.__lockdict[lname] # raises KeyError if lock is not there
904       except KeyError:
905         if want_all:
906           # We are acquiring all the set, it doesn't matter if this particular
907           # element is not there anymore.
908           continue
909
910         raise errors.LockError("Non-existing lock in set (%s)" % lname)
911
912       acquire_list.append((lname, lock))
913
914     # This will hold the locknames we effectively acquired.
915     acquired = set()
916
917     try:
918       # Now acquire_list contains a sorted list of resources and locks we
919       # want.  In order to get them we loop on this (private) list and
920       # acquire() them.  We gave no real guarantee they will still exist till
921       # this is done but .acquire() itself is safe and will alert us if the
922       # lock gets deleted.
923       for (lname, lock) in acquire_list:
924         if __debug__ and callable(test_notify):
925           test_notify_fn = lambda: test_notify(lname)
926         else:
927           test_notify_fn = None
928
929         timeout = timeout_fn()
930
931         try:
932           # raises LockError if the lock was deleted
933           acq_success = lock.acquire(shared=shared, timeout=timeout,
934                                      test_notify=test_notify_fn)
935         except errors.LockError:
936           if want_all:
937             # We are acquiring all the set, it doesn't matter if this
938             # particular element is not there anymore.
939             continue
940
941           raise errors.LockError("Non-existing lock in set (%s)" % lname)
942
943         if not acq_success:
944           # Couldn't get lock or timeout occurred
945           if timeout is None:
946             # This shouldn't happen as SharedLock.acquire(timeout=None) is
947             # blocking.
948             raise errors.LockError("Failed to get lock %s" % lname)
949
950           raise _AcquireTimeout()
951
952         try:
953           # now the lock cannot be deleted, we have it!
954           self._add_owned(name=lname)
955           acquired.add(lname)
956
957         except:
958           # We shouldn't have problems adding the lock to the owners list, but
959           # if we did we'll try to release this lock and re-raise exception.
960           # Of course something is going to be really wrong after this.
961           if lock._is_owned():
962             lock.release()
963           raise
964
965     except:
966       # Release all owned locks
967       self._release_and_delete_owned()
968       raise
969
970     return acquired
971
972   def release(self, names=None):
973     """Release a set of resource locks, at the same level.
974
975     You must have acquired the locks, either in shared or in exclusive mode,
976     before releasing them.
977
978     @type names: list of strings, or None
979     @param names: the names of the locks which shall be released
980         (defaults to all the locks acquired at that level).
981
982     """
983     assert self._is_owned(), "release() on lock set while not owner"
984
985     # Support passing in a single resource to release rather than many
986     if isinstance(names, basestring):
987       names = [names]
988
989     if names is None:
990       names = self._list_owned()
991     else:
992       names = set(names)
993       assert self._list_owned().issuperset(names), (
994                "release() on unheld resources %s" %
995                names.difference(self._list_owned()))
996
997     # First of all let's release the "all elements" lock, if set.
998     # After this 'add' can work again
999     if self.__lock._is_owned():
1000       self.__lock.release()
1001       self._del_owned()
1002
1003     for lockname in names:
1004       # If we are sure the lock doesn't leave __lockdict without being
1005       # exclusively held we can do this...
1006       self.__lockdict[lockname].release()
1007       self._del_owned(name=lockname)
1008
1009   def add(self, names, acquired=0, shared=0):
1010     """Add a new set of elements to the set
1011
1012     @type names: list of strings
1013     @param names: names of the new elements to add
1014     @type acquired: integer (0/1) used as a boolean
1015     @param acquired: pre-acquire the new resource?
1016     @type shared: integer (0/1) used as a boolean
1017     @param shared: is the pre-acquisition shared?
1018
1019     """
1020     # Check we don't already own locks at this level
1021     assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1022       "Cannot add locks if the set is only partially owned, or shared"
1023
1024     # Support passing in a single resource to add rather than many
1025     if isinstance(names, basestring):
1026       names = [names]
1027
1028     # If we don't already own the set-level lock acquired in an exclusive way
1029     # we'll get it and note we need to release it later.
1030     release_lock = False
1031     if not self.__lock._is_owned():
1032       release_lock = True
1033       self.__lock.acquire()
1034
1035     try:
1036       invalid_names = set(self.__names()).intersection(names)
1037       if invalid_names:
1038         # This must be an explicit raise, not an assert, because assert is
1039         # turned off when using optimization, and this can happen because of
1040         # concurrency even if the user doesn't want it.
1041         raise errors.LockError("duplicate add() (%s)" % invalid_names)
1042
1043       for lockname in names:
1044         lock = SharedLock()
1045
1046         if acquired:
1047           lock.acquire(shared=shared)
1048           # now the lock cannot be deleted, we have it!
1049           try:
1050             self._add_owned(name=lockname)
1051           except:
1052             # We shouldn't have problems adding the lock to the owners list,
1053             # but if we did we'll try to release this lock and re-raise
1054             # exception.  Of course something is going to be really wrong,
1055             # after this.  On the other hand the lock hasn't been added to the
1056             # __lockdict yet so no other threads should be pending on it. This
1057             # release is just a safety measure.
1058             lock.release()
1059             raise
1060
1061         self.__lockdict[lockname] = lock
1062
1063     finally:
1064       # Only release __lock if we were not holding it previously.
1065       if release_lock:
1066         self.__lock.release()
1067
1068     return True
1069
1070   def remove(self, names):
1071     """Remove elements from the lock set.
1072
1073     You can either not hold anything in the lockset or already hold a superset
1074     of the elements you want to delete, exclusively.
1075
1076     @type names: list of strings
1077     @param names: names of the resource to remove.
1078
1079     @return: a list of locks which we removed; the list is always
1080         equal to the names list if we were holding all the locks
1081         exclusively
1082
1083     """
1084     # Support passing in a single resource to remove rather than many
1085     if isinstance(names, basestring):
1086       names = [names]
1087
1088     # If we own any subset of this lock it must be a superset of what we want
1089     # to delete. The ownership must also be exclusive, but that will be checked
1090     # by the lock itself.
1091     assert not self._is_owned() or self._list_owned().issuperset(names), (
1092       "remove() on acquired lockset while not owning all elements")
1093
1094     removed = []
1095
1096     for lname in names:
1097       # Calling delete() acquires the lock exclusively if we don't already own
1098       # it, and causes all pending and subsequent lock acquires to fail. It's
1099       # fine to call it out of order because delete() also implies release(),
1100       # and the assertion above guarantees that if we either already hold
1101       # everything we want to delete, or we hold none.
1102       try:
1103         self.__lockdict[lname].delete()
1104         removed.append(lname)
1105       except (KeyError, errors.LockError):
1106         # This cannot happen if we were already holding it, verify:
1107         assert not self._is_owned(), "remove failed while holding lockset"
1108       else:
1109         # If no LockError was raised we are the ones who deleted the lock.
1110         # This means we can safely remove it from lockdict, as any further or
1111         # pending delete() or acquire() will fail (and nobody can have the lock
1112         # since before our call to delete()).
1113         #
1114         # This is done in an else clause because if the exception was thrown
1115         # it's the job of the one who actually deleted it.
1116         del self.__lockdict[lname]
1117         # And let's remove it from our private list if we owned it.
1118         if self._is_owned():
1119           self._del_owned(name=lname)
1120
1121     return removed
1122
1123
1124 # Locking levels, must be acquired in increasing order.
1125 # Current rules are:
1126 #   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1127 #   acquired before performing any operation, either in shared or in exclusive
1128 #   mode. acquiring the BGL in exclusive mode is discouraged and should be
1129 #   avoided.
1130 #   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1131 #   If you need more than one node, or more than one instance, acquire them at
1132 #   the same time.
1133 LEVEL_CLUSTER = 0
1134 LEVEL_INSTANCE = 1
1135 LEVEL_NODE = 2
1136
1137 LEVELS = [LEVEL_CLUSTER,
1138           LEVEL_INSTANCE,
1139           LEVEL_NODE]
1140
1141 # Lock levels which are modifiable
1142 LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1143
1144 LEVEL_NAMES = {
1145   LEVEL_CLUSTER: "cluster",
1146   LEVEL_INSTANCE: "instance",
1147   LEVEL_NODE: "node",
1148   }
1149
1150 # Constant for the big ganeti lock
1151 BGL = 'BGL'
1152
1153
1154 class GanetiLockManager:
1155   """The Ganeti Locking Library
1156
1157   The purpose of this small library is to manage locking for ganeti clusters
1158   in a central place, while at the same time doing dynamic checks against
1159   possible deadlocks. It will also make it easier to transition to a different
1160   lock type should we migrate away from python threads.
1161
1162   """
1163   _instance = None
1164
1165   def __init__(self, nodes=None, instances=None):
1166     """Constructs a new GanetiLockManager object.
1167
1168     There should be only a GanetiLockManager object at any time, so this
1169     function raises an error if this is not the case.
1170
1171     @param nodes: list of node names
1172     @param instances: list of instance names
1173
1174     """
1175     assert self.__class__._instance is None, \
1176            "double GanetiLockManager instance"
1177
1178     self.__class__._instance = self
1179
1180     # The keyring contains all the locks, at their level and in the correct
1181     # locking order.
1182     self.__keyring = {
1183       LEVEL_CLUSTER: LockSet([BGL]),
1184       LEVEL_NODE: LockSet(nodes),
1185       LEVEL_INSTANCE: LockSet(instances),
1186     }
1187
1188   def _names(self, level):
1189     """List the lock names at the given level.
1190
1191     This can be used for debugging/testing purposes.
1192
1193     @param level: the level whose list of locks to get
1194
1195     """
1196     assert level in LEVELS, "Invalid locking level %s" % level
1197     return self.__keyring[level]._names()
1198
1199   def _is_owned(self, level):
1200     """Check whether we are owning locks at the given level
1201
1202     """
1203     return self.__keyring[level]._is_owned()
1204
1205   is_owned = _is_owned
1206
1207   def _list_owned(self, level):
1208     """Get the set of owned locks at the given level
1209
1210     """
1211     return self.__keyring[level]._list_owned()
1212
1213   def _upper_owned(self, level):
1214     """Check that we don't own any lock at a level greater than the given one.
1215
1216     """
1217     # This way of checking only works if LEVELS[i] = i, which we check for in
1218     # the test cases.
1219     return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1220
1221   def _BGL_owned(self): # pylint: disable-msg=C0103
1222     """Check if the current thread owns the BGL.
1223
1224     Both an exclusive or a shared acquisition work.
1225
1226     """
1227     return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1228
1229   @staticmethod
1230   def _contains_BGL(level, names): # pylint: disable-msg=C0103
1231     """Check if the level contains the BGL.
1232
1233     Check if acting on the given level and set of names will change
1234     the status of the Big Ganeti Lock.
1235
1236     """
1237     return level == LEVEL_CLUSTER and (names is None or BGL in names)
1238
1239   def acquire(self, level, names, timeout=None, shared=0):
1240     """Acquire a set of resource locks, at the same level.
1241
1242     @type level: member of locking.LEVELS
1243     @param level: the level at which the locks shall be acquired
1244     @type names: list of strings (or string)
1245     @param names: the names of the locks which shall be acquired
1246         (special lock names, or instance/node names)
1247     @type shared: integer (0/1) used as a boolean
1248     @param shared: whether to acquire in shared mode; by default
1249         an exclusive lock will be acquired
1250     @type timeout: float
1251     @param timeout: Maximum time to acquire all locks
1252
1253     """
1254     assert level in LEVELS, "Invalid locking level %s" % level
1255
1256     # Check that we are either acquiring the Big Ganeti Lock or we already own
1257     # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1258     # so even if we've migrated we need to at least share the BGL to be
1259     # compatible with them. Of course if we own the BGL exclusively there's no
1260     # point in acquiring any other lock, unless perhaps we are half way through
1261     # the migration of the current opcode.
1262     assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1263             "You must own the Big Ganeti Lock before acquiring any other")
1264
1265     # Check we don't own locks at the same or upper levels.
1266     assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1267            " while owning some at a greater one")
1268
1269     # Acquire the locks in the set.
1270     return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1271
1272   def release(self, level, names=None):
1273     """Release a set of resource locks, at the same level.
1274
1275     You must have acquired the locks, either in shared or in exclusive
1276     mode, before releasing them.
1277
1278     @type level: member of locking.LEVELS
1279     @param level: the level at which the locks shall be released
1280     @type names: list of strings, or None
1281     @param names: the names of the locks which shall be released
1282         (defaults to all the locks acquired at that level)
1283
1284     """
1285     assert level in LEVELS, "Invalid locking level %s" % level
1286     assert (not self._contains_BGL(level, names) or
1287             not self._upper_owned(LEVEL_CLUSTER)), (
1288             "Cannot release the Big Ganeti Lock while holding something"
1289             " at upper levels (%r)" %
1290             (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1291                               for i in self.__keyring.keys()]), ))
1292
1293     # Release will complain if we don't own the locks already
1294     return self.__keyring[level].release(names)
1295
1296   def add(self, level, names, acquired=0, shared=0):
1297     """Add locks at the specified level.
1298
1299     @type level: member of locking.LEVELS_MOD
1300     @param level: the level at which the locks shall be added
1301     @type names: list of strings
1302     @param names: names of the locks to acquire
1303     @type acquired: integer (0/1) used as a boolean
1304     @param acquired: whether to acquire the newly added locks
1305     @type shared: integer (0/1) used as a boolean
1306     @param shared: whether the acquisition will be shared
1307
1308     """
1309     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1310     assert self._BGL_owned(), ("You must own the BGL before performing other"
1311            " operations")
1312     assert not self._upper_owned(level), ("Cannot add locks at a level"
1313            " while owning some at a greater one")
1314     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1315
1316   def remove(self, level, names):
1317     """Remove locks from the specified level.
1318
1319     You must either already own the locks you are trying to remove
1320     exclusively or not own any lock at an upper level.
1321
1322     @type level: member of locking.LEVELS_MOD
1323     @param level: the level at which the locks shall be removed
1324     @type names: list of strings
1325     @param names: the names of the locks which shall be removed
1326         (special lock names, or instance/node names)
1327
1328     """
1329     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1330     assert self._BGL_owned(), ("You must own the BGL before performing other"
1331            " operations")
1332     # Check we either own the level or don't own anything from here
1333     # up. LockSet.remove() will check the case in which we don't own
1334     # all the needed resources, or we have a shared ownership.
1335     assert self._is_owned(level) or not self._upper_owned(level), (
1336            "Cannot remove locks at a level while not owning it or"
1337            " owning some at a greater one")
1338     return self.__keyring[level].remove(names)