LUSetNodeParams: Don't break config on mc demotion.
[ganeti-local] / lib / locking.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Module implementing the Ganeti locking code."""
22
23 import os
24 import select
25 import threading
26 import time
27 import errno
28
29 from ganeti import errors
30 from ganeti import utils
31
32
33 def ssynchronized(lock, shared=0):
34   """Shared Synchronization decorator.
35
36   Calls the function holding the given lock, either in exclusive or shared
37   mode. It requires the passed lock to be a SharedLock (or support its
38   semantics).
39
40   """
41   def wrap(fn):
42     def sync_function(*args, **kwargs):
43       lock.acquire(shared=shared)
44       try:
45         return fn(*args, **kwargs)
46       finally:
47         lock.release()
48     return sync_function
49   return wrap
50
51
52 class _SingleNotifyPipeConditionWaiter(object):
53   """Helper class for SingleNotifyPipeCondition
54
55   """
56   __slots__ = [
57     "_fd",
58     "_poller",
59     ]
60
61   def __init__(self, poller, fd):
62     """Constructor for _SingleNotifyPipeConditionWaiter
63
64     @type poller: select.poll
65     @param poller: Poller object
66     @type fd: int
67     @param fd: File descriptor to wait for
68
69     """
70     object.__init__(self)
71     self._poller = poller
72     self._fd = fd
73
74   def __call__(self, timeout):
75     """Wait for something to happen on the pipe.
76
77     @type timeout: float or None
78     @param timeout: Timeout for waiting (can be None)
79
80     """
81     start_time = time.time()
82     remaining_time = timeout
83
84     while timeout is None or remaining_time > 0:
85       try:
86         result = self._poller.poll(remaining_time)
87       except EnvironmentError, err:
88         if err.errno != errno.EINTR:
89           raise
90         result = None
91
92       # Check whether we were notified
93       if result and result[0][0] == self._fd:
94         break
95
96       # Re-calculate timeout if necessary
97       if timeout is not None:
98         remaining_time = start_time + timeout - time.time()
99
100
101 class _BaseCondition(object):
102   """Base class containing common code for conditions.
103
104   Some of this code is taken from python's threading module.
105
106   """
107   __slots__ = [
108     "_lock",
109     "acquire",
110     "release",
111     ]
112
113   def __init__(self, lock):
114     """Constructor for _BaseCondition.
115
116     @type lock: L{threading.Lock}
117     @param lock: condition base lock
118
119     """
120     object.__init__(self)
121
122     # Recursive locks are not supported
123     assert not hasattr(lock, "_acquire_restore")
124     assert not hasattr(lock, "_release_save")
125
126     self._lock = lock
127
128     # Export the lock's acquire() and release() methods
129     self.acquire = lock.acquire
130     self.release = lock.release
131
132   def _is_owned(self):
133     """Check whether lock is owned by current thread.
134
135     """
136     if self._lock.acquire(0):
137       self._lock.release()
138       return False
139
140     return True
141
142   def _check_owned(self):
143     """Raise an exception if the current thread doesn't own the lock.
144
145     """
146     if not self._is_owned():
147       raise RuntimeError("cannot work with un-aquired lock")
148
149
150 class SingleNotifyPipeCondition(_BaseCondition):
151   """Condition which can only be notified once.
152
153   This condition class uses pipes and poll, internally, to be able to wait for
154   notification with a timeout, without resorting to polling. It is almost
155   compatible with Python's threading.Condition, with the following differences:
156     - notifyAll can only be called once, and no wait can happen after that
157     - notify is not supported, only notifyAll
158
159   """
160
161   __slots__ = _BaseCondition.__slots__ + [
162     "_poller",
163     "_read_fd",
164     "_write_fd",
165     "_nwaiters",
166     "_notified",
167     ]
168
169   _waiter_class = _SingleNotifyPipeConditionWaiter
170
171   def __init__(self, lock):
172     """Constructor for SingleNotifyPipeCondition
173
174     """
175     _BaseCondition.__init__(self, lock)
176     self._nwaiters = 0
177     self._notified = False
178     self._read_fd = None
179     self._write_fd = None
180     self._poller = None
181
182   def _check_unnotified(self):
183     if self._notified:
184       raise RuntimeError("cannot use already notified condition")
185
186   def _Cleanup(self):
187     """Cleanup open file descriptors, if any.
188
189     """
190     if self._read_fd is not None:
191       os.close(self._read_fd)
192       self._read_fd = None
193
194     if self._write_fd is not None:
195       os.close(self._write_fd)
196       self._write_fd = None
197     self._poller = None
198
199   def wait(self, timeout=None):
200     """Wait for a notification.
201
202     @type timeout: float or None
203     @param timeout: Waiting timeout (can be None)
204
205     """
206     self._check_owned()
207     self._check_unnotified()
208
209     self._nwaiters += 1
210     try:
211       if self._poller is None:
212         (self._read_fd, self._write_fd) = os.pipe()
213         self._poller = select.poll()
214         self._poller.register(self._read_fd, select.POLLHUP)
215
216       wait_fn = self._waiter_class(self._poller, self._read_fd)
217       self.release()
218       try:
219         # Wait for notification
220         wait_fn(timeout)
221       finally:
222         # Re-acquire lock
223         self.acquire()
224     finally:
225       self._nwaiters -= 1
226       if self._nwaiters == 0:
227         self._Cleanup()
228
229   def notifyAll(self):
230     """Close the writing side of the pipe to notify all waiters.
231
232     """
233     self._check_owned()
234     self._check_unnotified()
235     self._notified = True
236     if self._write_fd is not None:
237       os.close(self._write_fd)
238       self._write_fd = None
239
240
241 class PipeCondition(_BaseCondition):
242   """Group-only non-polling condition with counters.
243
244   This condition class uses pipes and poll, internally, to be able to wait for
245   notification with a timeout, without resorting to polling. It is almost
246   compatible with Python's threading.Condition, but only supports notifyAll and
247   non-recursive locks. As an additional features it's able to report whether
248   there are any waiting threads.
249
250   """
251   __slots__ = _BaseCondition.__slots__ + [
252     "_nwaiters",
253     "_single_condition",
254     ]
255
256   _single_condition_class = SingleNotifyPipeCondition
257
258   def __init__(self, lock):
259     """Initializes this class.
260
261     """
262     _BaseCondition.__init__(self, lock)
263     self._nwaiters = 0
264     self._single_condition = self._single_condition_class(self._lock)
265
266   def wait(self, timeout=None):
267     """Wait for a notification.
268
269     @type timeout: float or None
270     @param timeout: Waiting timeout (can be None)
271
272     """
273     self._check_owned()
274
275     # Keep local reference to the pipe. It could be replaced by another thread
276     # notifying while we're waiting.
277     my_condition = self._single_condition
278
279     assert self._nwaiters >= 0
280     self._nwaiters += 1
281     try:
282       my_condition.wait(timeout)
283     finally:
284       assert self._nwaiters > 0
285       self._nwaiters -= 1
286
287   def notifyAll(self):
288     """Notify all currently waiting threads.
289
290     """
291     self._check_owned()
292     self._single_condition.notifyAll()
293     self._single_condition = self._single_condition_class(self._lock)
294
295   def has_waiting(self):
296     """Returns whether there are active waiters.
297
298     """
299     self._check_owned()
300
301     return bool(self._nwaiters)
302
303
304 class _CountingCondition(object):
305   """Wrapper for Python's built-in threading.Condition class.
306
307   This wrapper keeps a count of active waiters. We can't access the internal
308   "__waiters" attribute of threading.Condition because it's not thread-safe.
309
310   """
311   __slots__ = [
312     "_cond",
313     "_nwaiters",
314     ]
315
316   def __init__(self, lock):
317     """Initializes this class.
318
319     """
320     object.__init__(self)
321     self._cond = threading.Condition(lock=lock)
322     self._nwaiters = 0
323
324   def notifyAll(self):
325     """Notifies the condition.
326
327     """
328     return self._cond.notifyAll()
329
330   def wait(self, timeout=None):
331     """Waits for the condition to be notified.
332
333     @type timeout: float or None
334     @param timeout: Waiting timeout (can be None)
335
336     """
337     assert self._nwaiters >= 0
338
339     self._nwaiters += 1
340     try:
341       return self._cond.wait(timeout=timeout)
342     finally:
343       self._nwaiters -= 1
344
345   def has_waiting(self):
346     """Returns whether there are active waiters.
347
348     """
349     return bool(self._nwaiters)
350
351
352 class SharedLock(object):
353   """Implements a shared lock.
354
355   Multiple threads can acquire the lock in a shared way, calling
356   acquire_shared().  In order to acquire the lock in an exclusive way threads
357   can call acquire_exclusive().
358
359   The lock prevents starvation but does not guarantee that threads will acquire
360   the shared lock in the order they queued for it, just that they will
361   eventually do so.
362
363   """
364   __slots__ = [
365     "__active_shr_c",
366     "__inactive_shr_c",
367     "__deleted",
368     "__exc",
369     "__lock",
370     "__pending",
371     "__shr",
372     ]
373
374   __condition_class = PipeCondition
375
376   def __init__(self):
377     """Construct a new SharedLock.
378
379     """
380     object.__init__(self)
381
382     # Internal lock
383     self.__lock = threading.Lock()
384
385     # Queue containing waiting acquires
386     self.__pending = []
387
388     # Active and inactive conditions for shared locks
389     self.__active_shr_c = self.__condition_class(self.__lock)
390     self.__inactive_shr_c = self.__condition_class(self.__lock)
391
392     # Current lock holders
393     self.__shr = set()
394     self.__exc = None
395
396     # is this lock in the deleted state?
397     self.__deleted = False
398
399   def __check_deleted(self):
400     """Raises an exception if the lock has been deleted.
401
402     """
403     if self.__deleted:
404       raise errors.LockError("Deleted lock")
405
406   def __is_sharer(self):
407     """Is the current thread sharing the lock at this time?
408
409     """
410     return threading.currentThread() in self.__shr
411
412   def __is_exclusive(self):
413     """Is the current thread holding the lock exclusively at this time?
414
415     """
416     return threading.currentThread() == self.__exc
417
418   def __is_owned(self, shared=-1):
419     """Is the current thread somehow owning the lock at this time?
420
421     This is a private version of the function, which presumes you're holding
422     the internal lock.
423
424     """
425     if shared < 0:
426       return self.__is_sharer() or self.__is_exclusive()
427     elif shared:
428       return self.__is_sharer()
429     else:
430       return self.__is_exclusive()
431
432   def _is_owned(self, shared=-1):
433     """Is the current thread somehow owning the lock at this time?
434
435     @param shared:
436         - < 0: check for any type of ownership (default)
437         - 0: check for exclusive ownership
438         - > 0: check for shared ownership
439
440     """
441     self.__lock.acquire()
442     try:
443       return self.__is_owned(shared=shared)
444     finally:
445       self.__lock.release()
446
447   def _count_pending(self):
448     """Returns the number of pending acquires.
449
450     @rtype: int
451
452     """
453     self.__lock.acquire()
454     try:
455       return len(self.__pending)
456     finally:
457       self.__lock.release()
458
459   def __do_acquire(self, shared):
460     """Actually acquire the lock.
461
462     """
463     if shared:
464       self.__shr.add(threading.currentThread())
465     else:
466       self.__exc = threading.currentThread()
467
468   def __can_acquire(self, shared):
469     """Determine whether lock can be acquired.
470
471     """
472     if shared:
473       return self.__exc is None
474     else:
475       return len(self.__shr) == 0 and self.__exc is None
476
477   def __is_on_top(self, cond):
478     """Checks whether the passed condition is on top of the queue.
479
480     The caller must make sure the queue isn't empty.
481
482     """
483     return self.__pending[0] == cond
484
485   def __acquire_unlocked(self, shared, timeout):
486     """Acquire a shared lock.
487
488     @param shared: whether to acquire in shared mode; by default an
489         exclusive lock will be acquired
490     @param timeout: maximum waiting time before giving up
491
492     """
493     self.__check_deleted()
494
495     # We cannot acquire the lock if we already have it
496     assert not self.__is_owned(), "double acquire() on a non-recursive lock"
497
498     # Check whether someone else holds the lock or there are pending acquires.
499     if not self.__pending and self.__can_acquire(shared):
500       # Apparently not, can acquire lock directly.
501       self.__do_acquire(shared)
502       return True
503
504     if shared:
505       wait_condition = self.__active_shr_c
506
507       # Check if we're not yet in the queue
508       if wait_condition not in self.__pending:
509         self.__pending.append(wait_condition)
510     else:
511       wait_condition = self.__condition_class(self.__lock)
512       # Always add to queue
513       self.__pending.append(wait_condition)
514
515     try:
516       # Wait until we become the topmost acquire in the queue or the timeout
517       # expires.
518       while not (self.__is_on_top(wait_condition) and
519                  self.__can_acquire(shared)):
520         # Wait for notification
521         wait_condition.wait(timeout)
522         self.__check_deleted()
523
524         # A lot of code assumes blocking acquires always succeed. Loop
525         # internally for that case.
526         if timeout is not None:
527           break
528
529       if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
530         self.__do_acquire(shared)
531         return True
532     finally:
533       # Remove condition from queue if there are no more waiters
534       if not wait_condition.has_waiting() and not self.__deleted:
535         self.__pending.remove(wait_condition)
536
537     return False
538
539   def acquire(self, shared=0, timeout=None):
540     """Acquire a shared lock.
541
542     @type shared: int
543     @param shared: whether to acquire in shared mode; by default an
544         exclusive lock will be acquired
545     @type timeout: float
546     @param timeout: maximum waiting time before giving up
547
548     """
549     self.__lock.acquire()
550     try:
551       return self.__acquire_unlocked(shared, timeout)
552     finally:
553       self.__lock.release()
554
555   def release(self):
556     """Release a Shared Lock.
557
558     You must have acquired the lock, either in shared or in exclusive mode,
559     before calling this function.
560
561     """
562     self.__lock.acquire()
563     try:
564       assert self.__is_exclusive() or self.__is_sharer(), \
565         "Cannot release non-owned lock"
566
567       # Autodetect release type
568       if self.__is_exclusive():
569         self.__exc = None
570       else:
571         self.__shr.remove(threading.currentThread())
572
573       # Notify topmost condition in queue
574       if self.__pending:
575         first_condition = self.__pending[0]
576         first_condition.notifyAll()
577
578         if first_condition == self.__active_shr_c:
579           self.__active_shr_c = self.__inactive_shr_c
580           self.__inactive_shr_c = first_condition
581
582     finally:
583       self.__lock.release()
584
585   def delete(self, timeout=None):
586     """Delete a Shared Lock.
587
588     This operation will declare the lock for removal. First the lock will be
589     acquired in exclusive mode if you don't already own it, then the lock
590     will be put in a state where any future and pending acquire() fail.
591
592     @type timeout: float
593     @param timeout: maximum waiting time before giving up
594
595     """
596     self.__lock.acquire()
597     try:
598       assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
599
600       self.__check_deleted()
601
602       # The caller is allowed to hold the lock exclusively already.
603       acquired = self.__is_exclusive()
604
605       if not acquired:
606         acquired = self.__acquire_unlocked(0, timeout)
607
608         assert self.__is_exclusive() and not self.__is_sharer(), \
609           "Lock wasn't acquired in exclusive mode"
610
611       if acquired:
612         self.__deleted = True
613         self.__exc = None
614
615         # Notify all acquires. They'll throw an error.
616         while self.__pending:
617           self.__pending.pop().notifyAll()
618
619       return acquired
620     finally:
621       self.__lock.release()
622
623
624 # Whenever we want to acquire a full LockSet we pass None as the value
625 # to acquire.  Hide this behind this nicely named constant.
626 ALL_SET = None
627
628
629 class LockSet:
630   """Implements a set of locks.
631
632   This abstraction implements a set of shared locks for the same resource type,
633   distinguished by name. The user can lock a subset of the resources and the
634   LockSet will take care of acquiring the locks always in the same order, thus
635   preventing deadlock.
636
637   All the locks needed in the same set must be acquired together, though.
638
639   """
640   def __init__(self, members=None):
641     """Constructs a new LockSet.
642
643     @param members: initial members of the set
644
645     """
646     # Used internally to guarantee coherency.
647     self.__lock = SharedLock()
648
649     # The lockdict indexes the relationship name -> lock
650     # The order-of-locking is implied by the alphabetical order of names
651     self.__lockdict = {}
652
653     if members is not None:
654       for name in members:
655         self.__lockdict[name] = SharedLock()
656
657     # The owner dict contains the set of locks each thread owns. For
658     # performance each thread can access its own key without a global lock on
659     # this structure. It is paramount though that *no* other type of access is
660     # done to this structure (eg. no looping over its keys). *_owner helper
661     # function are defined to guarantee access is correct, but in general never
662     # do anything different than __owners[threading.currentThread()], or there
663     # will be trouble.
664     self.__owners = {}
665
666   def _is_owned(self):
667     """Is the current thread a current level owner?"""
668     return threading.currentThread() in self.__owners
669
670   def _add_owned(self, name=None):
671     """Note the current thread owns the given lock"""
672     if name is None:
673       if not self._is_owned():
674         self.__owners[threading.currentThread()] = set()
675     else:
676       if self._is_owned():
677         self.__owners[threading.currentThread()].add(name)
678       else:
679         self.__owners[threading.currentThread()] = set([name])
680
681   def _del_owned(self, name=None):
682     """Note the current thread owns the given lock"""
683
684     if name is not None:
685       self.__owners[threading.currentThread()].remove(name)
686
687     # Only remove the key if we don't hold the set-lock as well
688     if (not self.__lock._is_owned() and
689         not self.__owners[threading.currentThread()]):
690       del self.__owners[threading.currentThread()]
691
692   def _list_owned(self):
693     """Get the set of resource names owned by the current thread"""
694     if self._is_owned():
695       return self.__owners[threading.currentThread()].copy()
696     else:
697       return set()
698
699   def __names(self):
700     """Return the current set of names.
701
702     Only call this function while holding __lock and don't iterate on the
703     result after releasing the lock.
704
705     """
706     return self.__lockdict.keys()
707
708   def _names(self):
709     """Return a copy of the current set of elements.
710
711     Used only for debugging purposes.
712
713     """
714     # If we don't already own the set-level lock acquired
715     # we'll get it and note we need to release it later.
716     release_lock = False
717     if not self.__lock._is_owned():
718       release_lock = True
719       self.__lock.acquire(shared=1)
720     try:
721       result = self.__names()
722     finally:
723       if release_lock:
724         self.__lock.release()
725     return set(result)
726
727   def acquire(self, names, timeout=None, shared=0):
728     """Acquire a set of resource locks.
729
730     @param names: the names of the locks which shall be acquired
731         (special lock names, or instance/node names)
732     @param shared: whether to acquire in shared mode; by default an
733         exclusive lock will be acquired
734     @type timeout: float
735     @param timeout: Maximum time to acquire all locks
736
737     @return: True when all the locks are successfully acquired
738
739     @raise errors.LockError: when any lock we try to acquire has
740         been deleted before we succeed. In this case none of the
741         locks requested will be acquired.
742
743     """
744     if timeout is not None:
745       raise NotImplementedError
746
747     # Check we don't already own locks at this level
748     assert not self._is_owned(), "Cannot acquire locks in the same set twice"
749
750     if names is None:
751       # If no names are given acquire the whole set by not letting new names
752       # being added before we release, and getting the current list of names.
753       # Some of them may then be deleted later, but we'll cope with this.
754       #
755       # We'd like to acquire this lock in a shared way, as it's nice if
756       # everybody else can use the instances at the same time. If are acquiring
757       # them exclusively though they won't be able to do this anyway, though,
758       # so we'll get the list lock exclusively as well in order to be able to
759       # do add() on the set while owning it.
760       self.__lock.acquire(shared=shared)
761       try:
762         # note we own the set-lock
763         self._add_owned()
764         names = self.__names()
765       except:
766         # We shouldn't have problems adding the lock to the owners list, but
767         # if we did we'll try to release this lock and re-raise exception.
768         # Of course something is going to be really wrong, after this.
769         self.__lock.release()
770         raise
771
772     try:
773       # Support passing in a single resource to acquire rather than many
774       if isinstance(names, basestring):
775         names = [names]
776       else:
777         names = sorted(names)
778
779       acquire_list = []
780       # First we look the locks up on __lockdict. We have no way of being sure
781       # they will still be there after, but this makes it a lot faster should
782       # just one of them be the already wrong
783       for lname in utils.UniqueSequence(names):
784         try:
785           lock = self.__lockdict[lname] # raises KeyError if lock is not there
786           acquire_list.append((lname, lock))
787         except (KeyError):
788           if self.__lock._is_owned():
789             # We are acquiring all the set, it doesn't matter if this
790             # particular element is not there anymore.
791             continue
792           else:
793             raise errors.LockError('non-existing lock in set (%s)' % lname)
794
795       # This will hold the locknames we effectively acquired.
796       acquired = set()
797       # Now acquire_list contains a sorted list of resources and locks we want.
798       # In order to get them we loop on this (private) list and acquire() them.
799       # We gave no real guarantee they will still exist till this is done but
800       # .acquire() itself is safe and will alert us if the lock gets deleted.
801       for (lname, lock) in acquire_list:
802         try:
803           lock.acquire(shared=shared) # raises LockError if the lock is deleted
804           # now the lock cannot be deleted, we have it!
805           self._add_owned(name=lname)
806           acquired.add(lname)
807         except (errors.LockError):
808           if self.__lock._is_owned():
809             # We are acquiring all the set, it doesn't matter if this
810             # particular element is not there anymore.
811             continue
812           else:
813             name_fail = lname
814             for lname in self._list_owned():
815               self.__lockdict[lname].release()
816               self._del_owned(name=lname)
817             raise errors.LockError('non-existing lock in set (%s)' % name_fail)
818         except:
819           # We shouldn't have problems adding the lock to the owners list, but
820           # if we did we'll try to release this lock and re-raise exception.
821           # Of course something is going to be really wrong, after this.
822           if lock._is_owned():
823             lock.release()
824           raise
825
826     except:
827       # If something went wrong and we had the set-lock let's release it...
828       if self.__lock._is_owned():
829         self.__lock.release()
830       raise
831
832     return acquired
833
834   def release(self, names=None):
835     """Release a set of resource locks, at the same level.
836
837     You must have acquired the locks, either in shared or in exclusive mode,
838     before releasing them.
839
840     @param names: the names of the locks which shall be released
841         (defaults to all the locks acquired at that level).
842
843     """
844     assert self._is_owned(), "release() on lock set while not owner"
845
846     # Support passing in a single resource to release rather than many
847     if isinstance(names, basestring):
848       names = [names]
849
850     if names is None:
851       names = self._list_owned()
852     else:
853       names = set(names)
854       assert self._list_owned().issuperset(names), (
855                "release() on unheld resources %s" %
856                names.difference(self._list_owned()))
857
858     # First of all let's release the "all elements" lock, if set.
859     # After this 'add' can work again
860     if self.__lock._is_owned():
861       self.__lock.release()
862       self._del_owned()
863
864     for lockname in names:
865       # If we are sure the lock doesn't leave __lockdict without being
866       # exclusively held we can do this...
867       self.__lockdict[lockname].release()
868       self._del_owned(name=lockname)
869
870   def add(self, names, acquired=0, shared=0):
871     """Add a new set of elements to the set
872
873     @param names: names of the new elements to add
874     @param acquired: pre-acquire the new resource?
875     @param shared: is the pre-acquisition shared?
876
877     """
878     # Check we don't already own locks at this level
879     assert not self._is_owned() or self.__lock._is_owned(shared=0), \
880       "Cannot add locks if the set is only partially owned, or shared"
881
882     # Support passing in a single resource to add rather than many
883     if isinstance(names, basestring):
884       names = [names]
885
886     # If we don't already own the set-level lock acquired in an exclusive way
887     # we'll get it and note we need to release it later.
888     release_lock = False
889     if not self.__lock._is_owned():
890       release_lock = True
891       self.__lock.acquire()
892
893     try:
894       invalid_names = set(self.__names()).intersection(names)
895       if invalid_names:
896         # This must be an explicit raise, not an assert, because assert is
897         # turned off when using optimization, and this can happen because of
898         # concurrency even if the user doesn't want it.
899         raise errors.LockError("duplicate add() (%s)" % invalid_names)
900
901       for lockname in names:
902         lock = SharedLock()
903
904         if acquired:
905           lock.acquire(shared=shared)
906           # now the lock cannot be deleted, we have it!
907           try:
908             self._add_owned(name=lockname)
909           except:
910             # We shouldn't have problems adding the lock to the owners list,
911             # but if we did we'll try to release this lock and re-raise
912             # exception.  Of course something is going to be really wrong,
913             # after this.  On the other hand the lock hasn't been added to the
914             # __lockdict yet so no other threads should be pending on it. This
915             # release is just a safety measure.
916             lock.release()
917             raise
918
919         self.__lockdict[lockname] = lock
920
921     finally:
922       # Only release __lock if we were not holding it previously.
923       if release_lock:
924         self.__lock.release()
925
926     return True
927
928   def remove(self, names):
929     """Remove elements from the lock set.
930
931     You can either not hold anything in the lockset or already hold a superset
932     of the elements you want to delete, exclusively.
933
934     @param names: names of the resource to remove.
935
936     @return:: a list of locks which we removed; the list is always
937         equal to the names list if we were holding all the locks
938         exclusively
939
940     """
941     # Support passing in a single resource to remove rather than many
942     if isinstance(names, basestring):
943       names = [names]
944
945     # If we own any subset of this lock it must be a superset of what we want
946     # to delete. The ownership must also be exclusive, but that will be checked
947     # by the lock itself.
948     assert not self._is_owned() or self._list_owned().issuperset(names), (
949       "remove() on acquired lockset while not owning all elements")
950
951     removed = []
952
953     for lname in names:
954       # Calling delete() acquires the lock exclusively if we don't already own
955       # it, and causes all pending and subsequent lock acquires to fail. It's
956       # fine to call it out of order because delete() also implies release(),
957       # and the assertion above guarantees that if we either already hold
958       # everything we want to delete, or we hold none.
959       try:
960         self.__lockdict[lname].delete()
961         removed.append(lname)
962       except (KeyError, errors.LockError):
963         # This cannot happen if we were already holding it, verify:
964         assert not self._is_owned(), "remove failed while holding lockset"
965       else:
966         # If no LockError was raised we are the ones who deleted the lock.
967         # This means we can safely remove it from lockdict, as any further or
968         # pending delete() or acquire() will fail (and nobody can have the lock
969         # since before our call to delete()).
970         #
971         # This is done in an else clause because if the exception was thrown
972         # it's the job of the one who actually deleted it.
973         del self.__lockdict[lname]
974         # And let's remove it from our private list if we owned it.
975         if self._is_owned():
976           self._del_owned(name=lname)
977
978     return removed
979
980
981 # Locking levels, must be acquired in increasing order.
982 # Current rules are:
983 #   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
984 #   acquired before performing any operation, either in shared or in exclusive
985 #   mode. acquiring the BGL in exclusive mode is discouraged and should be
986 #   avoided.
987 #   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
988 #   If you need more than one node, or more than one instance, acquire them at
989 #   the same time.
990 LEVEL_CLUSTER = 0
991 LEVEL_INSTANCE = 1
992 LEVEL_NODE = 2
993
994 LEVELS = [LEVEL_CLUSTER,
995           LEVEL_INSTANCE,
996           LEVEL_NODE]
997
998 # Lock levels which are modifiable
999 LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1000
1001 LEVEL_NAMES = {
1002   LEVEL_CLUSTER: "cluster",
1003   LEVEL_INSTANCE: "instance",
1004   LEVEL_NODE: "node",
1005   }
1006
1007 # Constant for the big ganeti lock
1008 BGL = 'BGL'
1009
1010
1011 class GanetiLockManager:
1012   """The Ganeti Locking Library
1013
1014   The purpose of this small library is to manage locking for ganeti clusters
1015   in a central place, while at the same time doing dynamic checks against
1016   possible deadlocks. It will also make it easier to transition to a different
1017   lock type should we migrate away from python threads.
1018
1019   """
1020   _instance = None
1021
1022   def __init__(self, nodes=None, instances=None):
1023     """Constructs a new GanetiLockManager object.
1024
1025     There should be only a GanetiLockManager object at any time, so this
1026     function raises an error if this is not the case.
1027
1028     @param nodes: list of node names
1029     @param instances: list of instance names
1030
1031     """
1032     assert self.__class__._instance is None, \
1033            "double GanetiLockManager instance"
1034
1035     self.__class__._instance = self
1036
1037     # The keyring contains all the locks, at their level and in the correct
1038     # locking order.
1039     self.__keyring = {
1040       LEVEL_CLUSTER: LockSet([BGL]),
1041       LEVEL_NODE: LockSet(nodes),
1042       LEVEL_INSTANCE: LockSet(instances),
1043     }
1044
1045   def _names(self, level):
1046     """List the lock names at the given level.
1047
1048     This can be used for debugging/testing purposes.
1049
1050     @param level: the level whose list of locks to get
1051
1052     """
1053     assert level in LEVELS, "Invalid locking level %s" % level
1054     return self.__keyring[level]._names()
1055
1056   def _is_owned(self, level):
1057     """Check whether we are owning locks at the given level
1058
1059     """
1060     return self.__keyring[level]._is_owned()
1061
1062   is_owned = _is_owned
1063
1064   def _list_owned(self, level):
1065     """Get the set of owned locks at the given level
1066
1067     """
1068     return self.__keyring[level]._list_owned()
1069
1070   def _upper_owned(self, level):
1071     """Check that we don't own any lock at a level greater than the given one.
1072
1073     """
1074     # This way of checking only works if LEVELS[i] = i, which we check for in
1075     # the test cases.
1076     return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1077
1078   def _BGL_owned(self):
1079     """Check if the current thread owns the BGL.
1080
1081     Both an exclusive or a shared acquisition work.
1082
1083     """
1084     return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1085
1086   def _contains_BGL(self, level, names):
1087     """Check if the level contains the BGL.
1088
1089     Check if acting on the given level and set of names will change
1090     the status of the Big Ganeti Lock.
1091
1092     """
1093     return level == LEVEL_CLUSTER and (names is None or BGL in names)
1094
1095   def acquire(self, level, names, timeout=None, shared=0):
1096     """Acquire a set of resource locks, at the same level.
1097
1098     @param level: the level at which the locks shall be acquired;
1099         it must be a member of LEVELS.
1100     @param names: the names of the locks which shall be acquired
1101         (special lock names, or instance/node names)
1102     @param shared: whether to acquire in shared mode; by default
1103         an exclusive lock will be acquired
1104     @type timeout: float
1105     @param timeout: Maximum time to acquire all locks
1106
1107     """
1108     assert level in LEVELS, "Invalid locking level %s" % level
1109
1110     # Check that we are either acquiring the Big Ganeti Lock or we already own
1111     # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1112     # so even if we've migrated we need to at least share the BGL to be
1113     # compatible with them. Of course if we own the BGL exclusively there's no
1114     # point in acquiring any other lock, unless perhaps we are half way through
1115     # the migration of the current opcode.
1116     assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1117             "You must own the Big Ganeti Lock before acquiring any other")
1118
1119     # Check we don't own locks at the same or upper levels.
1120     assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1121            " while owning some at a greater one")
1122
1123     # Acquire the locks in the set.
1124     return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1125
1126   def release(self, level, names=None):
1127     """Release a set of resource locks, at the same level.
1128
1129     You must have acquired the locks, either in shared or in exclusive
1130     mode, before releasing them.
1131
1132     @param level: the level at which the locks shall be released;
1133         it must be a member of LEVELS
1134     @param names: the names of the locks which shall be released
1135         (defaults to all the locks acquired at that level)
1136
1137     """
1138     assert level in LEVELS, "Invalid locking level %s" % level
1139     assert (not self._contains_BGL(level, names) or
1140             not self._upper_owned(LEVEL_CLUSTER)), (
1141             "Cannot release the Big Ganeti Lock while holding something"
1142             " at upper levels")
1143
1144     # Release will complain if we don't own the locks already
1145     return self.__keyring[level].release(names)
1146
1147   def add(self, level, names, acquired=0, shared=0):
1148     """Add locks at the specified level.
1149
1150     @param level: the level at which the locks shall be added;
1151         it must be a member of LEVELS_MOD.
1152     @param names: names of the locks to acquire
1153     @param acquired: whether to acquire the newly added locks
1154     @param shared: whether the acquisition will be shared
1155
1156     """
1157     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1158     assert self._BGL_owned(), ("You must own the BGL before performing other"
1159            " operations")
1160     assert not self._upper_owned(level), ("Cannot add locks at a level"
1161            " while owning some at a greater one")
1162     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1163
1164   def remove(self, level, names):
1165     """Remove locks from the specified level.
1166
1167     You must either already own the locks you are trying to remove
1168     exclusively or not own any lock at an upper level.
1169
1170     @param level: the level at which the locks shall be removed;
1171         it must be a member of LEVELS_MOD
1172     @param names: the names of the locks which shall be removed
1173         (special lock names, or instance/node names)
1174
1175     """
1176     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1177     assert self._BGL_owned(), ("You must own the BGL before performing other"
1178            " operations")
1179     # Check we either own the level or don't own anything from here
1180     # up. LockSet.remove() will check the case in which we don't own
1181     # all the needed resources, or we have a shared ownership.
1182     assert self._is_owned(level) or not self._upper_owned(level), (
1183            "Cannot remove locks at a level while not owning it or"
1184            " owning some at a greater one")
1185     return self.__keyring[level].remove(names)