locking: Simplify condition
[ganeti-local] / lib / locking.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Module implementing the Ganeti locking code."""
22
23 # pylint: disable=W0212
24
25 # W0212 since e.g. LockSet methods use (a lot) the internals of
26 # SharedLock
27
28 import os
29 import select
30 import threading
31 import errno
32 import weakref
33 import logging
34 import heapq
35 import itertools
36 import time
37
38 from ganeti import errors
39 from ganeti import utils
40 from ganeti import compat
41 from ganeti import query
42
43
44 _EXCLUSIVE_TEXT = "exclusive"
45 _SHARED_TEXT = "shared"
46 _DELETED_TEXT = "deleted"
47
48 _DEFAULT_PRIORITY = 0
49
50 #: Minimum timeout required to consider scheduling a pending acquisition
51 #: (seconds)
52 _LOCK_ACQUIRE_MIN_TIMEOUT = (1.0 / 1000)
53
54
55 def ssynchronized(mylock, shared=0):
56   """Shared Synchronization decorator.
57
58   Calls the function holding the given lock, either in exclusive or shared
59   mode. It requires the passed lock to be a SharedLock (or support its
60   semantics).
61
62   @type mylock: lockable object or string
63   @param mylock: lock to acquire or class member name of the lock to acquire
64
65   """
66   def wrap(fn):
67     def sync_function(*args, **kwargs):
68       if isinstance(mylock, basestring):
69         assert args, "cannot ssynchronize on non-class method: self not found"
70         # args[0] is "self"
71         lock = getattr(args[0], mylock)
72       else:
73         lock = mylock
74       lock.acquire(shared=shared)
75       try:
76         return fn(*args, **kwargs)
77       finally:
78         lock.release()
79     return sync_function
80   return wrap
81
82
83 class _SingleNotifyPipeConditionWaiter(object):
84   """Helper class for SingleNotifyPipeCondition
85
86   """
87   __slots__ = [
88     "_fd",
89     "_poller",
90     ]
91
92   def __init__(self, poller, fd):
93     """Constructor for _SingleNotifyPipeConditionWaiter
94
95     @type poller: select.poll
96     @param poller: Poller object
97     @type fd: int
98     @param fd: File descriptor to wait for
99
100     """
101     object.__init__(self)
102     self._poller = poller
103     self._fd = fd
104
105   def __call__(self, timeout):
106     """Wait for something to happen on the pipe.
107
108     @type timeout: float or None
109     @param timeout: Timeout for waiting (can be None)
110
111     """
112     running_timeout = utils.RunningTimeout(timeout, True)
113
114     while True:
115       remaining_time = running_timeout.Remaining()
116
117       if remaining_time is not None:
118         if remaining_time < 0.0:
119           break
120
121         # Our calculation uses seconds, poll() wants milliseconds
122         remaining_time *= 1000
123
124       try:
125         result = self._poller.poll(remaining_time)
126       except EnvironmentError, err:
127         if err.errno != errno.EINTR:
128           raise
129         result = None
130
131       # Check whether we were notified
132       if result and result[0][0] == self._fd:
133         break
134
135
136 class _BaseCondition(object):
137   """Base class containing common code for conditions.
138
139   Some of this code is taken from python's threading module.
140
141   """
142   __slots__ = [
143     "_lock",
144     "acquire",
145     "release",
146     "_is_owned",
147     "_acquire_restore",
148     "_release_save",
149     ]
150
151   def __init__(self, lock):
152     """Constructor for _BaseCondition.
153
154     @type lock: threading.Lock
155     @param lock: condition base lock
156
157     """
158     object.__init__(self)
159
160     try:
161       self._release_save = lock._release_save
162     except AttributeError:
163       self._release_save = self._base_release_save
164     try:
165       self._acquire_restore = lock._acquire_restore
166     except AttributeError:
167       self._acquire_restore = self._base_acquire_restore
168     try:
169       self._is_owned = lock.is_owned
170     except AttributeError:
171       self._is_owned = self._base_is_owned
172
173     self._lock = lock
174
175     # Export the lock's acquire() and release() methods
176     self.acquire = lock.acquire
177     self.release = lock.release
178
179   def _base_is_owned(self):
180     """Check whether lock is owned by current thread.
181
182     """
183     if self._lock.acquire(0):
184       self._lock.release()
185       return False
186     return True
187
188   def _base_release_save(self):
189     self._lock.release()
190
191   def _base_acquire_restore(self, _):
192     self._lock.acquire()
193
194   def _check_owned(self):
195     """Raise an exception if the current thread doesn't own the lock.
196
197     """
198     if not self._is_owned():
199       raise RuntimeError("cannot work with un-aquired lock")
200
201
202 class SingleNotifyPipeCondition(_BaseCondition):
203   """Condition which can only be notified once.
204
205   This condition class uses pipes and poll, internally, to be able to wait for
206   notification with a timeout, without resorting to polling. It is almost
207   compatible with Python's threading.Condition, with the following differences:
208     - notifyAll can only be called once, and no wait can happen after that
209     - notify is not supported, only notifyAll
210
211   """
212
213   __slots__ = [
214     "_poller",
215     "_read_fd",
216     "_write_fd",
217     "_nwaiters",
218     "_notified",
219     ]
220
221   _waiter_class = _SingleNotifyPipeConditionWaiter
222
223   def __init__(self, lock):
224     """Constructor for SingleNotifyPipeCondition
225
226     """
227     _BaseCondition.__init__(self, lock)
228     self._nwaiters = 0
229     self._notified = False
230     self._read_fd = None
231     self._write_fd = None
232     self._poller = None
233
234   def _check_unnotified(self):
235     """Throws an exception if already notified.
236
237     """
238     if self._notified:
239       raise RuntimeError("cannot use already notified condition")
240
241   def _Cleanup(self):
242     """Cleanup open file descriptors, if any.
243
244     """
245     if self._read_fd is not None:
246       os.close(self._read_fd)
247       self._read_fd = None
248
249     if self._write_fd is not None:
250       os.close(self._write_fd)
251       self._write_fd = None
252     self._poller = None
253
254   def wait(self, timeout):
255     """Wait for a notification.
256
257     @type timeout: float or None
258     @param timeout: Waiting timeout (can be None)
259
260     """
261     self._check_owned()
262     self._check_unnotified()
263
264     self._nwaiters += 1
265     try:
266       if self._poller is None:
267         (self._read_fd, self._write_fd) = os.pipe()
268         self._poller = select.poll()
269         self._poller.register(self._read_fd, select.POLLHUP)
270
271       wait_fn = self._waiter_class(self._poller, self._read_fd)
272       state = self._release_save()
273       try:
274         # Wait for notification
275         wait_fn(timeout)
276       finally:
277         # Re-acquire lock
278         self._acquire_restore(state)
279     finally:
280       self._nwaiters -= 1
281       if self._nwaiters == 0:
282         self._Cleanup()
283
284   def notifyAll(self): # pylint: disable=C0103
285     """Close the writing side of the pipe to notify all waiters.
286
287     """
288     self._check_owned()
289     self._check_unnotified()
290     self._notified = True
291     if self._write_fd is not None:
292       os.close(self._write_fd)
293       self._write_fd = None
294
295
296 class PipeCondition(_BaseCondition):
297   """Group-only non-polling condition with counters.
298
299   This condition class uses pipes and poll, internally, to be able to wait for
300   notification with a timeout, without resorting to polling. It is almost
301   compatible with Python's threading.Condition, but only supports notifyAll and
302   non-recursive locks. As an additional features it's able to report whether
303   there are any waiting threads.
304
305   """
306   __slots__ = [
307     "_waiters",
308     "_single_condition",
309     ]
310
311   _single_condition_class = SingleNotifyPipeCondition
312
313   def __init__(self, lock):
314     """Initializes this class.
315
316     """
317     _BaseCondition.__init__(self, lock)
318     self._waiters = set()
319     self._single_condition = self._single_condition_class(self._lock)
320
321   def wait(self, timeout):
322     """Wait for a notification.
323
324     @type timeout: float or None
325     @param timeout: Waiting timeout (can be None)
326
327     """
328     self._check_owned()
329
330     # Keep local reference to the pipe. It could be replaced by another thread
331     # notifying while we're waiting.
332     cond = self._single_condition
333
334     self._waiters.add(threading.currentThread())
335     try:
336       cond.wait(timeout)
337     finally:
338       self._check_owned()
339       self._waiters.remove(threading.currentThread())
340
341   def notifyAll(self): # pylint: disable=C0103
342     """Notify all currently waiting threads.
343
344     """
345     self._check_owned()
346     self._single_condition.notifyAll()
347     self._single_condition = self._single_condition_class(self._lock)
348
349   def get_waiting(self):
350     """Returns a list of all waiting threads.
351
352     """
353     self._check_owned()
354
355     return self._waiters
356
357   def has_waiting(self):
358     """Returns whether there are active waiters.
359
360     """
361     self._check_owned()
362
363     return bool(self._waiters)
364
365   def __repr__(self):
366     return ("<%s.%s waiters=%s at %#x>" %
367             (self.__class__.__module__, self.__class__.__name__,
368              self._waiters, id(self)))
369
370
371 class _PipeConditionWithMode(PipeCondition):
372   __slots__ = [
373     "shared",
374     ]
375
376   def __init__(self, lock, shared):
377     """Initializes this class.
378
379     """
380     self.shared = shared
381     PipeCondition.__init__(self, lock)
382
383
384 class SharedLock(object):
385   """Implements a shared lock.
386
387   Multiple threads can acquire the lock in a shared way by calling
388   C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
389   threads can call C{acquire(shared=0)}.
390
391   Notes on data structures: C{__pending} contains a priority queue (heapq) of
392   all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
393   ...]}. Each per-priority queue contains a normal in-order list of conditions
394   to be notified when the lock can be acquired. Shared locks are grouped
395   together by priority and the condition for them is stored in
396   C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
397   references for the per-priority queues indexed by priority for faster access.
398
399   @type name: string
400   @ivar name: the name of the lock
401
402   """
403   __slots__ = [
404     "__weakref__",
405     "__deleted",
406     "__exc",
407     "__lock",
408     "__pending",
409     "__pending_by_prio",
410     "__pending_shared",
411     "__shr",
412     "__time_fn",
413     "name",
414     ]
415
416   __condition_class = _PipeConditionWithMode
417
418   def __init__(self, name, monitor=None, _time_fn=time.time):
419     """Construct a new SharedLock.
420
421     @param name: the name of the lock
422     @type monitor: L{LockMonitor}
423     @param monitor: Lock monitor with which to register
424
425     """
426     object.__init__(self)
427
428     self.name = name
429
430     # Used for unittesting
431     self.__time_fn = _time_fn
432
433     # Internal lock
434     self.__lock = threading.Lock()
435
436     # Queue containing waiting acquires
437     self.__pending = []
438     self.__pending_by_prio = {}
439     self.__pending_shared = {}
440
441     # Current lock holders
442     self.__shr = set()
443     self.__exc = None
444
445     # is this lock in the deleted state?
446     self.__deleted = False
447
448     # Register with lock monitor
449     if monitor:
450       logging.debug("Adding lock %s to monitor", name)
451       monitor.RegisterLock(self)
452
453   def __repr__(self):
454     return ("<%s.%s name=%s at %#x>" %
455             (self.__class__.__module__, self.__class__.__name__,
456              self.name, id(self)))
457
458   def GetLockInfo(self, requested):
459     """Retrieves information for querying locks.
460
461     @type requested: set
462     @param requested: Requested information, see C{query.LQ_*}
463
464     """
465     self.__lock.acquire()
466     try:
467       # Note: to avoid unintentional race conditions, no references to
468       # modifiable objects should be returned unless they were created in this
469       # function.
470       mode = None
471       owner_names = None
472
473       if query.LQ_MODE in requested:
474         if self.__deleted:
475           mode = _DELETED_TEXT
476           assert not (self.__exc or self.__shr)
477         elif self.__exc:
478           mode = _EXCLUSIVE_TEXT
479         elif self.__shr:
480           mode = _SHARED_TEXT
481
482       # Current owner(s) are wanted
483       if query.LQ_OWNER in requested:
484         if self.__exc:
485           owner = [self.__exc]
486         else:
487           owner = self.__shr
488
489         if owner:
490           assert not self.__deleted
491           owner_names = [i.getName() for i in owner]
492
493       # Pending acquires are wanted
494       if query.LQ_PENDING in requested:
495         pending = []
496
497         # Sorting instead of copying and using heaq functions for simplicity
498         for (_, prioqueue) in sorted(self.__pending):
499           for cond in prioqueue:
500             if cond.shared:
501               pendmode = _SHARED_TEXT
502             else:
503               pendmode = _EXCLUSIVE_TEXT
504
505             # List of names will be sorted in L{query._GetLockPending}
506             pending.append((pendmode, [i.getName()
507                                        for i in cond.get_waiting()]))
508       else:
509         pending = None
510
511       return [(self.name, mode, owner_names, pending)]
512     finally:
513       self.__lock.release()
514
515   def __check_deleted(self):
516     """Raises an exception if the lock has been deleted.
517
518     """
519     if self.__deleted:
520       raise errors.LockError("Deleted lock %s" % self.name)
521
522   def __is_sharer(self):
523     """Is the current thread sharing the lock at this time?
524
525     """
526     return threading.currentThread() in self.__shr
527
528   def __is_exclusive(self):
529     """Is the current thread holding the lock exclusively at this time?
530
531     """
532     return threading.currentThread() == self.__exc
533
534   def __is_owned(self, shared=-1):
535     """Is the current thread somehow owning the lock at this time?
536
537     This is a private version of the function, which presumes you're holding
538     the internal lock.
539
540     """
541     if shared < 0:
542       return self.__is_sharer() or self.__is_exclusive()
543     elif shared:
544       return self.__is_sharer()
545     else:
546       return self.__is_exclusive()
547
548   def is_owned(self, shared=-1):
549     """Is the current thread somehow owning the lock at this time?
550
551     @param shared:
552         - < 0: check for any type of ownership (default)
553         - 0: check for exclusive ownership
554         - > 0: check for shared ownership
555
556     """
557     self.__lock.acquire()
558     try:
559       return self.__is_owned(shared=shared)
560     finally:
561       self.__lock.release()
562
563   #: Necessary to remain compatible with threading.Condition, which tries to
564   #: retrieve a locks' "_is_owned" attribute
565   _is_owned = is_owned
566
567   def _count_pending(self):
568     """Returns the number of pending acquires.
569
570     @rtype: int
571
572     """
573     self.__lock.acquire()
574     try:
575       return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
576     finally:
577       self.__lock.release()
578
579   def _check_empty(self):
580     """Checks whether there are any pending acquires.
581
582     @rtype: bool
583
584     """
585     self.__lock.acquire()
586     try:
587       # Order is important: __find_first_pending_queue modifies __pending
588       (_, prioqueue) = self.__find_first_pending_queue()
589
590       return not (prioqueue or
591                   self.__pending or
592                   self.__pending_by_prio or
593                   self.__pending_shared)
594     finally:
595       self.__lock.release()
596
597   def __do_acquire(self, shared):
598     """Actually acquire the lock.
599
600     """
601     if shared:
602       self.__shr.add(threading.currentThread())
603     else:
604       self.__exc = threading.currentThread()
605
606   def __can_acquire(self, shared):
607     """Determine whether lock can be acquired.
608
609     """
610     if shared:
611       return self.__exc is None
612     else:
613       return len(self.__shr) == 0 and self.__exc is None
614
615   def __find_first_pending_queue(self):
616     """Tries to find the topmost queued entry with pending acquires.
617
618     Removes empty entries while going through the list.
619
620     """
621     while self.__pending:
622       (priority, prioqueue) = self.__pending[0]
623
624       if prioqueue:
625         return (priority, prioqueue)
626
627       # Remove empty queue
628       heapq.heappop(self.__pending)
629       del self.__pending_by_prio[priority]
630       assert priority not in self.__pending_shared
631
632     return (None, None)
633
634   def __is_on_top(self, cond):
635     """Checks whether the passed condition is on top of the queue.
636
637     The caller must make sure the queue isn't empty.
638
639     """
640     (_, prioqueue) = self.__find_first_pending_queue()
641
642     return cond == prioqueue[0]
643
644   def __acquire_unlocked(self, shared, timeout, priority):
645     """Acquire a shared lock.
646
647     @param shared: whether to acquire in shared mode; by default an
648         exclusive lock will be acquired
649     @param timeout: maximum waiting time before giving up
650     @type priority: integer
651     @param priority: Priority for acquiring lock
652
653     """
654     self.__check_deleted()
655
656     # We cannot acquire the lock if we already have it
657     assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
658                                    " %s" % self.name)
659
660     # Remove empty entries from queue
661     self.__find_first_pending_queue()
662
663     # Check whether someone else holds the lock or there are pending acquires.
664     if not self.__pending and self.__can_acquire(shared):
665       # Apparently not, can acquire lock directly.
666       self.__do_acquire(shared)
667       return True
668
669     # The lock couldn't be acquired right away, so if a timeout is given and is
670     # considered too short, return right away as scheduling a pending
671     # acquisition is quite expensive
672     if timeout is not None and timeout < _LOCK_ACQUIRE_MIN_TIMEOUT:
673       return False
674
675     prioqueue = self.__pending_by_prio.get(priority, None)
676
677     if shared:
678       # Try to re-use condition for shared acquire
679       wait_condition = self.__pending_shared.get(priority, None)
680       assert (wait_condition is None or
681               (wait_condition.shared and wait_condition in prioqueue))
682     else:
683       wait_condition = None
684
685     if wait_condition is None:
686       if prioqueue is None:
687         assert priority not in self.__pending_by_prio
688
689         prioqueue = []
690         heapq.heappush(self.__pending, (priority, prioqueue))
691         self.__pending_by_prio[priority] = prioqueue
692
693       wait_condition = self.__condition_class(self.__lock, shared)
694       prioqueue.append(wait_condition)
695
696       if shared:
697         # Keep reference for further shared acquires on same priority. This is
698         # better than trying to find it in the list of pending acquires.
699         assert priority not in self.__pending_shared
700         self.__pending_shared[priority] = wait_condition
701
702     wait_start = self.__time_fn()
703     acquired = False
704
705     try:
706       # Wait until we become the topmost acquire in the queue or the timeout
707       # expires.
708       while True:
709         if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
710           self.__do_acquire(shared)
711           acquired = True
712           break
713
714         # A lot of code assumes blocking acquires always succeed, therefore we
715         # can never return False for a blocking acquire
716         if (timeout is not None and
717             utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)):
718           break
719
720         # Wait for notification
721         wait_condition.wait(timeout)
722         self.__check_deleted()
723     finally:
724       # Remove condition from queue if there are no more waiters
725       if not wait_condition.has_waiting():
726         prioqueue.remove(wait_condition)
727         if wait_condition.shared:
728           # Remove from list of shared acquires if it wasn't while releasing
729           # (e.g. on lock deletion)
730           self.__pending_shared.pop(priority, None)
731
732     return acquired
733
734   def acquire(self, shared=0, timeout=None, priority=None,
735               test_notify=None):
736     """Acquire a shared lock.
737
738     @type shared: integer (0/1) used as a boolean
739     @param shared: whether to acquire in shared mode; by default an
740         exclusive lock will be acquired
741     @type timeout: float
742     @param timeout: maximum waiting time before giving up
743     @type priority: integer
744     @param priority: Priority for acquiring lock
745     @type test_notify: callable or None
746     @param test_notify: Special callback function for unittesting
747
748     """
749     if priority is None:
750       priority = _DEFAULT_PRIORITY
751
752     self.__lock.acquire()
753     try:
754       # We already got the lock, notify now
755       if __debug__ and callable(test_notify):
756         test_notify()
757
758       return self.__acquire_unlocked(shared, timeout, priority)
759     finally:
760       self.__lock.release()
761
762   def downgrade(self):
763     """Changes the lock mode from exclusive to shared.
764
765     Pending acquires in shared mode on the same priority will go ahead.
766
767     """
768     self.__lock.acquire()
769     try:
770       assert self.__is_owned(), "Lock must be owned"
771
772       if self.__is_exclusive():
773         # Do nothing if the lock is already acquired in shared mode
774         self.__exc = None
775         self.__do_acquire(1)
776
777         # Important: pending shared acquires should only jump ahead if there
778         # was a transition from exclusive to shared, otherwise an owner of a
779         # shared lock can keep calling this function to push incoming shared
780         # acquires
781         (priority, prioqueue) = self.__find_first_pending_queue()
782         if prioqueue:
783           # Is there a pending shared acquire on this priority?
784           cond = self.__pending_shared.pop(priority, None)
785           if cond:
786             assert cond.shared
787             assert cond in prioqueue
788
789             # Ensure shared acquire is on top of queue
790             if len(prioqueue) > 1:
791               prioqueue.remove(cond)
792               prioqueue.insert(0, cond)
793
794             # Notify
795             cond.notifyAll()
796
797       assert not self.__is_exclusive()
798       assert self.__is_sharer()
799
800       return True
801     finally:
802       self.__lock.release()
803
804   def release(self):
805     """Release a Shared Lock.
806
807     You must have acquired the lock, either in shared or in exclusive mode,
808     before calling this function.
809
810     """
811     self.__lock.acquire()
812     try:
813       assert self.__is_exclusive() or self.__is_sharer(), \
814         "Cannot release non-owned lock"
815
816       # Autodetect release type
817       if self.__is_exclusive():
818         self.__exc = None
819         notify = True
820       else:
821         self.__shr.remove(threading.currentThread())
822         notify = not self.__shr
823
824       # Notify topmost condition in queue if there are no owners left (for
825       # shared locks)
826       if notify:
827         self.__notify_topmost()
828     finally:
829       self.__lock.release()
830
831   def __notify_topmost(self):
832     """Notifies topmost condition in queue of pending acquires.
833
834     """
835     (priority, prioqueue) = self.__find_first_pending_queue()
836     if prioqueue:
837       cond = prioqueue[0]
838       cond.notifyAll()
839       if cond.shared:
840         # Prevent further shared acquires from sneaking in while waiters are
841         # notified
842         self.__pending_shared.pop(priority, None)
843
844   def _notify_topmost(self):
845     """Exported version of L{__notify_topmost}.
846
847     """
848     self.__lock.acquire()
849     try:
850       return self.__notify_topmost()
851     finally:
852       self.__lock.release()
853
854   def delete(self, timeout=None, priority=None):
855     """Delete a Shared Lock.
856
857     This operation will declare the lock for removal. First the lock will be
858     acquired in exclusive mode if you don't already own it, then the lock
859     will be put in a state where any future and pending acquire() fail.
860
861     @type timeout: float
862     @param timeout: maximum waiting time before giving up
863     @type priority: integer
864     @param priority: Priority for acquiring lock
865
866     """
867     if priority is None:
868       priority = _DEFAULT_PRIORITY
869
870     self.__lock.acquire()
871     try:
872       assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
873
874       self.__check_deleted()
875
876       # The caller is allowed to hold the lock exclusively already.
877       acquired = self.__is_exclusive()
878
879       if not acquired:
880         acquired = self.__acquire_unlocked(0, timeout, priority)
881
882       if acquired:
883         assert self.__is_exclusive() and not self.__is_sharer(), \
884           "Lock wasn't acquired in exclusive mode"
885
886         self.__deleted = True
887         self.__exc = None
888
889         assert not (self.__exc or self.__shr), "Found owner during deletion"
890
891         # Notify all acquires. They'll throw an error.
892         for (_, prioqueue) in self.__pending:
893           for cond in prioqueue:
894             cond.notifyAll()
895
896         assert self.__deleted
897
898       return acquired
899     finally:
900       self.__lock.release()
901
902   def _release_save(self):
903     shared = self.__is_sharer()
904     self.release()
905     return shared
906
907   def _acquire_restore(self, shared):
908     self.acquire(shared=shared)
909
910
911 # Whenever we want to acquire a full LockSet we pass None as the value
912 # to acquire.  Hide this behind this nicely named constant.
913 ALL_SET = None
914
915
916 class _AcquireTimeout(Exception):
917   """Internal exception to abort an acquire on a timeout.
918
919   """
920
921
922 class LockSet:
923   """Implements a set of locks.
924
925   This abstraction implements a set of shared locks for the same resource type,
926   distinguished by name. The user can lock a subset of the resources and the
927   LockSet will take care of acquiring the locks always in the same order, thus
928   preventing deadlock.
929
930   All the locks needed in the same set must be acquired together, though.
931
932   @type name: string
933   @ivar name: the name of the lockset
934
935   """
936   def __init__(self, members, name, monitor=None):
937     """Constructs a new LockSet.
938
939     @type members: list of strings
940     @param members: initial members of the set
941     @type monitor: L{LockMonitor}
942     @param monitor: Lock monitor with which to register member locks
943
944     """
945     assert members is not None, "members parameter is not a list"
946     self.name = name
947
948     # Lock monitor
949     self.__monitor = monitor
950
951     # Used internally to guarantee coherency
952     self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
953
954     # The lockdict indexes the relationship name -> lock
955     # The order-of-locking is implied by the alphabetical order of names
956     self.__lockdict = {}
957
958     for mname in members:
959       self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
960                                           monitor=monitor)
961
962     # The owner dict contains the set of locks each thread owns. For
963     # performance each thread can access its own key without a global lock on
964     # this structure. It is paramount though that *no* other type of access is
965     # done to this structure (eg. no looping over its keys). *_owner helper
966     # function are defined to guarantee access is correct, but in general never
967     # do anything different than __owners[threading.currentThread()], or there
968     # will be trouble.
969     self.__owners = {}
970
971   def _GetLockName(self, mname):
972     """Returns the name for a member lock.
973
974     """
975     return "%s/%s" % (self.name, mname)
976
977   def _get_lock(self):
978     """Returns the lockset-internal lock.
979
980     """
981     return self.__lock
982
983   def _get_lockdict(self):
984     """Returns the lockset-internal lock dictionary.
985
986     Accessing this structure is only safe in single-thread usage or when the
987     lockset-internal lock is held.
988
989     """
990     return self.__lockdict
991
992   def is_owned(self):
993     """Is the current thread a current level owner?
994
995     @note: Use L{check_owned} to check if a specific lock is held
996
997     """
998     return threading.currentThread() in self.__owners
999
1000   def check_owned(self, names, shared=-1):
1001     """Check if locks are owned in a specific mode.
1002
1003     @type names: sequence or string
1004     @param names: Lock names (or a single lock name)
1005     @param shared: See L{SharedLock.is_owned}
1006     @rtype: bool
1007     @note: Use L{is_owned} to check if the current thread holds I{any} lock and
1008       L{list_owned} to get the names of all owned locks
1009
1010     """
1011     if isinstance(names, basestring):
1012       names = [names]
1013
1014     # Avoid check if no locks are owned anyway
1015     if names and self.is_owned():
1016       candidates = []
1017
1018       # Gather references to all locks (in case they're deleted in the meantime)
1019       for lname in names:
1020         try:
1021           lock = self.__lockdict[lname]
1022         except KeyError:
1023           raise errors.LockError("Non-existing lock '%s' in set '%s' (it may"
1024                                  " have been removed)" % (lname, self.name))
1025         else:
1026           candidates.append(lock)
1027
1028       return compat.all(lock.is_owned(shared=shared) for lock in candidates)
1029     else:
1030       return False
1031
1032   def _add_owned(self, name=None):
1033     """Note the current thread owns the given lock"""
1034     if name is None:
1035       if not self.is_owned():
1036         self.__owners[threading.currentThread()] = set()
1037     else:
1038       if self.is_owned():
1039         self.__owners[threading.currentThread()].add(name)
1040       else:
1041         self.__owners[threading.currentThread()] = set([name])
1042
1043   def _del_owned(self, name=None):
1044     """Note the current thread owns the given lock"""
1045
1046     assert not (name is None and self.__lock.is_owned()), \
1047            "Cannot hold internal lock when deleting owner status"
1048
1049     if name is not None:
1050       self.__owners[threading.currentThread()].remove(name)
1051
1052     # Only remove the key if we don't hold the set-lock as well
1053     if not (self.__lock.is_owned() or
1054             self.__owners[threading.currentThread()]):
1055       del self.__owners[threading.currentThread()]
1056
1057   def list_owned(self):
1058     """Get the set of resource names owned by the current thread"""
1059     if self.is_owned():
1060       return self.__owners[threading.currentThread()].copy()
1061     else:
1062       return set()
1063
1064   def _release_and_delete_owned(self):
1065     """Release and delete all resources owned by the current thread"""
1066     for lname in self.list_owned():
1067       lock = self.__lockdict[lname]
1068       if lock.is_owned():
1069         lock.release()
1070       self._del_owned(name=lname)
1071
1072   def __names(self):
1073     """Return the current set of names.
1074
1075     Only call this function while holding __lock and don't iterate on the
1076     result after releasing the lock.
1077
1078     """
1079     return self.__lockdict.keys()
1080
1081   def _names(self):
1082     """Return a copy of the current set of elements.
1083
1084     Used only for debugging purposes.
1085
1086     """
1087     # If we don't already own the set-level lock acquired
1088     # we'll get it and note we need to release it later.
1089     release_lock = False
1090     if not self.__lock.is_owned():
1091       release_lock = True
1092       self.__lock.acquire(shared=1)
1093     try:
1094       result = self.__names()
1095     finally:
1096       if release_lock:
1097         self.__lock.release()
1098     return set(result)
1099
1100   def acquire(self, names, timeout=None, shared=0, priority=None,
1101               test_notify=None):
1102     """Acquire a set of resource locks.
1103
1104     @type names: list of strings (or string)
1105     @param names: the names of the locks which shall be acquired
1106         (special lock names, or instance/node names)
1107     @type shared: integer (0/1) used as a boolean
1108     @param shared: whether to acquire in shared mode; by default an
1109         exclusive lock will be acquired
1110     @type timeout: float or None
1111     @param timeout: Maximum time to acquire all locks
1112     @type priority: integer
1113     @param priority: Priority for acquiring locks
1114     @type test_notify: callable or None
1115     @param test_notify: Special callback function for unittesting
1116
1117     @return: Set of all locks successfully acquired or None in case of timeout
1118
1119     @raise errors.LockError: when any lock we try to acquire has
1120         been deleted before we succeed. In this case none of the
1121         locks requested will be acquired.
1122
1123     """
1124     assert timeout is None or timeout >= 0.0
1125
1126     # Check we don't already own locks at this level
1127     assert not self.is_owned(), ("Cannot acquire locks in the same set twice"
1128                                  " (lockset %s)" % self.name)
1129
1130     if priority is None:
1131       priority = _DEFAULT_PRIORITY
1132
1133     # We need to keep track of how long we spent waiting for a lock. The
1134     # timeout passed to this function is over all lock acquires.
1135     running_timeout = utils.RunningTimeout(timeout, False)
1136
1137     try:
1138       if names is not None:
1139         # Support passing in a single resource to acquire rather than many
1140         if isinstance(names, basestring):
1141           names = [names]
1142
1143         return self.__acquire_inner(names, False, shared, priority,
1144                                     running_timeout.Remaining, test_notify)
1145
1146       else:
1147         # If no names are given acquire the whole set by not letting new names
1148         # being added before we release, and getting the current list of names.
1149         # Some of them may then be deleted later, but we'll cope with this.
1150         #
1151         # We'd like to acquire this lock in a shared way, as it's nice if
1152         # everybody else can use the instances at the same time. If we are
1153         # acquiring them exclusively though they won't be able to do this
1154         # anyway, though, so we'll get the list lock exclusively as well in
1155         # order to be able to do add() on the set while owning it.
1156         if not self.__lock.acquire(shared=shared, priority=priority,
1157                                    timeout=running_timeout.Remaining()):
1158           raise _AcquireTimeout()
1159         try:
1160           # note we own the set-lock
1161           self._add_owned()
1162
1163           return self.__acquire_inner(self.__names(), True, shared, priority,
1164                                       running_timeout.Remaining, test_notify)
1165         except:
1166           # We shouldn't have problems adding the lock to the owners list, but
1167           # if we did we'll try to release this lock and re-raise exception.
1168           # Of course something is going to be really wrong, after this.
1169           self.__lock.release()
1170           self._del_owned()
1171           raise
1172
1173     except _AcquireTimeout:
1174       return None
1175
1176   def __acquire_inner(self, names, want_all, shared, priority,
1177                       timeout_fn, test_notify):
1178     """Inner logic for acquiring a number of locks.
1179
1180     @param names: Names of the locks to be acquired
1181     @param want_all: Whether all locks in the set should be acquired
1182     @param shared: Whether to acquire in shared mode
1183     @param timeout_fn: Function returning remaining timeout
1184     @param priority: Priority for acquiring locks
1185     @param test_notify: Special callback function for unittesting
1186
1187     """
1188     acquire_list = []
1189
1190     # First we look the locks up on __lockdict. We have no way of being sure
1191     # they will still be there after, but this makes it a lot faster should
1192     # just one of them be the already wrong. Using a sorted sequence to prevent
1193     # deadlocks.
1194     for lname in sorted(utils.UniqueSequence(names)):
1195       try:
1196         lock = self.__lockdict[lname] # raises KeyError if lock is not there
1197       except KeyError:
1198         if want_all:
1199           # We are acquiring all the set, it doesn't matter if this particular
1200           # element is not there anymore.
1201           continue
1202
1203         raise errors.LockError("Non-existing lock %s in set %s (it may have"
1204                                " been removed)" % (lname, self.name))
1205
1206       acquire_list.append((lname, lock))
1207
1208     # This will hold the locknames we effectively acquired.
1209     acquired = set()
1210
1211     try:
1212       # Now acquire_list contains a sorted list of resources and locks we
1213       # want.  In order to get them we loop on this (private) list and
1214       # acquire() them.  We gave no real guarantee they will still exist till
1215       # this is done but .acquire() itself is safe and will alert us if the
1216       # lock gets deleted.
1217       for (lname, lock) in acquire_list:
1218         if __debug__ and callable(test_notify):
1219           test_notify_fn = lambda: test_notify(lname)
1220         else:
1221           test_notify_fn = None
1222
1223         timeout = timeout_fn()
1224
1225         try:
1226           # raises LockError if the lock was deleted
1227           acq_success = lock.acquire(shared=shared, timeout=timeout,
1228                                      priority=priority,
1229                                      test_notify=test_notify_fn)
1230         except errors.LockError:
1231           if want_all:
1232             # We are acquiring all the set, it doesn't matter if this
1233             # particular element is not there anymore.
1234             continue
1235
1236           raise errors.LockError("Non-existing lock %s in set %s (it may"
1237                                  " have been removed)" % (lname, self.name))
1238
1239         if not acq_success:
1240           # Couldn't get lock or timeout occurred
1241           if timeout is None:
1242             # This shouldn't happen as SharedLock.acquire(timeout=None) is
1243             # blocking.
1244             raise errors.LockError("Failed to get lock %s (set %s)" %
1245                                    (lname, self.name))
1246
1247           raise _AcquireTimeout()
1248
1249         try:
1250           # now the lock cannot be deleted, we have it!
1251           self._add_owned(name=lname)
1252           acquired.add(lname)
1253
1254         except:
1255           # We shouldn't have problems adding the lock to the owners list, but
1256           # if we did we'll try to release this lock and re-raise exception.
1257           # Of course something is going to be really wrong after this.
1258           if lock.is_owned():
1259             lock.release()
1260           raise
1261
1262     except:
1263       # Release all owned locks
1264       self._release_and_delete_owned()
1265       raise
1266
1267     return acquired
1268
1269   def downgrade(self, names=None):
1270     """Downgrade a set of resource locks from exclusive to shared mode.
1271
1272     The locks must have been acquired in exclusive mode.
1273
1274     """
1275     assert self.is_owned(), ("downgrade on lockset %s while not owning any"
1276                              " lock" % self.name)
1277
1278     # Support passing in a single resource to downgrade rather than many
1279     if isinstance(names, basestring):
1280       names = [names]
1281
1282     owned = self.list_owned()
1283
1284     if names is None:
1285       names = owned
1286     else:
1287       names = set(names)
1288       assert owned.issuperset(names), \
1289         ("downgrade() on unheld resources %s (set %s)" %
1290          (names.difference(owned), self.name))
1291
1292     for lockname in names:
1293       self.__lockdict[lockname].downgrade()
1294
1295     # Do we own the lockset in exclusive mode?
1296     if self.__lock.is_owned(shared=0):
1297       # Have all locks been downgraded?
1298       if not compat.any(lock.is_owned(shared=0)
1299                         for lock in self.__lockdict.values()):
1300         self.__lock.downgrade()
1301         assert self.__lock.is_owned(shared=1)
1302
1303     return True
1304
1305   def release(self, names=None):
1306     """Release a set of resource locks, at the same level.
1307
1308     You must have acquired the locks, either in shared or in exclusive mode,
1309     before releasing them.
1310
1311     @type names: list of strings, or None
1312     @param names: the names of the locks which shall be released
1313         (defaults to all the locks acquired at that level).
1314
1315     """
1316     assert self.is_owned(), ("release() on lock set %s while not owner" %
1317                              self.name)
1318
1319     # Support passing in a single resource to release rather than many
1320     if isinstance(names, basestring):
1321       names = [names]
1322
1323     if names is None:
1324       names = self.list_owned()
1325     else:
1326       names = set(names)
1327       assert self.list_owned().issuperset(names), (
1328                "release() on unheld resources %s (set %s)" %
1329                (names.difference(self.list_owned()), self.name))
1330
1331     # First of all let's release the "all elements" lock, if set.
1332     # After this 'add' can work again
1333     if self.__lock.is_owned():
1334       self.__lock.release()
1335       self._del_owned()
1336
1337     for lockname in names:
1338       # If we are sure the lock doesn't leave __lockdict without being
1339       # exclusively held we can do this...
1340       self.__lockdict[lockname].release()
1341       self._del_owned(name=lockname)
1342
1343   def add(self, names, acquired=0, shared=0):
1344     """Add a new set of elements to the set
1345
1346     @type names: list of strings
1347     @param names: names of the new elements to add
1348     @type acquired: integer (0/1) used as a boolean
1349     @param acquired: pre-acquire the new resource?
1350     @type shared: integer (0/1) used as a boolean
1351     @param shared: is the pre-acquisition shared?
1352
1353     """
1354     # Check we don't already own locks at this level
1355     assert not self.is_owned() or self.__lock.is_owned(shared=0), \
1356       ("Cannot add locks if the set %s is only partially owned, or shared" %
1357        self.name)
1358
1359     # Support passing in a single resource to add rather than many
1360     if isinstance(names, basestring):
1361       names = [names]
1362
1363     # If we don't already own the set-level lock acquired in an exclusive way
1364     # we'll get it and note we need to release it later.
1365     release_lock = False
1366     if not self.__lock.is_owned():
1367       release_lock = True
1368       self.__lock.acquire()
1369
1370     try:
1371       invalid_names = set(self.__names()).intersection(names)
1372       if invalid_names:
1373         # This must be an explicit raise, not an assert, because assert is
1374         # turned off when using optimization, and this can happen because of
1375         # concurrency even if the user doesn't want it.
1376         raise errors.LockError("duplicate add(%s) on lockset %s" %
1377                                (invalid_names, self.name))
1378
1379       for lockname in names:
1380         lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1381
1382         if acquired:
1383           # No need for priority or timeout here as this lock has just been
1384           # created
1385           lock.acquire(shared=shared)
1386           # now the lock cannot be deleted, we have it!
1387           try:
1388             self._add_owned(name=lockname)
1389           except:
1390             # We shouldn't have problems adding the lock to the owners list,
1391             # but if we did we'll try to release this lock and re-raise
1392             # exception.  Of course something is going to be really wrong,
1393             # after this.  On the other hand the lock hasn't been added to the
1394             # __lockdict yet so no other threads should be pending on it. This
1395             # release is just a safety measure.
1396             lock.release()
1397             raise
1398
1399         self.__lockdict[lockname] = lock
1400
1401     finally:
1402       # Only release __lock if we were not holding it previously.
1403       if release_lock:
1404         self.__lock.release()
1405
1406     return True
1407
1408   def remove(self, names):
1409     """Remove elements from the lock set.
1410
1411     You can either not hold anything in the lockset or already hold a superset
1412     of the elements you want to delete, exclusively.
1413
1414     @type names: list of strings
1415     @param names: names of the resource to remove.
1416
1417     @return: a list of locks which we removed; the list is always
1418         equal to the names list if we were holding all the locks
1419         exclusively
1420
1421     """
1422     # Support passing in a single resource to remove rather than many
1423     if isinstance(names, basestring):
1424       names = [names]
1425
1426     # If we own any subset of this lock it must be a superset of what we want
1427     # to delete. The ownership must also be exclusive, but that will be checked
1428     # by the lock itself.
1429     assert not self.is_owned() or self.list_owned().issuperset(names), (
1430       "remove() on acquired lockset %s while not owning all elements" %
1431       self.name)
1432
1433     removed = []
1434
1435     for lname in names:
1436       # Calling delete() acquires the lock exclusively if we don't already own
1437       # it, and causes all pending and subsequent lock acquires to fail. It's
1438       # fine to call it out of order because delete() also implies release(),
1439       # and the assertion above guarantees that if we either already hold
1440       # everything we want to delete, or we hold none.
1441       try:
1442         self.__lockdict[lname].delete()
1443         removed.append(lname)
1444       except (KeyError, errors.LockError):
1445         # This cannot happen if we were already holding it, verify:
1446         assert not self.is_owned(), ("remove failed while holding lockset %s" %
1447                                      self.name)
1448       else:
1449         # If no LockError was raised we are the ones who deleted the lock.
1450         # This means we can safely remove it from lockdict, as any further or
1451         # pending delete() or acquire() will fail (and nobody can have the lock
1452         # since before our call to delete()).
1453         #
1454         # This is done in an else clause because if the exception was thrown
1455         # it's the job of the one who actually deleted it.
1456         del self.__lockdict[lname]
1457         # And let's remove it from our private list if we owned it.
1458         if self.is_owned():
1459           self._del_owned(name=lname)
1460
1461     return removed
1462
1463
1464 # Locking levels, must be acquired in increasing order.
1465 # Current rules are:
1466 #   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1467 #   acquired before performing any operation, either in shared or in exclusive
1468 #   mode. acquiring the BGL in exclusive mode is discouraged and should be
1469 #   avoided.
1470 #   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1471 #   If you need more than one node, or more than one instance, acquire them at
1472 #   the same time.
1473 LEVEL_CLUSTER = 0
1474 LEVEL_INSTANCE = 1
1475 LEVEL_NODEGROUP = 2
1476 LEVEL_NODE = 3
1477 #: Level for node resources, used for operations with possibly high impact on
1478 #: the node's disks.
1479 LEVEL_NODE_RES = 4
1480
1481 LEVELS = [
1482   LEVEL_CLUSTER,
1483   LEVEL_INSTANCE,
1484   LEVEL_NODEGROUP,
1485   LEVEL_NODE,
1486   LEVEL_NODE_RES,
1487   ]
1488
1489 # Lock levels which are modifiable
1490 LEVELS_MOD = frozenset([
1491   LEVEL_NODE_RES,
1492   LEVEL_NODE,
1493   LEVEL_NODEGROUP,
1494   LEVEL_INSTANCE,
1495   ])
1496
1497 #: Lock level names (make sure to use singular form)
1498 LEVEL_NAMES = {
1499   LEVEL_CLUSTER: "cluster",
1500   LEVEL_INSTANCE: "instance",
1501   LEVEL_NODEGROUP: "nodegroup",
1502   LEVEL_NODE: "node",
1503   LEVEL_NODE_RES: "node-res",
1504   }
1505
1506 # Constant for the big ganeti lock
1507 BGL = "BGL"
1508
1509
1510 class GanetiLockManager:
1511   """The Ganeti Locking Library
1512
1513   The purpose of this small library is to manage locking for ganeti clusters
1514   in a central place, while at the same time doing dynamic checks against
1515   possible deadlocks. It will also make it easier to transition to a different
1516   lock type should we migrate away from python threads.
1517
1518   """
1519   _instance = None
1520
1521   def __init__(self, nodes, nodegroups, instances):
1522     """Constructs a new GanetiLockManager object.
1523
1524     There should be only a GanetiLockManager object at any time, so this
1525     function raises an error if this is not the case.
1526
1527     @param nodes: list of node names
1528     @param nodegroups: list of nodegroup uuids
1529     @param instances: list of instance names
1530
1531     """
1532     assert self.__class__._instance is None, \
1533            "double GanetiLockManager instance"
1534
1535     self.__class__._instance = self
1536
1537     self._monitor = LockMonitor()
1538
1539     # The keyring contains all the locks, at their level and in the correct
1540     # locking order.
1541     self.__keyring = {
1542       LEVEL_CLUSTER: LockSet([BGL], "cluster", monitor=self._monitor),
1543       LEVEL_NODE: LockSet(nodes, "node", monitor=self._monitor),
1544       LEVEL_NODE_RES: LockSet(nodes, "node-res", monitor=self._monitor),
1545       LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroup", monitor=self._monitor),
1546       LEVEL_INSTANCE: LockSet(instances, "instance",
1547                               monitor=self._monitor),
1548       }
1549
1550     assert compat.all(ls.name == LEVEL_NAMES[level]
1551                       for (level, ls) in self.__keyring.items())
1552
1553   def AddToLockMonitor(self, provider):
1554     """Registers a new lock with the monitor.
1555
1556     See L{LockMonitor.RegisterLock}.
1557
1558     """
1559     return self._monitor.RegisterLock(provider)
1560
1561   def QueryLocks(self, fields):
1562     """Queries information from all locks.
1563
1564     See L{LockMonitor.QueryLocks}.
1565
1566     """
1567     return self._monitor.QueryLocks(fields)
1568
1569   def _names(self, level):
1570     """List the lock names at the given level.
1571
1572     This can be used for debugging/testing purposes.
1573
1574     @param level: the level whose list of locks to get
1575
1576     """
1577     assert level in LEVELS, "Invalid locking level %s" % level
1578     return self.__keyring[level]._names()
1579
1580   def is_owned(self, level):
1581     """Check whether we are owning locks at the given level
1582
1583     """
1584     return self.__keyring[level].is_owned()
1585
1586   def list_owned(self, level):
1587     """Get the set of owned locks at the given level
1588
1589     """
1590     return self.__keyring[level].list_owned()
1591
1592   def check_owned(self, level, names, shared=-1):
1593     """Check if locks at a certain level are owned in a specific mode.
1594
1595     @see: L{LockSet.check_owned}
1596
1597     """
1598     return self.__keyring[level].check_owned(names, shared=shared)
1599
1600   def _upper_owned(self, level):
1601     """Check that we don't own any lock at a level greater than the given one.
1602
1603     """
1604     # This way of checking only works if LEVELS[i] = i, which we check for in
1605     # the test cases.
1606     return compat.any((self.is_owned(l) for l in LEVELS[level + 1:]))
1607
1608   def _BGL_owned(self): # pylint: disable=C0103
1609     """Check if the current thread owns the BGL.
1610
1611     Both an exclusive or a shared acquisition work.
1612
1613     """
1614     return BGL in self.__keyring[LEVEL_CLUSTER].list_owned()
1615
1616   @staticmethod
1617   def _contains_BGL(level, names): # pylint: disable=C0103
1618     """Check if the level contains the BGL.
1619
1620     Check if acting on the given level and set of names will change
1621     the status of the Big Ganeti Lock.
1622
1623     """
1624     return level == LEVEL_CLUSTER and (names is None or BGL in names)
1625
1626   def acquire(self, level, names, timeout=None, shared=0, priority=None):
1627     """Acquire a set of resource locks, at the same level.
1628
1629     @type level: member of locking.LEVELS
1630     @param level: the level at which the locks shall be acquired
1631     @type names: list of strings (or string)
1632     @param names: the names of the locks which shall be acquired
1633         (special lock names, or instance/node names)
1634     @type shared: integer (0/1) used as a boolean
1635     @param shared: whether to acquire in shared mode; by default
1636         an exclusive lock will be acquired
1637     @type timeout: float
1638     @param timeout: Maximum time to acquire all locks
1639     @type priority: integer
1640     @param priority: Priority for acquiring lock
1641
1642     """
1643     assert level in LEVELS, "Invalid locking level %s" % level
1644
1645     # Check that we are either acquiring the Big Ganeti Lock or we already own
1646     # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1647     # so even if we've migrated we need to at least share the BGL to be
1648     # compatible with them. Of course if we own the BGL exclusively there's no
1649     # point in acquiring any other lock, unless perhaps we are half way through
1650     # the migration of the current opcode.
1651     assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1652       "You must own the Big Ganeti Lock before acquiring any other")
1653
1654     # Check we don't own locks at the same or upper levels.
1655     assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1656                                           " while owning some at a greater one")
1657
1658     # Acquire the locks in the set.
1659     return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1660                                          priority=priority)
1661
1662   def downgrade(self, level, names=None):
1663     """Downgrade a set of resource locks from exclusive to shared mode.
1664
1665     You must have acquired the locks in exclusive mode.
1666
1667     @type level: member of locking.LEVELS
1668     @param level: the level at which the locks shall be downgraded
1669     @type names: list of strings, or None
1670     @param names: the names of the locks which shall be downgraded
1671         (defaults to all the locks acquired at the level)
1672
1673     """
1674     assert level in LEVELS, "Invalid locking level %s" % level
1675
1676     return self.__keyring[level].downgrade(names=names)
1677
1678   def release(self, level, names=None):
1679     """Release a set of resource locks, at the same level.
1680
1681     You must have acquired the locks, either in shared or in exclusive
1682     mode, before releasing them.
1683
1684     @type level: member of locking.LEVELS
1685     @param level: the level at which the locks shall be released
1686     @type names: list of strings, or None
1687     @param names: the names of the locks which shall be released
1688         (defaults to all the locks acquired at that level)
1689
1690     """
1691     assert level in LEVELS, "Invalid locking level %s" % level
1692     assert (not self._contains_BGL(level, names) or
1693             not self._upper_owned(LEVEL_CLUSTER)), (
1694               "Cannot release the Big Ganeti Lock while holding something"
1695               " at upper levels (%r)" %
1696               (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self.list_owned(i))
1697                                 for i in self.__keyring.keys()]), ))
1698
1699     # Release will complain if we don't own the locks already
1700     return self.__keyring[level].release(names)
1701
1702   def add(self, level, names, acquired=0, shared=0):
1703     """Add locks at the specified level.
1704
1705     @type level: member of locking.LEVELS_MOD
1706     @param level: the level at which the locks shall be added
1707     @type names: list of strings
1708     @param names: names of the locks to acquire
1709     @type acquired: integer (0/1) used as a boolean
1710     @param acquired: whether to acquire the newly added locks
1711     @type shared: integer (0/1) used as a boolean
1712     @param shared: whether the acquisition will be shared
1713
1714     """
1715     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1716     assert self._BGL_owned(), ("You must own the BGL before performing other"
1717                                " operations")
1718     assert not self._upper_owned(level), ("Cannot add locks at a level"
1719                                           " while owning some at a greater one")
1720     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1721
1722   def remove(self, level, names):
1723     """Remove locks from the specified level.
1724
1725     You must either already own the locks you are trying to remove
1726     exclusively or not own any lock at an upper level.
1727
1728     @type level: member of locking.LEVELS_MOD
1729     @param level: the level at which the locks shall be removed
1730     @type names: list of strings
1731     @param names: the names of the locks which shall be removed
1732         (special lock names, or instance/node names)
1733
1734     """
1735     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1736     assert self._BGL_owned(), ("You must own the BGL before performing other"
1737                                " operations")
1738     # Check we either own the level or don't own anything from here
1739     # up. LockSet.remove() will check the case in which we don't own
1740     # all the needed resources, or we have a shared ownership.
1741     assert self.is_owned(level) or not self._upper_owned(level), (
1742            "Cannot remove locks at a level while not owning it or"
1743            " owning some at a greater one")
1744     return self.__keyring[level].remove(names)
1745
1746
1747 def _MonitorSortKey((item, idx, num)):
1748   """Sorting key function.
1749
1750   Sort by name, registration order and then order of information. This provides
1751   a stable sort order over different providers, even if they return the same
1752   name.
1753
1754   """
1755   (name, _, _, _) = item
1756
1757   return (utils.NiceSortKey(name), num, idx)
1758
1759
1760 class LockMonitor(object):
1761   _LOCK_ATTR = "_lock"
1762
1763   def __init__(self):
1764     """Initializes this class.
1765
1766     """
1767     self._lock = SharedLock("LockMonitor")
1768
1769     # Counter for stable sorting
1770     self._counter = itertools.count(0)
1771
1772     # Tracked locks. Weak references are used to avoid issues with circular
1773     # references and deletion.
1774     self._locks = weakref.WeakKeyDictionary()
1775
1776   @ssynchronized(_LOCK_ATTR)
1777   def RegisterLock(self, provider):
1778     """Registers a new lock.
1779
1780     @param provider: Object with a callable method named C{GetLockInfo}, taking
1781       a single C{set} containing the requested information items
1782     @note: It would be nicer to only receive the function generating the
1783       requested information but, as it turns out, weak references to bound
1784       methods (e.g. C{self.GetLockInfo}) are tricky; there are several
1785       workarounds, but none of the ones I found works properly in combination
1786       with a standard C{WeakKeyDictionary}
1787
1788     """
1789     assert provider not in self._locks, "Duplicate registration"
1790
1791     # There used to be a check for duplicate names here. As it turned out, when
1792     # a lock is re-created with the same name in a very short timeframe, the
1793     # previous instance might not yet be removed from the weakref dictionary.
1794     # By keeping track of the order of incoming registrations, a stable sort
1795     # ordering can still be guaranteed.
1796
1797     self._locks[provider] = self._counter.next()
1798
1799   def _GetLockInfo(self, requested):
1800     """Get information from all locks.
1801
1802     """
1803     # Must hold lock while getting consistent list of tracked items
1804     self._lock.acquire(shared=1)
1805     try:
1806       items = self._locks.items()
1807     finally:
1808       self._lock.release()
1809
1810     return [(info, idx, num)
1811             for (provider, num) in items
1812             for (idx, info) in enumerate(provider.GetLockInfo(requested))]
1813
1814   def _Query(self, fields):
1815     """Queries information from all locks.
1816
1817     @type fields: list of strings
1818     @param fields: List of fields to return
1819
1820     """
1821     qobj = query.Query(query.LOCK_FIELDS, fields)
1822
1823     # Get all data with internal lock held and then sort by name and incoming
1824     # order
1825     lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
1826                       key=_MonitorSortKey)
1827
1828     # Extract lock information and build query data
1829     return (qobj, query.LockQueryData(map(compat.fst, lockinfo)))
1830
1831   def QueryLocks(self, fields):
1832     """Queries information from all locks.
1833
1834     @type fields: list of strings
1835     @param fields: List of fields to return
1836
1837     """
1838     (qobj, ctx) = self._Query(fields)
1839
1840     # Prepare query response
1841     return query.GetQueryResponse(qobj, ctx)