jqueue: Add more debug output
[ganeti-local] / lib / locking.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Module implementing the Ganeti locking code."""
22
23 # pylint: disable-msg=W0212
24
25 # W0212 since e.g. LockSet methods use (a lot) the internals of
26 # SharedLock
27
28 import os
29 import select
30 import threading
31 import time
32 import errno
33
34 from ganeti import errors
35 from ganeti import utils
36 from ganeti import compat
37
38
39 def ssynchronized(mylock, shared=0):
40   """Shared Synchronization decorator.
41
42   Calls the function holding the given lock, either in exclusive or shared
43   mode. It requires the passed lock to be a SharedLock (or support its
44   semantics).
45
46   @type mylock: lockable object or string
47   @param mylock: lock to acquire or class member name of the lock to acquire
48
49   """
50   def wrap(fn):
51     def sync_function(*args, **kwargs):
52       if isinstance(mylock, basestring):
53         assert args, "cannot ssynchronize on non-class method: self not found"
54         # args[0] is "self"
55         lock = getattr(args[0], mylock)
56       else:
57         lock = mylock
58       lock.acquire(shared=shared)
59       try:
60         return fn(*args, **kwargs)
61       finally:
62         lock.release()
63     return sync_function
64   return wrap
65
66
67 class RunningTimeout(object):
68   """Class to calculate remaining timeout when doing several operations.
69
70   """
71   __slots__ = [
72     "_allow_negative",
73     "_start_time",
74     "_time_fn",
75     "_timeout",
76     ]
77
78   def __init__(self, timeout, allow_negative, _time_fn=time.time):
79     """Initializes this class.
80
81     @type timeout: float
82     @param timeout: Timeout duration
83     @type allow_negative: bool
84     @param allow_negative: Whether to return values below zero
85     @param _time_fn: Time function for unittests
86
87     """
88     object.__init__(self)
89
90     if timeout is not None and timeout < 0.0:
91       raise ValueError("Timeout must not be negative")
92
93     self._timeout = timeout
94     self._allow_negative = allow_negative
95     self._time_fn = _time_fn
96
97     self._start_time = None
98
99   def Remaining(self):
100     """Returns the remaining timeout.
101
102     """
103     if self._timeout is None:
104       return None
105
106     # Get start time on first calculation
107     if self._start_time is None:
108       self._start_time = self._time_fn()
109
110     # Calculate remaining time
111     remaining_timeout = self._start_time + self._timeout - self._time_fn()
112
113     if not self._allow_negative:
114       # Ensure timeout is always >= 0
115       return max(0.0, remaining_timeout)
116
117     return remaining_timeout
118
119
120 class _SingleNotifyPipeConditionWaiter(object):
121   """Helper class for SingleNotifyPipeCondition
122
123   """
124   __slots__ = [
125     "_fd",
126     "_poller",
127     ]
128
129   def __init__(self, poller, fd):
130     """Constructor for _SingleNotifyPipeConditionWaiter
131
132     @type poller: select.poll
133     @param poller: Poller object
134     @type fd: int
135     @param fd: File descriptor to wait for
136
137     """
138     object.__init__(self)
139     self._poller = poller
140     self._fd = fd
141
142   def __call__(self, timeout):
143     """Wait for something to happen on the pipe.
144
145     @type timeout: float or None
146     @param timeout: Timeout for waiting (can be None)
147
148     """
149     running_timeout = RunningTimeout(timeout, True)
150
151     while True:
152       remaining_time = running_timeout.Remaining()
153
154       if remaining_time is not None:
155         if remaining_time < 0.0:
156           break
157
158         # Our calculation uses seconds, poll() wants milliseconds
159         remaining_time *= 1000
160
161       try:
162         result = self._poller.poll(remaining_time)
163       except EnvironmentError, err:
164         if err.errno != errno.EINTR:
165           raise
166         result = None
167
168       # Check whether we were notified
169       if result and result[0][0] == self._fd:
170         break
171
172
173 class _BaseCondition(object):
174   """Base class containing common code for conditions.
175
176   Some of this code is taken from python's threading module.
177
178   """
179   __slots__ = [
180     "_lock",
181     "acquire",
182     "release",
183     "_is_owned",
184     "_acquire_restore",
185     "_release_save",
186     ]
187
188   def __init__(self, lock):
189     """Constructor for _BaseCondition.
190
191     @type lock: threading.Lock
192     @param lock: condition base lock
193
194     """
195     object.__init__(self)
196
197     try:
198       self._release_save = lock._release_save
199     except AttributeError:
200       self._release_save = self._base_release_save
201     try:
202       self._acquire_restore = lock._acquire_restore
203     except AttributeError:
204       self._acquire_restore = self._base_acquire_restore
205     try:
206       self._is_owned = lock._is_owned
207     except AttributeError:
208       self._is_owned = self._base_is_owned
209
210     self._lock = lock
211
212     # Export the lock's acquire() and release() methods
213     self.acquire = lock.acquire
214     self.release = lock.release
215
216   def _base_is_owned(self):
217     """Check whether lock is owned by current thread.
218
219     """
220     if self._lock.acquire(0):
221       self._lock.release()
222       return False
223     return True
224
225   def _base_release_save(self):
226     self._lock.release()
227
228   def _base_acquire_restore(self, _):
229     self._lock.acquire()
230
231   def _check_owned(self):
232     """Raise an exception if the current thread doesn't own the lock.
233
234     """
235     if not self._is_owned():
236       raise RuntimeError("cannot work with un-aquired lock")
237
238
239 class SingleNotifyPipeCondition(_BaseCondition):
240   """Condition which can only be notified once.
241
242   This condition class uses pipes and poll, internally, to be able to wait for
243   notification with a timeout, without resorting to polling. It is almost
244   compatible with Python's threading.Condition, with the following differences:
245     - notifyAll can only be called once, and no wait can happen after that
246     - notify is not supported, only notifyAll
247
248   """
249
250   __slots__ = [
251     "_poller",
252     "_read_fd",
253     "_write_fd",
254     "_nwaiters",
255     "_notified",
256     ]
257
258   _waiter_class = _SingleNotifyPipeConditionWaiter
259
260   def __init__(self, lock):
261     """Constructor for SingleNotifyPipeCondition
262
263     """
264     _BaseCondition.__init__(self, lock)
265     self._nwaiters = 0
266     self._notified = False
267     self._read_fd = None
268     self._write_fd = None
269     self._poller = None
270
271   def _check_unnotified(self):
272     """Throws an exception if already notified.
273
274     """
275     if self._notified:
276       raise RuntimeError("cannot use already notified condition")
277
278   def _Cleanup(self):
279     """Cleanup open file descriptors, if any.
280
281     """
282     if self._read_fd is not None:
283       os.close(self._read_fd)
284       self._read_fd = None
285
286     if self._write_fd is not None:
287       os.close(self._write_fd)
288       self._write_fd = None
289     self._poller = None
290
291   def wait(self, timeout=None):
292     """Wait for a notification.
293
294     @type timeout: float or None
295     @param timeout: Waiting timeout (can be None)
296
297     """
298     self._check_owned()
299     self._check_unnotified()
300
301     self._nwaiters += 1
302     try:
303       if self._poller is None:
304         (self._read_fd, self._write_fd) = os.pipe()
305         self._poller = select.poll()
306         self._poller.register(self._read_fd, select.POLLHUP)
307
308       wait_fn = self._waiter_class(self._poller, self._read_fd)
309       state = self._release_save()
310       try:
311         # Wait for notification
312         wait_fn(timeout)
313       finally:
314         # Re-acquire lock
315         self._acquire_restore(state)
316     finally:
317       self._nwaiters -= 1
318       if self._nwaiters == 0:
319         self._Cleanup()
320
321   def notifyAll(self): # pylint: disable-msg=C0103
322     """Close the writing side of the pipe to notify all waiters.
323
324     """
325     self._check_owned()
326     self._check_unnotified()
327     self._notified = True
328     if self._write_fd is not None:
329       os.close(self._write_fd)
330       self._write_fd = None
331
332
333 class PipeCondition(_BaseCondition):
334   """Group-only non-polling condition with counters.
335
336   This condition class uses pipes and poll, internally, to be able to wait for
337   notification with a timeout, without resorting to polling. It is almost
338   compatible with Python's threading.Condition, but only supports notifyAll and
339   non-recursive locks. As an additional features it's able to report whether
340   there are any waiting threads.
341
342   """
343   __slots__ = [
344     "_nwaiters",
345     "_single_condition",
346     ]
347
348   _single_condition_class = SingleNotifyPipeCondition
349
350   def __init__(self, lock):
351     """Initializes this class.
352
353     """
354     _BaseCondition.__init__(self, lock)
355     self._nwaiters = 0
356     self._single_condition = self._single_condition_class(self._lock)
357
358   def wait(self, timeout=None):
359     """Wait for a notification.
360
361     @type timeout: float or None
362     @param timeout: Waiting timeout (can be None)
363
364     """
365     self._check_owned()
366
367     # Keep local reference to the pipe. It could be replaced by another thread
368     # notifying while we're waiting.
369     my_condition = self._single_condition
370
371     assert self._nwaiters >= 0
372     self._nwaiters += 1
373     try:
374       my_condition.wait(timeout)
375     finally:
376       assert self._nwaiters > 0
377       self._nwaiters -= 1
378
379   def notifyAll(self): # pylint: disable-msg=C0103
380     """Notify all currently waiting threads.
381
382     """
383     self._check_owned()
384     self._single_condition.notifyAll()
385     self._single_condition = self._single_condition_class(self._lock)
386
387   def has_waiting(self):
388     """Returns whether there are active waiters.
389
390     """
391     self._check_owned()
392
393     return bool(self._nwaiters)
394
395
396 class SharedLock(object):
397   """Implements a shared lock.
398
399   Multiple threads can acquire the lock in a shared way, calling
400   acquire_shared().  In order to acquire the lock in an exclusive way threads
401   can call acquire_exclusive().
402
403   The lock prevents starvation but does not guarantee that threads will acquire
404   the shared lock in the order they queued for it, just that they will
405   eventually do so.
406
407   @type name: string
408   @ivar name: the name of the lock
409
410   """
411   __slots__ = [
412     "__active_shr_c",
413     "__inactive_shr_c",
414     "__deleted",
415     "__exc",
416     "__lock",
417     "__pending",
418     "__shr",
419     "name",
420     ]
421
422   __condition_class = PipeCondition
423
424   def __init__(self, name):
425     """Construct a new SharedLock.
426
427     @param name: the name of the lock
428
429     """
430     object.__init__(self)
431
432     self.name = name
433
434     # Internal lock
435     self.__lock = threading.Lock()
436
437     # Queue containing waiting acquires
438     self.__pending = []
439
440     # Active and inactive conditions for shared locks
441     self.__active_shr_c = self.__condition_class(self.__lock)
442     self.__inactive_shr_c = self.__condition_class(self.__lock)
443
444     # Current lock holders
445     self.__shr = set()
446     self.__exc = None
447
448     # is this lock in the deleted state?
449     self.__deleted = False
450
451   def __check_deleted(self):
452     """Raises an exception if the lock has been deleted.
453
454     """
455     if self.__deleted:
456       raise errors.LockError("Deleted lock %s" % self.name)
457
458   def __is_sharer(self):
459     """Is the current thread sharing the lock at this time?
460
461     """
462     return threading.currentThread() in self.__shr
463
464   def __is_exclusive(self):
465     """Is the current thread holding the lock exclusively at this time?
466
467     """
468     return threading.currentThread() == self.__exc
469
470   def __is_owned(self, shared=-1):
471     """Is the current thread somehow owning the lock at this time?
472
473     This is a private version of the function, which presumes you're holding
474     the internal lock.
475
476     """
477     if shared < 0:
478       return self.__is_sharer() or self.__is_exclusive()
479     elif shared:
480       return self.__is_sharer()
481     else:
482       return self.__is_exclusive()
483
484   def _is_owned(self, shared=-1):
485     """Is the current thread somehow owning the lock at this time?
486
487     @param shared:
488         - < 0: check for any type of ownership (default)
489         - 0: check for exclusive ownership
490         - > 0: check for shared ownership
491
492     """
493     self.__lock.acquire()
494     try:
495       return self.__is_owned(shared=shared)
496     finally:
497       self.__lock.release()
498
499   def _count_pending(self):
500     """Returns the number of pending acquires.
501
502     @rtype: int
503
504     """
505     self.__lock.acquire()
506     try:
507       return len(self.__pending)
508     finally:
509       self.__lock.release()
510
511   def __do_acquire(self, shared):
512     """Actually acquire the lock.
513
514     """
515     if shared:
516       self.__shr.add(threading.currentThread())
517     else:
518       self.__exc = threading.currentThread()
519
520   def __can_acquire(self, shared):
521     """Determine whether lock can be acquired.
522
523     """
524     if shared:
525       return self.__exc is None
526     else:
527       return len(self.__shr) == 0 and self.__exc is None
528
529   def __is_on_top(self, cond):
530     """Checks whether the passed condition is on top of the queue.
531
532     The caller must make sure the queue isn't empty.
533
534     """
535     return self.__pending[0] == cond
536
537   def __acquire_unlocked(self, shared, timeout):
538     """Acquire a shared lock.
539
540     @param shared: whether to acquire in shared mode; by default an
541         exclusive lock will be acquired
542     @param timeout: maximum waiting time before giving up
543
544     """
545     self.__check_deleted()
546
547     # We cannot acquire the lock if we already have it
548     assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
549                                    " %s" % self.name)
550
551     # Check whether someone else holds the lock or there are pending acquires.
552     if not self.__pending and self.__can_acquire(shared):
553       # Apparently not, can acquire lock directly.
554       self.__do_acquire(shared)
555       return True
556
557     if shared:
558       wait_condition = self.__active_shr_c
559
560       # Check if we're not yet in the queue
561       if wait_condition not in self.__pending:
562         self.__pending.append(wait_condition)
563     else:
564       wait_condition = self.__condition_class(self.__lock)
565       # Always add to queue
566       self.__pending.append(wait_condition)
567
568     try:
569       # Wait until we become the topmost acquire in the queue or the timeout
570       # expires.
571       while not (self.__is_on_top(wait_condition) and
572                  self.__can_acquire(shared)):
573         # Wait for notification
574         wait_condition.wait(timeout)
575         self.__check_deleted()
576
577         # A lot of code assumes blocking acquires always succeed. Loop
578         # internally for that case.
579         if timeout is not None:
580           break
581
582       if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
583         self.__do_acquire(shared)
584         return True
585     finally:
586       # Remove condition from queue if there are no more waiters
587       if not wait_condition.has_waiting() and not self.__deleted:
588         self.__pending.remove(wait_condition)
589
590     return False
591
592   def acquire(self, shared=0, timeout=None, test_notify=None):
593     """Acquire a shared lock.
594
595     @type shared: integer (0/1) used as a boolean
596     @param shared: whether to acquire in shared mode; by default an
597         exclusive lock will be acquired
598     @type timeout: float
599     @param timeout: maximum waiting time before giving up
600     @type test_notify: callable or None
601     @param test_notify: Special callback function for unittesting
602
603     """
604     self.__lock.acquire()
605     try:
606       # We already got the lock, notify now
607       if __debug__ and callable(test_notify):
608         test_notify()
609
610       return self.__acquire_unlocked(shared, timeout)
611     finally:
612       self.__lock.release()
613
614   def release(self):
615     """Release a Shared Lock.
616
617     You must have acquired the lock, either in shared or in exclusive mode,
618     before calling this function.
619
620     """
621     self.__lock.acquire()
622     try:
623       assert self.__is_exclusive() or self.__is_sharer(), \
624         "Cannot release non-owned lock"
625
626       # Autodetect release type
627       if self.__is_exclusive():
628         self.__exc = None
629       else:
630         self.__shr.remove(threading.currentThread())
631
632       # Notify topmost condition in queue
633       if self.__pending:
634         first_condition = self.__pending[0]
635         first_condition.notifyAll()
636
637         if first_condition == self.__active_shr_c:
638           self.__active_shr_c = self.__inactive_shr_c
639           self.__inactive_shr_c = first_condition
640
641     finally:
642       self.__lock.release()
643
644   def delete(self, timeout=None):
645     """Delete a Shared Lock.
646
647     This operation will declare the lock for removal. First the lock will be
648     acquired in exclusive mode if you don't already own it, then the lock
649     will be put in a state where any future and pending acquire() fail.
650
651     @type timeout: float
652     @param timeout: maximum waiting time before giving up
653
654     """
655     self.__lock.acquire()
656     try:
657       assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
658
659       self.__check_deleted()
660
661       # The caller is allowed to hold the lock exclusively already.
662       acquired = self.__is_exclusive()
663
664       if not acquired:
665         acquired = self.__acquire_unlocked(0, timeout)
666
667         assert self.__is_exclusive() and not self.__is_sharer(), \
668           "Lock wasn't acquired in exclusive mode"
669
670       if acquired:
671         self.__deleted = True
672         self.__exc = None
673
674         # Notify all acquires. They'll throw an error.
675         while self.__pending:
676           self.__pending.pop().notifyAll()
677
678       return acquired
679     finally:
680       self.__lock.release()
681
682   def _release_save(self):
683     shared = self.__is_sharer()
684     self.release()
685     return shared
686
687   def _acquire_restore(self, shared):
688     self.acquire(shared=shared)
689
690
691 # Whenever we want to acquire a full LockSet we pass None as the value
692 # to acquire.  Hide this behind this nicely named constant.
693 ALL_SET = None
694
695
696 class _AcquireTimeout(Exception):
697   """Internal exception to abort an acquire on a timeout.
698
699   """
700
701
702 class LockSet:
703   """Implements a set of locks.
704
705   This abstraction implements a set of shared locks for the same resource type,
706   distinguished by name. The user can lock a subset of the resources and the
707   LockSet will take care of acquiring the locks always in the same order, thus
708   preventing deadlock.
709
710   All the locks needed in the same set must be acquired together, though.
711
712   @type name: string
713   @ivar name: the name of the lockset
714
715   """
716   def __init__(self, members, name):
717     """Constructs a new LockSet.
718
719     @type members: list of strings
720     @param members: initial members of the set
721
722     """
723     assert members is not None, "members parameter is not a list"
724     self.name = name
725
726     # Used internally to guarantee coherency.
727     self.__lock = SharedLock(name)
728
729     # The lockdict indexes the relationship name -> lock
730     # The order-of-locking is implied by the alphabetical order of names
731     self.__lockdict = {}
732
733     for mname in members:
734       self.__lockdict[mname] = SharedLock("%s/%s" % (name, mname))
735
736     # The owner dict contains the set of locks each thread owns. For
737     # performance each thread can access its own key without a global lock on
738     # this structure. It is paramount though that *no* other type of access is
739     # done to this structure (eg. no looping over its keys). *_owner helper
740     # function are defined to guarantee access is correct, but in general never
741     # do anything different than __owners[threading.currentThread()], or there
742     # will be trouble.
743     self.__owners = {}
744
745   def _is_owned(self):
746     """Is the current thread a current level owner?"""
747     return threading.currentThread() in self.__owners
748
749   def _add_owned(self, name=None):
750     """Note the current thread owns the given lock"""
751     if name is None:
752       if not self._is_owned():
753         self.__owners[threading.currentThread()] = set()
754     else:
755       if self._is_owned():
756         self.__owners[threading.currentThread()].add(name)
757       else:
758         self.__owners[threading.currentThread()] = set([name])
759
760   def _del_owned(self, name=None):
761     """Note the current thread owns the given lock"""
762
763     assert not (name is None and self.__lock._is_owned()), \
764            "Cannot hold internal lock when deleting owner status"
765
766     if name is not None:
767       self.__owners[threading.currentThread()].remove(name)
768
769     # Only remove the key if we don't hold the set-lock as well
770     if (not self.__lock._is_owned() and
771         not self.__owners[threading.currentThread()]):
772       del self.__owners[threading.currentThread()]
773
774   def _list_owned(self):
775     """Get the set of resource names owned by the current thread"""
776     if self._is_owned():
777       return self.__owners[threading.currentThread()].copy()
778     else:
779       return set()
780
781   def _release_and_delete_owned(self):
782     """Release and delete all resources owned by the current thread"""
783     for lname in self._list_owned():
784       lock = self.__lockdict[lname]
785       if lock._is_owned():
786         lock.release()
787       self._del_owned(name=lname)
788
789   def __names(self):
790     """Return the current set of names.
791
792     Only call this function while holding __lock and don't iterate on the
793     result after releasing the lock.
794
795     """
796     return self.__lockdict.keys()
797
798   def _names(self):
799     """Return a copy of the current set of elements.
800
801     Used only for debugging purposes.
802
803     """
804     # If we don't already own the set-level lock acquired
805     # we'll get it and note we need to release it later.
806     release_lock = False
807     if not self.__lock._is_owned():
808       release_lock = True
809       self.__lock.acquire(shared=1)
810     try:
811       result = self.__names()
812     finally:
813       if release_lock:
814         self.__lock.release()
815     return set(result)
816
817   def acquire(self, names, timeout=None, shared=0, test_notify=None):
818     """Acquire a set of resource locks.
819
820     @type names: list of strings (or string)
821     @param names: the names of the locks which shall be acquired
822         (special lock names, or instance/node names)
823     @type shared: integer (0/1) used as a boolean
824     @param shared: whether to acquire in shared mode; by default an
825         exclusive lock will be acquired
826     @type timeout: float or None
827     @param timeout: Maximum time to acquire all locks
828     @type test_notify: callable or None
829     @param test_notify: Special callback function for unittesting
830
831     @return: Set of all locks successfully acquired or None in case of timeout
832
833     @raise errors.LockError: when any lock we try to acquire has
834         been deleted before we succeed. In this case none of the
835         locks requested will be acquired.
836
837     """
838     assert timeout is None or timeout >= 0.0
839
840     # Check we don't already own locks at this level
841     assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
842                                   " (lockset %s)" % self.name)
843
844     # We need to keep track of how long we spent waiting for a lock. The
845     # timeout passed to this function is over all lock acquires.
846     running_timeout = RunningTimeout(timeout, False)
847
848     try:
849       if names is not None:
850         # Support passing in a single resource to acquire rather than many
851         if isinstance(names, basestring):
852           names = [names]
853
854         return self.__acquire_inner(names, False, shared,
855                                     running_timeout.Remaining, test_notify)
856
857       else:
858         # If no names are given acquire the whole set by not letting new names
859         # being added before we release, and getting the current list of names.
860         # Some of them may then be deleted later, but we'll cope with this.
861         #
862         # We'd like to acquire this lock in a shared way, as it's nice if
863         # everybody else can use the instances at the same time. If are
864         # acquiring them exclusively though they won't be able to do this
865         # anyway, though, so we'll get the list lock exclusively as well in
866         # order to be able to do add() on the set while owning it.
867         if not self.__lock.acquire(shared=shared,
868                                    timeout=running_timeout.Remaining()):
869           raise _AcquireTimeout()
870         try:
871           # note we own the set-lock
872           self._add_owned()
873
874           return self.__acquire_inner(self.__names(), True, shared,
875                                       running_timeout.Remaining, test_notify)
876         except:
877           # We shouldn't have problems adding the lock to the owners list, but
878           # if we did we'll try to release this lock and re-raise exception.
879           # Of course something is going to be really wrong, after this.
880           self.__lock.release()
881           self._del_owned()
882           raise
883
884     except _AcquireTimeout:
885       return None
886
887   def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
888     """Inner logic for acquiring a number of locks.
889
890     @param names: Names of the locks to be acquired
891     @param want_all: Whether all locks in the set should be acquired
892     @param shared: Whether to acquire in shared mode
893     @param timeout_fn: Function returning remaining timeout
894     @param test_notify: Special callback function for unittesting
895
896     """
897     acquire_list = []
898
899     # First we look the locks up on __lockdict. We have no way of being sure
900     # they will still be there after, but this makes it a lot faster should
901     # just one of them be the already wrong. Using a sorted sequence to prevent
902     # deadlocks.
903     for lname in sorted(utils.UniqueSequence(names)):
904       try:
905         lock = self.__lockdict[lname] # raises KeyError if lock is not there
906       except KeyError:
907         if want_all:
908           # We are acquiring all the set, it doesn't matter if this particular
909           # element is not there anymore.
910           continue
911
912         raise errors.LockError("Non-existing lock %s in set %s" %
913                                (lname, self.name))
914
915       acquire_list.append((lname, lock))
916
917     # This will hold the locknames we effectively acquired.
918     acquired = set()
919
920     try:
921       # Now acquire_list contains a sorted list of resources and locks we
922       # want.  In order to get them we loop on this (private) list and
923       # acquire() them.  We gave no real guarantee they will still exist till
924       # this is done but .acquire() itself is safe and will alert us if the
925       # lock gets deleted.
926       for (lname, lock) in acquire_list:
927         if __debug__ and callable(test_notify):
928           test_notify_fn = lambda: test_notify(lname)
929         else:
930           test_notify_fn = None
931
932         timeout = timeout_fn()
933
934         try:
935           # raises LockError if the lock was deleted
936           acq_success = lock.acquire(shared=shared, timeout=timeout,
937                                      test_notify=test_notify_fn)
938         except errors.LockError:
939           if want_all:
940             # We are acquiring all the set, it doesn't matter if this
941             # particular element is not there anymore.
942             continue
943
944           raise errors.LockError("Non-existing lock %s in set %s" %
945                                  (lname, self.name))
946
947         if not acq_success:
948           # Couldn't get lock or timeout occurred
949           if timeout is None:
950             # This shouldn't happen as SharedLock.acquire(timeout=None) is
951             # blocking.
952             raise errors.LockError("Failed to get lock %s (set %s)" %
953                                    (lname, self.name))
954
955           raise _AcquireTimeout()
956
957         try:
958           # now the lock cannot be deleted, we have it!
959           self._add_owned(name=lname)
960           acquired.add(lname)
961
962         except:
963           # We shouldn't have problems adding the lock to the owners list, but
964           # if we did we'll try to release this lock and re-raise exception.
965           # Of course something is going to be really wrong after this.
966           if lock._is_owned():
967             lock.release()
968           raise
969
970     except:
971       # Release all owned locks
972       self._release_and_delete_owned()
973       raise
974
975     return acquired
976
977   def release(self, names=None):
978     """Release a set of resource locks, at the same level.
979
980     You must have acquired the locks, either in shared or in exclusive mode,
981     before releasing them.
982
983     @type names: list of strings, or None
984     @param names: the names of the locks which shall be released
985         (defaults to all the locks acquired at that level).
986
987     """
988     assert self._is_owned(), ("release() on lock set %s while not owner" %
989                               self.name)
990
991     # Support passing in a single resource to release rather than many
992     if isinstance(names, basestring):
993       names = [names]
994
995     if names is None:
996       names = self._list_owned()
997     else:
998       names = set(names)
999       assert self._list_owned().issuperset(names), (
1000                "release() on unheld resources %s (set %s)" %
1001                (names.difference(self._list_owned()), self.name))
1002
1003     # First of all let's release the "all elements" lock, if set.
1004     # After this 'add' can work again
1005     if self.__lock._is_owned():
1006       self.__lock.release()
1007       self._del_owned()
1008
1009     for lockname in names:
1010       # If we are sure the lock doesn't leave __lockdict without being
1011       # exclusively held we can do this...
1012       self.__lockdict[lockname].release()
1013       self._del_owned(name=lockname)
1014
1015   def add(self, names, acquired=0, shared=0):
1016     """Add a new set of elements to the set
1017
1018     @type names: list of strings
1019     @param names: names of the new elements to add
1020     @type acquired: integer (0/1) used as a boolean
1021     @param acquired: pre-acquire the new resource?
1022     @type shared: integer (0/1) used as a boolean
1023     @param shared: is the pre-acquisition shared?
1024
1025     """
1026     # Check we don't already own locks at this level
1027     assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1028       ("Cannot add locks if the set %s is only partially owned, or shared" %
1029        self.name)
1030
1031     # Support passing in a single resource to add rather than many
1032     if isinstance(names, basestring):
1033       names = [names]
1034
1035     # If we don't already own the set-level lock acquired in an exclusive way
1036     # we'll get it and note we need to release it later.
1037     release_lock = False
1038     if not self.__lock._is_owned():
1039       release_lock = True
1040       self.__lock.acquire()
1041
1042     try:
1043       invalid_names = set(self.__names()).intersection(names)
1044       if invalid_names:
1045         # This must be an explicit raise, not an assert, because assert is
1046         # turned off when using optimization, and this can happen because of
1047         # concurrency even if the user doesn't want it.
1048         raise errors.LockError("duplicate add(%s) on lockset %s" %
1049                                (invalid_names, self.name))
1050
1051       for lockname in names:
1052         lock = SharedLock("%s/%s" % (self.name, lockname))
1053
1054         if acquired:
1055           lock.acquire(shared=shared)
1056           # now the lock cannot be deleted, we have it!
1057           try:
1058             self._add_owned(name=lockname)
1059           except:
1060             # We shouldn't have problems adding the lock to the owners list,
1061             # but if we did we'll try to release this lock and re-raise
1062             # exception.  Of course something is going to be really wrong,
1063             # after this.  On the other hand the lock hasn't been added to the
1064             # __lockdict yet so no other threads should be pending on it. This
1065             # release is just a safety measure.
1066             lock.release()
1067             raise
1068
1069         self.__lockdict[lockname] = lock
1070
1071     finally:
1072       # Only release __lock if we were not holding it previously.
1073       if release_lock:
1074         self.__lock.release()
1075
1076     return True
1077
1078   def remove(self, names):
1079     """Remove elements from the lock set.
1080
1081     You can either not hold anything in the lockset or already hold a superset
1082     of the elements you want to delete, exclusively.
1083
1084     @type names: list of strings
1085     @param names: names of the resource to remove.
1086
1087     @return: a list of locks which we removed; the list is always
1088         equal to the names list if we were holding all the locks
1089         exclusively
1090
1091     """
1092     # Support passing in a single resource to remove rather than many
1093     if isinstance(names, basestring):
1094       names = [names]
1095
1096     # If we own any subset of this lock it must be a superset of what we want
1097     # to delete. The ownership must also be exclusive, but that will be checked
1098     # by the lock itself.
1099     assert not self._is_owned() or self._list_owned().issuperset(names), (
1100       "remove() on acquired lockset %s while not owning all elements" %
1101       self.name)
1102
1103     removed = []
1104
1105     for lname in names:
1106       # Calling delete() acquires the lock exclusively if we don't already own
1107       # it, and causes all pending and subsequent lock acquires to fail. It's
1108       # fine to call it out of order because delete() also implies release(),
1109       # and the assertion above guarantees that if we either already hold
1110       # everything we want to delete, or we hold none.
1111       try:
1112         self.__lockdict[lname].delete()
1113         removed.append(lname)
1114       except (KeyError, errors.LockError):
1115         # This cannot happen if we were already holding it, verify:
1116         assert not self._is_owned(), ("remove failed while holding lockset %s"
1117                                       % self.name)
1118       else:
1119         # If no LockError was raised we are the ones who deleted the lock.
1120         # This means we can safely remove it from lockdict, as any further or
1121         # pending delete() or acquire() will fail (and nobody can have the lock
1122         # since before our call to delete()).
1123         #
1124         # This is done in an else clause because if the exception was thrown
1125         # it's the job of the one who actually deleted it.
1126         del self.__lockdict[lname]
1127         # And let's remove it from our private list if we owned it.
1128         if self._is_owned():
1129           self._del_owned(name=lname)
1130
1131     return removed
1132
1133
1134 # Locking levels, must be acquired in increasing order.
1135 # Current rules are:
1136 #   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1137 #   acquired before performing any operation, either in shared or in exclusive
1138 #   mode. acquiring the BGL in exclusive mode is discouraged and should be
1139 #   avoided.
1140 #   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1141 #   If you need more than one node, or more than one instance, acquire them at
1142 #   the same time.
1143 LEVEL_CLUSTER = 0
1144 LEVEL_INSTANCE = 1
1145 LEVEL_NODE = 2
1146
1147 LEVELS = [LEVEL_CLUSTER,
1148           LEVEL_INSTANCE,
1149           LEVEL_NODE]
1150
1151 # Lock levels which are modifiable
1152 LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1153
1154 LEVEL_NAMES = {
1155   LEVEL_CLUSTER: "cluster",
1156   LEVEL_INSTANCE: "instance",
1157   LEVEL_NODE: "node",
1158   }
1159
1160 # Constant for the big ganeti lock
1161 BGL = 'BGL'
1162
1163
1164 class GanetiLockManager:
1165   """The Ganeti Locking Library
1166
1167   The purpose of this small library is to manage locking for ganeti clusters
1168   in a central place, while at the same time doing dynamic checks against
1169   possible deadlocks. It will also make it easier to transition to a different
1170   lock type should we migrate away from python threads.
1171
1172   """
1173   _instance = None
1174
1175   def __init__(self, nodes=None, instances=None):
1176     """Constructs a new GanetiLockManager object.
1177
1178     There should be only a GanetiLockManager object at any time, so this
1179     function raises an error if this is not the case.
1180
1181     @param nodes: list of node names
1182     @param instances: list of instance names
1183
1184     """
1185     assert self.__class__._instance is None, \
1186            "double GanetiLockManager instance"
1187
1188     self.__class__._instance = self
1189
1190     # The keyring contains all the locks, at their level and in the correct
1191     # locking order.
1192     self.__keyring = {
1193       LEVEL_CLUSTER: LockSet([BGL], "bgl lockset"),
1194       LEVEL_NODE: LockSet(nodes, "nodes lockset"),
1195       LEVEL_INSTANCE: LockSet(instances, "instances lockset"),
1196     }
1197
1198   def _names(self, level):
1199     """List the lock names at the given level.
1200
1201     This can be used for debugging/testing purposes.
1202
1203     @param level: the level whose list of locks to get
1204
1205     """
1206     assert level in LEVELS, "Invalid locking level %s" % level
1207     return self.__keyring[level]._names()
1208
1209   def _is_owned(self, level):
1210     """Check whether we are owning locks at the given level
1211
1212     """
1213     return self.__keyring[level]._is_owned()
1214
1215   is_owned = _is_owned
1216
1217   def _list_owned(self, level):
1218     """Get the set of owned locks at the given level
1219
1220     """
1221     return self.__keyring[level]._list_owned()
1222
1223   def _upper_owned(self, level):
1224     """Check that we don't own any lock at a level greater than the given one.
1225
1226     """
1227     # This way of checking only works if LEVELS[i] = i, which we check for in
1228     # the test cases.
1229     return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1230
1231   def _BGL_owned(self): # pylint: disable-msg=C0103
1232     """Check if the current thread owns the BGL.
1233
1234     Both an exclusive or a shared acquisition work.
1235
1236     """
1237     return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1238
1239   @staticmethod
1240   def _contains_BGL(level, names): # pylint: disable-msg=C0103
1241     """Check if the level contains the BGL.
1242
1243     Check if acting on the given level and set of names will change
1244     the status of the Big Ganeti Lock.
1245
1246     """
1247     return level == LEVEL_CLUSTER and (names is None or BGL in names)
1248
1249   def acquire(self, level, names, timeout=None, shared=0):
1250     """Acquire a set of resource locks, at the same level.
1251
1252     @type level: member of locking.LEVELS
1253     @param level: the level at which the locks shall be acquired
1254     @type names: list of strings (or string)
1255     @param names: the names of the locks which shall be acquired
1256         (special lock names, or instance/node names)
1257     @type shared: integer (0/1) used as a boolean
1258     @param shared: whether to acquire in shared mode; by default
1259         an exclusive lock will be acquired
1260     @type timeout: float
1261     @param timeout: Maximum time to acquire all locks
1262
1263     """
1264     assert level in LEVELS, "Invalid locking level %s" % level
1265
1266     # Check that we are either acquiring the Big Ganeti Lock or we already own
1267     # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1268     # so even if we've migrated we need to at least share the BGL to be
1269     # compatible with them. Of course if we own the BGL exclusively there's no
1270     # point in acquiring any other lock, unless perhaps we are half way through
1271     # the migration of the current opcode.
1272     assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1273             "You must own the Big Ganeti Lock before acquiring any other")
1274
1275     # Check we don't own locks at the same or upper levels.
1276     assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1277            " while owning some at a greater one")
1278
1279     # Acquire the locks in the set.
1280     return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1281
1282   def release(self, level, names=None):
1283     """Release a set of resource locks, at the same level.
1284
1285     You must have acquired the locks, either in shared or in exclusive
1286     mode, before releasing them.
1287
1288     @type level: member of locking.LEVELS
1289     @param level: the level at which the locks shall be released
1290     @type names: list of strings, or None
1291     @param names: the names of the locks which shall be released
1292         (defaults to all the locks acquired at that level)
1293
1294     """
1295     assert level in LEVELS, "Invalid locking level %s" % level
1296     assert (not self._contains_BGL(level, names) or
1297             not self._upper_owned(LEVEL_CLUSTER)), (
1298             "Cannot release the Big Ganeti Lock while holding something"
1299             " at upper levels (%r)" %
1300             (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1301                               for i in self.__keyring.keys()]), ))
1302
1303     # Release will complain if we don't own the locks already
1304     return self.__keyring[level].release(names)
1305
1306   def add(self, level, names, acquired=0, shared=0):
1307     """Add locks at the specified level.
1308
1309     @type level: member of locking.LEVELS_MOD
1310     @param level: the level at which the locks shall be added
1311     @type names: list of strings
1312     @param names: names of the locks to acquire
1313     @type acquired: integer (0/1) used as a boolean
1314     @param acquired: whether to acquire the newly added locks
1315     @type shared: integer (0/1) used as a boolean
1316     @param shared: whether the acquisition will be shared
1317
1318     """
1319     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1320     assert self._BGL_owned(), ("You must own the BGL before performing other"
1321            " operations")
1322     assert not self._upper_owned(level), ("Cannot add locks at a level"
1323            " while owning some at a greater one")
1324     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1325
1326   def remove(self, level, names):
1327     """Remove locks from the specified level.
1328
1329     You must either already own the locks you are trying to remove
1330     exclusively or not own any lock at an upper level.
1331
1332     @type level: member of locking.LEVELS_MOD
1333     @param level: the level at which the locks shall be removed
1334     @type names: list of strings
1335     @param names: the names of the locks which shall be removed
1336         (special lock names, or instance/node names)
1337
1338     """
1339     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1340     assert self._BGL_owned(), ("You must own the BGL before performing other"
1341            " operations")
1342     # Check we either own the level or don't own anything from here
1343     # up. LockSet.remove() will check the case in which we don't own
1344     # all the needed resources, or we have a shared ownership.
1345     assert self._is_owned(level) or not self._upper_owned(level), (
1346            "Cannot remove locks at a level while not owning it or"
1347            " owning some at a greater one")
1348     return self.__keyring[level].remove(names)