Populate OS variants if an api >= 15 is present
[ganeti-local] / lib / locking.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Module implementing the Ganeti locking code."""
22
23 import os
24 import select
25 import threading
26 import time
27 import errno
28
29 from ganeti import errors
30 from ganeti import utils
31
32
33 def ssynchronized(lock, shared=0):
34   """Shared Synchronization decorator.
35
36   Calls the function holding the given lock, either in exclusive or shared
37   mode. It requires the passed lock to be a SharedLock (or support its
38   semantics).
39
40   """
41   def wrap(fn):
42     def sync_function(*args, **kwargs):
43       lock.acquire(shared=shared)
44       try:
45         return fn(*args, **kwargs)
46       finally:
47         lock.release()
48     return sync_function
49   return wrap
50
51
52 class _SingleNotifyPipeConditionWaiter(object):
53   """Helper class for SingleNotifyPipeCondition
54
55   """
56   __slots__ = [
57     "_fd",
58     "_poller",
59     ]
60
61   def __init__(self, poller, fd):
62     """Constructor for _SingleNotifyPipeConditionWaiter
63
64     @type poller: select.poll
65     @param poller: Poller object
66     @type fd: int
67     @param fd: File descriptor to wait for
68
69     """
70     object.__init__(self)
71     self._poller = poller
72     self._fd = fd
73
74   def __call__(self, timeout):
75     """Wait for something to happen on the pipe.
76
77     @type timeout: float or None
78     @param timeout: Timeout for waiting (can be None)
79
80     """
81     start_time = time.time()
82     remaining_time = timeout
83
84     while timeout is None or remaining_time > 0:
85       try:
86         result = self._poller.poll(remaining_time)
87       except EnvironmentError, err:
88         if err.errno != errno.EINTR:
89           raise
90         result = None
91
92       # Check whether we were notified
93       if result and result[0][0] == self._fd:
94         break
95
96       # Re-calculate timeout if necessary
97       if timeout is not None:
98         remaining_time = start_time + timeout - time.time()
99
100
101 class _BaseCondition(object):
102   """Base class containing common code for conditions.
103
104   Some of this code is taken from python's threading module.
105
106   """
107   __slots__ = [
108     "_lock",
109     "acquire",
110     "release",
111     ]
112
113   def __init__(self, lock):
114     """Constructor for _BaseCondition.
115
116     @type lock: L{threading.Lock}
117     @param lock: condition base lock
118
119     """
120     object.__init__(self)
121
122     # Recursive locks are not supported
123     assert not hasattr(lock, "_acquire_restore")
124     assert not hasattr(lock, "_release_save")
125
126     self._lock = lock
127
128     # Export the lock's acquire() and release() methods
129     self.acquire = lock.acquire
130     self.release = lock.release
131
132   def _is_owned(self):
133     """Check whether lock is owned by current thread.
134
135     """
136     if self._lock.acquire(0):
137       self._lock.release()
138       return False
139
140     return True
141
142   def _check_owned(self):
143     """Raise an exception if the current thread doesn't own the lock.
144
145     """
146     if not self._is_owned():
147       raise RuntimeError("cannot work with un-aquired lock")
148
149
150 class SingleNotifyPipeCondition(_BaseCondition):
151   """Condition which can only be notified once.
152
153   This condition class uses pipes and poll, internally, to be able to wait for
154   notification with a timeout, without resorting to polling. It is almost
155   compatible with Python's threading.Condition, with the following differences:
156     - notifyAll can only be called once, and no wait can happen after that
157     - notify is not supported, only notifyAll
158
159   """
160
161   __slots__ = _BaseCondition.__slots__ + [
162     "_poller",
163     "_read_fd",
164     "_write_fd",
165     "_nwaiters",
166     "_notified",
167     ]
168
169   _waiter_class = _SingleNotifyPipeConditionWaiter
170
171   def __init__(self, lock):
172     """Constructor for SingleNotifyPipeCondition
173
174     """
175     _BaseCondition.__init__(self, lock)
176     self._nwaiters = 0
177     self._notified = False
178     self._read_fd = None
179     self._write_fd = None
180     self._poller = None
181
182   def _check_unnotified(self):
183     if self._notified:
184       raise RuntimeError("cannot use already notified condition")
185
186   def _Cleanup(self):
187     """Cleanup open file descriptors, if any.
188
189     """
190     if self._read_fd is not None:
191       os.close(self._read_fd)
192       self._read_fd = None
193
194     if self._write_fd is not None:
195       os.close(self._write_fd)
196       self._write_fd = None
197     self._poller = None
198
199   def wait(self, timeout=None):
200     """Wait for a notification.
201
202     @type timeout: float or None
203     @param timeout: Waiting timeout (can be None)
204
205     """
206     self._check_owned()
207     self._check_unnotified()
208
209     self._nwaiters += 1
210     try:
211       if self._poller is None:
212         (self._read_fd, self._write_fd) = os.pipe()
213         self._poller = select.poll()
214         self._poller.register(self._read_fd, select.POLLHUP)
215
216       wait_fn = self._waiter_class(self._poller, self._read_fd)
217       self.release()
218       try:
219         # Wait for notification
220         wait_fn(timeout)
221       finally:
222         # Re-acquire lock
223         self.acquire()
224     finally:
225       self._nwaiters -= 1
226       if self._nwaiters == 0:
227         self._Cleanup()
228
229   def notifyAll(self):
230     """Close the writing side of the pipe to notify all waiters.
231
232     """
233     self._check_owned()
234     self._check_unnotified()
235     self._notified = True
236     if self._write_fd is not None:
237       os.close(self._write_fd)
238       self._write_fd = None
239
240
241 class PipeCondition(_BaseCondition):
242   """Group-only non-polling condition with counters.
243
244   This condition class uses pipes and poll, internally, to be able to wait for
245   notification with a timeout, without resorting to polling. It is almost
246   compatible with Python's threading.Condition, but only supports notifyAll and
247   non-recursive locks. As an additional features it's able to report whether
248   there are any waiting threads.
249
250   """
251   __slots__ = _BaseCondition.__slots__ + [
252     "_nwaiters",
253     "_single_condition",
254     ]
255
256   _single_condition_class = SingleNotifyPipeCondition
257
258   def __init__(self, lock):
259     """Initializes this class.
260
261     """
262     _BaseCondition.__init__(self, lock)
263     self._nwaiters = 0
264     self._single_condition = self._single_condition_class(self._lock)
265
266   def wait(self, timeout=None):
267     """Wait for a notification.
268
269     @type timeout: float or None
270     @param timeout: Waiting timeout (can be None)
271
272     """
273     self._check_owned()
274
275     # Keep local reference to the pipe. It could be replaced by another thread
276     # notifying while we're waiting.
277     my_condition = self._single_condition
278
279     assert self._nwaiters >= 0
280     self._nwaiters += 1
281     try:
282       my_condition.wait(timeout)
283     finally:
284       assert self._nwaiters > 0
285       self._nwaiters -= 1
286
287   def notifyAll(self):
288     """Notify all currently waiting threads.
289
290     """
291     self._check_owned()
292     self._single_condition.notifyAll()
293     self._single_condition = self._single_condition_class(self._lock)
294
295   def has_waiting(self):
296     """Returns whether there are active waiters.
297
298     """
299     self._check_owned()
300
301     return bool(self._nwaiters)
302
303
304 class _CountingCondition(object):
305   """Wrapper for Python's built-in threading.Condition class.
306
307   This wrapper keeps a count of active waiters. We can't access the internal
308   "__waiters" attribute of threading.Condition because it's not thread-safe.
309
310   """
311   __slots__ = [
312     "_cond",
313     "_nwaiters",
314     ]
315
316   def __init__(self, lock):
317     """Initializes this class.
318
319     """
320     object.__init__(self)
321     self._cond = threading.Condition(lock=lock)
322     self._nwaiters = 0
323
324   def notifyAll(self):
325     """Notifies the condition.
326
327     """
328     return self._cond.notifyAll()
329
330   def wait(self, timeout=None):
331     """Waits for the condition to be notified.
332
333     @type timeout: float or None
334     @param timeout: Waiting timeout (can be None)
335
336     """
337     assert self._nwaiters >= 0
338
339     self._nwaiters += 1
340     try:
341       return self._cond.wait(timeout=timeout)
342     finally:
343       self._nwaiters -= 1
344
345   def has_waiting(self):
346     """Returns whether there are active waiters.
347
348     """
349     return bool(self._nwaiters)
350
351
352 class SharedLock(object):
353   """Implements a shared lock.
354
355   Multiple threads can acquire the lock in a shared way, calling
356   acquire_shared().  In order to acquire the lock in an exclusive way threads
357   can call acquire_exclusive().
358
359   The lock prevents starvation but does not guarantee that threads will acquire
360   the shared lock in the order they queued for it, just that they will
361   eventually do so.
362
363   """
364   __slots__ = [
365     "__active_shr_c",
366     "__inactive_shr_c",
367     "__deleted",
368     "__exc",
369     "__lock",
370     "__pending",
371     "__shr",
372     ]
373
374   __condition_class = PipeCondition
375
376   def __init__(self):
377     """Construct a new SharedLock.
378
379     """
380     object.__init__(self)
381
382     # Internal lock
383     self.__lock = threading.Lock()
384
385     # Queue containing waiting acquires
386     self.__pending = []
387
388     # Active and inactive conditions for shared locks
389     self.__active_shr_c = self.__condition_class(self.__lock)
390     self.__inactive_shr_c = self.__condition_class(self.__lock)
391
392     # Current lock holders
393     self.__shr = set()
394     self.__exc = None
395
396     # is this lock in the deleted state?
397     self.__deleted = False
398
399   def __check_deleted(self):
400     """Raises an exception if the lock has been deleted.
401
402     """
403     if self.__deleted:
404       raise errors.LockError("Deleted lock")
405
406   def __is_sharer(self):
407     """Is the current thread sharing the lock at this time?
408
409     """
410     return threading.currentThread() in self.__shr
411
412   def __is_exclusive(self):
413     """Is the current thread holding the lock exclusively at this time?
414
415     """
416     return threading.currentThread() == self.__exc
417
418   def __is_owned(self, shared=-1):
419     """Is the current thread somehow owning the lock at this time?
420
421     This is a private version of the function, which presumes you're holding
422     the internal lock.
423
424     """
425     if shared < 0:
426       return self.__is_sharer() or self.__is_exclusive()
427     elif shared:
428       return self.__is_sharer()
429     else:
430       return self.__is_exclusive()
431
432   def _is_owned(self, shared=-1):
433     """Is the current thread somehow owning the lock at this time?
434
435     @param shared:
436         - < 0: check for any type of ownership (default)
437         - 0: check for exclusive ownership
438         - > 0: check for shared ownership
439
440     """
441     self.__lock.acquire()
442     try:
443       return self.__is_owned(shared=shared)
444     finally:
445       self.__lock.release()
446
447   def _count_pending(self):
448     """Returns the number of pending acquires.
449
450     @rtype: int
451
452     """
453     self.__lock.acquire()
454     try:
455       return len(self.__pending)
456     finally:
457       self.__lock.release()
458
459   def __do_acquire(self, shared):
460     """Actually acquire the lock.
461
462     """
463     if shared:
464       self.__shr.add(threading.currentThread())
465     else:
466       self.__exc = threading.currentThread()
467
468   def __can_acquire(self, shared):
469     """Determine whether lock can be acquired.
470
471     """
472     if shared:
473       return self.__exc is None
474     else:
475       return len(self.__shr) == 0 and self.__exc is None
476
477   def __is_on_top(self, cond):
478     """Checks whether the passed condition is on top of the queue.
479
480     The caller must make sure the queue isn't empty.
481
482     """
483     return self.__pending[0] == cond
484
485   def __acquire_unlocked(self, shared, timeout):
486     """Acquire a shared lock.
487
488     @param shared: whether to acquire in shared mode; by default an
489         exclusive lock will be acquired
490     @param timeout: maximum waiting time before giving up
491
492     """
493     self.__check_deleted()
494
495     # We cannot acquire the lock if we already have it
496     assert not self.__is_owned(), "double acquire() on a non-recursive lock"
497
498     # Check whether someone else holds the lock or there are pending acquires.
499     if not self.__pending and self.__can_acquire(shared):
500       # Apparently not, can acquire lock directly.
501       self.__do_acquire(shared)
502       return True
503
504     if shared:
505       wait_condition = self.__active_shr_c
506
507       # Check if we're not yet in the queue
508       if wait_condition not in self.__pending:
509         self.__pending.append(wait_condition)
510     else:
511       wait_condition = self.__condition_class(self.__lock)
512       # Always add to queue
513       self.__pending.append(wait_condition)
514
515     try:
516       # Wait until we become the topmost acquire in the queue or the timeout
517       # expires.
518       while not (self.__is_on_top(wait_condition) and
519                  self.__can_acquire(shared)):
520         # Wait for notification
521         wait_condition.wait(timeout)
522         self.__check_deleted()
523
524         # A lot of code assumes blocking acquires always succeed. Loop
525         # internally for that case.
526         if timeout is not None:
527           break
528
529       if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
530         self.__do_acquire(shared)
531         return True
532     finally:
533       # Remove condition from queue if there are no more waiters
534       if not wait_condition.has_waiting() and not self.__deleted:
535         self.__pending.remove(wait_condition)
536
537     return False
538
539   def acquire(self, shared=0, timeout=None, test_notify=None):
540     """Acquire a shared lock.
541
542     @type shared: int
543     @param shared: whether to acquire in shared mode; by default an
544         exclusive lock will be acquired
545     @type timeout: float
546     @param timeout: maximum waiting time before giving up
547     @type test_notify: callable or None
548     @param test_notify: Special callback function for unittesting
549
550     """
551     self.__lock.acquire()
552     try:
553       # We already got the lock, notify now
554       if __debug__ and callable(test_notify):
555         test_notify()
556
557       return self.__acquire_unlocked(shared, timeout)
558     finally:
559       self.__lock.release()
560
561   def release(self):
562     """Release a Shared Lock.
563
564     You must have acquired the lock, either in shared or in exclusive mode,
565     before calling this function.
566
567     """
568     self.__lock.acquire()
569     try:
570       assert self.__is_exclusive() or self.__is_sharer(), \
571         "Cannot release non-owned lock"
572
573       # Autodetect release type
574       if self.__is_exclusive():
575         self.__exc = None
576       else:
577         self.__shr.remove(threading.currentThread())
578
579       # Notify topmost condition in queue
580       if self.__pending:
581         first_condition = self.__pending[0]
582         first_condition.notifyAll()
583
584         if first_condition == self.__active_shr_c:
585           self.__active_shr_c = self.__inactive_shr_c
586           self.__inactive_shr_c = first_condition
587
588     finally:
589       self.__lock.release()
590
591   def delete(self, timeout=None):
592     """Delete a Shared Lock.
593
594     This operation will declare the lock for removal. First the lock will be
595     acquired in exclusive mode if you don't already own it, then the lock
596     will be put in a state where any future and pending acquire() fail.
597
598     @type timeout: float
599     @param timeout: maximum waiting time before giving up
600
601     """
602     self.__lock.acquire()
603     try:
604       assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
605
606       self.__check_deleted()
607
608       # The caller is allowed to hold the lock exclusively already.
609       acquired = self.__is_exclusive()
610
611       if not acquired:
612         acquired = self.__acquire_unlocked(0, timeout)
613
614         assert self.__is_exclusive() and not self.__is_sharer(), \
615           "Lock wasn't acquired in exclusive mode"
616
617       if acquired:
618         self.__deleted = True
619         self.__exc = None
620
621         # Notify all acquires. They'll throw an error.
622         while self.__pending:
623           self.__pending.pop().notifyAll()
624
625       return acquired
626     finally:
627       self.__lock.release()
628
629
630 # Whenever we want to acquire a full LockSet we pass None as the value
631 # to acquire.  Hide this behind this nicely named constant.
632 ALL_SET = None
633
634
635 class LockSet:
636   """Implements a set of locks.
637
638   This abstraction implements a set of shared locks for the same resource type,
639   distinguished by name. The user can lock a subset of the resources and the
640   LockSet will take care of acquiring the locks always in the same order, thus
641   preventing deadlock.
642
643   All the locks needed in the same set must be acquired together, though.
644
645   """
646   def __init__(self, members=None):
647     """Constructs a new LockSet.
648
649     @param members: initial members of the set
650
651     """
652     # Used internally to guarantee coherency.
653     self.__lock = SharedLock()
654
655     # The lockdict indexes the relationship name -> lock
656     # The order-of-locking is implied by the alphabetical order of names
657     self.__lockdict = {}
658
659     if members is not None:
660       for name in members:
661         self.__lockdict[name] = SharedLock()
662
663     # The owner dict contains the set of locks each thread owns. For
664     # performance each thread can access its own key without a global lock on
665     # this structure. It is paramount though that *no* other type of access is
666     # done to this structure (eg. no looping over its keys). *_owner helper
667     # function are defined to guarantee access is correct, but in general never
668     # do anything different than __owners[threading.currentThread()], or there
669     # will be trouble.
670     self.__owners = {}
671
672   def _is_owned(self):
673     """Is the current thread a current level owner?"""
674     return threading.currentThread() in self.__owners
675
676   def _add_owned(self, name=None):
677     """Note the current thread owns the given lock"""
678     if name is None:
679       if not self._is_owned():
680         self.__owners[threading.currentThread()] = set()
681     else:
682       if self._is_owned():
683         self.__owners[threading.currentThread()].add(name)
684       else:
685         self.__owners[threading.currentThread()] = set([name])
686
687   def _del_owned(self, name=None):
688     """Note the current thread owns the given lock"""
689
690     if name is not None:
691       self.__owners[threading.currentThread()].remove(name)
692
693     # Only remove the key if we don't hold the set-lock as well
694     if (not self.__lock._is_owned() and
695         not self.__owners[threading.currentThread()]):
696       del self.__owners[threading.currentThread()]
697
698   def _list_owned(self):
699     """Get the set of resource names owned by the current thread"""
700     if self._is_owned():
701       return self.__owners[threading.currentThread()].copy()
702     else:
703       return set()
704
705   def __names(self):
706     """Return the current set of names.
707
708     Only call this function while holding __lock and don't iterate on the
709     result after releasing the lock.
710
711     """
712     return self.__lockdict.keys()
713
714   def _names(self):
715     """Return a copy of the current set of elements.
716
717     Used only for debugging purposes.
718
719     """
720     # If we don't already own the set-level lock acquired
721     # we'll get it and note we need to release it later.
722     release_lock = False
723     if not self.__lock._is_owned():
724       release_lock = True
725       self.__lock.acquire(shared=1)
726     try:
727       result = self.__names()
728     finally:
729       if release_lock:
730         self.__lock.release()
731     return set(result)
732
733   def acquire(self, names, timeout=None, shared=0):
734     """Acquire a set of resource locks.
735
736     @param names: the names of the locks which shall be acquired
737         (special lock names, or instance/node names)
738     @param shared: whether to acquire in shared mode; by default an
739         exclusive lock will be acquired
740     @type timeout: float
741     @param timeout: Maximum time to acquire all locks
742
743     @return: True when all the locks are successfully acquired
744
745     @raise errors.LockError: when any lock we try to acquire has
746         been deleted before we succeed. In this case none of the
747         locks requested will be acquired.
748
749     """
750     if timeout is not None:
751       raise NotImplementedError
752
753     # Check we don't already own locks at this level
754     assert not self._is_owned(), "Cannot acquire locks in the same set twice"
755
756     if names is None:
757       # If no names are given acquire the whole set by not letting new names
758       # being added before we release, and getting the current list of names.
759       # Some of them may then be deleted later, but we'll cope with this.
760       #
761       # We'd like to acquire this lock in a shared way, as it's nice if
762       # everybody else can use the instances at the same time. If are acquiring
763       # them exclusively though they won't be able to do this anyway, though,
764       # so we'll get the list lock exclusively as well in order to be able to
765       # do add() on the set while owning it.
766       self.__lock.acquire(shared=shared)
767       try:
768         # note we own the set-lock
769         self._add_owned()
770         names = self.__names()
771       except:
772         # We shouldn't have problems adding the lock to the owners list, but
773         # if we did we'll try to release this lock and re-raise exception.
774         # Of course something is going to be really wrong, after this.
775         self.__lock.release()
776         raise
777
778     try:
779       # Support passing in a single resource to acquire rather than many
780       if isinstance(names, basestring):
781         names = [names]
782       else:
783         names = sorted(names)
784
785       acquire_list = []
786       # First we look the locks up on __lockdict. We have no way of being sure
787       # they will still be there after, but this makes it a lot faster should
788       # just one of them be the already wrong
789       for lname in utils.UniqueSequence(names):
790         try:
791           lock = self.__lockdict[lname] # raises KeyError if lock is not there
792           acquire_list.append((lname, lock))
793         except (KeyError):
794           if self.__lock._is_owned():
795             # We are acquiring all the set, it doesn't matter if this
796             # particular element is not there anymore.
797             continue
798           else:
799             raise errors.LockError('non-existing lock in set (%s)' % lname)
800
801       # This will hold the locknames we effectively acquired.
802       acquired = set()
803       # Now acquire_list contains a sorted list of resources and locks we want.
804       # In order to get them we loop on this (private) list and acquire() them.
805       # We gave no real guarantee they will still exist till this is done but
806       # .acquire() itself is safe and will alert us if the lock gets deleted.
807       for (lname, lock) in acquire_list:
808         try:
809           lock.acquire(shared=shared) # raises LockError if the lock is deleted
810           # now the lock cannot be deleted, we have it!
811           self._add_owned(name=lname)
812           acquired.add(lname)
813         except (errors.LockError):
814           if self.__lock._is_owned():
815             # We are acquiring all the set, it doesn't matter if this
816             # particular element is not there anymore.
817             continue
818           else:
819             name_fail = lname
820             for lname in self._list_owned():
821               self.__lockdict[lname].release()
822               self._del_owned(name=lname)
823             raise errors.LockError('non-existing lock in set (%s)' % name_fail)
824         except:
825           # We shouldn't have problems adding the lock to the owners list, but
826           # if we did we'll try to release this lock and re-raise exception.
827           # Of course something is going to be really wrong, after this.
828           if lock._is_owned():
829             lock.release()
830           raise
831
832     except:
833       # If something went wrong and we had the set-lock let's release it...
834       if self.__lock._is_owned():
835         self.__lock.release()
836       raise
837
838     return acquired
839
840   def release(self, names=None):
841     """Release a set of resource locks, at the same level.
842
843     You must have acquired the locks, either in shared or in exclusive mode,
844     before releasing them.
845
846     @param names: the names of the locks which shall be released
847         (defaults to all the locks acquired at that level).
848
849     """
850     assert self._is_owned(), "release() on lock set while not owner"
851
852     # Support passing in a single resource to release rather than many
853     if isinstance(names, basestring):
854       names = [names]
855
856     if names is None:
857       names = self._list_owned()
858     else:
859       names = set(names)
860       assert self._list_owned().issuperset(names), (
861                "release() on unheld resources %s" %
862                names.difference(self._list_owned()))
863
864     # First of all let's release the "all elements" lock, if set.
865     # After this 'add' can work again
866     if self.__lock._is_owned():
867       self.__lock.release()
868       self._del_owned()
869
870     for lockname in names:
871       # If we are sure the lock doesn't leave __lockdict without being
872       # exclusively held we can do this...
873       self.__lockdict[lockname].release()
874       self._del_owned(name=lockname)
875
876   def add(self, names, acquired=0, shared=0):
877     """Add a new set of elements to the set
878
879     @param names: names of the new elements to add
880     @param acquired: pre-acquire the new resource?
881     @param shared: is the pre-acquisition shared?
882
883     """
884     # Check we don't already own locks at this level
885     assert not self._is_owned() or self.__lock._is_owned(shared=0), \
886       "Cannot add locks if the set is only partially owned, or shared"
887
888     # Support passing in a single resource to add rather than many
889     if isinstance(names, basestring):
890       names = [names]
891
892     # If we don't already own the set-level lock acquired in an exclusive way
893     # we'll get it and note we need to release it later.
894     release_lock = False
895     if not self.__lock._is_owned():
896       release_lock = True
897       self.__lock.acquire()
898
899     try:
900       invalid_names = set(self.__names()).intersection(names)
901       if invalid_names:
902         # This must be an explicit raise, not an assert, because assert is
903         # turned off when using optimization, and this can happen because of
904         # concurrency even if the user doesn't want it.
905         raise errors.LockError("duplicate add() (%s)" % invalid_names)
906
907       for lockname in names:
908         lock = SharedLock()
909
910         if acquired:
911           lock.acquire(shared=shared)
912           # now the lock cannot be deleted, we have it!
913           try:
914             self._add_owned(name=lockname)
915           except:
916             # We shouldn't have problems adding the lock to the owners list,
917             # but if we did we'll try to release this lock and re-raise
918             # exception.  Of course something is going to be really wrong,
919             # after this.  On the other hand the lock hasn't been added to the
920             # __lockdict yet so no other threads should be pending on it. This
921             # release is just a safety measure.
922             lock.release()
923             raise
924
925         self.__lockdict[lockname] = lock
926
927     finally:
928       # Only release __lock if we were not holding it previously.
929       if release_lock:
930         self.__lock.release()
931
932     return True
933
934   def remove(self, names):
935     """Remove elements from the lock set.
936
937     You can either not hold anything in the lockset or already hold a superset
938     of the elements you want to delete, exclusively.
939
940     @param names: names of the resource to remove.
941
942     @return:: a list of locks which we removed; the list is always
943         equal to the names list if we were holding all the locks
944         exclusively
945
946     """
947     # Support passing in a single resource to remove rather than many
948     if isinstance(names, basestring):
949       names = [names]
950
951     # If we own any subset of this lock it must be a superset of what we want
952     # to delete. The ownership must also be exclusive, but that will be checked
953     # by the lock itself.
954     assert not self._is_owned() or self._list_owned().issuperset(names), (
955       "remove() on acquired lockset while not owning all elements")
956
957     removed = []
958
959     for lname in names:
960       # Calling delete() acquires the lock exclusively if we don't already own
961       # it, and causes all pending and subsequent lock acquires to fail. It's
962       # fine to call it out of order because delete() also implies release(),
963       # and the assertion above guarantees that if we either already hold
964       # everything we want to delete, or we hold none.
965       try:
966         self.__lockdict[lname].delete()
967         removed.append(lname)
968       except (KeyError, errors.LockError):
969         # This cannot happen if we were already holding it, verify:
970         assert not self._is_owned(), "remove failed while holding lockset"
971       else:
972         # If no LockError was raised we are the ones who deleted the lock.
973         # This means we can safely remove it from lockdict, as any further or
974         # pending delete() or acquire() will fail (and nobody can have the lock
975         # since before our call to delete()).
976         #
977         # This is done in an else clause because if the exception was thrown
978         # it's the job of the one who actually deleted it.
979         del self.__lockdict[lname]
980         # And let's remove it from our private list if we owned it.
981         if self._is_owned():
982           self._del_owned(name=lname)
983
984     return removed
985
986
987 # Locking levels, must be acquired in increasing order.
988 # Current rules are:
989 #   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
990 #   acquired before performing any operation, either in shared or in exclusive
991 #   mode. acquiring the BGL in exclusive mode is discouraged and should be
992 #   avoided.
993 #   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
994 #   If you need more than one node, or more than one instance, acquire them at
995 #   the same time.
996 LEVEL_CLUSTER = 0
997 LEVEL_INSTANCE = 1
998 LEVEL_NODE = 2
999
1000 LEVELS = [LEVEL_CLUSTER,
1001           LEVEL_INSTANCE,
1002           LEVEL_NODE]
1003
1004 # Lock levels which are modifiable
1005 LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1006
1007 LEVEL_NAMES = {
1008   LEVEL_CLUSTER: "cluster",
1009   LEVEL_INSTANCE: "instance",
1010   LEVEL_NODE: "node",
1011   }
1012
1013 # Constant for the big ganeti lock
1014 BGL = 'BGL'
1015
1016
1017 class GanetiLockManager:
1018   """The Ganeti Locking Library
1019
1020   The purpose of this small library is to manage locking for ganeti clusters
1021   in a central place, while at the same time doing dynamic checks against
1022   possible deadlocks. It will also make it easier to transition to a different
1023   lock type should we migrate away from python threads.
1024
1025   """
1026   _instance = None
1027
1028   def __init__(self, nodes=None, instances=None):
1029     """Constructs a new GanetiLockManager object.
1030
1031     There should be only a GanetiLockManager object at any time, so this
1032     function raises an error if this is not the case.
1033
1034     @param nodes: list of node names
1035     @param instances: list of instance names
1036
1037     """
1038     assert self.__class__._instance is None, \
1039            "double GanetiLockManager instance"
1040
1041     self.__class__._instance = self
1042
1043     # The keyring contains all the locks, at their level and in the correct
1044     # locking order.
1045     self.__keyring = {
1046       LEVEL_CLUSTER: LockSet([BGL]),
1047       LEVEL_NODE: LockSet(nodes),
1048       LEVEL_INSTANCE: LockSet(instances),
1049     }
1050
1051   def _names(self, level):
1052     """List the lock names at the given level.
1053
1054     This can be used for debugging/testing purposes.
1055
1056     @param level: the level whose list of locks to get
1057
1058     """
1059     assert level in LEVELS, "Invalid locking level %s" % level
1060     return self.__keyring[level]._names()
1061
1062   def _is_owned(self, level):
1063     """Check whether we are owning locks at the given level
1064
1065     """
1066     return self.__keyring[level]._is_owned()
1067
1068   is_owned = _is_owned
1069
1070   def _list_owned(self, level):
1071     """Get the set of owned locks at the given level
1072
1073     """
1074     return self.__keyring[level]._list_owned()
1075
1076   def _upper_owned(self, level):
1077     """Check that we don't own any lock at a level greater than the given one.
1078
1079     """
1080     # This way of checking only works if LEVELS[i] = i, which we check for in
1081     # the test cases.
1082     return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1083
1084   def _BGL_owned(self):
1085     """Check if the current thread owns the BGL.
1086
1087     Both an exclusive or a shared acquisition work.
1088
1089     """
1090     return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1091
1092   def _contains_BGL(self, level, names):
1093     """Check if the level contains the BGL.
1094
1095     Check if acting on the given level and set of names will change
1096     the status of the Big Ganeti Lock.
1097
1098     """
1099     return level == LEVEL_CLUSTER and (names is None or BGL in names)
1100
1101   def acquire(self, level, names, timeout=None, shared=0):
1102     """Acquire a set of resource locks, at the same level.
1103
1104     @param level: the level at which the locks shall be acquired;
1105         it must be a member of LEVELS.
1106     @param names: the names of the locks which shall be acquired
1107         (special lock names, or instance/node names)
1108     @param shared: whether to acquire in shared mode; by default
1109         an exclusive lock will be acquired
1110     @type timeout: float
1111     @param timeout: Maximum time to acquire all locks
1112
1113     """
1114     assert level in LEVELS, "Invalid locking level %s" % level
1115
1116     # Check that we are either acquiring the Big Ganeti Lock or we already own
1117     # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1118     # so even if we've migrated we need to at least share the BGL to be
1119     # compatible with them. Of course if we own the BGL exclusively there's no
1120     # point in acquiring any other lock, unless perhaps we are half way through
1121     # the migration of the current opcode.
1122     assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1123             "You must own the Big Ganeti Lock before acquiring any other")
1124
1125     # Check we don't own locks at the same or upper levels.
1126     assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1127            " while owning some at a greater one")
1128
1129     # Acquire the locks in the set.
1130     return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1131
1132   def release(self, level, names=None):
1133     """Release a set of resource locks, at the same level.
1134
1135     You must have acquired the locks, either in shared or in exclusive
1136     mode, before releasing them.
1137
1138     @param level: the level at which the locks shall be released;
1139         it must be a member of LEVELS
1140     @param names: the names of the locks which shall be released
1141         (defaults to all the locks acquired at that level)
1142
1143     """
1144     assert level in LEVELS, "Invalid locking level %s" % level
1145     assert (not self._contains_BGL(level, names) or
1146             not self._upper_owned(LEVEL_CLUSTER)), (
1147             "Cannot release the Big Ganeti Lock while holding something"
1148             " at upper levels")
1149
1150     # Release will complain if we don't own the locks already
1151     return self.__keyring[level].release(names)
1152
1153   def add(self, level, names, acquired=0, shared=0):
1154     """Add locks at the specified level.
1155
1156     @param level: the level at which the locks shall be added;
1157         it must be a member of LEVELS_MOD.
1158     @param names: names of the locks to acquire
1159     @param acquired: whether to acquire the newly added locks
1160     @param shared: whether the acquisition will be shared
1161
1162     """
1163     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1164     assert self._BGL_owned(), ("You must own the BGL before performing other"
1165            " operations")
1166     assert not self._upper_owned(level), ("Cannot add locks at a level"
1167            " while owning some at a greater one")
1168     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1169
1170   def remove(self, level, names):
1171     """Remove locks from the specified level.
1172
1173     You must either already own the locks you are trying to remove
1174     exclusively or not own any lock at an upper level.
1175
1176     @param level: the level at which the locks shall be removed;
1177         it must be a member of LEVELS_MOD
1178     @param names: the names of the locks which shall be removed
1179         (special lock names, or instance/node names)
1180
1181     """
1182     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1183     assert self._BGL_owned(), ("You must own the BGL before performing other"
1184            " operations")
1185     # Check we either own the level or don't own anything from here
1186     # up. LockSet.remove() will check the case in which we don't own
1187     # all the needed resources, or we have a shared ownership.
1188     assert self._is_owned(level) or not self._upper_owned(level), (
1189            "Cannot remove locks at a level while not owning it or"
1190            " owning some at a greater one")
1191     return self.__keyring[level].remove(names)