Transition into and out of offline instance state
[ganeti-local] / lib / locking.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Module implementing the Ganeti locking code."""
22
23 # pylint: disable=W0212
24
25 # W0212 since e.g. LockSet methods use (a lot) the internals of
26 # SharedLock
27
28 import os
29 import select
30 import threading
31 import errno
32 import weakref
33 import logging
34 import heapq
35 import itertools
36
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import compat
40 from ganeti import query
41
42
43 _EXCLUSIVE_TEXT = "exclusive"
44 _SHARED_TEXT = "shared"
45 _DELETED_TEXT = "deleted"
46
47 _DEFAULT_PRIORITY = 0
48
49
50 def ssynchronized(mylock, shared=0):
51   """Shared Synchronization decorator.
52
53   Calls the function holding the given lock, either in exclusive or shared
54   mode. It requires the passed lock to be a SharedLock (or support its
55   semantics).
56
57   @type mylock: lockable object or string
58   @param mylock: lock to acquire or class member name of the lock to acquire
59
60   """
61   def wrap(fn):
62     def sync_function(*args, **kwargs):
63       if isinstance(mylock, basestring):
64         assert args, "cannot ssynchronize on non-class method: self not found"
65         # args[0] is "self"
66         lock = getattr(args[0], mylock)
67       else:
68         lock = mylock
69       lock.acquire(shared=shared)
70       try:
71         return fn(*args, **kwargs)
72       finally:
73         lock.release()
74     return sync_function
75   return wrap
76
77
78 class _SingleNotifyPipeConditionWaiter(object):
79   """Helper class for SingleNotifyPipeCondition
80
81   """
82   __slots__ = [
83     "_fd",
84     "_poller",
85     ]
86
87   def __init__(self, poller, fd):
88     """Constructor for _SingleNotifyPipeConditionWaiter
89
90     @type poller: select.poll
91     @param poller: Poller object
92     @type fd: int
93     @param fd: File descriptor to wait for
94
95     """
96     object.__init__(self)
97     self._poller = poller
98     self._fd = fd
99
100   def __call__(self, timeout):
101     """Wait for something to happen on the pipe.
102
103     @type timeout: float or None
104     @param timeout: Timeout for waiting (can be None)
105
106     """
107     running_timeout = utils.RunningTimeout(timeout, True)
108
109     while True:
110       remaining_time = running_timeout.Remaining()
111
112       if remaining_time is not None:
113         if remaining_time < 0.0:
114           break
115
116         # Our calculation uses seconds, poll() wants milliseconds
117         remaining_time *= 1000
118
119       try:
120         result = self._poller.poll(remaining_time)
121       except EnvironmentError, err:
122         if err.errno != errno.EINTR:
123           raise
124         result = None
125
126       # Check whether we were notified
127       if result and result[0][0] == self._fd:
128         break
129
130
131 class _BaseCondition(object):
132   """Base class containing common code for conditions.
133
134   Some of this code is taken from python's threading module.
135
136   """
137   __slots__ = [
138     "_lock",
139     "acquire",
140     "release",
141     "_is_owned",
142     "_acquire_restore",
143     "_release_save",
144     ]
145
146   def __init__(self, lock):
147     """Constructor for _BaseCondition.
148
149     @type lock: threading.Lock
150     @param lock: condition base lock
151
152     """
153     object.__init__(self)
154
155     try:
156       self._release_save = lock._release_save
157     except AttributeError:
158       self._release_save = self._base_release_save
159     try:
160       self._acquire_restore = lock._acquire_restore
161     except AttributeError:
162       self._acquire_restore = self._base_acquire_restore
163     try:
164       self._is_owned = lock.is_owned
165     except AttributeError:
166       self._is_owned = self._base_is_owned
167
168     self._lock = lock
169
170     # Export the lock's acquire() and release() methods
171     self.acquire = lock.acquire
172     self.release = lock.release
173
174   def _base_is_owned(self):
175     """Check whether lock is owned by current thread.
176
177     """
178     if self._lock.acquire(0):
179       self._lock.release()
180       return False
181     return True
182
183   def _base_release_save(self):
184     self._lock.release()
185
186   def _base_acquire_restore(self, _):
187     self._lock.acquire()
188
189   def _check_owned(self):
190     """Raise an exception if the current thread doesn't own the lock.
191
192     """
193     if not self._is_owned():
194       raise RuntimeError("cannot work with un-aquired lock")
195
196
197 class SingleNotifyPipeCondition(_BaseCondition):
198   """Condition which can only be notified once.
199
200   This condition class uses pipes and poll, internally, to be able to wait for
201   notification with a timeout, without resorting to polling. It is almost
202   compatible with Python's threading.Condition, with the following differences:
203     - notifyAll can only be called once, and no wait can happen after that
204     - notify is not supported, only notifyAll
205
206   """
207
208   __slots__ = [
209     "_poller",
210     "_read_fd",
211     "_write_fd",
212     "_nwaiters",
213     "_notified",
214     ]
215
216   _waiter_class = _SingleNotifyPipeConditionWaiter
217
218   def __init__(self, lock):
219     """Constructor for SingleNotifyPipeCondition
220
221     """
222     _BaseCondition.__init__(self, lock)
223     self._nwaiters = 0
224     self._notified = False
225     self._read_fd = None
226     self._write_fd = None
227     self._poller = None
228
229   def _check_unnotified(self):
230     """Throws an exception if already notified.
231
232     """
233     if self._notified:
234       raise RuntimeError("cannot use already notified condition")
235
236   def _Cleanup(self):
237     """Cleanup open file descriptors, if any.
238
239     """
240     if self._read_fd is not None:
241       os.close(self._read_fd)
242       self._read_fd = None
243
244     if self._write_fd is not None:
245       os.close(self._write_fd)
246       self._write_fd = None
247     self._poller = None
248
249   def wait(self, timeout):
250     """Wait for a notification.
251
252     @type timeout: float or None
253     @param timeout: Waiting timeout (can be None)
254
255     """
256     self._check_owned()
257     self._check_unnotified()
258
259     self._nwaiters += 1
260     try:
261       if self._poller is None:
262         (self._read_fd, self._write_fd) = os.pipe()
263         self._poller = select.poll()
264         self._poller.register(self._read_fd, select.POLLHUP)
265
266       wait_fn = self._waiter_class(self._poller, self._read_fd)
267       state = self._release_save()
268       try:
269         # Wait for notification
270         wait_fn(timeout)
271       finally:
272         # Re-acquire lock
273         self._acquire_restore(state)
274     finally:
275       self._nwaiters -= 1
276       if self._nwaiters == 0:
277         self._Cleanup()
278
279   def notifyAll(self): # pylint: disable=C0103
280     """Close the writing side of the pipe to notify all waiters.
281
282     """
283     self._check_owned()
284     self._check_unnotified()
285     self._notified = True
286     if self._write_fd is not None:
287       os.close(self._write_fd)
288       self._write_fd = None
289
290
291 class PipeCondition(_BaseCondition):
292   """Group-only non-polling condition with counters.
293
294   This condition class uses pipes and poll, internally, to be able to wait for
295   notification with a timeout, without resorting to polling. It is almost
296   compatible with Python's threading.Condition, but only supports notifyAll and
297   non-recursive locks. As an additional features it's able to report whether
298   there are any waiting threads.
299
300   """
301   __slots__ = [
302     "_waiters",
303     "_single_condition",
304     ]
305
306   _single_condition_class = SingleNotifyPipeCondition
307
308   def __init__(self, lock):
309     """Initializes this class.
310
311     """
312     _BaseCondition.__init__(self, lock)
313     self._waiters = set()
314     self._single_condition = self._single_condition_class(self._lock)
315
316   def wait(self, timeout):
317     """Wait for a notification.
318
319     @type timeout: float or None
320     @param timeout: Waiting timeout (can be None)
321
322     """
323     self._check_owned()
324
325     # Keep local reference to the pipe. It could be replaced by another thread
326     # notifying while we're waiting.
327     cond = self._single_condition
328
329     self._waiters.add(threading.currentThread())
330     try:
331       cond.wait(timeout)
332     finally:
333       self._check_owned()
334       self._waiters.remove(threading.currentThread())
335
336   def notifyAll(self): # pylint: disable=C0103
337     """Notify all currently waiting threads.
338
339     """
340     self._check_owned()
341     self._single_condition.notifyAll()
342     self._single_condition = self._single_condition_class(self._lock)
343
344   def get_waiting(self):
345     """Returns a list of all waiting threads.
346
347     """
348     self._check_owned()
349
350     return self._waiters
351
352   def has_waiting(self):
353     """Returns whether there are active waiters.
354
355     """
356     self._check_owned()
357
358     return bool(self._waiters)
359
360
361 class _PipeConditionWithMode(PipeCondition):
362   __slots__ = [
363     "shared",
364     ]
365
366   def __init__(self, lock, shared):
367     """Initializes this class.
368
369     """
370     self.shared = shared
371     PipeCondition.__init__(self, lock)
372
373
374 class SharedLock(object):
375   """Implements a shared lock.
376
377   Multiple threads can acquire the lock in a shared way by calling
378   C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
379   threads can call C{acquire(shared=0)}.
380
381   Notes on data structures: C{__pending} contains a priority queue (heapq) of
382   all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
383   ...]}. Each per-priority queue contains a normal in-order list of conditions
384   to be notified when the lock can be acquired. Shared locks are grouped
385   together by priority and the condition for them is stored in
386   C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
387   references for the per-priority queues indexed by priority for faster access.
388
389   @type name: string
390   @ivar name: the name of the lock
391
392   """
393   __slots__ = [
394     "__weakref__",
395     "__deleted",
396     "__exc",
397     "__lock",
398     "__pending",
399     "__pending_by_prio",
400     "__pending_shared",
401     "__shr",
402     "name",
403     ]
404
405   __condition_class = _PipeConditionWithMode
406
407   def __init__(self, name, monitor=None):
408     """Construct a new SharedLock.
409
410     @param name: the name of the lock
411     @type monitor: L{LockMonitor}
412     @param monitor: Lock monitor with which to register
413
414     """
415     object.__init__(self)
416
417     self.name = name
418
419     # Internal lock
420     self.__lock = threading.Lock()
421
422     # Queue containing waiting acquires
423     self.__pending = []
424     self.__pending_by_prio = {}
425     self.__pending_shared = {}
426
427     # Current lock holders
428     self.__shr = set()
429     self.__exc = None
430
431     # is this lock in the deleted state?
432     self.__deleted = False
433
434     # Register with lock monitor
435     if monitor:
436       logging.debug("Adding lock %s to monitor", name)
437       monitor.RegisterLock(self)
438
439   def GetLockInfo(self, requested):
440     """Retrieves information for querying locks.
441
442     @type requested: set
443     @param requested: Requested information, see C{query.LQ_*}
444
445     """
446     self.__lock.acquire()
447     try:
448       # Note: to avoid unintentional race conditions, no references to
449       # modifiable objects should be returned unless they were created in this
450       # function.
451       mode = None
452       owner_names = None
453
454       if query.LQ_MODE in requested:
455         if self.__deleted:
456           mode = _DELETED_TEXT
457           assert not (self.__exc or self.__shr)
458         elif self.__exc:
459           mode = _EXCLUSIVE_TEXT
460         elif self.__shr:
461           mode = _SHARED_TEXT
462
463       # Current owner(s) are wanted
464       if query.LQ_OWNER in requested:
465         if self.__exc:
466           owner = [self.__exc]
467         else:
468           owner = self.__shr
469
470         if owner:
471           assert not self.__deleted
472           owner_names = [i.getName() for i in owner]
473
474       # Pending acquires are wanted
475       if query.LQ_PENDING in requested:
476         pending = []
477
478         # Sorting instead of copying and using heaq functions for simplicity
479         for (_, prioqueue) in sorted(self.__pending):
480           for cond in prioqueue:
481             if cond.shared:
482               pendmode = _SHARED_TEXT
483             else:
484               pendmode = _EXCLUSIVE_TEXT
485
486             # List of names will be sorted in L{query._GetLockPending}
487             pending.append((pendmode, [i.getName()
488                                        for i in cond.get_waiting()]))
489       else:
490         pending = None
491
492       return [(self.name, mode, owner_names, pending)]
493     finally:
494       self.__lock.release()
495
496   def __check_deleted(self):
497     """Raises an exception if the lock has been deleted.
498
499     """
500     if self.__deleted:
501       raise errors.LockError("Deleted lock %s" % self.name)
502
503   def __is_sharer(self):
504     """Is the current thread sharing the lock at this time?
505
506     """
507     return threading.currentThread() in self.__shr
508
509   def __is_exclusive(self):
510     """Is the current thread holding the lock exclusively at this time?
511
512     """
513     return threading.currentThread() == self.__exc
514
515   def __is_owned(self, shared=-1):
516     """Is the current thread somehow owning the lock at this time?
517
518     This is a private version of the function, which presumes you're holding
519     the internal lock.
520
521     """
522     if shared < 0:
523       return self.__is_sharer() or self.__is_exclusive()
524     elif shared:
525       return self.__is_sharer()
526     else:
527       return self.__is_exclusive()
528
529   def is_owned(self, shared=-1):
530     """Is the current thread somehow owning the lock at this time?
531
532     @param shared:
533         - < 0: check for any type of ownership (default)
534         - 0: check for exclusive ownership
535         - > 0: check for shared ownership
536
537     """
538     self.__lock.acquire()
539     try:
540       return self.__is_owned(shared=shared)
541     finally:
542       self.__lock.release()
543
544   #: Necessary to remain compatible with threading.Condition, which tries to
545   #: retrieve a locks' "_is_owned" attribute
546   _is_owned = is_owned
547
548   def _count_pending(self):
549     """Returns the number of pending acquires.
550
551     @rtype: int
552
553     """
554     self.__lock.acquire()
555     try:
556       return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
557     finally:
558       self.__lock.release()
559
560   def _check_empty(self):
561     """Checks whether there are any pending acquires.
562
563     @rtype: bool
564
565     """
566     self.__lock.acquire()
567     try:
568       # Order is important: __find_first_pending_queue modifies __pending
569       (_, prioqueue) = self.__find_first_pending_queue()
570
571       return not (prioqueue or
572                   self.__pending or
573                   self.__pending_by_prio or
574                   self.__pending_shared)
575     finally:
576       self.__lock.release()
577
578   def __do_acquire(self, shared):
579     """Actually acquire the lock.
580
581     """
582     if shared:
583       self.__shr.add(threading.currentThread())
584     else:
585       self.__exc = threading.currentThread()
586
587   def __can_acquire(self, shared):
588     """Determine whether lock can be acquired.
589
590     """
591     if shared:
592       return self.__exc is None
593     else:
594       return len(self.__shr) == 0 and self.__exc is None
595
596   def __find_first_pending_queue(self):
597     """Tries to find the topmost queued entry with pending acquires.
598
599     Removes empty entries while going through the list.
600
601     """
602     while self.__pending:
603       (priority, prioqueue) = self.__pending[0]
604
605       if prioqueue:
606         return (priority, prioqueue)
607
608       # Remove empty queue
609       heapq.heappop(self.__pending)
610       del self.__pending_by_prio[priority]
611       assert priority not in self.__pending_shared
612
613     return (None, None)
614
615   def __is_on_top(self, cond):
616     """Checks whether the passed condition is on top of the queue.
617
618     The caller must make sure the queue isn't empty.
619
620     """
621     (_, prioqueue) = self.__find_first_pending_queue()
622
623     return cond == prioqueue[0]
624
625   def __acquire_unlocked(self, shared, timeout, priority):
626     """Acquire a shared lock.
627
628     @param shared: whether to acquire in shared mode; by default an
629         exclusive lock will be acquired
630     @param timeout: maximum waiting time before giving up
631     @type priority: integer
632     @param priority: Priority for acquiring lock
633
634     """
635     self.__check_deleted()
636
637     # We cannot acquire the lock if we already have it
638     assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
639                                    " %s" % self.name)
640
641     # Remove empty entries from queue
642     self.__find_first_pending_queue()
643
644     # Check whether someone else holds the lock or there are pending acquires.
645     if not self.__pending and self.__can_acquire(shared):
646       # Apparently not, can acquire lock directly.
647       self.__do_acquire(shared)
648       return True
649
650     prioqueue = self.__pending_by_prio.get(priority, None)
651
652     if shared:
653       # Try to re-use condition for shared acquire
654       wait_condition = self.__pending_shared.get(priority, None)
655       assert (wait_condition is None or
656               (wait_condition.shared and wait_condition in prioqueue))
657     else:
658       wait_condition = None
659
660     if wait_condition is None:
661       if prioqueue is None:
662         assert priority not in self.__pending_by_prio
663
664         prioqueue = []
665         heapq.heappush(self.__pending, (priority, prioqueue))
666         self.__pending_by_prio[priority] = prioqueue
667
668       wait_condition = self.__condition_class(self.__lock, shared)
669       prioqueue.append(wait_condition)
670
671       if shared:
672         # Keep reference for further shared acquires on same priority. This is
673         # better than trying to find it in the list of pending acquires.
674         assert priority not in self.__pending_shared
675         self.__pending_shared[priority] = wait_condition
676
677     try:
678       # Wait until we become the topmost acquire in the queue or the timeout
679       # expires.
680       # TODO: Decrease timeout with spurious notifications
681       while not (self.__is_on_top(wait_condition) and
682                  self.__can_acquire(shared)):
683         # Wait for notification
684         wait_condition.wait(timeout)
685         self.__check_deleted()
686
687         # A lot of code assumes blocking acquires always succeed. Loop
688         # internally for that case.
689         if timeout is not None:
690           break
691
692       if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
693         self.__do_acquire(shared)
694         return True
695     finally:
696       # Remove condition from queue if there are no more waiters
697       if not wait_condition.has_waiting():
698         prioqueue.remove(wait_condition)
699         if wait_condition.shared:
700           # Remove from list of shared acquires if it wasn't while releasing
701           # (e.g. on lock deletion)
702           self.__pending_shared.pop(priority, None)
703
704     return False
705
706   def acquire(self, shared=0, timeout=None, priority=None,
707               test_notify=None):
708     """Acquire a shared lock.
709
710     @type shared: integer (0/1) used as a boolean
711     @param shared: whether to acquire in shared mode; by default an
712         exclusive lock will be acquired
713     @type timeout: float
714     @param timeout: maximum waiting time before giving up
715     @type priority: integer
716     @param priority: Priority for acquiring lock
717     @type test_notify: callable or None
718     @param test_notify: Special callback function for unittesting
719
720     """
721     if priority is None:
722       priority = _DEFAULT_PRIORITY
723
724     self.__lock.acquire()
725     try:
726       # We already got the lock, notify now
727       if __debug__ and callable(test_notify):
728         test_notify()
729
730       return self.__acquire_unlocked(shared, timeout, priority)
731     finally:
732       self.__lock.release()
733
734   def downgrade(self):
735     """Changes the lock mode from exclusive to shared.
736
737     Pending acquires in shared mode on the same priority will go ahead.
738
739     """
740     self.__lock.acquire()
741     try:
742       assert self.__is_owned(), "Lock must be owned"
743
744       if self.__is_exclusive():
745         # Do nothing if the lock is already acquired in shared mode
746         self.__exc = None
747         self.__do_acquire(1)
748
749         # Important: pending shared acquires should only jump ahead if there
750         # was a transition from exclusive to shared, otherwise an owner of a
751         # shared lock can keep calling this function to push incoming shared
752         # acquires
753         (priority, prioqueue) = self.__find_first_pending_queue()
754         if prioqueue:
755           # Is there a pending shared acquire on this priority?
756           cond = self.__pending_shared.pop(priority, None)
757           if cond:
758             assert cond.shared
759             assert cond in prioqueue
760
761             # Ensure shared acquire is on top of queue
762             if len(prioqueue) > 1:
763               prioqueue.remove(cond)
764               prioqueue.insert(0, cond)
765
766             # Notify
767             cond.notifyAll()
768
769       assert not self.__is_exclusive()
770       assert self.__is_sharer()
771
772       return True
773     finally:
774       self.__lock.release()
775
776   def release(self):
777     """Release a Shared Lock.
778
779     You must have acquired the lock, either in shared or in exclusive mode,
780     before calling this function.
781
782     """
783     self.__lock.acquire()
784     try:
785       assert self.__is_exclusive() or self.__is_sharer(), \
786         "Cannot release non-owned lock"
787
788       # Autodetect release type
789       if self.__is_exclusive():
790         self.__exc = None
791       else:
792         self.__shr.remove(threading.currentThread())
793
794       # Notify topmost condition in queue
795       (priority, prioqueue) = self.__find_first_pending_queue()
796       if prioqueue:
797         cond = prioqueue[0]
798         cond.notifyAll()
799         if cond.shared:
800           # Prevent further shared acquires from sneaking in while waiters are
801           # notified
802           self.__pending_shared.pop(priority, None)
803
804     finally:
805       self.__lock.release()
806
807   def delete(self, timeout=None, priority=None):
808     """Delete a Shared Lock.
809
810     This operation will declare the lock for removal. First the lock will be
811     acquired in exclusive mode if you don't already own it, then the lock
812     will be put in a state where any future and pending acquire() fail.
813
814     @type timeout: float
815     @param timeout: maximum waiting time before giving up
816     @type priority: integer
817     @param priority: Priority for acquiring lock
818
819     """
820     if priority is None:
821       priority = _DEFAULT_PRIORITY
822
823     self.__lock.acquire()
824     try:
825       assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
826
827       self.__check_deleted()
828
829       # The caller is allowed to hold the lock exclusively already.
830       acquired = self.__is_exclusive()
831
832       if not acquired:
833         acquired = self.__acquire_unlocked(0, timeout, priority)
834
835         assert self.__is_exclusive() and not self.__is_sharer(), \
836           "Lock wasn't acquired in exclusive mode"
837
838       if acquired:
839         self.__deleted = True
840         self.__exc = None
841
842         assert not (self.__exc or self.__shr), "Found owner during deletion"
843
844         # Notify all acquires. They'll throw an error.
845         for (_, prioqueue) in self.__pending:
846           for cond in prioqueue:
847             cond.notifyAll()
848
849         assert self.__deleted
850
851       return acquired
852     finally:
853       self.__lock.release()
854
855   def _release_save(self):
856     shared = self.__is_sharer()
857     self.release()
858     return shared
859
860   def _acquire_restore(self, shared):
861     self.acquire(shared=shared)
862
863
864 # Whenever we want to acquire a full LockSet we pass None as the value
865 # to acquire.  Hide this behind this nicely named constant.
866 ALL_SET = None
867
868
869 class _AcquireTimeout(Exception):
870   """Internal exception to abort an acquire on a timeout.
871
872   """
873
874
875 class LockSet:
876   """Implements a set of locks.
877
878   This abstraction implements a set of shared locks for the same resource type,
879   distinguished by name. The user can lock a subset of the resources and the
880   LockSet will take care of acquiring the locks always in the same order, thus
881   preventing deadlock.
882
883   All the locks needed in the same set must be acquired together, though.
884
885   @type name: string
886   @ivar name: the name of the lockset
887
888   """
889   def __init__(self, members, name, monitor=None):
890     """Constructs a new LockSet.
891
892     @type members: list of strings
893     @param members: initial members of the set
894     @type monitor: L{LockMonitor}
895     @param monitor: Lock monitor with which to register member locks
896
897     """
898     assert members is not None, "members parameter is not a list"
899     self.name = name
900
901     # Lock monitor
902     self.__monitor = monitor
903
904     # Used internally to guarantee coherency
905     self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
906
907     # The lockdict indexes the relationship name -> lock
908     # The order-of-locking is implied by the alphabetical order of names
909     self.__lockdict = {}
910
911     for mname in members:
912       self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
913                                           monitor=monitor)
914
915     # The owner dict contains the set of locks each thread owns. For
916     # performance each thread can access its own key without a global lock on
917     # this structure. It is paramount though that *no* other type of access is
918     # done to this structure (eg. no looping over its keys). *_owner helper
919     # function are defined to guarantee access is correct, but in general never
920     # do anything different than __owners[threading.currentThread()], or there
921     # will be trouble.
922     self.__owners = {}
923
924   def _GetLockName(self, mname):
925     """Returns the name for a member lock.
926
927     """
928     return "%s/%s" % (self.name, mname)
929
930   def _get_lock(self):
931     """Returns the lockset-internal lock.
932
933     """
934     return self.__lock
935
936   def _get_lockdict(self):
937     """Returns the lockset-internal lock dictionary.
938
939     Accessing this structure is only safe in single-thread usage or when the
940     lockset-internal lock is held.
941
942     """
943     return self.__lockdict
944
945   def is_owned(self):
946     """Is the current thread a current level owner?"""
947     return threading.currentThread() in self.__owners
948
949   def _add_owned(self, name=None):
950     """Note the current thread owns the given lock"""
951     if name is None:
952       if not self.is_owned():
953         self.__owners[threading.currentThread()] = set()
954     else:
955       if self.is_owned():
956         self.__owners[threading.currentThread()].add(name)
957       else:
958         self.__owners[threading.currentThread()] = set([name])
959
960   def _del_owned(self, name=None):
961     """Note the current thread owns the given lock"""
962
963     assert not (name is None and self.__lock.is_owned()), \
964            "Cannot hold internal lock when deleting owner status"
965
966     if name is not None:
967       self.__owners[threading.currentThread()].remove(name)
968
969     # Only remove the key if we don't hold the set-lock as well
970     if (not self.__lock.is_owned() and
971         not self.__owners[threading.currentThread()]):
972       del self.__owners[threading.currentThread()]
973
974   def list_owned(self):
975     """Get the set of resource names owned by the current thread"""
976     if self.is_owned():
977       return self.__owners[threading.currentThread()].copy()
978     else:
979       return set()
980
981   def _release_and_delete_owned(self):
982     """Release and delete all resources owned by the current thread"""
983     for lname in self.list_owned():
984       lock = self.__lockdict[lname]
985       if lock.is_owned():
986         lock.release()
987       self._del_owned(name=lname)
988
989   def __names(self):
990     """Return the current set of names.
991
992     Only call this function while holding __lock and don't iterate on the
993     result after releasing the lock.
994
995     """
996     return self.__lockdict.keys()
997
998   def _names(self):
999     """Return a copy of the current set of elements.
1000
1001     Used only for debugging purposes.
1002
1003     """
1004     # If we don't already own the set-level lock acquired
1005     # we'll get it and note we need to release it later.
1006     release_lock = False
1007     if not self.__lock.is_owned():
1008       release_lock = True
1009       self.__lock.acquire(shared=1)
1010     try:
1011       result = self.__names()
1012     finally:
1013       if release_lock:
1014         self.__lock.release()
1015     return set(result)
1016
1017   def acquire(self, names, timeout=None, shared=0, priority=None,
1018               test_notify=None):
1019     """Acquire a set of resource locks.
1020
1021     @type names: list of strings (or string)
1022     @param names: the names of the locks which shall be acquired
1023         (special lock names, or instance/node names)
1024     @type shared: integer (0/1) used as a boolean
1025     @param shared: whether to acquire in shared mode; by default an
1026         exclusive lock will be acquired
1027     @type timeout: float or None
1028     @param timeout: Maximum time to acquire all locks
1029     @type priority: integer
1030     @param priority: Priority for acquiring locks
1031     @type test_notify: callable or None
1032     @param test_notify: Special callback function for unittesting
1033
1034     @return: Set of all locks successfully acquired or None in case of timeout
1035
1036     @raise errors.LockError: when any lock we try to acquire has
1037         been deleted before we succeed. In this case none of the
1038         locks requested will be acquired.
1039
1040     """
1041     assert timeout is None or timeout >= 0.0
1042
1043     # Check we don't already own locks at this level
1044     assert not self.is_owned(), ("Cannot acquire locks in the same set twice"
1045                                  " (lockset %s)" % self.name)
1046
1047     if priority is None:
1048       priority = _DEFAULT_PRIORITY
1049
1050     # We need to keep track of how long we spent waiting for a lock. The
1051     # timeout passed to this function is over all lock acquires.
1052     running_timeout = utils.RunningTimeout(timeout, False)
1053
1054     try:
1055       if names is not None:
1056         # Support passing in a single resource to acquire rather than many
1057         if isinstance(names, basestring):
1058           names = [names]
1059
1060         return self.__acquire_inner(names, False, shared, priority,
1061                                     running_timeout.Remaining, test_notify)
1062
1063       else:
1064         # If no names are given acquire the whole set by not letting new names
1065         # being added before we release, and getting the current list of names.
1066         # Some of them may then be deleted later, but we'll cope with this.
1067         #
1068         # We'd like to acquire this lock in a shared way, as it's nice if
1069         # everybody else can use the instances at the same time. If we are
1070         # acquiring them exclusively though they won't be able to do this
1071         # anyway, though, so we'll get the list lock exclusively as well in
1072         # order to be able to do add() on the set while owning it.
1073         if not self.__lock.acquire(shared=shared, priority=priority,
1074                                    timeout=running_timeout.Remaining()):
1075           raise _AcquireTimeout()
1076         try:
1077           # note we own the set-lock
1078           self._add_owned()
1079
1080           return self.__acquire_inner(self.__names(), True, shared, priority,
1081                                       running_timeout.Remaining, test_notify)
1082         except:
1083           # We shouldn't have problems adding the lock to the owners list, but
1084           # if we did we'll try to release this lock and re-raise exception.
1085           # Of course something is going to be really wrong, after this.
1086           self.__lock.release()
1087           self._del_owned()
1088           raise
1089
1090     except _AcquireTimeout:
1091       return None
1092
1093   def __acquire_inner(self, names, want_all, shared, priority,
1094                       timeout_fn, test_notify):
1095     """Inner logic for acquiring a number of locks.
1096
1097     @param names: Names of the locks to be acquired
1098     @param want_all: Whether all locks in the set should be acquired
1099     @param shared: Whether to acquire in shared mode
1100     @param timeout_fn: Function returning remaining timeout
1101     @param priority: Priority for acquiring locks
1102     @param test_notify: Special callback function for unittesting
1103
1104     """
1105     acquire_list = []
1106
1107     # First we look the locks up on __lockdict. We have no way of being sure
1108     # they will still be there after, but this makes it a lot faster should
1109     # just one of them be the already wrong. Using a sorted sequence to prevent
1110     # deadlocks.
1111     for lname in sorted(utils.UniqueSequence(names)):
1112       try:
1113         lock = self.__lockdict[lname] # raises KeyError if lock is not there
1114       except KeyError:
1115         if want_all:
1116           # We are acquiring all the set, it doesn't matter if this particular
1117           # element is not there anymore.
1118           continue
1119
1120         raise errors.LockError("Non-existing lock %s in set %s (it may have"
1121                                " been removed)" % (lname, self.name))
1122
1123       acquire_list.append((lname, lock))
1124
1125     # This will hold the locknames we effectively acquired.
1126     acquired = set()
1127
1128     try:
1129       # Now acquire_list contains a sorted list of resources and locks we
1130       # want.  In order to get them we loop on this (private) list and
1131       # acquire() them.  We gave no real guarantee they will still exist till
1132       # this is done but .acquire() itself is safe and will alert us if the
1133       # lock gets deleted.
1134       for (lname, lock) in acquire_list:
1135         if __debug__ and callable(test_notify):
1136           test_notify_fn = lambda: test_notify(lname)
1137         else:
1138           test_notify_fn = None
1139
1140         timeout = timeout_fn()
1141
1142         try:
1143           # raises LockError if the lock was deleted
1144           acq_success = lock.acquire(shared=shared, timeout=timeout,
1145                                      priority=priority,
1146                                      test_notify=test_notify_fn)
1147         except errors.LockError:
1148           if want_all:
1149             # We are acquiring all the set, it doesn't matter if this
1150             # particular element is not there anymore.
1151             continue
1152
1153           raise errors.LockError("Non-existing lock %s in set %s (it may"
1154                                  " have been removed)" % (lname, self.name))
1155
1156         if not acq_success:
1157           # Couldn't get lock or timeout occurred
1158           if timeout is None:
1159             # This shouldn't happen as SharedLock.acquire(timeout=None) is
1160             # blocking.
1161             raise errors.LockError("Failed to get lock %s (set %s)" %
1162                                    (lname, self.name))
1163
1164           raise _AcquireTimeout()
1165
1166         try:
1167           # now the lock cannot be deleted, we have it!
1168           self._add_owned(name=lname)
1169           acquired.add(lname)
1170
1171         except:
1172           # We shouldn't have problems adding the lock to the owners list, but
1173           # if we did we'll try to release this lock and re-raise exception.
1174           # Of course something is going to be really wrong after this.
1175           if lock.is_owned():
1176             lock.release()
1177           raise
1178
1179     except:
1180       # Release all owned locks
1181       self._release_and_delete_owned()
1182       raise
1183
1184     return acquired
1185
1186   def downgrade(self, names=None):
1187     """Downgrade a set of resource locks from exclusive to shared mode.
1188
1189     The locks must have been acquired in exclusive mode.
1190
1191     """
1192     assert self.is_owned(), ("downgrade on lockset %s while not owning any"
1193                              " lock" % self.name)
1194
1195     # Support passing in a single resource to downgrade rather than many
1196     if isinstance(names, basestring):
1197       names = [names]
1198
1199     owned = self.list_owned()
1200
1201     if names is None:
1202       names = owned
1203     else:
1204       names = set(names)
1205       assert owned.issuperset(names), \
1206         ("downgrade() on unheld resources %s (set %s)" %
1207          (names.difference(owned), self.name))
1208
1209     for lockname in names:
1210       self.__lockdict[lockname].downgrade()
1211
1212     # Do we own the lockset in exclusive mode?
1213     if self.__lock.is_owned(shared=0):
1214       # Have all locks been downgraded?
1215       if not compat.any(lock.is_owned(shared=0)
1216                         for lock in self.__lockdict.values()):
1217         self.__lock.downgrade()
1218         assert self.__lock.is_owned(shared=1)
1219
1220     return True
1221
1222   def release(self, names=None):
1223     """Release a set of resource locks, at the same level.
1224
1225     You must have acquired the locks, either in shared or in exclusive mode,
1226     before releasing them.
1227
1228     @type names: list of strings, or None
1229     @param names: the names of the locks which shall be released
1230         (defaults to all the locks acquired at that level).
1231
1232     """
1233     assert self.is_owned(), ("release() on lock set %s while not owner" %
1234                              self.name)
1235
1236     # Support passing in a single resource to release rather than many
1237     if isinstance(names, basestring):
1238       names = [names]
1239
1240     if names is None:
1241       names = self.list_owned()
1242     else:
1243       names = set(names)
1244       assert self.list_owned().issuperset(names), (
1245                "release() on unheld resources %s (set %s)" %
1246                (names.difference(self.list_owned()), self.name))
1247
1248     # First of all let's release the "all elements" lock, if set.
1249     # After this 'add' can work again
1250     if self.__lock.is_owned():
1251       self.__lock.release()
1252       self._del_owned()
1253
1254     for lockname in names:
1255       # If we are sure the lock doesn't leave __lockdict without being
1256       # exclusively held we can do this...
1257       self.__lockdict[lockname].release()
1258       self._del_owned(name=lockname)
1259
1260   def add(self, names, acquired=0, shared=0):
1261     """Add a new set of elements to the set
1262
1263     @type names: list of strings
1264     @param names: names of the new elements to add
1265     @type acquired: integer (0/1) used as a boolean
1266     @param acquired: pre-acquire the new resource?
1267     @type shared: integer (0/1) used as a boolean
1268     @param shared: is the pre-acquisition shared?
1269
1270     """
1271     # Check we don't already own locks at this level
1272     assert not self.is_owned() or self.__lock.is_owned(shared=0), \
1273       ("Cannot add locks if the set %s is only partially owned, or shared" %
1274        self.name)
1275
1276     # Support passing in a single resource to add rather than many
1277     if isinstance(names, basestring):
1278       names = [names]
1279
1280     # If we don't already own the set-level lock acquired in an exclusive way
1281     # we'll get it and note we need to release it later.
1282     release_lock = False
1283     if not self.__lock.is_owned():
1284       release_lock = True
1285       self.__lock.acquire()
1286
1287     try:
1288       invalid_names = set(self.__names()).intersection(names)
1289       if invalid_names:
1290         # This must be an explicit raise, not an assert, because assert is
1291         # turned off when using optimization, and this can happen because of
1292         # concurrency even if the user doesn't want it.
1293         raise errors.LockError("duplicate add(%s) on lockset %s" %
1294                                (invalid_names, self.name))
1295
1296       for lockname in names:
1297         lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1298
1299         if acquired:
1300           # No need for priority or timeout here as this lock has just been
1301           # created
1302           lock.acquire(shared=shared)
1303           # now the lock cannot be deleted, we have it!
1304           try:
1305             self._add_owned(name=lockname)
1306           except:
1307             # We shouldn't have problems adding the lock to the owners list,
1308             # but if we did we'll try to release this lock and re-raise
1309             # exception.  Of course something is going to be really wrong,
1310             # after this.  On the other hand the lock hasn't been added to the
1311             # __lockdict yet so no other threads should be pending on it. This
1312             # release is just a safety measure.
1313             lock.release()
1314             raise
1315
1316         self.__lockdict[lockname] = lock
1317
1318     finally:
1319       # Only release __lock if we were not holding it previously.
1320       if release_lock:
1321         self.__lock.release()
1322
1323     return True
1324
1325   def remove(self, names):
1326     """Remove elements from the lock set.
1327
1328     You can either not hold anything in the lockset or already hold a superset
1329     of the elements you want to delete, exclusively.
1330
1331     @type names: list of strings
1332     @param names: names of the resource to remove.
1333
1334     @return: a list of locks which we removed; the list is always
1335         equal to the names list if we were holding all the locks
1336         exclusively
1337
1338     """
1339     # Support passing in a single resource to remove rather than many
1340     if isinstance(names, basestring):
1341       names = [names]
1342
1343     # If we own any subset of this lock it must be a superset of what we want
1344     # to delete. The ownership must also be exclusive, but that will be checked
1345     # by the lock itself.
1346     assert not self.is_owned() or self.list_owned().issuperset(names), (
1347       "remove() on acquired lockset %s while not owning all elements" %
1348       self.name)
1349
1350     removed = []
1351
1352     for lname in names:
1353       # Calling delete() acquires the lock exclusively if we don't already own
1354       # it, and causes all pending and subsequent lock acquires to fail. It's
1355       # fine to call it out of order because delete() also implies release(),
1356       # and the assertion above guarantees that if we either already hold
1357       # everything we want to delete, or we hold none.
1358       try:
1359         self.__lockdict[lname].delete()
1360         removed.append(lname)
1361       except (KeyError, errors.LockError):
1362         # This cannot happen if we were already holding it, verify:
1363         assert not self.is_owned(), ("remove failed while holding lockset %s" %
1364                                      self.name)
1365       else:
1366         # If no LockError was raised we are the ones who deleted the lock.
1367         # This means we can safely remove it from lockdict, as any further or
1368         # pending delete() or acquire() will fail (and nobody can have the lock
1369         # since before our call to delete()).
1370         #
1371         # This is done in an else clause because if the exception was thrown
1372         # it's the job of the one who actually deleted it.
1373         del self.__lockdict[lname]
1374         # And let's remove it from our private list if we owned it.
1375         if self.is_owned():
1376           self._del_owned(name=lname)
1377
1378     return removed
1379
1380
1381 # Locking levels, must be acquired in increasing order.
1382 # Current rules are:
1383 #   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1384 #   acquired before performing any operation, either in shared or in exclusive
1385 #   mode. acquiring the BGL in exclusive mode is discouraged and should be
1386 #   avoided.
1387 #   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1388 #   If you need more than one node, or more than one instance, acquire them at
1389 #   the same time.
1390 LEVEL_CLUSTER = 0
1391 LEVEL_INSTANCE = 1
1392 LEVEL_NODEGROUP = 2
1393 LEVEL_NODE = 3
1394 LEVEL_NODE_RES = 4
1395
1396 LEVELS = [
1397   LEVEL_CLUSTER,
1398   LEVEL_INSTANCE,
1399   LEVEL_NODEGROUP,
1400   LEVEL_NODE,
1401   LEVEL_NODE_RES,
1402   ]
1403
1404 # Lock levels which are modifiable
1405 LEVELS_MOD = frozenset([
1406   LEVEL_NODE_RES,
1407   LEVEL_NODE,
1408   LEVEL_NODEGROUP,
1409   LEVEL_INSTANCE,
1410   ])
1411
1412 #: Lock level names (make sure to use singular form)
1413 LEVEL_NAMES = {
1414   LEVEL_CLUSTER: "cluster",
1415   LEVEL_INSTANCE: "instance",
1416   LEVEL_NODEGROUP: "nodegroup",
1417   LEVEL_NODE: "node",
1418   LEVEL_NODE_RES: "node-res",
1419   }
1420
1421 # Constant for the big ganeti lock
1422 BGL = 'BGL'
1423
1424
1425 class GanetiLockManager:
1426   """The Ganeti Locking Library
1427
1428   The purpose of this small library is to manage locking for ganeti clusters
1429   in a central place, while at the same time doing dynamic checks against
1430   possible deadlocks. It will also make it easier to transition to a different
1431   lock type should we migrate away from python threads.
1432
1433   """
1434   _instance = None
1435
1436   def __init__(self, nodes, nodegroups, instances):
1437     """Constructs a new GanetiLockManager object.
1438
1439     There should be only a GanetiLockManager object at any time, so this
1440     function raises an error if this is not the case.
1441
1442     @param nodes: list of node names
1443     @param nodegroups: list of nodegroup uuids
1444     @param instances: list of instance names
1445
1446     """
1447     assert self.__class__._instance is None, \
1448            "double GanetiLockManager instance"
1449
1450     self.__class__._instance = self
1451
1452     self._monitor = LockMonitor()
1453
1454     # The keyring contains all the locks, at their level and in the correct
1455     # locking order.
1456     self.__keyring = {
1457       LEVEL_CLUSTER: LockSet([BGL], "cluster", monitor=self._monitor),
1458       LEVEL_NODE: LockSet(nodes, "node", monitor=self._monitor),
1459       LEVEL_NODE_RES: LockSet(nodes, "node-res", monitor=self._monitor),
1460       LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroup", monitor=self._monitor),
1461       LEVEL_INSTANCE: LockSet(instances, "instance",
1462                               monitor=self._monitor),
1463       }
1464
1465     assert compat.all(ls.name == LEVEL_NAMES[level]
1466                       for (level, ls) in self.__keyring.items())
1467
1468   def AddToLockMonitor(self, provider):
1469     """Registers a new lock with the monitor.
1470
1471     See L{LockMonitor.RegisterLock}.
1472
1473     """
1474     return self._monitor.RegisterLock(provider)
1475
1476   def QueryLocks(self, fields):
1477     """Queries information from all locks.
1478
1479     See L{LockMonitor.QueryLocks}.
1480
1481     """
1482     return self._monitor.QueryLocks(fields)
1483
1484   def OldStyleQueryLocks(self, fields):
1485     """Queries information from all locks, returning old-style data.
1486
1487     See L{LockMonitor.OldStyleQueryLocks}.
1488
1489     """
1490     return self._monitor.OldStyleQueryLocks(fields)
1491
1492   def _names(self, level):
1493     """List the lock names at the given level.
1494
1495     This can be used for debugging/testing purposes.
1496
1497     @param level: the level whose list of locks to get
1498
1499     """
1500     assert level in LEVELS, "Invalid locking level %s" % level
1501     return self.__keyring[level]._names()
1502
1503   def is_owned(self, level):
1504     """Check whether we are owning locks at the given level
1505
1506     """
1507     return self.__keyring[level].is_owned()
1508
1509   def list_owned(self, level):
1510     """Get the set of owned locks at the given level
1511
1512     """
1513     return self.__keyring[level].list_owned()
1514
1515   def _upper_owned(self, level):
1516     """Check that we don't own any lock at a level greater than the given one.
1517
1518     """
1519     # This way of checking only works if LEVELS[i] = i, which we check for in
1520     # the test cases.
1521     return compat.any((self.is_owned(l) for l in LEVELS[level + 1:]))
1522
1523   def _BGL_owned(self): # pylint: disable=C0103
1524     """Check if the current thread owns the BGL.
1525
1526     Both an exclusive or a shared acquisition work.
1527
1528     """
1529     return BGL in self.__keyring[LEVEL_CLUSTER].list_owned()
1530
1531   @staticmethod
1532   def _contains_BGL(level, names): # pylint: disable=C0103
1533     """Check if the level contains the BGL.
1534
1535     Check if acting on the given level and set of names will change
1536     the status of the Big Ganeti Lock.
1537
1538     """
1539     return level == LEVEL_CLUSTER and (names is None or BGL in names)
1540
1541   def acquire(self, level, names, timeout=None, shared=0, priority=None):
1542     """Acquire a set of resource locks, at the same level.
1543
1544     @type level: member of locking.LEVELS
1545     @param level: the level at which the locks shall be acquired
1546     @type names: list of strings (or string)
1547     @param names: the names of the locks which shall be acquired
1548         (special lock names, or instance/node names)
1549     @type shared: integer (0/1) used as a boolean
1550     @param shared: whether to acquire in shared mode; by default
1551         an exclusive lock will be acquired
1552     @type timeout: float
1553     @param timeout: Maximum time to acquire all locks
1554     @type priority: integer
1555     @param priority: Priority for acquiring lock
1556
1557     """
1558     assert level in LEVELS, "Invalid locking level %s" % level
1559
1560     # Check that we are either acquiring the Big Ganeti Lock or we already own
1561     # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1562     # so even if we've migrated we need to at least share the BGL to be
1563     # compatible with them. Of course if we own the BGL exclusively there's no
1564     # point in acquiring any other lock, unless perhaps we are half way through
1565     # the migration of the current opcode.
1566     assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1567             "You must own the Big Ganeti Lock before acquiring any other")
1568
1569     # Check we don't own locks at the same or upper levels.
1570     assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1571            " while owning some at a greater one")
1572
1573     # Acquire the locks in the set.
1574     return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1575                                          priority=priority)
1576
1577   def downgrade(self, level, names=None):
1578     """Downgrade a set of resource locks from exclusive to shared mode.
1579
1580     You must have acquired the locks in exclusive mode.
1581
1582     @type level: member of locking.LEVELS
1583     @param level: the level at which the locks shall be downgraded
1584     @type names: list of strings, or None
1585     @param names: the names of the locks which shall be downgraded
1586         (defaults to all the locks acquired at the level)
1587
1588     """
1589     assert level in LEVELS, "Invalid locking level %s" % level
1590
1591     return self.__keyring[level].downgrade(names=names)
1592
1593   def release(self, level, names=None):
1594     """Release a set of resource locks, at the same level.
1595
1596     You must have acquired the locks, either in shared or in exclusive
1597     mode, before releasing them.
1598
1599     @type level: member of locking.LEVELS
1600     @param level: the level at which the locks shall be released
1601     @type names: list of strings, or None
1602     @param names: the names of the locks which shall be released
1603         (defaults to all the locks acquired at that level)
1604
1605     """
1606     assert level in LEVELS, "Invalid locking level %s" % level
1607     assert (not self._contains_BGL(level, names) or
1608             not self._upper_owned(LEVEL_CLUSTER)), (
1609             "Cannot release the Big Ganeti Lock while holding something"
1610             " at upper levels (%r)" %
1611             (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self.list_owned(i))
1612                               for i in self.__keyring.keys()]), ))
1613
1614     # Release will complain if we don't own the locks already
1615     return self.__keyring[level].release(names)
1616
1617   def add(self, level, names, acquired=0, shared=0):
1618     """Add locks at the specified level.
1619
1620     @type level: member of locking.LEVELS_MOD
1621     @param level: the level at which the locks shall be added
1622     @type names: list of strings
1623     @param names: names of the locks to acquire
1624     @type acquired: integer (0/1) used as a boolean
1625     @param acquired: whether to acquire the newly added locks
1626     @type shared: integer (0/1) used as a boolean
1627     @param shared: whether the acquisition will be shared
1628
1629     """
1630     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1631     assert self._BGL_owned(), ("You must own the BGL before performing other"
1632            " operations")
1633     assert not self._upper_owned(level), ("Cannot add locks at a level"
1634            " while owning some at a greater one")
1635     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1636
1637   def remove(self, level, names):
1638     """Remove locks from the specified level.
1639
1640     You must either already own the locks you are trying to remove
1641     exclusively or not own any lock at an upper level.
1642
1643     @type level: member of locking.LEVELS_MOD
1644     @param level: the level at which the locks shall be removed
1645     @type names: list of strings
1646     @param names: the names of the locks which shall be removed
1647         (special lock names, or instance/node names)
1648
1649     """
1650     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1651     assert self._BGL_owned(), ("You must own the BGL before performing other"
1652            " operations")
1653     # Check we either own the level or don't own anything from here
1654     # up. LockSet.remove() will check the case in which we don't own
1655     # all the needed resources, or we have a shared ownership.
1656     assert self.is_owned(level) or not self._upper_owned(level), (
1657            "Cannot remove locks at a level while not owning it or"
1658            " owning some at a greater one")
1659     return self.__keyring[level].remove(names)
1660
1661
1662 def _MonitorSortKey((item, idx, num)):
1663   """Sorting key function.
1664
1665   Sort by name, registration order and then order of information. This provides
1666   a stable sort order over different providers, even if they return the same
1667   name.
1668
1669   """
1670   (name, _, _, _) = item
1671
1672   return (utils.NiceSortKey(name), num, idx)
1673
1674
1675 class LockMonitor(object):
1676   _LOCK_ATTR = "_lock"
1677
1678   def __init__(self):
1679     """Initializes this class.
1680
1681     """
1682     self._lock = SharedLock("LockMonitor")
1683
1684     # Counter for stable sorting
1685     self._counter = itertools.count(0)
1686
1687     # Tracked locks. Weak references are used to avoid issues with circular
1688     # references and deletion.
1689     self._locks = weakref.WeakKeyDictionary()
1690
1691   @ssynchronized(_LOCK_ATTR)
1692   def RegisterLock(self, provider):
1693     """Registers a new lock.
1694
1695     @param provider: Object with a callable method named C{GetLockInfo}, taking
1696       a single C{set} containing the requested information items
1697     @note: It would be nicer to only receive the function generating the
1698       requested information but, as it turns out, weak references to bound
1699       methods (e.g. C{self.GetLockInfo}) are tricky; there are several
1700       workarounds, but none of the ones I found works properly in combination
1701       with a standard C{WeakKeyDictionary}
1702
1703     """
1704     assert provider not in self._locks, "Duplicate registration"
1705
1706     # There used to be a check for duplicate names here. As it turned out, when
1707     # a lock is re-created with the same name in a very short timeframe, the
1708     # previous instance might not yet be removed from the weakref dictionary.
1709     # By keeping track of the order of incoming registrations, a stable sort
1710     # ordering can still be guaranteed.
1711
1712     self._locks[provider] = self._counter.next()
1713
1714   def _GetLockInfo(self, requested):
1715     """Get information from all locks.
1716
1717     """
1718     # Must hold lock while getting consistent list of tracked items
1719     self._lock.acquire(shared=1)
1720     try:
1721       items = self._locks.items()
1722     finally:
1723       self._lock.release()
1724
1725     return [(info, idx, num)
1726             for (provider, num) in items
1727             for (idx, info) in enumerate(provider.GetLockInfo(requested))]
1728
1729   def _Query(self, fields):
1730     """Queries information from all locks.
1731
1732     @type fields: list of strings
1733     @param fields: List of fields to return
1734
1735     """
1736     qobj = query.Query(query.LOCK_FIELDS, fields)
1737
1738     # Get all data with internal lock held and then sort by name and incoming
1739     # order
1740     lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
1741                       key=_MonitorSortKey)
1742
1743     # Extract lock information and build query data
1744     return (qobj, query.LockQueryData(map(compat.fst, lockinfo)))
1745
1746   def QueryLocks(self, fields):
1747     """Queries information from all locks.
1748
1749     @type fields: list of strings
1750     @param fields: List of fields to return
1751
1752     """
1753     (qobj, ctx) = self._Query(fields)
1754
1755     # Prepare query response
1756     return query.GetQueryResponse(qobj, ctx)
1757
1758   def OldStyleQueryLocks(self, fields):
1759     """Queries information from all locks, returning old-style data.
1760
1761     @type fields: list of strings
1762     @param fields: List of fields to return
1763
1764     """
1765     (qobj, ctx) = self._Query(fields)
1766
1767     return qobj.OldStyleQuery(ctx)