901fdac80489133ee1fc8343b5fbfe7bac1a7d90
[ganeti-local] / lib / locking.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Module implementing the Ganeti locking code."""
22
23 import os
24 import select
25 import threading
26 import time
27 import errno
28
29 from ganeti import errors
30 from ganeti import utils
31
32
33 def ssynchronized(lock, shared=0):
34   """Shared Synchronization decorator.
35
36   Calls the function holding the given lock, either in exclusive or shared
37   mode. It requires the passed lock to be a SharedLock (or support its
38   semantics).
39
40   """
41   def wrap(fn):
42     def sync_function(*args, **kwargs):
43       lock.acquire(shared=shared)
44       try:
45         return fn(*args, **kwargs)
46       finally:
47         lock.release()
48     return sync_function
49   return wrap
50
51
52 class _SingleActionPipeConditionWaiter(object):
53   """Callable helper class for _SingleActionPipeCondition.
54
55   """
56   __slots__ = [
57     "_cond",
58     "_fd",
59     "_poller",
60     ]
61
62   def __init__(self, cond, poller, fd):
63     """Initializes this class.
64
65     @type cond: L{_SingleActionPipeCondition}
66     @param cond: Parent condition
67     @type poller: select.poll
68     @param poller: Poller object
69     @type fd: int
70     @param fd: File descriptor to wait for
71
72     """
73     object.__init__(self)
74
75     self._cond = cond
76     self._poller = poller
77     self._fd = fd
78
79   def __call__(self, timeout):
80     """Wait for something to happen on the pipe.
81
82     @type timeout: float or None
83     @param timeout: Timeout for waiting (can be None)
84
85     """
86     start_time = time.time()
87     remaining_time = timeout
88
89     while timeout is None or remaining_time > 0:
90       try:
91         result = self._poller.poll(remaining_time)
92       except EnvironmentError, err:
93         if err.errno != errno.EINTR:
94           raise
95         result = None
96
97       # Check whether we were notified
98       if result and result[0][0] == self._fd:
99         break
100
101       # Re-calculate timeout if necessary
102       if timeout is not None:
103         remaining_time = start_time + timeout - time.time()
104
105
106 class _SingleActionPipeCondition(object):
107   """Wrapper around a pipe for usage inside conditions.
108
109   This class contains a POSIX pipe(2) and a poller to poll it. The pipe is
110   always allocated when constructing the class. Extra care is taken to always
111   close the file descriptors.
112
113   An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for
114   notifications.
115
116   Warning: This class is designed to be used as the underlying component of a
117   locking condition, but is not by itself thread safe, and needs to be
118   protected by an external lock.
119
120   """
121   __slots__ = [
122     "_poller",
123     "_read_fd",
124     "_write_fd",
125     "_nwaiters",
126     ]
127
128   _waiter_class = _SingleActionPipeConditionWaiter
129
130   def __init__(self):
131     """Initializes this class.
132
133     """
134     object.__init__(self)
135
136     self._nwaiters = 0
137
138     # Just assume the unpacking is successful, otherwise error handling gets
139     # very complicated.
140     (self._read_fd, self._write_fd) = os.pipe()
141     try:
142       # The poller looks for closure of the write side
143       poller = select.poll()
144       poller.register(self._read_fd, select.POLLHUP)
145
146       self._poller = poller
147     except:
148       if self._read_fd is not None:
149         os.close(self._read_fd)
150       if self._write_fd is not None:
151         os.close(self._write_fd)
152       raise
153
154     # There should be no code here anymore, otherwise the pipe file descriptors
155     # may be not be cleaned up properly in case of errors.
156
157   def StartWaiting(self):
158     """Return function to wait for notification.
159
160     @rtype: L{_SingleActionPipeConditionWaiter}
161     @return: Function to wait for notification
162
163     """
164     assert self._nwaiters >= 0
165
166     if self._poller is None:
167       raise RuntimeError("Already cleaned up")
168
169     # Create waiter function and increase number of waiters
170     wait_fn = self._waiter_class(self, self._poller, self._read_fd)
171     self._nwaiters += 1
172     return wait_fn
173
174   def DoneWaiting(self):
175     """Decrement number of waiters and automatic cleanup.
176
177     Must be called after waiting for a notification.
178
179     @rtype: bool
180     @return: Whether this was the last waiter
181
182     """
183     assert self._nwaiters > 0
184
185     self._nwaiters -= 1
186
187     if self._nwaiters == 0:
188       self._Cleanup()
189       return True
190
191     return False
192
193   def notifyAll(self):
194     """Close the writing side of the pipe to notify all waiters.
195
196     """
197     if self._write_fd is None:
198       raise RuntimeError("Can only notify once")
199
200     os.close(self._write_fd)
201     self._write_fd = None
202
203   def _Cleanup(self):
204     """Close all file descriptors.
205
206     """
207     if self._read_fd is not None:
208       os.close(self._read_fd)
209       self._read_fd = None
210
211     if self._write_fd is not None:
212       os.close(self._write_fd)
213       self._write_fd = None
214
215     self._poller = None
216
217   def __del__(self):
218     """Called on object deletion.
219
220     Ensure no file descriptors are left open.
221
222     """
223     self._Cleanup()
224
225
226 class _PipeCondition(object):
227   """Group-only non-polling condition with counters.
228
229   This condition class uses pipes and poll, internally, to be able to wait for
230   notification with a timeout, without resorting to polling. It is almost
231   compatible with Python's threading.Condition, but only supports notifyAll and
232   non-recursive locks. As an additional features it's able to report whether
233   there are any waiting threads.
234
235   """
236   __slots__ = [
237     "_lock",
238     "_nwaiters",
239     "_pipe",
240     "acquire",
241     "release",
242     ]
243
244   _pipe_class = _SingleActionPipeCondition
245
246   def __init__(self, lock):
247     """Initializes this class.
248
249     """
250     object.__init__(self)
251
252     # Recursive locks are not supported
253     assert not hasattr(lock, "_acquire_restore")
254     assert not hasattr(lock, "_release_save")
255
256     self._lock = lock
257
258     # Export the lock's acquire() and release() methods
259     self.acquire = lock.acquire
260     self.release = lock.release
261
262     self._nwaiters = 0
263     self._pipe = None
264
265   def _is_owned(self):
266     """Check whether lock is owned by current thread.
267
268     """
269     if self._lock.acquire(0):
270       self._lock.release()
271       return False
272
273     return True
274
275   def _check_owned(self):
276     """Raise an exception if the current thread doesn't own the lock.
277
278     """
279     if not self._is_owned():
280       raise RuntimeError("cannot work with un-aquired lock")
281
282   def wait(self, timeout=None):
283     """Wait for a notification.
284
285     @type timeout: float or None
286     @param timeout: Waiting timeout (can be None)
287
288     """
289     self._check_owned()
290
291     if not self._pipe:
292       self._pipe = self._pipe_class()
293
294     # Keep local reference to the pipe. It could be replaced by another thread
295     # notifying while we're waiting.
296     pipe = self._pipe
297
298     assert self._nwaiters >= 0
299     self._nwaiters += 1
300     try:
301       # Get function to wait on the pipe
302       wait_fn = pipe.StartWaiting()
303       try:
304         # Release lock while waiting
305         self.release()
306         try:
307           # Wait for notification
308           wait_fn(timeout)
309         finally:
310           # Re-acquire lock
311           self.acquire()
312       finally:
313         # Destroy pipe if this was the last waiter and the current pipe is
314         # still the same. The same pipe cannot be reused after cleanup.
315         if pipe.DoneWaiting() and pipe == self._pipe:
316           self._pipe = None
317     finally:
318       assert self._nwaiters > 0
319       self._nwaiters -= 1
320
321   def notifyAll(self):
322     """Notify all currently waiting threads.
323
324     """
325     self._check_owned()
326
327     # Notify and forget pipe. A new one will be created on the next call to
328     # wait.
329     if self._pipe is not None:
330       self._pipe.notifyAll()
331       self._pipe = None
332
333   def has_waiting(self):
334     """Returns whether there are active waiters.
335
336     """
337     self._check_owned()
338
339     return bool(self._nwaiters)
340
341
342 class _CountingCondition(object):
343   """Wrapper for Python's built-in threading.Condition class.
344
345   This wrapper keeps a count of active waiters. We can't access the internal
346   "__waiters" attribute of threading.Condition because it's not thread-safe.
347
348   """
349   __slots__ = [
350     "_cond",
351     "_nwaiters",
352     ]
353
354   def __init__(self, lock):
355     """Initializes this class.
356
357     """
358     object.__init__(self)
359     self._cond = threading.Condition(lock=lock)
360     self._nwaiters = 0
361
362   def notifyAll(self):
363     """Notifies the condition.
364
365     """
366     return self._cond.notifyAll()
367
368   def wait(self, timeout=None):
369     """Waits for the condition to be notified.
370
371     @type timeout: float or None
372     @param timeout: Timeout in seconds
373
374     """
375     assert self._nwaiters >= 0
376
377     self._nwaiters += 1
378     try:
379       return self._cond.wait(timeout=timeout)
380     finally:
381       self._nwaiters -= 1
382
383   def has_waiting(self):
384     """Returns whether there are active waiters.
385
386     """
387     return bool(self._nwaiters)
388
389
390 class SharedLock(object):
391   """Implements a shared lock.
392
393   Multiple threads can acquire the lock in a shared way, calling
394   acquire_shared().  In order to acquire the lock in an exclusive way threads
395   can call acquire_exclusive().
396
397   The lock prevents starvation but does not guarantee that threads will acquire
398   the shared lock in the order they queued for it, just that they will
399   eventually do so.
400
401   """
402   __slots__ = [
403     "__active_shr_c",
404     "__inactive_shr_c",
405     "__deleted",
406     "__exc",
407     "__lock",
408     "__pending",
409     "__shr",
410     ]
411
412   __condition_class = _CountingCondition
413
414   def __init__(self):
415     """Construct a new SharedLock.
416
417     """
418     object.__init__(self)
419
420     # Internal lock
421     self.__lock = threading.Lock()
422
423     # Queue containing waiting acquires
424     self.__pending = []
425
426     # Active and inactive conditions for shared locks
427     self.__active_shr_c = self.__condition_class(self.__lock)
428     self.__inactive_shr_c = self.__condition_class(self.__lock)
429
430     # Current lock holders
431     self.__shr = set()
432     self.__exc = None
433
434     # is this lock in the deleted state?
435     self.__deleted = False
436
437   def __check_deleted(self):
438     """Raises an exception if the lock has been deleted.
439
440     """
441     if self.__deleted:
442       raise errors.LockError("Deleted lock")
443
444   def __is_sharer(self):
445     """Is the current thread sharing the lock at this time?
446
447     """
448     return threading.currentThread() in self.__shr
449
450   def __is_exclusive(self):
451     """Is the current thread holding the lock exclusively at this time?
452
453     """
454     return threading.currentThread() == self.__exc
455
456   def __is_owned(self, shared=-1):
457     """Is the current thread somehow owning the lock at this time?
458
459     This is a private version of the function, which presumes you're holding
460     the internal lock.
461
462     """
463     if shared < 0:
464       return self.__is_sharer() or self.__is_exclusive()
465     elif shared:
466       return self.__is_sharer()
467     else:
468       return self.__is_exclusive()
469
470   def _is_owned(self, shared=-1):
471     """Is the current thread somehow owning the lock at this time?
472
473     @param shared:
474         - < 0: check for any type of ownership (default)
475         - 0: check for exclusive ownership
476         - > 0: check for shared ownership
477
478     """
479     self.__lock.acquire()
480     try:
481       return self.__is_owned(shared=shared)
482     finally:
483       self.__lock.release()
484
485   def _count_pending(self):
486     """Returns the number of pending acquires.
487
488     @rtype: int
489
490     """
491     self.__lock.acquire()
492     try:
493       return len(self.__pending)
494     finally:
495       self.__lock.release()
496
497   def __do_acquire(self, shared):
498     """Actually acquire the lock.
499
500     """
501     if shared:
502       self.__shr.add(threading.currentThread())
503     else:
504       self.__exc = threading.currentThread()
505
506   def __can_acquire(self, shared):
507     """Determine whether lock can be acquired.
508
509     """
510     if shared:
511       return self.__exc is None
512     else:
513       return len(self.__shr) == 0 and self.__exc is None
514
515   def __is_on_top(self, cond):
516     """Checks whether the passed condition is on top of the queue.
517
518     The caller must make sure the queue isn't empty.
519
520     """
521     return self.__pending[0] == cond
522
523   def __acquire_unlocked(self, shared=0, timeout=None):
524     """Acquire a shared lock.
525
526     @param shared: whether to acquire in shared mode; by default an
527         exclusive lock will be acquired
528     @param timeout: maximum waiting time before giving up
529
530     """
531     self.__check_deleted()
532
533     # We cannot acquire the lock if we already have it
534     assert not self.__is_owned(), "double acquire() on a non-recursive lock"
535
536     # Check whether someone else holds the lock or there are pending acquires.
537     if not self.__pending and self.__can_acquire(shared):
538       # Apparently not, can acquire lock directly.
539       self.__do_acquire(shared)
540       return True
541
542     if shared:
543       wait_condition = self.__active_shr_c
544
545       # Check if we're not yet in the queue
546       if wait_condition not in self.__pending:
547         self.__pending.append(wait_condition)
548     else:
549       wait_condition = self.__condition_class(self.__lock)
550       # Always add to queue
551       self.__pending.append(wait_condition)
552
553     try:
554       # Wait until we become the topmost acquire in the queue or the timeout
555       # expires.
556       while not (self.__is_on_top(wait_condition) and
557                  self.__can_acquire(shared)):
558         # Wait for notification
559         wait_condition.wait(timeout)
560         self.__check_deleted()
561
562         # A lot of code assumes blocking acquires always succeed. Loop
563         # internally for that case.
564         if timeout is not None:
565           break
566
567       if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
568         self.__do_acquire(shared)
569         return True
570     finally:
571       # Remove condition from queue if there are no more waiters
572       if not wait_condition.has_waiting() and not self.__deleted:
573         self.__pending.remove(wait_condition)
574
575     return False
576
577   def acquire(self, shared=0, timeout=None):
578     """Acquire a shared lock.
579
580     @type shared: int
581     @param shared: whether to acquire in shared mode; by default an
582         exclusive lock will be acquired
583     @type timeout: float
584     @param timeout: maximum waiting time before giving up
585
586     """
587     self.__lock.acquire()
588     try:
589       return self.__acquire_unlocked(shared, timeout)
590     finally:
591       self.__lock.release()
592
593   def release(self):
594     """Release a Shared Lock.
595
596     You must have acquired the lock, either in shared or in exclusive mode,
597     before calling this function.
598
599     """
600     self.__lock.acquire()
601     try:
602       assert self.__is_exclusive() or self.__is_sharer(), \
603         "Cannot release non-owned lock"
604
605       # Autodetect release type
606       if self.__is_exclusive():
607         self.__exc = None
608       else:
609         self.__shr.remove(threading.currentThread())
610
611       # Notify topmost condition in queue
612       if self.__pending:
613         first_condition = self.__pending[0]
614         first_condition.notifyAll()
615
616         if first_condition == self.__active_shr_c:
617           self.__active_shr_c = self.__inactive_shr_c
618           self.__inactive_shr_c = first_condition
619
620     finally:
621       self.__lock.release()
622
623   def delete(self, timeout=None):
624     """Delete a Shared Lock.
625
626     This operation will declare the lock for removal. First the lock will be
627     acquired in exclusive mode if you don't already own it, then the lock
628     will be put in a state where any future and pending acquire() fail.
629
630     @type timeout: float
631     @param timeout: maximum waiting time before giving up
632
633     """
634     self.__lock.acquire()
635     try:
636       assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
637
638       self.__check_deleted()
639
640       # The caller is allowed to hold the lock exclusively already.
641       acquired = self.__is_exclusive()
642
643       if not acquired:
644         acquired = self.__acquire_unlocked(timeout)
645
646       if acquired:
647         self.__deleted = True
648         self.__exc = None
649
650         # Notify all acquires. They'll throw an error.
651         while self.__pending:
652           self.__pending.pop().notifyAll()
653
654       return acquired
655     finally:
656       self.__lock.release()
657
658
659 # Whenever we want to acquire a full LockSet we pass None as the value
660 # to acquire.  Hide this behind this nicely named constant.
661 ALL_SET = None
662
663
664 class LockSet:
665   """Implements a set of locks.
666
667   This abstraction implements a set of shared locks for the same resource type,
668   distinguished by name. The user can lock a subset of the resources and the
669   LockSet will take care of acquiring the locks always in the same order, thus
670   preventing deadlock.
671
672   All the locks needed in the same set must be acquired together, though.
673
674   """
675   def __init__(self, members=None):
676     """Constructs a new LockSet.
677
678     @param members: initial members of the set
679
680     """
681     # Used internally to guarantee coherency.
682     self.__lock = SharedLock()
683
684     # The lockdict indexes the relationship name -> lock
685     # The order-of-locking is implied by the alphabetical order of names
686     self.__lockdict = {}
687
688     if members is not None:
689       for name in members:
690         self.__lockdict[name] = SharedLock()
691
692     # The owner dict contains the set of locks each thread owns. For
693     # performance each thread can access its own key without a global lock on
694     # this structure. It is paramount though that *no* other type of access is
695     # done to this structure (eg. no looping over its keys). *_owner helper
696     # function are defined to guarantee access is correct, but in general never
697     # do anything different than __owners[threading.currentThread()], or there
698     # will be trouble.
699     self.__owners = {}
700
701   def _is_owned(self):
702     """Is the current thread a current level owner?"""
703     return threading.currentThread() in self.__owners
704
705   def _add_owned(self, name=None):
706     """Note the current thread owns the given lock"""
707     if name is None:
708       if not self._is_owned():
709         self.__owners[threading.currentThread()] = set()
710     else:
711       if self._is_owned():
712         self.__owners[threading.currentThread()].add(name)
713       else:
714         self.__owners[threading.currentThread()] = set([name])
715
716   def _del_owned(self, name=None):
717     """Note the current thread owns the given lock"""
718
719     if name is not None:
720       self.__owners[threading.currentThread()].remove(name)
721
722     # Only remove the key if we don't hold the set-lock as well
723     if (not self.__lock._is_owned() and
724         not self.__owners[threading.currentThread()]):
725       del self.__owners[threading.currentThread()]
726
727   def _list_owned(self):
728     """Get the set of resource names owned by the current thread"""
729     if self._is_owned():
730       return self.__owners[threading.currentThread()].copy()
731     else:
732       return set()
733
734   def __names(self):
735     """Return the current set of names.
736
737     Only call this function while holding __lock and don't iterate on the
738     result after releasing the lock.
739
740     """
741     return self.__lockdict.keys()
742
743   def _names(self):
744     """Return a copy of the current set of elements.
745
746     Used only for debugging purposes.
747
748     """
749     # If we don't already own the set-level lock acquired
750     # we'll get it and note we need to release it later.
751     release_lock = False
752     if not self.__lock._is_owned():
753       release_lock = True
754       self.__lock.acquire(shared=1)
755     try:
756       result = self.__names()
757     finally:
758       if release_lock:
759         self.__lock.release()
760     return set(result)
761
762   def acquire(self, names, blocking=1, shared=0):
763     """Acquire a set of resource locks.
764
765     @param names: the names of the locks which shall be acquired
766         (special lock names, or instance/node names)
767     @param shared: whether to acquire in shared mode; by default an
768         exclusive lock will be acquired
769     @param blocking: whether to block while trying to acquire or to
770         operate in try-lock mode (this locking mode is not supported yet)
771
772     @return: True when all the locks are successfully acquired
773
774     @raise errors.LockError: when any lock we try to acquire has
775         been deleted before we succeed. In this case none of the
776         locks requested will be acquired.
777
778     """
779     if not blocking:
780       # We don't have non-blocking mode for now
781       raise NotImplementedError
782
783     # Check we don't already own locks at this level
784     assert not self._is_owned(), "Cannot acquire locks in the same set twice"
785
786     if names is None:
787       # If no names are given acquire the whole set by not letting new names
788       # being added before we release, and getting the current list of names.
789       # Some of them may then be deleted later, but we'll cope with this.
790       #
791       # We'd like to acquire this lock in a shared way, as it's nice if
792       # everybody else can use the instances at the same time. If are acquiring
793       # them exclusively though they won't be able to do this anyway, though,
794       # so we'll get the list lock exclusively as well in order to be able to
795       # do add() on the set while owning it.
796       self.__lock.acquire(shared=shared)
797       try:
798         # note we own the set-lock
799         self._add_owned()
800         names = self.__names()
801       except:
802         # We shouldn't have problems adding the lock to the owners list, but
803         # if we did we'll try to release this lock and re-raise exception.
804         # Of course something is going to be really wrong, after this.
805         self.__lock.release()
806         raise
807
808     try:
809       # Support passing in a single resource to acquire rather than many
810       if isinstance(names, basestring):
811         names = [names]
812       else:
813         names = sorted(names)
814
815       acquire_list = []
816       # First we look the locks up on __lockdict. We have no way of being sure
817       # they will still be there after, but this makes it a lot faster should
818       # just one of them be the already wrong
819       for lname in utils.UniqueSequence(names):
820         try:
821           lock = self.__lockdict[lname] # raises KeyError if lock is not there
822           acquire_list.append((lname, lock))
823         except (KeyError):
824           if self.__lock._is_owned():
825             # We are acquiring all the set, it doesn't matter if this
826             # particular element is not there anymore.
827             continue
828           else:
829             raise errors.LockError('non-existing lock in set (%s)' % lname)
830
831       # This will hold the locknames we effectively acquired.
832       acquired = set()
833       # Now acquire_list contains a sorted list of resources and locks we want.
834       # In order to get them we loop on this (private) list and acquire() them.
835       # We gave no real guarantee they will still exist till this is done but
836       # .acquire() itself is safe and will alert us if the lock gets deleted.
837       for (lname, lock) in acquire_list:
838         try:
839           lock.acquire(shared=shared) # raises LockError if the lock is deleted
840           # now the lock cannot be deleted, we have it!
841           self._add_owned(name=lname)
842           acquired.add(lname)
843         except (errors.LockError):
844           if self.__lock._is_owned():
845             # We are acquiring all the set, it doesn't matter if this
846             # particular element is not there anymore.
847             continue
848           else:
849             name_fail = lname
850             for lname in self._list_owned():
851               self.__lockdict[lname].release()
852               self._del_owned(name=lname)
853             raise errors.LockError('non-existing lock in set (%s)' % name_fail)
854         except:
855           # We shouldn't have problems adding the lock to the owners list, but
856           # if we did we'll try to release this lock and re-raise exception.
857           # Of course something is going to be really wrong, after this.
858           if lock._is_owned():
859             lock.release()
860           raise
861
862     except:
863       # If something went wrong and we had the set-lock let's release it...
864       if self.__lock._is_owned():
865         self.__lock.release()
866       raise
867
868     return acquired
869
870   def release(self, names=None):
871     """Release a set of resource locks, at the same level.
872
873     You must have acquired the locks, either in shared or in exclusive mode,
874     before releasing them.
875
876     @param names: the names of the locks which shall be released
877         (defaults to all the locks acquired at that level).
878
879     """
880     assert self._is_owned(), "release() on lock set while not owner"
881
882     # Support passing in a single resource to release rather than many
883     if isinstance(names, basestring):
884       names = [names]
885
886     if names is None:
887       names = self._list_owned()
888     else:
889       names = set(names)
890       assert self._list_owned().issuperset(names), (
891                "release() on unheld resources %s" %
892                names.difference(self._list_owned()))
893
894     # First of all let's release the "all elements" lock, if set.
895     # After this 'add' can work again
896     if self.__lock._is_owned():
897       self.__lock.release()
898       self._del_owned()
899
900     for lockname in names:
901       # If we are sure the lock doesn't leave __lockdict without being
902       # exclusively held we can do this...
903       self.__lockdict[lockname].release()
904       self._del_owned(name=lockname)
905
906   def add(self, names, acquired=0, shared=0):
907     """Add a new set of elements to the set
908
909     @param names: names of the new elements to add
910     @param acquired: pre-acquire the new resource?
911     @param shared: is the pre-acquisition shared?
912
913     """
914     # Check we don't already own locks at this level
915     assert not self._is_owned() or self.__lock._is_owned(shared=0), \
916       "Cannot add locks if the set is only partially owned, or shared"
917
918     # Support passing in a single resource to add rather than many
919     if isinstance(names, basestring):
920       names = [names]
921
922     # If we don't already own the set-level lock acquired in an exclusive way
923     # we'll get it and note we need to release it later.
924     release_lock = False
925     if not self.__lock._is_owned():
926       release_lock = True
927       self.__lock.acquire()
928
929     try:
930       invalid_names = set(self.__names()).intersection(names)
931       if invalid_names:
932         # This must be an explicit raise, not an assert, because assert is
933         # turned off when using optimization, and this can happen because of
934         # concurrency even if the user doesn't want it.
935         raise errors.LockError("duplicate add() (%s)" % invalid_names)
936
937       for lockname in names:
938         lock = SharedLock()
939
940         if acquired:
941           lock.acquire(shared=shared)
942           # now the lock cannot be deleted, we have it!
943           try:
944             self._add_owned(name=lockname)
945           except:
946             # We shouldn't have problems adding the lock to the owners list,
947             # but if we did we'll try to release this lock and re-raise
948             # exception.  Of course something is going to be really wrong,
949             # after this.  On the other hand the lock hasn't been added to the
950             # __lockdict yet so no other threads should be pending on it. This
951             # release is just a safety measure.
952             lock.release()
953             raise
954
955         self.__lockdict[lockname] = lock
956
957     finally:
958       # Only release __lock if we were not holding it previously.
959       if release_lock:
960         self.__lock.release()
961
962     return True
963
964   def remove(self, names, blocking=1):
965     """Remove elements from the lock set.
966
967     You can either not hold anything in the lockset or already hold a superset
968     of the elements you want to delete, exclusively.
969
970     @param names: names of the resource to remove.
971     @param blocking: whether to block while trying to acquire or to
972         operate in try-lock mode (this locking mode is not supported
973         yet unless you are already holding exclusively the locks)
974
975     @return:: a list of locks which we removed; the list is always
976         equal to the names list if we were holding all the locks
977         exclusively
978
979     """
980     if not blocking and not self._is_owned():
981       # We don't have non-blocking mode for now
982       raise NotImplementedError
983
984     # Support passing in a single resource to remove rather than many
985     if isinstance(names, basestring):
986       names = [names]
987
988     # If we own any subset of this lock it must be a superset of what we want
989     # to delete. The ownership must also be exclusive, but that will be checked
990     # by the lock itself.
991     assert not self._is_owned() or self._list_owned().issuperset(names), (
992       "remove() on acquired lockset while not owning all elements")
993
994     removed = []
995
996     for lname in names:
997       # Calling delete() acquires the lock exclusively if we don't already own
998       # it, and causes all pending and subsequent lock acquires to fail. It's
999       # fine to call it out of order because delete() also implies release(),
1000       # and the assertion above guarantees that if we either already hold
1001       # everything we want to delete, or we hold none.
1002       try:
1003         self.__lockdict[lname].delete()
1004         removed.append(lname)
1005       except (KeyError, errors.LockError):
1006         # This cannot happen if we were already holding it, verify:
1007         assert not self._is_owned(), "remove failed while holding lockset"
1008       else:
1009         # If no LockError was raised we are the ones who deleted the lock.
1010         # This means we can safely remove it from lockdict, as any further or
1011         # pending delete() or acquire() will fail (and nobody can have the lock
1012         # since before our call to delete()).
1013         #
1014         # This is done in an else clause because if the exception was thrown
1015         # it's the job of the one who actually deleted it.
1016         del self.__lockdict[lname]
1017         # And let's remove it from our private list if we owned it.
1018         if self._is_owned():
1019           self._del_owned(name=lname)
1020
1021     return removed
1022
1023
1024 # Locking levels, must be acquired in increasing order.
1025 # Current rules are:
1026 #   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1027 #   acquired before performing any operation, either in shared or in exclusive
1028 #   mode. acquiring the BGL in exclusive mode is discouraged and should be
1029 #   avoided.
1030 #   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1031 #   If you need more than one node, or more than one instance, acquire them at
1032 #   the same time.
1033 LEVEL_CLUSTER = 0
1034 LEVEL_INSTANCE = 1
1035 LEVEL_NODE = 2
1036
1037 LEVELS = [LEVEL_CLUSTER,
1038           LEVEL_INSTANCE,
1039           LEVEL_NODE]
1040
1041 # Lock levels which are modifiable
1042 LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1043
1044 LEVEL_NAMES = {
1045   LEVEL_CLUSTER: "cluster",
1046   LEVEL_INSTANCE: "instance",
1047   LEVEL_NODE: "node",
1048   }
1049
1050 # Constant for the big ganeti lock
1051 BGL = 'BGL'
1052
1053
1054 class GanetiLockManager:
1055   """The Ganeti Locking Library
1056
1057   The purpose of this small library is to manage locking for ganeti clusters
1058   in a central place, while at the same time doing dynamic checks against
1059   possible deadlocks. It will also make it easier to transition to a different
1060   lock type should we migrate away from python threads.
1061
1062   """
1063   _instance = None
1064
1065   def __init__(self, nodes=None, instances=None):
1066     """Constructs a new GanetiLockManager object.
1067
1068     There should be only a GanetiLockManager object at any time, so this
1069     function raises an error if this is not the case.
1070
1071     @param nodes: list of node names
1072     @param instances: list of instance names
1073
1074     """
1075     assert self.__class__._instance is None, \
1076            "double GanetiLockManager instance"
1077
1078     self.__class__._instance = self
1079
1080     # The keyring contains all the locks, at their level and in the correct
1081     # locking order.
1082     self.__keyring = {
1083       LEVEL_CLUSTER: LockSet([BGL]),
1084       LEVEL_NODE: LockSet(nodes),
1085       LEVEL_INSTANCE: LockSet(instances),
1086     }
1087
1088   def _names(self, level):
1089     """List the lock names at the given level.
1090
1091     This can be used for debugging/testing purposes.
1092
1093     @param level: the level whose list of locks to get
1094
1095     """
1096     assert level in LEVELS, "Invalid locking level %s" % level
1097     return self.__keyring[level]._names()
1098
1099   def _is_owned(self, level):
1100     """Check whether we are owning locks at the given level
1101
1102     """
1103     return self.__keyring[level]._is_owned()
1104
1105   is_owned = _is_owned
1106
1107   def _list_owned(self, level):
1108     """Get the set of owned locks at the given level
1109
1110     """
1111     return self.__keyring[level]._list_owned()
1112
1113   def _upper_owned(self, level):
1114     """Check that we don't own any lock at a level greater than the given one.
1115
1116     """
1117     # This way of checking only works if LEVELS[i] = i, which we check for in
1118     # the test cases.
1119     return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1120
1121   def _BGL_owned(self):
1122     """Check if the current thread owns the BGL.
1123
1124     Both an exclusive or a shared acquisition work.
1125
1126     """
1127     return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1128
1129   def _contains_BGL(self, level, names):
1130     """Check if the level contains the BGL.
1131
1132     Check if acting on the given level and set of names will change
1133     the status of the Big Ganeti Lock.
1134
1135     """
1136     return level == LEVEL_CLUSTER and (names is None or BGL in names)
1137
1138   def acquire(self, level, names, blocking=1, shared=0):
1139     """Acquire a set of resource locks, at the same level.
1140
1141     @param level: the level at which the locks shall be acquired;
1142         it must be a member of LEVELS.
1143     @param names: the names of the locks which shall be acquired
1144         (special lock names, or instance/node names)
1145     @param shared: whether to acquire in shared mode; by default
1146         an exclusive lock will be acquired
1147     @param blocking: whether to block while trying to acquire or to
1148         operate in try-lock mode (this locking mode is not supported yet)
1149
1150     """
1151     assert level in LEVELS, "Invalid locking level %s" % level
1152
1153     # Check that we are either acquiring the Big Ganeti Lock or we already own
1154     # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1155     # so even if we've migrated we need to at least share the BGL to be
1156     # compatible with them. Of course if we own the BGL exclusively there's no
1157     # point in acquiring any other lock, unless perhaps we are half way through
1158     # the migration of the current opcode.
1159     assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1160             "You must own the Big Ganeti Lock before acquiring any other")
1161
1162     # Check we don't own locks at the same or upper levels.
1163     assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1164            " while owning some at a greater one")
1165
1166     # Acquire the locks in the set.
1167     return self.__keyring[level].acquire(names, shared=shared,
1168                                          blocking=blocking)
1169
1170   def release(self, level, names=None):
1171     """Release a set of resource locks, at the same level.
1172
1173     You must have acquired the locks, either in shared or in exclusive
1174     mode, before releasing them.
1175
1176     @param level: the level at which the locks shall be released;
1177         it must be a member of LEVELS
1178     @param names: the names of the locks which shall be released
1179         (defaults to all the locks acquired at that level)
1180
1181     """
1182     assert level in LEVELS, "Invalid locking level %s" % level
1183     assert (not self._contains_BGL(level, names) or
1184             not self._upper_owned(LEVEL_CLUSTER)), (
1185             "Cannot release the Big Ganeti Lock while holding something"
1186             " at upper levels")
1187
1188     # Release will complain if we don't own the locks already
1189     return self.__keyring[level].release(names)
1190
1191   def add(self, level, names, acquired=0, shared=0):
1192     """Add locks at the specified level.
1193
1194     @param level: the level at which the locks shall be added;
1195         it must be a member of LEVELS_MOD.
1196     @param names: names of the locks to acquire
1197     @param acquired: whether to acquire the newly added locks
1198     @param shared: whether the acquisition will be shared
1199
1200     """
1201     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1202     assert self._BGL_owned(), ("You must own the BGL before performing other"
1203            " operations")
1204     assert not self._upper_owned(level), ("Cannot add locks at a level"
1205            " while owning some at a greater one")
1206     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1207
1208   def remove(self, level, names, blocking=1):
1209     """Remove locks from the specified level.
1210
1211     You must either already own the locks you are trying to remove
1212     exclusively or not own any lock at an upper level.
1213
1214     @param level: the level at which the locks shall be removed;
1215         it must be a member of LEVELS_MOD
1216     @param names: the names of the locks which shall be removed
1217         (special lock names, or instance/node names)
1218     @param blocking: whether to block while trying to operate in
1219         try-lock mode (this locking mode is not supported yet)
1220
1221     """
1222     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1223     assert self._BGL_owned(), ("You must own the BGL before performing other"
1224            " operations")
1225     # Check we either own the level or don't own anything from here
1226     # up. LockSet.remove() will check the case in which we don't own
1227     # all the needed resources, or we have a shared ownership.
1228     assert self._is_owned(level) or not self._upper_owned(level), (
1229            "Cannot remove locks at a level while not owning it or"
1230            " owning some at a greater one")
1231     return self.__keyring[level].remove(names, blocking=blocking)