Improve import/export timeout settings
[ganeti-local] / lib / locking.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Module implementing the Ganeti locking code."""
22
23 # pylint: disable-msg=W0212
24
25 # W0212 since e.g. LockSet methods use (a lot) the internals of
26 # SharedLock
27
28 import os
29 import select
30 import threading
31 import time
32 import errno
33 import weakref
34 import logging
35 import heapq
36
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import compat
40
41
42 _EXCLUSIVE_TEXT = "exclusive"
43 _SHARED_TEXT = "shared"
44
45 _DEFAULT_PRIORITY = 0
46
47
48 def ssynchronized(mylock, shared=0):
49   """Shared Synchronization decorator.
50
51   Calls the function holding the given lock, either in exclusive or shared
52   mode. It requires the passed lock to be a SharedLock (or support its
53   semantics).
54
55   @type mylock: lockable object or string
56   @param mylock: lock to acquire or class member name of the lock to acquire
57
58   """
59   def wrap(fn):
60     def sync_function(*args, **kwargs):
61       if isinstance(mylock, basestring):
62         assert args, "cannot ssynchronize on non-class method: self not found"
63         # args[0] is "self"
64         lock = getattr(args[0], mylock)
65       else:
66         lock = mylock
67       lock.acquire(shared=shared)
68       try:
69         return fn(*args, **kwargs)
70       finally:
71         lock.release()
72     return sync_function
73   return wrap
74
75
76 class RunningTimeout(object):
77   """Class to calculate remaining timeout when doing several operations.
78
79   """
80   __slots__ = [
81     "_allow_negative",
82     "_start_time",
83     "_time_fn",
84     "_timeout",
85     ]
86
87   def __init__(self, timeout, allow_negative, _time_fn=time.time):
88     """Initializes this class.
89
90     @type timeout: float
91     @param timeout: Timeout duration
92     @type allow_negative: bool
93     @param allow_negative: Whether to return values below zero
94     @param _time_fn: Time function for unittests
95
96     """
97     object.__init__(self)
98
99     if timeout is not None and timeout < 0.0:
100       raise ValueError("Timeout must not be negative")
101
102     self._timeout = timeout
103     self._allow_negative = allow_negative
104     self._time_fn = _time_fn
105
106     self._start_time = None
107
108   def Remaining(self):
109     """Returns the remaining timeout.
110
111     """
112     if self._timeout is None:
113       return None
114
115     # Get start time on first calculation
116     if self._start_time is None:
117       self._start_time = self._time_fn()
118
119     # Calculate remaining time
120     remaining_timeout = self._start_time + self._timeout - self._time_fn()
121
122     if not self._allow_negative:
123       # Ensure timeout is always >= 0
124       return max(0.0, remaining_timeout)
125
126     return remaining_timeout
127
128
129 class _SingleNotifyPipeConditionWaiter(object):
130   """Helper class for SingleNotifyPipeCondition
131
132   """
133   __slots__ = [
134     "_fd",
135     "_poller",
136     ]
137
138   def __init__(self, poller, fd):
139     """Constructor for _SingleNotifyPipeConditionWaiter
140
141     @type poller: select.poll
142     @param poller: Poller object
143     @type fd: int
144     @param fd: File descriptor to wait for
145
146     """
147     object.__init__(self)
148     self._poller = poller
149     self._fd = fd
150
151   def __call__(self, timeout):
152     """Wait for something to happen on the pipe.
153
154     @type timeout: float or None
155     @param timeout: Timeout for waiting (can be None)
156
157     """
158     running_timeout = RunningTimeout(timeout, True)
159
160     while True:
161       remaining_time = running_timeout.Remaining()
162
163       if remaining_time is not None:
164         if remaining_time < 0.0:
165           break
166
167         # Our calculation uses seconds, poll() wants milliseconds
168         remaining_time *= 1000
169
170       try:
171         result = self._poller.poll(remaining_time)
172       except EnvironmentError, err:
173         if err.errno != errno.EINTR:
174           raise
175         result = None
176
177       # Check whether we were notified
178       if result and result[0][0] == self._fd:
179         break
180
181
182 class _BaseCondition(object):
183   """Base class containing common code for conditions.
184
185   Some of this code is taken from python's threading module.
186
187   """
188   __slots__ = [
189     "_lock",
190     "acquire",
191     "release",
192     "_is_owned",
193     "_acquire_restore",
194     "_release_save",
195     ]
196
197   def __init__(self, lock):
198     """Constructor for _BaseCondition.
199
200     @type lock: threading.Lock
201     @param lock: condition base lock
202
203     """
204     object.__init__(self)
205
206     try:
207       self._release_save = lock._release_save
208     except AttributeError:
209       self._release_save = self._base_release_save
210     try:
211       self._acquire_restore = lock._acquire_restore
212     except AttributeError:
213       self._acquire_restore = self._base_acquire_restore
214     try:
215       self._is_owned = lock._is_owned
216     except AttributeError:
217       self._is_owned = self._base_is_owned
218
219     self._lock = lock
220
221     # Export the lock's acquire() and release() methods
222     self.acquire = lock.acquire
223     self.release = lock.release
224
225   def _base_is_owned(self):
226     """Check whether lock is owned by current thread.
227
228     """
229     if self._lock.acquire(0):
230       self._lock.release()
231       return False
232     return True
233
234   def _base_release_save(self):
235     self._lock.release()
236
237   def _base_acquire_restore(self, _):
238     self._lock.acquire()
239
240   def _check_owned(self):
241     """Raise an exception if the current thread doesn't own the lock.
242
243     """
244     if not self._is_owned():
245       raise RuntimeError("cannot work with un-aquired lock")
246
247
248 class SingleNotifyPipeCondition(_BaseCondition):
249   """Condition which can only be notified once.
250
251   This condition class uses pipes and poll, internally, to be able to wait for
252   notification with a timeout, without resorting to polling. It is almost
253   compatible with Python's threading.Condition, with the following differences:
254     - notifyAll can only be called once, and no wait can happen after that
255     - notify is not supported, only notifyAll
256
257   """
258
259   __slots__ = [
260     "_poller",
261     "_read_fd",
262     "_write_fd",
263     "_nwaiters",
264     "_notified",
265     ]
266
267   _waiter_class = _SingleNotifyPipeConditionWaiter
268
269   def __init__(self, lock):
270     """Constructor for SingleNotifyPipeCondition
271
272     """
273     _BaseCondition.__init__(self, lock)
274     self._nwaiters = 0
275     self._notified = False
276     self._read_fd = None
277     self._write_fd = None
278     self._poller = None
279
280   def _check_unnotified(self):
281     """Throws an exception if already notified.
282
283     """
284     if self._notified:
285       raise RuntimeError("cannot use already notified condition")
286
287   def _Cleanup(self):
288     """Cleanup open file descriptors, if any.
289
290     """
291     if self._read_fd is not None:
292       os.close(self._read_fd)
293       self._read_fd = None
294
295     if self._write_fd is not None:
296       os.close(self._write_fd)
297       self._write_fd = None
298     self._poller = None
299
300   def wait(self, timeout=None):
301     """Wait for a notification.
302
303     @type timeout: float or None
304     @param timeout: Waiting timeout (can be None)
305
306     """
307     self._check_owned()
308     self._check_unnotified()
309
310     self._nwaiters += 1
311     try:
312       if self._poller is None:
313         (self._read_fd, self._write_fd) = os.pipe()
314         self._poller = select.poll()
315         self._poller.register(self._read_fd, select.POLLHUP)
316
317       wait_fn = self._waiter_class(self._poller, self._read_fd)
318       state = self._release_save()
319       try:
320         # Wait for notification
321         wait_fn(timeout)
322       finally:
323         # Re-acquire lock
324         self._acquire_restore(state)
325     finally:
326       self._nwaiters -= 1
327       if self._nwaiters == 0:
328         self._Cleanup()
329
330   def notifyAll(self): # pylint: disable-msg=C0103
331     """Close the writing side of the pipe to notify all waiters.
332
333     """
334     self._check_owned()
335     self._check_unnotified()
336     self._notified = True
337     if self._write_fd is not None:
338       os.close(self._write_fd)
339       self._write_fd = None
340
341
342 class PipeCondition(_BaseCondition):
343   """Group-only non-polling condition with counters.
344
345   This condition class uses pipes and poll, internally, to be able to wait for
346   notification with a timeout, without resorting to polling. It is almost
347   compatible with Python's threading.Condition, but only supports notifyAll and
348   non-recursive locks. As an additional features it's able to report whether
349   there are any waiting threads.
350
351   """
352   __slots__ = [
353     "_waiters",
354     "_single_condition",
355     ]
356
357   _single_condition_class = SingleNotifyPipeCondition
358
359   def __init__(self, lock):
360     """Initializes this class.
361
362     """
363     _BaseCondition.__init__(self, lock)
364     self._waiters = set()
365     self._single_condition = self._single_condition_class(self._lock)
366
367   def wait(self, timeout=None):
368     """Wait for a notification.
369
370     @type timeout: float or None
371     @param timeout: Waiting timeout (can be None)
372
373     """
374     self._check_owned()
375
376     # Keep local reference to the pipe. It could be replaced by another thread
377     # notifying while we're waiting.
378     cond = self._single_condition
379
380     self._waiters.add(threading.currentThread())
381     try:
382       cond.wait(timeout)
383     finally:
384       self._check_owned()
385       self._waiters.remove(threading.currentThread())
386
387   def notifyAll(self): # pylint: disable-msg=C0103
388     """Notify all currently waiting threads.
389
390     """
391     self._check_owned()
392     self._single_condition.notifyAll()
393     self._single_condition = self._single_condition_class(self._lock)
394
395   def get_waiting(self):
396     """Returns a list of all waiting threads.
397
398     """
399     self._check_owned()
400
401     return self._waiters
402
403   def has_waiting(self):
404     """Returns whether there are active waiters.
405
406     """
407     self._check_owned()
408
409     return bool(self._waiters)
410
411
412 class _PipeConditionWithMode(PipeCondition):
413   __slots__ = [
414     "shared",
415     ]
416
417   def __init__(self, lock, shared):
418     """Initializes this class.
419
420     """
421     self.shared = shared
422     PipeCondition.__init__(self, lock)
423
424
425 class SharedLock(object):
426   """Implements a shared lock.
427
428   Multiple threads can acquire the lock in a shared way by calling
429   C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
430   threads can call C{acquire(shared=0)}.
431
432   Notes on data structures: C{__pending} contains a priority queue (heapq) of
433   all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
434   ...]}. Each per-priority queue contains a normal in-order list of conditions
435   to be notified when the lock can be acquired. Shared locks are grouped
436   together by priority and the condition for them is stored in
437   C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
438   references for the per-priority queues indexed by priority for faster access.
439
440   @type name: string
441   @ivar name: the name of the lock
442
443   """
444   __slots__ = [
445     "__weakref__",
446     "__deleted",
447     "__exc",
448     "__lock",
449     "__pending",
450     "__pending_by_prio",
451     "__pending_shared",
452     "__shr",
453     "name",
454     ]
455
456   __condition_class = _PipeConditionWithMode
457
458   def __init__(self, name, monitor=None):
459     """Construct a new SharedLock.
460
461     @param name: the name of the lock
462     @type monitor: L{LockMonitor}
463     @param monitor: Lock monitor with which to register
464
465     """
466     object.__init__(self)
467
468     self.name = name
469
470     # Internal lock
471     self.__lock = threading.Lock()
472
473     # Queue containing waiting acquires
474     self.__pending = []
475     self.__pending_by_prio = {}
476     self.__pending_shared = {}
477
478     # Current lock holders
479     self.__shr = set()
480     self.__exc = None
481
482     # is this lock in the deleted state?
483     self.__deleted = False
484
485     # Register with lock monitor
486     if monitor:
487       monitor.RegisterLock(self)
488
489   def GetInfo(self, fields):
490     """Retrieves information for querying locks.
491
492     @type fields: list of strings
493     @param fields: List of fields to return
494
495     """
496     self.__lock.acquire()
497     try:
498       info = []
499
500       # Note: to avoid unintentional race conditions, no references to
501       # modifiable objects should be returned unless they were created in this
502       # function.
503       for fname in fields:
504         if fname == "name":
505           info.append(self.name)
506         elif fname == "mode":
507           if self.__deleted:
508             info.append("deleted")
509             assert not (self.__exc or self.__shr)
510           elif self.__exc:
511             info.append(_EXCLUSIVE_TEXT)
512           elif self.__shr:
513             info.append(_SHARED_TEXT)
514           else:
515             info.append(None)
516         elif fname == "owner":
517           if self.__exc:
518             owner = [self.__exc]
519           else:
520             owner = self.__shr
521
522           if owner:
523             assert not self.__deleted
524             info.append([i.getName() for i in owner])
525           else:
526             info.append(None)
527         elif fname == "pending":
528           data = []
529
530           # Sorting instead of copying and using heaq functions for simplicity
531           for (_, prioqueue) in sorted(self.__pending):
532             for cond in prioqueue:
533               if cond.shared:
534                 mode = _SHARED_TEXT
535               else:
536                 mode = _EXCLUSIVE_TEXT
537
538               # This function should be fast as it runs with the lock held.
539               # Hence not using utils.NiceSort.
540               data.append((mode, sorted(i.getName()
541                                         for i in cond.get_waiting())))
542
543           info.append(data)
544         else:
545           raise errors.OpExecError("Invalid query field '%s'" % fname)
546
547       return info
548     finally:
549       self.__lock.release()
550
551   def __check_deleted(self):
552     """Raises an exception if the lock has been deleted.
553
554     """
555     if self.__deleted:
556       raise errors.LockError("Deleted lock %s" % self.name)
557
558   def __is_sharer(self):
559     """Is the current thread sharing the lock at this time?
560
561     """
562     return threading.currentThread() in self.__shr
563
564   def __is_exclusive(self):
565     """Is the current thread holding the lock exclusively at this time?
566
567     """
568     return threading.currentThread() == self.__exc
569
570   def __is_owned(self, shared=-1):
571     """Is the current thread somehow owning the lock at this time?
572
573     This is a private version of the function, which presumes you're holding
574     the internal lock.
575
576     """
577     if shared < 0:
578       return self.__is_sharer() or self.__is_exclusive()
579     elif shared:
580       return self.__is_sharer()
581     else:
582       return self.__is_exclusive()
583
584   def _is_owned(self, shared=-1):
585     """Is the current thread somehow owning the lock at this time?
586
587     @param shared:
588         - < 0: check for any type of ownership (default)
589         - 0: check for exclusive ownership
590         - > 0: check for shared ownership
591
592     """
593     self.__lock.acquire()
594     try:
595       return self.__is_owned(shared=shared)
596     finally:
597       self.__lock.release()
598
599   def _count_pending(self):
600     """Returns the number of pending acquires.
601
602     @rtype: int
603
604     """
605     self.__lock.acquire()
606     try:
607       return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
608     finally:
609       self.__lock.release()
610
611   def _check_empty(self):
612     """Checks whether there are any pending acquires.
613
614     @rtype: bool
615
616     """
617     self.__lock.acquire()
618     try:
619       # Order is important: __find_first_pending_queue modifies __pending
620       return not (self.__find_first_pending_queue() or
621                   self.__pending or
622                   self.__pending_by_prio or
623                   self.__pending_shared)
624     finally:
625       self.__lock.release()
626
627   def __do_acquire(self, shared):
628     """Actually acquire the lock.
629
630     """
631     if shared:
632       self.__shr.add(threading.currentThread())
633     else:
634       self.__exc = threading.currentThread()
635
636   def __can_acquire(self, shared):
637     """Determine whether lock can be acquired.
638
639     """
640     if shared:
641       return self.__exc is None
642     else:
643       return len(self.__shr) == 0 and self.__exc is None
644
645   def __find_first_pending_queue(self):
646     """Tries to find the topmost queued entry with pending acquires.
647
648     Removes empty entries while going through the list.
649
650     """
651     while self.__pending:
652       (priority, prioqueue) = self.__pending[0]
653
654       if not prioqueue:
655         heapq.heappop(self.__pending)
656         del self.__pending_by_prio[priority]
657         assert priority not in self.__pending_shared
658         continue
659
660       if prioqueue:
661         return prioqueue
662
663     return None
664
665   def __is_on_top(self, cond):
666     """Checks whether the passed condition is on top of the queue.
667
668     The caller must make sure the queue isn't empty.
669
670     """
671     return cond == self.__find_first_pending_queue()[0]
672
673   def __acquire_unlocked(self, shared, timeout, priority):
674     """Acquire a shared lock.
675
676     @param shared: whether to acquire in shared mode; by default an
677         exclusive lock will be acquired
678     @param timeout: maximum waiting time before giving up
679     @type priority: integer
680     @param priority: Priority for acquiring lock
681
682     """
683     self.__check_deleted()
684
685     # We cannot acquire the lock if we already have it
686     assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
687                                    " %s" % self.name)
688
689     # Remove empty entries from queue
690     self.__find_first_pending_queue()
691
692     # Check whether someone else holds the lock or there are pending acquires.
693     if not self.__pending and self.__can_acquire(shared):
694       # Apparently not, can acquire lock directly.
695       self.__do_acquire(shared)
696       return True
697
698     prioqueue = self.__pending_by_prio.get(priority, None)
699
700     if shared:
701       # Try to re-use condition for shared acquire
702       wait_condition = self.__pending_shared.get(priority, None)
703       assert (wait_condition is None or
704               (wait_condition.shared and wait_condition in prioqueue))
705     else:
706       wait_condition = None
707
708     if wait_condition is None:
709       if prioqueue is None:
710         assert priority not in self.__pending_by_prio
711
712         prioqueue = []
713         heapq.heappush(self.__pending, (priority, prioqueue))
714         self.__pending_by_prio[priority] = prioqueue
715
716       wait_condition = self.__condition_class(self.__lock, shared)
717       prioqueue.append(wait_condition)
718
719       if shared:
720         # Keep reference for further shared acquires on same priority. This is
721         # better than trying to find it in the list of pending acquires.
722         assert priority not in self.__pending_shared
723         self.__pending_shared[priority] = wait_condition
724
725     try:
726       # Wait until we become the topmost acquire in the queue or the timeout
727       # expires.
728       # TODO: Decrease timeout with spurious notifications
729       while not (self.__is_on_top(wait_condition) and
730                  self.__can_acquire(shared)):
731         # Wait for notification
732         wait_condition.wait(timeout)
733         self.__check_deleted()
734
735         # A lot of code assumes blocking acquires always succeed. Loop
736         # internally for that case.
737         if timeout is not None:
738           break
739
740       if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
741         self.__do_acquire(shared)
742         return True
743     finally:
744       # Remove condition from queue if there are no more waiters
745       if not wait_condition.has_waiting():
746         prioqueue.remove(wait_condition)
747         if wait_condition.shared:
748           del self.__pending_shared[priority]
749
750     return False
751
752   def acquire(self, shared=0, timeout=None, priority=None,
753               test_notify=None):
754     """Acquire a shared lock.
755
756     @type shared: integer (0/1) used as a boolean
757     @param shared: whether to acquire in shared mode; by default an
758         exclusive lock will be acquired
759     @type timeout: float
760     @param timeout: maximum waiting time before giving up
761     @type priority: integer
762     @param priority: Priority for acquiring lock
763     @type test_notify: callable or None
764     @param test_notify: Special callback function for unittesting
765
766     """
767     if priority is None:
768       priority = _DEFAULT_PRIORITY
769
770     self.__lock.acquire()
771     try:
772       # We already got the lock, notify now
773       if __debug__ and callable(test_notify):
774         test_notify()
775
776       return self.__acquire_unlocked(shared, timeout, priority)
777     finally:
778       self.__lock.release()
779
780   def release(self):
781     """Release a Shared Lock.
782
783     You must have acquired the lock, either in shared or in exclusive mode,
784     before calling this function.
785
786     """
787     self.__lock.acquire()
788     try:
789       assert self.__is_exclusive() or self.__is_sharer(), \
790         "Cannot release non-owned lock"
791
792       # Autodetect release type
793       if self.__is_exclusive():
794         self.__exc = None
795       else:
796         self.__shr.remove(threading.currentThread())
797
798       # Notify topmost condition in queue
799       prioqueue = self.__find_first_pending_queue()
800       if prioqueue:
801         prioqueue[0].notifyAll()
802
803     finally:
804       self.__lock.release()
805
806   def delete(self, timeout=None, priority=None):
807     """Delete a Shared Lock.
808
809     This operation will declare the lock for removal. First the lock will be
810     acquired in exclusive mode if you don't already own it, then the lock
811     will be put in a state where any future and pending acquire() fail.
812
813     @type timeout: float
814     @param timeout: maximum waiting time before giving up
815     @type priority: integer
816     @param priority: Priority for acquiring lock
817
818     """
819     if priority is None:
820       priority = _DEFAULT_PRIORITY
821
822     self.__lock.acquire()
823     try:
824       assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
825
826       self.__check_deleted()
827
828       # The caller is allowed to hold the lock exclusively already.
829       acquired = self.__is_exclusive()
830
831       if not acquired:
832         acquired = self.__acquire_unlocked(0, timeout, priority)
833
834         assert self.__is_exclusive() and not self.__is_sharer(), \
835           "Lock wasn't acquired in exclusive mode"
836
837       if acquired:
838         self.__deleted = True
839         self.__exc = None
840
841         assert not (self.__exc or self.__shr), "Found owner during deletion"
842
843         # Notify all acquires. They'll throw an error.
844         for (_, prioqueue) in self.__pending:
845           for cond in prioqueue:
846             cond.notifyAll()
847
848         assert self.__deleted
849
850       return acquired
851     finally:
852       self.__lock.release()
853
854   def _release_save(self):
855     shared = self.__is_sharer()
856     self.release()
857     return shared
858
859   def _acquire_restore(self, shared):
860     self.acquire(shared=shared)
861
862
863 # Whenever we want to acquire a full LockSet we pass None as the value
864 # to acquire.  Hide this behind this nicely named constant.
865 ALL_SET = None
866
867
868 class _AcquireTimeout(Exception):
869   """Internal exception to abort an acquire on a timeout.
870
871   """
872
873
874 class LockSet:
875   """Implements a set of locks.
876
877   This abstraction implements a set of shared locks for the same resource type,
878   distinguished by name. The user can lock a subset of the resources and the
879   LockSet will take care of acquiring the locks always in the same order, thus
880   preventing deadlock.
881
882   All the locks needed in the same set must be acquired together, though.
883
884   @type name: string
885   @ivar name: the name of the lockset
886
887   """
888   def __init__(self, members, name, monitor=None):
889     """Constructs a new LockSet.
890
891     @type members: list of strings
892     @param members: initial members of the set
893     @type monitor: L{LockMonitor}
894     @param monitor: Lock monitor with which to register member locks
895
896     """
897     assert members is not None, "members parameter is not a list"
898     self.name = name
899
900     # Lock monitor
901     self.__monitor = monitor
902
903     # Used internally to guarantee coherency.
904     self.__lock = SharedLock(name)
905
906     # The lockdict indexes the relationship name -> lock
907     # The order-of-locking is implied by the alphabetical order of names
908     self.__lockdict = {}
909
910     for mname in members:
911       self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
912                                           monitor=monitor)
913
914     # The owner dict contains the set of locks each thread owns. For
915     # performance each thread can access its own key without a global lock on
916     # this structure. It is paramount though that *no* other type of access is
917     # done to this structure (eg. no looping over its keys). *_owner helper
918     # function are defined to guarantee access is correct, but in general never
919     # do anything different than __owners[threading.currentThread()], or there
920     # will be trouble.
921     self.__owners = {}
922
923   def _GetLockName(self, mname):
924     """Returns the name for a member lock.
925
926     """
927     return "%s/%s" % (self.name, mname)
928
929   def _is_owned(self):
930     """Is the current thread a current level owner?"""
931     return threading.currentThread() in self.__owners
932
933   def _add_owned(self, name=None):
934     """Note the current thread owns the given lock"""
935     if name is None:
936       if not self._is_owned():
937         self.__owners[threading.currentThread()] = set()
938     else:
939       if self._is_owned():
940         self.__owners[threading.currentThread()].add(name)
941       else:
942         self.__owners[threading.currentThread()] = set([name])
943
944   def _del_owned(self, name=None):
945     """Note the current thread owns the given lock"""
946
947     assert not (name is None and self.__lock._is_owned()), \
948            "Cannot hold internal lock when deleting owner status"
949
950     if name is not None:
951       self.__owners[threading.currentThread()].remove(name)
952
953     # Only remove the key if we don't hold the set-lock as well
954     if (not self.__lock._is_owned() and
955         not self.__owners[threading.currentThread()]):
956       del self.__owners[threading.currentThread()]
957
958   def _list_owned(self):
959     """Get the set of resource names owned by the current thread"""
960     if self._is_owned():
961       return self.__owners[threading.currentThread()].copy()
962     else:
963       return set()
964
965   def _release_and_delete_owned(self):
966     """Release and delete all resources owned by the current thread"""
967     for lname in self._list_owned():
968       lock = self.__lockdict[lname]
969       if lock._is_owned():
970         lock.release()
971       self._del_owned(name=lname)
972
973   def __names(self):
974     """Return the current set of names.
975
976     Only call this function while holding __lock and don't iterate on the
977     result after releasing the lock.
978
979     """
980     return self.__lockdict.keys()
981
982   def _names(self):
983     """Return a copy of the current set of elements.
984
985     Used only for debugging purposes.
986
987     """
988     # If we don't already own the set-level lock acquired
989     # we'll get it and note we need to release it later.
990     release_lock = False
991     if not self.__lock._is_owned():
992       release_lock = True
993       self.__lock.acquire(shared=1)
994     try:
995       result = self.__names()
996     finally:
997       if release_lock:
998         self.__lock.release()
999     return set(result)
1000
1001   def acquire(self, names, timeout=None, shared=0, priority=None,
1002               test_notify=None):
1003     """Acquire a set of resource locks.
1004
1005     @type names: list of strings (or string)
1006     @param names: the names of the locks which shall be acquired
1007         (special lock names, or instance/node names)
1008     @type shared: integer (0/1) used as a boolean
1009     @param shared: whether to acquire in shared mode; by default an
1010         exclusive lock will be acquired
1011     @type timeout: float or None
1012     @param timeout: Maximum time to acquire all locks
1013     @type priority: integer
1014     @param priority: Priority for acquiring locks
1015     @type test_notify: callable or None
1016     @param test_notify: Special callback function for unittesting
1017
1018     @return: Set of all locks successfully acquired or None in case of timeout
1019
1020     @raise errors.LockError: when any lock we try to acquire has
1021         been deleted before we succeed. In this case none of the
1022         locks requested will be acquired.
1023
1024     """
1025     assert timeout is None or timeout >= 0.0
1026
1027     # Check we don't already own locks at this level
1028     assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
1029                                   " (lockset %s)" % self.name)
1030
1031     if priority is None:
1032       priority = _DEFAULT_PRIORITY
1033
1034     # We need to keep track of how long we spent waiting for a lock. The
1035     # timeout passed to this function is over all lock acquires.
1036     running_timeout = RunningTimeout(timeout, False)
1037
1038     try:
1039       if names is not None:
1040         # Support passing in a single resource to acquire rather than many
1041         if isinstance(names, basestring):
1042           names = [names]
1043
1044         return self.__acquire_inner(names, False, shared, priority,
1045                                     running_timeout.Remaining, test_notify)
1046
1047       else:
1048         # If no names are given acquire the whole set by not letting new names
1049         # being added before we release, and getting the current list of names.
1050         # Some of them may then be deleted later, but we'll cope with this.
1051         #
1052         # We'd like to acquire this lock in a shared way, as it's nice if
1053         # everybody else can use the instances at the same time. If we are
1054         # acquiring them exclusively though they won't be able to do this
1055         # anyway, though, so we'll get the list lock exclusively as well in
1056         # order to be able to do add() on the set while owning it.
1057         if not self.__lock.acquire(shared=shared, priority=priority,
1058                                    timeout=running_timeout.Remaining()):
1059           raise _AcquireTimeout()
1060         try:
1061           # note we own the set-lock
1062           self._add_owned()
1063
1064           return self.__acquire_inner(self.__names(), True, shared, priority,
1065                                       running_timeout.Remaining, test_notify)
1066         except:
1067           # We shouldn't have problems adding the lock to the owners list, but
1068           # if we did we'll try to release this lock and re-raise exception.
1069           # Of course something is going to be really wrong, after this.
1070           self.__lock.release()
1071           self._del_owned()
1072           raise
1073
1074     except _AcquireTimeout:
1075       return None
1076
1077   def __acquire_inner(self, names, want_all, shared, priority,
1078                       timeout_fn, test_notify):
1079     """Inner logic for acquiring a number of locks.
1080
1081     @param names: Names of the locks to be acquired
1082     @param want_all: Whether all locks in the set should be acquired
1083     @param shared: Whether to acquire in shared mode
1084     @param timeout_fn: Function returning remaining timeout
1085     @param priority: Priority for acquiring locks
1086     @param test_notify: Special callback function for unittesting
1087
1088     """
1089     acquire_list = []
1090
1091     # First we look the locks up on __lockdict. We have no way of being sure
1092     # they will still be there after, but this makes it a lot faster should
1093     # just one of them be the already wrong. Using a sorted sequence to prevent
1094     # deadlocks.
1095     for lname in sorted(utils.UniqueSequence(names)):
1096       try:
1097         lock = self.__lockdict[lname] # raises KeyError if lock is not there
1098       except KeyError:
1099         if want_all:
1100           # We are acquiring all the set, it doesn't matter if this particular
1101           # element is not there anymore.
1102           continue
1103
1104         raise errors.LockError("Non-existing lock %s in set %s (it may have"
1105                                " been removed)" % (lname, self.name))
1106
1107       acquire_list.append((lname, lock))
1108
1109     # This will hold the locknames we effectively acquired.
1110     acquired = set()
1111
1112     try:
1113       # Now acquire_list contains a sorted list of resources and locks we
1114       # want.  In order to get them we loop on this (private) list and
1115       # acquire() them.  We gave no real guarantee they will still exist till
1116       # this is done but .acquire() itself is safe and will alert us if the
1117       # lock gets deleted.
1118       for (lname, lock) in acquire_list:
1119         if __debug__ and callable(test_notify):
1120           test_notify_fn = lambda: test_notify(lname)
1121         else:
1122           test_notify_fn = None
1123
1124         timeout = timeout_fn()
1125
1126         try:
1127           # raises LockError if the lock was deleted
1128           acq_success = lock.acquire(shared=shared, timeout=timeout,
1129                                      priority=priority,
1130                                      test_notify=test_notify_fn)
1131         except errors.LockError:
1132           if want_all:
1133             # We are acquiring all the set, it doesn't matter if this
1134             # particular element is not there anymore.
1135             continue
1136
1137           raise errors.LockError("Non-existing lock %s in set %s (it may"
1138                                  " have been removed)" % (lname, self.name))
1139
1140         if not acq_success:
1141           # Couldn't get lock or timeout occurred
1142           if timeout is None:
1143             # This shouldn't happen as SharedLock.acquire(timeout=None) is
1144             # blocking.
1145             raise errors.LockError("Failed to get lock %s (set %s)" %
1146                                    (lname, self.name))
1147
1148           raise _AcquireTimeout()
1149
1150         try:
1151           # now the lock cannot be deleted, we have it!
1152           self._add_owned(name=lname)
1153           acquired.add(lname)
1154
1155         except:
1156           # We shouldn't have problems adding the lock to the owners list, but
1157           # if we did we'll try to release this lock and re-raise exception.
1158           # Of course something is going to be really wrong after this.
1159           if lock._is_owned():
1160             lock.release()
1161           raise
1162
1163     except:
1164       # Release all owned locks
1165       self._release_and_delete_owned()
1166       raise
1167
1168     return acquired
1169
1170   def release(self, names=None):
1171     """Release a set of resource locks, at the same level.
1172
1173     You must have acquired the locks, either in shared or in exclusive mode,
1174     before releasing them.
1175
1176     @type names: list of strings, or None
1177     @param names: the names of the locks which shall be released
1178         (defaults to all the locks acquired at that level).
1179
1180     """
1181     assert self._is_owned(), ("release() on lock set %s while not owner" %
1182                               self.name)
1183
1184     # Support passing in a single resource to release rather than many
1185     if isinstance(names, basestring):
1186       names = [names]
1187
1188     if names is None:
1189       names = self._list_owned()
1190     else:
1191       names = set(names)
1192       assert self._list_owned().issuperset(names), (
1193                "release() on unheld resources %s (set %s)" %
1194                (names.difference(self._list_owned()), self.name))
1195
1196     # First of all let's release the "all elements" lock, if set.
1197     # After this 'add' can work again
1198     if self.__lock._is_owned():
1199       self.__lock.release()
1200       self._del_owned()
1201
1202     for lockname in names:
1203       # If we are sure the lock doesn't leave __lockdict without being
1204       # exclusively held we can do this...
1205       self.__lockdict[lockname].release()
1206       self._del_owned(name=lockname)
1207
1208   def add(self, names, acquired=0, shared=0):
1209     """Add a new set of elements to the set
1210
1211     @type names: list of strings
1212     @param names: names of the new elements to add
1213     @type acquired: integer (0/1) used as a boolean
1214     @param acquired: pre-acquire the new resource?
1215     @type shared: integer (0/1) used as a boolean
1216     @param shared: is the pre-acquisition shared?
1217
1218     """
1219     # Check we don't already own locks at this level
1220     assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1221       ("Cannot add locks if the set %s is only partially owned, or shared" %
1222        self.name)
1223
1224     # Support passing in a single resource to add rather than many
1225     if isinstance(names, basestring):
1226       names = [names]
1227
1228     # If we don't already own the set-level lock acquired in an exclusive way
1229     # we'll get it and note we need to release it later.
1230     release_lock = False
1231     if not self.__lock._is_owned():
1232       release_lock = True
1233       self.__lock.acquire()
1234
1235     try:
1236       invalid_names = set(self.__names()).intersection(names)
1237       if invalid_names:
1238         # This must be an explicit raise, not an assert, because assert is
1239         # turned off when using optimization, and this can happen because of
1240         # concurrency even if the user doesn't want it.
1241         raise errors.LockError("duplicate add(%s) on lockset %s" %
1242                                (invalid_names, self.name))
1243
1244       for lockname in names:
1245         lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1246
1247         if acquired:
1248           # No need for priority or timeout here as this lock has just been
1249           # created
1250           lock.acquire(shared=shared)
1251           # now the lock cannot be deleted, we have it!
1252           try:
1253             self._add_owned(name=lockname)
1254           except:
1255             # We shouldn't have problems adding the lock to the owners list,
1256             # but if we did we'll try to release this lock and re-raise
1257             # exception.  Of course something is going to be really wrong,
1258             # after this.  On the other hand the lock hasn't been added to the
1259             # __lockdict yet so no other threads should be pending on it. This
1260             # release is just a safety measure.
1261             lock.release()
1262             raise
1263
1264         self.__lockdict[lockname] = lock
1265
1266     finally:
1267       # Only release __lock if we were not holding it previously.
1268       if release_lock:
1269         self.__lock.release()
1270
1271     return True
1272
1273   def remove(self, names):
1274     """Remove elements from the lock set.
1275
1276     You can either not hold anything in the lockset or already hold a superset
1277     of the elements you want to delete, exclusively.
1278
1279     @type names: list of strings
1280     @param names: names of the resource to remove.
1281
1282     @return: a list of locks which we removed; the list is always
1283         equal to the names list if we were holding all the locks
1284         exclusively
1285
1286     """
1287     # Support passing in a single resource to remove rather than many
1288     if isinstance(names, basestring):
1289       names = [names]
1290
1291     # If we own any subset of this lock it must be a superset of what we want
1292     # to delete. The ownership must also be exclusive, but that will be checked
1293     # by the lock itself.
1294     assert not self._is_owned() or self._list_owned().issuperset(names), (
1295       "remove() on acquired lockset %s while not owning all elements" %
1296       self.name)
1297
1298     removed = []
1299
1300     for lname in names:
1301       # Calling delete() acquires the lock exclusively if we don't already own
1302       # it, and causes all pending and subsequent lock acquires to fail. It's
1303       # fine to call it out of order because delete() also implies release(),
1304       # and the assertion above guarantees that if we either already hold
1305       # everything we want to delete, or we hold none.
1306       try:
1307         self.__lockdict[lname].delete()
1308         removed.append(lname)
1309       except (KeyError, errors.LockError):
1310         # This cannot happen if we were already holding it, verify:
1311         assert not self._is_owned(), ("remove failed while holding lockset %s"
1312                                       % self.name)
1313       else:
1314         # If no LockError was raised we are the ones who deleted the lock.
1315         # This means we can safely remove it from lockdict, as any further or
1316         # pending delete() or acquire() will fail (and nobody can have the lock
1317         # since before our call to delete()).
1318         #
1319         # This is done in an else clause because if the exception was thrown
1320         # it's the job of the one who actually deleted it.
1321         del self.__lockdict[lname]
1322         # And let's remove it from our private list if we owned it.
1323         if self._is_owned():
1324           self._del_owned(name=lname)
1325
1326     return removed
1327
1328
1329 # Locking levels, must be acquired in increasing order.
1330 # Current rules are:
1331 #   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1332 #   acquired before performing any operation, either in shared or in exclusive
1333 #   mode. acquiring the BGL in exclusive mode is discouraged and should be
1334 #   avoided.
1335 #   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1336 #   If you need more than one node, or more than one instance, acquire them at
1337 #   the same time.
1338 LEVEL_CLUSTER = 0
1339 LEVEL_INSTANCE = 1
1340 LEVEL_NODE = 2
1341
1342 LEVELS = [LEVEL_CLUSTER,
1343           LEVEL_INSTANCE,
1344           LEVEL_NODE]
1345
1346 # Lock levels which are modifiable
1347 LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1348
1349 LEVEL_NAMES = {
1350   LEVEL_CLUSTER: "cluster",
1351   LEVEL_INSTANCE: "instance",
1352   LEVEL_NODE: "node",
1353   }
1354
1355 # Constant for the big ganeti lock
1356 BGL = 'BGL'
1357
1358
1359 class GanetiLockManager:
1360   """The Ganeti Locking Library
1361
1362   The purpose of this small library is to manage locking for ganeti clusters
1363   in a central place, while at the same time doing dynamic checks against
1364   possible deadlocks. It will also make it easier to transition to a different
1365   lock type should we migrate away from python threads.
1366
1367   """
1368   _instance = None
1369
1370   def __init__(self, nodes, instances):
1371     """Constructs a new GanetiLockManager object.
1372
1373     There should be only a GanetiLockManager object at any time, so this
1374     function raises an error if this is not the case.
1375
1376     @param nodes: list of node names
1377     @param instances: list of instance names
1378
1379     """
1380     assert self.__class__._instance is None, \
1381            "double GanetiLockManager instance"
1382
1383     self.__class__._instance = self
1384
1385     self._monitor = LockMonitor()
1386
1387     # The keyring contains all the locks, at their level and in the correct
1388     # locking order.
1389     self.__keyring = {
1390       LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1391       LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1392       LEVEL_INSTANCE: LockSet(instances, "instances",
1393                               monitor=self._monitor),
1394       }
1395
1396   def QueryLocks(self, fields, sync):
1397     """Queries information from all locks.
1398
1399     See L{LockMonitor.QueryLocks}.
1400
1401     """
1402     return self._monitor.QueryLocks(fields, sync)
1403
1404   def _names(self, level):
1405     """List the lock names at the given level.
1406
1407     This can be used for debugging/testing purposes.
1408
1409     @param level: the level whose list of locks to get
1410
1411     """
1412     assert level in LEVELS, "Invalid locking level %s" % level
1413     return self.__keyring[level]._names()
1414
1415   def _is_owned(self, level):
1416     """Check whether we are owning locks at the given level
1417
1418     """
1419     return self.__keyring[level]._is_owned()
1420
1421   is_owned = _is_owned
1422
1423   def _list_owned(self, level):
1424     """Get the set of owned locks at the given level
1425
1426     """
1427     return self.__keyring[level]._list_owned()
1428
1429   def _upper_owned(self, level):
1430     """Check that we don't own any lock at a level greater than the given one.
1431
1432     """
1433     # This way of checking only works if LEVELS[i] = i, which we check for in
1434     # the test cases.
1435     return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1436
1437   def _BGL_owned(self): # pylint: disable-msg=C0103
1438     """Check if the current thread owns the BGL.
1439
1440     Both an exclusive or a shared acquisition work.
1441
1442     """
1443     return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1444
1445   @staticmethod
1446   def _contains_BGL(level, names): # pylint: disable-msg=C0103
1447     """Check if the level contains the BGL.
1448
1449     Check if acting on the given level and set of names will change
1450     the status of the Big Ganeti Lock.
1451
1452     """
1453     return level == LEVEL_CLUSTER and (names is None or BGL in names)
1454
1455   def acquire(self, level, names, timeout=None, shared=0, priority=None):
1456     """Acquire a set of resource locks, at the same level.
1457
1458     @type level: member of locking.LEVELS
1459     @param level: the level at which the locks shall be acquired
1460     @type names: list of strings (or string)
1461     @param names: the names of the locks which shall be acquired
1462         (special lock names, or instance/node names)
1463     @type shared: integer (0/1) used as a boolean
1464     @param shared: whether to acquire in shared mode; by default
1465         an exclusive lock will be acquired
1466     @type timeout: float
1467     @param timeout: Maximum time to acquire all locks
1468     @type priority: integer
1469     @param priority: Priority for acquiring lock
1470
1471     """
1472     assert level in LEVELS, "Invalid locking level %s" % level
1473
1474     # Check that we are either acquiring the Big Ganeti Lock or we already own
1475     # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1476     # so even if we've migrated we need to at least share the BGL to be
1477     # compatible with them. Of course if we own the BGL exclusively there's no
1478     # point in acquiring any other lock, unless perhaps we are half way through
1479     # the migration of the current opcode.
1480     assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1481             "You must own the Big Ganeti Lock before acquiring any other")
1482
1483     # Check we don't own locks at the same or upper levels.
1484     assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1485            " while owning some at a greater one")
1486
1487     # Acquire the locks in the set.
1488     return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1489                                          priority=priority)
1490
1491   def release(self, level, names=None):
1492     """Release a set of resource locks, at the same level.
1493
1494     You must have acquired the locks, either in shared or in exclusive
1495     mode, before releasing them.
1496
1497     @type level: member of locking.LEVELS
1498     @param level: the level at which the locks shall be released
1499     @type names: list of strings, or None
1500     @param names: the names of the locks which shall be released
1501         (defaults to all the locks acquired at that level)
1502
1503     """
1504     assert level in LEVELS, "Invalid locking level %s" % level
1505     assert (not self._contains_BGL(level, names) or
1506             not self._upper_owned(LEVEL_CLUSTER)), (
1507             "Cannot release the Big Ganeti Lock while holding something"
1508             " at upper levels (%r)" %
1509             (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1510                               for i in self.__keyring.keys()]), ))
1511
1512     # Release will complain if we don't own the locks already
1513     return self.__keyring[level].release(names)
1514
1515   def add(self, level, names, acquired=0, shared=0):
1516     """Add locks at the specified level.
1517
1518     @type level: member of locking.LEVELS_MOD
1519     @param level: the level at which the locks shall be added
1520     @type names: list of strings
1521     @param names: names of the locks to acquire
1522     @type acquired: integer (0/1) used as a boolean
1523     @param acquired: whether to acquire the newly added locks
1524     @type shared: integer (0/1) used as a boolean
1525     @param shared: whether the acquisition will be shared
1526
1527     """
1528     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1529     assert self._BGL_owned(), ("You must own the BGL before performing other"
1530            " operations")
1531     assert not self._upper_owned(level), ("Cannot add locks at a level"
1532            " while owning some at a greater one")
1533     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1534
1535   def remove(self, level, names):
1536     """Remove locks from the specified level.
1537
1538     You must either already own the locks you are trying to remove
1539     exclusively or not own any lock at an upper level.
1540
1541     @type level: member of locking.LEVELS_MOD
1542     @param level: the level at which the locks shall be removed
1543     @type names: list of strings
1544     @param names: the names of the locks which shall be removed
1545         (special lock names, or instance/node names)
1546
1547     """
1548     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1549     assert self._BGL_owned(), ("You must own the BGL before performing other"
1550            " operations")
1551     # Check we either own the level or don't own anything from here
1552     # up. LockSet.remove() will check the case in which we don't own
1553     # all the needed resources, or we have a shared ownership.
1554     assert self._is_owned(level) or not self._upper_owned(level), (
1555            "Cannot remove locks at a level while not owning it or"
1556            " owning some at a greater one")
1557     return self.__keyring[level].remove(names)
1558
1559
1560 class LockMonitor(object):
1561   _LOCK_ATTR = "_lock"
1562
1563   def __init__(self):
1564     """Initializes this class.
1565
1566     """
1567     self._lock = SharedLock("LockMonitor")
1568
1569     # Tracked locks. Weak references are used to avoid issues with circular
1570     # references and deletion.
1571     self._locks = weakref.WeakKeyDictionary()
1572
1573   @ssynchronized(_LOCK_ATTR)
1574   def RegisterLock(self, lock):
1575     """Registers a new lock.
1576
1577     """
1578     logging.debug("Registering lock %s", lock.name)
1579     assert lock not in self._locks, "Duplicate lock registration"
1580     assert not compat.any(lock.name == i.name for i in self._locks.keys()), \
1581            "Found duplicate lock name"
1582     self._locks[lock] = None
1583
1584   @ssynchronized(_LOCK_ATTR)
1585   def _GetLockInfo(self, fields):
1586     """Get information from all locks while the monitor lock is held.
1587
1588     """
1589     result = {}
1590
1591     for lock in self._locks.keys():
1592       assert lock.name not in result, "Found duplicate lock name"
1593       result[lock.name] = lock.GetInfo(fields)
1594
1595     return result
1596
1597   def QueryLocks(self, fields, sync):
1598     """Queries information from all locks.
1599
1600     @type fields: list of strings
1601     @param fields: List of fields to return
1602     @type sync: boolean
1603     @param sync: Whether to operate in synchronous mode
1604
1605     """
1606     if sync:
1607       raise NotImplementedError("Synchronous queries are not implemented")
1608
1609     # Get all data without sorting
1610     result = self._GetLockInfo(fields)
1611
1612     # Sort by name
1613     return [result[name] for name in utils.NiceSort(result.keys())]