--select-instances hbal manpage update
[ganeti-local] / lib / locking.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Module implementing the Ganeti locking code."""
22
23 # pylint: disable-msg=W0212
24
25 # W0212 since e.g. LockSet methods use (a lot) the internals of
26 # SharedLock
27
28 import os
29 import select
30 import threading
31 import errno
32 import weakref
33 import logging
34 import heapq
35 import operator
36 import itertools
37
38 from ganeti import errors
39 from ganeti import utils
40 from ganeti import compat
41 from ganeti import query
42
43
44 _EXCLUSIVE_TEXT = "exclusive"
45 _SHARED_TEXT = "shared"
46 _DELETED_TEXT = "deleted"
47
48 _DEFAULT_PRIORITY = 0
49
50
51 def ssynchronized(mylock, shared=0):
52   """Shared Synchronization decorator.
53
54   Calls the function holding the given lock, either in exclusive or shared
55   mode. It requires the passed lock to be a SharedLock (or support its
56   semantics).
57
58   @type mylock: lockable object or string
59   @param mylock: lock to acquire or class member name of the lock to acquire
60
61   """
62   def wrap(fn):
63     def sync_function(*args, **kwargs):
64       if isinstance(mylock, basestring):
65         assert args, "cannot ssynchronize on non-class method: self not found"
66         # args[0] is "self"
67         lock = getattr(args[0], mylock)
68       else:
69         lock = mylock
70       lock.acquire(shared=shared)
71       try:
72         return fn(*args, **kwargs)
73       finally:
74         lock.release()
75     return sync_function
76   return wrap
77
78
79 class _SingleNotifyPipeConditionWaiter(object):
80   """Helper class for SingleNotifyPipeCondition
81
82   """
83   __slots__ = [
84     "_fd",
85     "_poller",
86     ]
87
88   def __init__(self, poller, fd):
89     """Constructor for _SingleNotifyPipeConditionWaiter
90
91     @type poller: select.poll
92     @param poller: Poller object
93     @type fd: int
94     @param fd: File descriptor to wait for
95
96     """
97     object.__init__(self)
98     self._poller = poller
99     self._fd = fd
100
101   def __call__(self, timeout):
102     """Wait for something to happen on the pipe.
103
104     @type timeout: float or None
105     @param timeout: Timeout for waiting (can be None)
106
107     """
108     running_timeout = utils.RunningTimeout(timeout, True)
109
110     while True:
111       remaining_time = running_timeout.Remaining()
112
113       if remaining_time is not None:
114         if remaining_time < 0.0:
115           break
116
117         # Our calculation uses seconds, poll() wants milliseconds
118         remaining_time *= 1000
119
120       try:
121         result = self._poller.poll(remaining_time)
122       except EnvironmentError, err:
123         if err.errno != errno.EINTR:
124           raise
125         result = None
126
127       # Check whether we were notified
128       if result and result[0][0] == self._fd:
129         break
130
131
132 class _BaseCondition(object):
133   """Base class containing common code for conditions.
134
135   Some of this code is taken from python's threading module.
136
137   """
138   __slots__ = [
139     "_lock",
140     "acquire",
141     "release",
142     "_is_owned",
143     "_acquire_restore",
144     "_release_save",
145     ]
146
147   def __init__(self, lock):
148     """Constructor for _BaseCondition.
149
150     @type lock: threading.Lock
151     @param lock: condition base lock
152
153     """
154     object.__init__(self)
155
156     try:
157       self._release_save = lock._release_save
158     except AttributeError:
159       self._release_save = self._base_release_save
160     try:
161       self._acquire_restore = lock._acquire_restore
162     except AttributeError:
163       self._acquire_restore = self._base_acquire_restore
164     try:
165       self._is_owned = lock._is_owned
166     except AttributeError:
167       self._is_owned = self._base_is_owned
168
169     self._lock = lock
170
171     # Export the lock's acquire() and release() methods
172     self.acquire = lock.acquire
173     self.release = lock.release
174
175   def _base_is_owned(self):
176     """Check whether lock is owned by current thread.
177
178     """
179     if self._lock.acquire(0):
180       self._lock.release()
181       return False
182     return True
183
184   def _base_release_save(self):
185     self._lock.release()
186
187   def _base_acquire_restore(self, _):
188     self._lock.acquire()
189
190   def _check_owned(self):
191     """Raise an exception if the current thread doesn't own the lock.
192
193     """
194     if not self._is_owned():
195       raise RuntimeError("cannot work with un-aquired lock")
196
197
198 class SingleNotifyPipeCondition(_BaseCondition):
199   """Condition which can only be notified once.
200
201   This condition class uses pipes and poll, internally, to be able to wait for
202   notification with a timeout, without resorting to polling. It is almost
203   compatible with Python's threading.Condition, with the following differences:
204     - notifyAll can only be called once, and no wait can happen after that
205     - notify is not supported, only notifyAll
206
207   """
208
209   __slots__ = [
210     "_poller",
211     "_read_fd",
212     "_write_fd",
213     "_nwaiters",
214     "_notified",
215     ]
216
217   _waiter_class = _SingleNotifyPipeConditionWaiter
218
219   def __init__(self, lock):
220     """Constructor for SingleNotifyPipeCondition
221
222     """
223     _BaseCondition.__init__(self, lock)
224     self._nwaiters = 0
225     self._notified = False
226     self._read_fd = None
227     self._write_fd = None
228     self._poller = None
229
230   def _check_unnotified(self):
231     """Throws an exception if already notified.
232
233     """
234     if self._notified:
235       raise RuntimeError("cannot use already notified condition")
236
237   def _Cleanup(self):
238     """Cleanup open file descriptors, if any.
239
240     """
241     if self._read_fd is not None:
242       os.close(self._read_fd)
243       self._read_fd = None
244
245     if self._write_fd is not None:
246       os.close(self._write_fd)
247       self._write_fd = None
248     self._poller = None
249
250   def wait(self, timeout):
251     """Wait for a notification.
252
253     @type timeout: float or None
254     @param timeout: Waiting timeout (can be None)
255
256     """
257     self._check_owned()
258     self._check_unnotified()
259
260     self._nwaiters += 1
261     try:
262       if self._poller is None:
263         (self._read_fd, self._write_fd) = os.pipe()
264         self._poller = select.poll()
265         self._poller.register(self._read_fd, select.POLLHUP)
266
267       wait_fn = self._waiter_class(self._poller, self._read_fd)
268       state = self._release_save()
269       try:
270         # Wait for notification
271         wait_fn(timeout)
272       finally:
273         # Re-acquire lock
274         self._acquire_restore(state)
275     finally:
276       self._nwaiters -= 1
277       if self._nwaiters == 0:
278         self._Cleanup()
279
280   def notifyAll(self): # pylint: disable-msg=C0103
281     """Close the writing side of the pipe to notify all waiters.
282
283     """
284     self._check_owned()
285     self._check_unnotified()
286     self._notified = True
287     if self._write_fd is not None:
288       os.close(self._write_fd)
289       self._write_fd = None
290
291
292 class PipeCondition(_BaseCondition):
293   """Group-only non-polling condition with counters.
294
295   This condition class uses pipes and poll, internally, to be able to wait for
296   notification with a timeout, without resorting to polling. It is almost
297   compatible with Python's threading.Condition, but only supports notifyAll and
298   non-recursive locks. As an additional features it's able to report whether
299   there are any waiting threads.
300
301   """
302   __slots__ = [
303     "_waiters",
304     "_single_condition",
305     ]
306
307   _single_condition_class = SingleNotifyPipeCondition
308
309   def __init__(self, lock):
310     """Initializes this class.
311
312     """
313     _BaseCondition.__init__(self, lock)
314     self._waiters = set()
315     self._single_condition = self._single_condition_class(self._lock)
316
317   def wait(self, timeout):
318     """Wait for a notification.
319
320     @type timeout: float or None
321     @param timeout: Waiting timeout (can be None)
322
323     """
324     self._check_owned()
325
326     # Keep local reference to the pipe. It could be replaced by another thread
327     # notifying while we're waiting.
328     cond = self._single_condition
329
330     self._waiters.add(threading.currentThread())
331     try:
332       cond.wait(timeout)
333     finally:
334       self._check_owned()
335       self._waiters.remove(threading.currentThread())
336
337   def notifyAll(self): # pylint: disable-msg=C0103
338     """Notify all currently waiting threads.
339
340     """
341     self._check_owned()
342     self._single_condition.notifyAll()
343     self._single_condition = self._single_condition_class(self._lock)
344
345   def get_waiting(self):
346     """Returns a list of all waiting threads.
347
348     """
349     self._check_owned()
350
351     return self._waiters
352
353   def has_waiting(self):
354     """Returns whether there are active waiters.
355
356     """
357     self._check_owned()
358
359     return bool(self._waiters)
360
361
362 class _PipeConditionWithMode(PipeCondition):
363   __slots__ = [
364     "shared",
365     ]
366
367   def __init__(self, lock, shared):
368     """Initializes this class.
369
370     """
371     self.shared = shared
372     PipeCondition.__init__(self, lock)
373
374
375 class SharedLock(object):
376   """Implements a shared lock.
377
378   Multiple threads can acquire the lock in a shared way by calling
379   C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
380   threads can call C{acquire(shared=0)}.
381
382   Notes on data structures: C{__pending} contains a priority queue (heapq) of
383   all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
384   ...]}. Each per-priority queue contains a normal in-order list of conditions
385   to be notified when the lock can be acquired. Shared locks are grouped
386   together by priority and the condition for them is stored in
387   C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
388   references for the per-priority queues indexed by priority for faster access.
389
390   @type name: string
391   @ivar name: the name of the lock
392
393   """
394   __slots__ = [
395     "__weakref__",
396     "__deleted",
397     "__exc",
398     "__lock",
399     "__pending",
400     "__pending_by_prio",
401     "__pending_shared",
402     "__shr",
403     "name",
404     ]
405
406   __condition_class = _PipeConditionWithMode
407
408   def __init__(self, name, monitor=None):
409     """Construct a new SharedLock.
410
411     @param name: the name of the lock
412     @type monitor: L{LockMonitor}
413     @param monitor: Lock monitor with which to register
414
415     """
416     object.__init__(self)
417
418     self.name = name
419
420     # Internal lock
421     self.__lock = threading.Lock()
422
423     # Queue containing waiting acquires
424     self.__pending = []
425     self.__pending_by_prio = {}
426     self.__pending_shared = {}
427
428     # Current lock holders
429     self.__shr = set()
430     self.__exc = None
431
432     # is this lock in the deleted state?
433     self.__deleted = False
434
435     # Register with lock monitor
436     if monitor:
437       monitor.RegisterLock(self)
438
439   def GetInfo(self, requested):
440     """Retrieves information for querying locks.
441
442     @type requested: set
443     @param requested: Requested information, see C{query.LQ_*}
444
445     """
446     self.__lock.acquire()
447     try:
448       # Note: to avoid unintentional race conditions, no references to
449       # modifiable objects should be returned unless they were created in this
450       # function.
451       mode = None
452       owner_names = None
453
454       if query.LQ_MODE in requested:
455         if self.__deleted:
456           mode = _DELETED_TEXT
457           assert not (self.__exc or self.__shr)
458         elif self.__exc:
459           mode = _EXCLUSIVE_TEXT
460         elif self.__shr:
461           mode = _SHARED_TEXT
462
463       # Current owner(s) are wanted
464       if query.LQ_OWNER in requested:
465         if self.__exc:
466           owner = [self.__exc]
467         else:
468           owner = self.__shr
469
470         if owner:
471           assert not self.__deleted
472           owner_names = [i.getName() for i in owner]
473
474       # Pending acquires are wanted
475       if query.LQ_PENDING in requested:
476         pending = []
477
478         # Sorting instead of copying and using heaq functions for simplicity
479         for (_, prioqueue) in sorted(self.__pending):
480           for cond in prioqueue:
481             if cond.shared:
482               pendmode = _SHARED_TEXT
483             else:
484               pendmode = _EXCLUSIVE_TEXT
485
486             # List of names will be sorted in L{query._GetLockPending}
487             pending.append((pendmode, [i.getName()
488                                        for i in cond.get_waiting()]))
489       else:
490         pending = None
491
492       return (self.name, mode, owner_names, pending)
493     finally:
494       self.__lock.release()
495
496   def __check_deleted(self):
497     """Raises an exception if the lock has been deleted.
498
499     """
500     if self.__deleted:
501       raise errors.LockError("Deleted lock %s" % self.name)
502
503   def __is_sharer(self):
504     """Is the current thread sharing the lock at this time?
505
506     """
507     return threading.currentThread() in self.__shr
508
509   def __is_exclusive(self):
510     """Is the current thread holding the lock exclusively at this time?
511
512     """
513     return threading.currentThread() == self.__exc
514
515   def __is_owned(self, shared=-1):
516     """Is the current thread somehow owning the lock at this time?
517
518     This is a private version of the function, which presumes you're holding
519     the internal lock.
520
521     """
522     if shared < 0:
523       return self.__is_sharer() or self.__is_exclusive()
524     elif shared:
525       return self.__is_sharer()
526     else:
527       return self.__is_exclusive()
528
529   def _is_owned(self, shared=-1):
530     """Is the current thread somehow owning the lock at this time?
531
532     @param shared:
533         - < 0: check for any type of ownership (default)
534         - 0: check for exclusive ownership
535         - > 0: check for shared ownership
536
537     """
538     self.__lock.acquire()
539     try:
540       return self.__is_owned(shared=shared)
541     finally:
542       self.__lock.release()
543
544   def _count_pending(self):
545     """Returns the number of pending acquires.
546
547     @rtype: int
548
549     """
550     self.__lock.acquire()
551     try:
552       return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
553     finally:
554       self.__lock.release()
555
556   def _check_empty(self):
557     """Checks whether there are any pending acquires.
558
559     @rtype: bool
560
561     """
562     self.__lock.acquire()
563     try:
564       # Order is important: __find_first_pending_queue modifies __pending
565       (_, prioqueue) = self.__find_first_pending_queue()
566
567       return not (prioqueue or
568                   self.__pending or
569                   self.__pending_by_prio or
570                   self.__pending_shared)
571     finally:
572       self.__lock.release()
573
574   def __do_acquire(self, shared):
575     """Actually acquire the lock.
576
577     """
578     if shared:
579       self.__shr.add(threading.currentThread())
580     else:
581       self.__exc = threading.currentThread()
582
583   def __can_acquire(self, shared):
584     """Determine whether lock can be acquired.
585
586     """
587     if shared:
588       return self.__exc is None
589     else:
590       return len(self.__shr) == 0 and self.__exc is None
591
592   def __find_first_pending_queue(self):
593     """Tries to find the topmost queued entry with pending acquires.
594
595     Removes empty entries while going through the list.
596
597     """
598     while self.__pending:
599       (priority, prioqueue) = self.__pending[0]
600
601       if prioqueue:
602         return (priority, prioqueue)
603
604       # Remove empty queue
605       heapq.heappop(self.__pending)
606       del self.__pending_by_prio[priority]
607       assert priority not in self.__pending_shared
608
609     return (None, None)
610
611   def __is_on_top(self, cond):
612     """Checks whether the passed condition is on top of the queue.
613
614     The caller must make sure the queue isn't empty.
615
616     """
617     (_, prioqueue) = self.__find_first_pending_queue()
618
619     return cond == prioqueue[0]
620
621   def __acquire_unlocked(self, shared, timeout, priority):
622     """Acquire a shared lock.
623
624     @param shared: whether to acquire in shared mode; by default an
625         exclusive lock will be acquired
626     @param timeout: maximum waiting time before giving up
627     @type priority: integer
628     @param priority: Priority for acquiring lock
629
630     """
631     self.__check_deleted()
632
633     # We cannot acquire the lock if we already have it
634     assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
635                                    " %s" % self.name)
636
637     # Remove empty entries from queue
638     self.__find_first_pending_queue()
639
640     # Check whether someone else holds the lock or there are pending acquires.
641     if not self.__pending and self.__can_acquire(shared):
642       # Apparently not, can acquire lock directly.
643       self.__do_acquire(shared)
644       return True
645
646     prioqueue = self.__pending_by_prio.get(priority, None)
647
648     if shared:
649       # Try to re-use condition for shared acquire
650       wait_condition = self.__pending_shared.get(priority, None)
651       assert (wait_condition is None or
652               (wait_condition.shared and wait_condition in prioqueue))
653     else:
654       wait_condition = None
655
656     if wait_condition is None:
657       if prioqueue is None:
658         assert priority not in self.__pending_by_prio
659
660         prioqueue = []
661         heapq.heappush(self.__pending, (priority, prioqueue))
662         self.__pending_by_prio[priority] = prioqueue
663
664       wait_condition = self.__condition_class(self.__lock, shared)
665       prioqueue.append(wait_condition)
666
667       if shared:
668         # Keep reference for further shared acquires on same priority. This is
669         # better than trying to find it in the list of pending acquires.
670         assert priority not in self.__pending_shared
671         self.__pending_shared[priority] = wait_condition
672
673     try:
674       # Wait until we become the topmost acquire in the queue or the timeout
675       # expires.
676       # TODO: Decrease timeout with spurious notifications
677       while not (self.__is_on_top(wait_condition) and
678                  self.__can_acquire(shared)):
679         # Wait for notification
680         wait_condition.wait(timeout)
681         self.__check_deleted()
682
683         # A lot of code assumes blocking acquires always succeed. Loop
684         # internally for that case.
685         if timeout is not None:
686           break
687
688       if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
689         self.__do_acquire(shared)
690         return True
691     finally:
692       # Remove condition from queue if there are no more waiters
693       if not wait_condition.has_waiting():
694         prioqueue.remove(wait_condition)
695         if wait_condition.shared:
696           # Remove from list of shared acquires if it wasn't while releasing
697           # (e.g. on lock deletion)
698           self.__pending_shared.pop(priority, None)
699
700     return False
701
702   def acquire(self, shared=0, timeout=None, priority=None,
703               test_notify=None):
704     """Acquire a shared lock.
705
706     @type shared: integer (0/1) used as a boolean
707     @param shared: whether to acquire in shared mode; by default an
708         exclusive lock will be acquired
709     @type timeout: float
710     @param timeout: maximum waiting time before giving up
711     @type priority: integer
712     @param priority: Priority for acquiring lock
713     @type test_notify: callable or None
714     @param test_notify: Special callback function for unittesting
715
716     """
717     if priority is None:
718       priority = _DEFAULT_PRIORITY
719
720     self.__lock.acquire()
721     try:
722       # We already got the lock, notify now
723       if __debug__ and callable(test_notify):
724         test_notify()
725
726       return self.__acquire_unlocked(shared, timeout, priority)
727     finally:
728       self.__lock.release()
729
730   def downgrade(self):
731     """Changes the lock mode from exclusive to shared.
732
733     Pending acquires in shared mode on the same priority will go ahead.
734
735     """
736     self.__lock.acquire()
737     try:
738       assert self.__is_owned(), "Lock must be owned"
739
740       if self.__is_exclusive():
741         # Do nothing if the lock is already acquired in shared mode
742         self.__exc = None
743         self.__do_acquire(1)
744
745         # Important: pending shared acquires should only jump ahead if there
746         # was a transition from exclusive to shared, otherwise an owner of a
747         # shared lock can keep calling this function to push incoming shared
748         # acquires
749         (priority, prioqueue) = self.__find_first_pending_queue()
750         if prioqueue:
751           # Is there a pending shared acquire on this priority?
752           cond = self.__pending_shared.pop(priority, None)
753           if cond:
754             assert cond.shared
755             assert cond in prioqueue
756
757             # Ensure shared acquire is on top of queue
758             if len(prioqueue) > 1:
759               prioqueue.remove(cond)
760               prioqueue.insert(0, cond)
761
762             # Notify
763             cond.notifyAll()
764
765       assert not self.__is_exclusive()
766       assert self.__is_sharer()
767
768       return True
769     finally:
770       self.__lock.release()
771
772   def release(self):
773     """Release a Shared Lock.
774
775     You must have acquired the lock, either in shared or in exclusive mode,
776     before calling this function.
777
778     """
779     self.__lock.acquire()
780     try:
781       assert self.__is_exclusive() or self.__is_sharer(), \
782         "Cannot release non-owned lock"
783
784       # Autodetect release type
785       if self.__is_exclusive():
786         self.__exc = None
787       else:
788         self.__shr.remove(threading.currentThread())
789
790       # Notify topmost condition in queue
791       (priority, prioqueue) = self.__find_first_pending_queue()
792       if prioqueue:
793         cond = prioqueue[0]
794         cond.notifyAll()
795         if cond.shared:
796           # Prevent further shared acquires from sneaking in while waiters are
797           # notified
798           self.__pending_shared.pop(priority, None)
799
800     finally:
801       self.__lock.release()
802
803   def delete(self, timeout=None, priority=None):
804     """Delete a Shared Lock.
805
806     This operation will declare the lock for removal. First the lock will be
807     acquired in exclusive mode if you don't already own it, then the lock
808     will be put in a state where any future and pending acquire() fail.
809
810     @type timeout: float
811     @param timeout: maximum waiting time before giving up
812     @type priority: integer
813     @param priority: Priority for acquiring lock
814
815     """
816     if priority is None:
817       priority = _DEFAULT_PRIORITY
818
819     self.__lock.acquire()
820     try:
821       assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
822
823       self.__check_deleted()
824
825       # The caller is allowed to hold the lock exclusively already.
826       acquired = self.__is_exclusive()
827
828       if not acquired:
829         acquired = self.__acquire_unlocked(0, timeout, priority)
830
831         assert self.__is_exclusive() and not self.__is_sharer(), \
832           "Lock wasn't acquired in exclusive mode"
833
834       if acquired:
835         self.__deleted = True
836         self.__exc = None
837
838         assert not (self.__exc or self.__shr), "Found owner during deletion"
839
840         # Notify all acquires. They'll throw an error.
841         for (_, prioqueue) in self.__pending:
842           for cond in prioqueue:
843             cond.notifyAll()
844
845         assert self.__deleted
846
847       return acquired
848     finally:
849       self.__lock.release()
850
851   def _release_save(self):
852     shared = self.__is_sharer()
853     self.release()
854     return shared
855
856   def _acquire_restore(self, shared):
857     self.acquire(shared=shared)
858
859
860 # Whenever we want to acquire a full LockSet we pass None as the value
861 # to acquire.  Hide this behind this nicely named constant.
862 ALL_SET = None
863
864
865 class _AcquireTimeout(Exception):
866   """Internal exception to abort an acquire on a timeout.
867
868   """
869
870
871 class LockSet:
872   """Implements a set of locks.
873
874   This abstraction implements a set of shared locks for the same resource type,
875   distinguished by name. The user can lock a subset of the resources and the
876   LockSet will take care of acquiring the locks always in the same order, thus
877   preventing deadlock.
878
879   All the locks needed in the same set must be acquired together, though.
880
881   @type name: string
882   @ivar name: the name of the lockset
883
884   """
885   def __init__(self, members, name, monitor=None):
886     """Constructs a new LockSet.
887
888     @type members: list of strings
889     @param members: initial members of the set
890     @type monitor: L{LockMonitor}
891     @param monitor: Lock monitor with which to register member locks
892
893     """
894     assert members is not None, "members parameter is not a list"
895     self.name = name
896
897     # Lock monitor
898     self.__monitor = monitor
899
900     # Used internally to guarantee coherency
901     self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
902
903     # The lockdict indexes the relationship name -> lock
904     # The order-of-locking is implied by the alphabetical order of names
905     self.__lockdict = {}
906
907     for mname in members:
908       self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
909                                           monitor=monitor)
910
911     # The owner dict contains the set of locks each thread owns. For
912     # performance each thread can access its own key without a global lock on
913     # this structure. It is paramount though that *no* other type of access is
914     # done to this structure (eg. no looping over its keys). *_owner helper
915     # function are defined to guarantee access is correct, but in general never
916     # do anything different than __owners[threading.currentThread()], or there
917     # will be trouble.
918     self.__owners = {}
919
920   def _GetLockName(self, mname):
921     """Returns the name for a member lock.
922
923     """
924     return "%s/%s" % (self.name, mname)
925
926   def _get_lock(self):
927     """Returns the lockset-internal lock.
928
929     """
930     return self.__lock
931
932   def _get_lockdict(self):
933     """Returns the lockset-internal lock dictionary.
934
935     Accessing this structure is only safe in single-thread usage or when the
936     lockset-internal lock is held.
937
938     """
939     return self.__lockdict
940
941   def _is_owned(self):
942     """Is the current thread a current level owner?"""
943     return threading.currentThread() in self.__owners
944
945   def _add_owned(self, name=None):
946     """Note the current thread owns the given lock"""
947     if name is None:
948       if not self._is_owned():
949         self.__owners[threading.currentThread()] = set()
950     else:
951       if self._is_owned():
952         self.__owners[threading.currentThread()].add(name)
953       else:
954         self.__owners[threading.currentThread()] = set([name])
955
956   def _del_owned(self, name=None):
957     """Note the current thread owns the given lock"""
958
959     assert not (name is None and self.__lock._is_owned()), \
960            "Cannot hold internal lock when deleting owner status"
961
962     if name is not None:
963       self.__owners[threading.currentThread()].remove(name)
964
965     # Only remove the key if we don't hold the set-lock as well
966     if (not self.__lock._is_owned() and
967         not self.__owners[threading.currentThread()]):
968       del self.__owners[threading.currentThread()]
969
970   def _list_owned(self):
971     """Get the set of resource names owned by the current thread"""
972     if self._is_owned():
973       return self.__owners[threading.currentThread()].copy()
974     else:
975       return set()
976
977   def _release_and_delete_owned(self):
978     """Release and delete all resources owned by the current thread"""
979     for lname in self._list_owned():
980       lock = self.__lockdict[lname]
981       if lock._is_owned():
982         lock.release()
983       self._del_owned(name=lname)
984
985   def __names(self):
986     """Return the current set of names.
987
988     Only call this function while holding __lock and don't iterate on the
989     result after releasing the lock.
990
991     """
992     return self.__lockdict.keys()
993
994   def _names(self):
995     """Return a copy of the current set of elements.
996
997     Used only for debugging purposes.
998
999     """
1000     # If we don't already own the set-level lock acquired
1001     # we'll get it and note we need to release it later.
1002     release_lock = False
1003     if not self.__lock._is_owned():
1004       release_lock = True
1005       self.__lock.acquire(shared=1)
1006     try:
1007       result = self.__names()
1008     finally:
1009       if release_lock:
1010         self.__lock.release()
1011     return set(result)
1012
1013   def acquire(self, names, timeout=None, shared=0, priority=None,
1014               test_notify=None):
1015     """Acquire a set of resource locks.
1016
1017     @type names: list of strings (or string)
1018     @param names: the names of the locks which shall be acquired
1019         (special lock names, or instance/node names)
1020     @type shared: integer (0/1) used as a boolean
1021     @param shared: whether to acquire in shared mode; by default an
1022         exclusive lock will be acquired
1023     @type timeout: float or None
1024     @param timeout: Maximum time to acquire all locks
1025     @type priority: integer
1026     @param priority: Priority for acquiring locks
1027     @type test_notify: callable or None
1028     @param test_notify: Special callback function for unittesting
1029
1030     @return: Set of all locks successfully acquired or None in case of timeout
1031
1032     @raise errors.LockError: when any lock we try to acquire has
1033         been deleted before we succeed. In this case none of the
1034         locks requested will be acquired.
1035
1036     """
1037     assert timeout is None or timeout >= 0.0
1038
1039     # Check we don't already own locks at this level
1040     assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
1041                                   " (lockset %s)" % self.name)
1042
1043     if priority is None:
1044       priority = _DEFAULT_PRIORITY
1045
1046     # We need to keep track of how long we spent waiting for a lock. The
1047     # timeout passed to this function is over all lock acquires.
1048     running_timeout = utils.RunningTimeout(timeout, False)
1049
1050     try:
1051       if names is not None:
1052         # Support passing in a single resource to acquire rather than many
1053         if isinstance(names, basestring):
1054           names = [names]
1055
1056         return self.__acquire_inner(names, False, shared, priority,
1057                                     running_timeout.Remaining, test_notify)
1058
1059       else:
1060         # If no names are given acquire the whole set by not letting new names
1061         # being added before we release, and getting the current list of names.
1062         # Some of them may then be deleted later, but we'll cope with this.
1063         #
1064         # We'd like to acquire this lock in a shared way, as it's nice if
1065         # everybody else can use the instances at the same time. If we are
1066         # acquiring them exclusively though they won't be able to do this
1067         # anyway, though, so we'll get the list lock exclusively as well in
1068         # order to be able to do add() on the set while owning it.
1069         if not self.__lock.acquire(shared=shared, priority=priority,
1070                                    timeout=running_timeout.Remaining()):
1071           raise _AcquireTimeout()
1072         try:
1073           # note we own the set-lock
1074           self._add_owned()
1075
1076           return self.__acquire_inner(self.__names(), True, shared, priority,
1077                                       running_timeout.Remaining, test_notify)
1078         except:
1079           # We shouldn't have problems adding the lock to the owners list, but
1080           # if we did we'll try to release this lock and re-raise exception.
1081           # Of course something is going to be really wrong, after this.
1082           self.__lock.release()
1083           self._del_owned()
1084           raise
1085
1086     except _AcquireTimeout:
1087       return None
1088
1089   def __acquire_inner(self, names, want_all, shared, priority,
1090                       timeout_fn, test_notify):
1091     """Inner logic for acquiring a number of locks.
1092
1093     @param names: Names of the locks to be acquired
1094     @param want_all: Whether all locks in the set should be acquired
1095     @param shared: Whether to acquire in shared mode
1096     @param timeout_fn: Function returning remaining timeout
1097     @param priority: Priority for acquiring locks
1098     @param test_notify: Special callback function for unittesting
1099
1100     """
1101     acquire_list = []
1102
1103     # First we look the locks up on __lockdict. We have no way of being sure
1104     # they will still be there after, but this makes it a lot faster should
1105     # just one of them be the already wrong. Using a sorted sequence to prevent
1106     # deadlocks.
1107     for lname in sorted(utils.UniqueSequence(names)):
1108       try:
1109         lock = self.__lockdict[lname] # raises KeyError if lock is not there
1110       except KeyError:
1111         if want_all:
1112           # We are acquiring all the set, it doesn't matter if this particular
1113           # element is not there anymore.
1114           continue
1115
1116         raise errors.LockError("Non-existing lock %s in set %s (it may have"
1117                                " been removed)" % (lname, self.name))
1118
1119       acquire_list.append((lname, lock))
1120
1121     # This will hold the locknames we effectively acquired.
1122     acquired = set()
1123
1124     try:
1125       # Now acquire_list contains a sorted list of resources and locks we
1126       # want.  In order to get them we loop on this (private) list and
1127       # acquire() them.  We gave no real guarantee they will still exist till
1128       # this is done but .acquire() itself is safe and will alert us if the
1129       # lock gets deleted.
1130       for (lname, lock) in acquire_list:
1131         if __debug__ and callable(test_notify):
1132           test_notify_fn = lambda: test_notify(lname)
1133         else:
1134           test_notify_fn = None
1135
1136         timeout = timeout_fn()
1137
1138         try:
1139           # raises LockError if the lock was deleted
1140           acq_success = lock.acquire(shared=shared, timeout=timeout,
1141                                      priority=priority,
1142                                      test_notify=test_notify_fn)
1143         except errors.LockError:
1144           if want_all:
1145             # We are acquiring all the set, it doesn't matter if this
1146             # particular element is not there anymore.
1147             continue
1148
1149           raise errors.LockError("Non-existing lock %s in set %s (it may"
1150                                  " have been removed)" % (lname, self.name))
1151
1152         if not acq_success:
1153           # Couldn't get lock or timeout occurred
1154           if timeout is None:
1155             # This shouldn't happen as SharedLock.acquire(timeout=None) is
1156             # blocking.
1157             raise errors.LockError("Failed to get lock %s (set %s)" %
1158                                    (lname, self.name))
1159
1160           raise _AcquireTimeout()
1161
1162         try:
1163           # now the lock cannot be deleted, we have it!
1164           self._add_owned(name=lname)
1165           acquired.add(lname)
1166
1167         except:
1168           # We shouldn't have problems adding the lock to the owners list, but
1169           # if we did we'll try to release this lock and re-raise exception.
1170           # Of course something is going to be really wrong after this.
1171           if lock._is_owned():
1172             lock.release()
1173           raise
1174
1175     except:
1176       # Release all owned locks
1177       self._release_and_delete_owned()
1178       raise
1179
1180     return acquired
1181
1182   def downgrade(self, names=None):
1183     """Downgrade a set of resource locks from exclusive to shared mode.
1184
1185     The locks must have been acquired in exclusive mode.
1186
1187     """
1188     assert self._is_owned(), ("downgrade on lockset %s while not owning any"
1189                               " lock" % self.name)
1190
1191     # Support passing in a single resource to downgrade rather than many
1192     if isinstance(names, basestring):
1193       names = [names]
1194
1195     owned = self._list_owned()
1196
1197     if names is None:
1198       names = owned
1199     else:
1200       names = set(names)
1201       assert owned.issuperset(names), \
1202         ("downgrade() on unheld resources %s (set %s)" %
1203          (names.difference(owned), self.name))
1204
1205     for lockname in names:
1206       self.__lockdict[lockname].downgrade()
1207
1208     # Do we own the lockset in exclusive mode?
1209     if self.__lock._is_owned(shared=0):
1210       # Have all locks been downgraded?
1211       if not compat.any(lock._is_owned(shared=0)
1212                         for lock in self.__lockdict.values()):
1213         self.__lock.downgrade()
1214         assert self.__lock._is_owned(shared=1)
1215
1216     return True
1217
1218   def release(self, names=None):
1219     """Release a set of resource locks, at the same level.
1220
1221     You must have acquired the locks, either in shared or in exclusive mode,
1222     before releasing them.
1223
1224     @type names: list of strings, or None
1225     @param names: the names of the locks which shall be released
1226         (defaults to all the locks acquired at that level).
1227
1228     """
1229     assert self._is_owned(), ("release() on lock set %s while not owner" %
1230                               self.name)
1231
1232     # Support passing in a single resource to release rather than many
1233     if isinstance(names, basestring):
1234       names = [names]
1235
1236     if names is None:
1237       names = self._list_owned()
1238     else:
1239       names = set(names)
1240       assert self._list_owned().issuperset(names), (
1241                "release() on unheld resources %s (set %s)" %
1242                (names.difference(self._list_owned()), self.name))
1243
1244     # First of all let's release the "all elements" lock, if set.
1245     # After this 'add' can work again
1246     if self.__lock._is_owned():
1247       self.__lock.release()
1248       self._del_owned()
1249
1250     for lockname in names:
1251       # If we are sure the lock doesn't leave __lockdict without being
1252       # exclusively held we can do this...
1253       self.__lockdict[lockname].release()
1254       self._del_owned(name=lockname)
1255
1256   def add(self, names, acquired=0, shared=0):
1257     """Add a new set of elements to the set
1258
1259     @type names: list of strings
1260     @param names: names of the new elements to add
1261     @type acquired: integer (0/1) used as a boolean
1262     @param acquired: pre-acquire the new resource?
1263     @type shared: integer (0/1) used as a boolean
1264     @param shared: is the pre-acquisition shared?
1265
1266     """
1267     # Check we don't already own locks at this level
1268     assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1269       ("Cannot add locks if the set %s is only partially owned, or shared" %
1270        self.name)
1271
1272     # Support passing in a single resource to add rather than many
1273     if isinstance(names, basestring):
1274       names = [names]
1275
1276     # If we don't already own the set-level lock acquired in an exclusive way
1277     # we'll get it and note we need to release it later.
1278     release_lock = False
1279     if not self.__lock._is_owned():
1280       release_lock = True
1281       self.__lock.acquire()
1282
1283     try:
1284       invalid_names = set(self.__names()).intersection(names)
1285       if invalid_names:
1286         # This must be an explicit raise, not an assert, because assert is
1287         # turned off when using optimization, and this can happen because of
1288         # concurrency even if the user doesn't want it.
1289         raise errors.LockError("duplicate add(%s) on lockset %s" %
1290                                (invalid_names, self.name))
1291
1292       for lockname in names:
1293         lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1294
1295         if acquired:
1296           # No need for priority or timeout here as this lock has just been
1297           # created
1298           lock.acquire(shared=shared)
1299           # now the lock cannot be deleted, we have it!
1300           try:
1301             self._add_owned(name=lockname)
1302           except:
1303             # We shouldn't have problems adding the lock to the owners list,
1304             # but if we did we'll try to release this lock and re-raise
1305             # exception.  Of course something is going to be really wrong,
1306             # after this.  On the other hand the lock hasn't been added to the
1307             # __lockdict yet so no other threads should be pending on it. This
1308             # release is just a safety measure.
1309             lock.release()
1310             raise
1311
1312         self.__lockdict[lockname] = lock
1313
1314     finally:
1315       # Only release __lock if we were not holding it previously.
1316       if release_lock:
1317         self.__lock.release()
1318
1319     return True
1320
1321   def remove(self, names):
1322     """Remove elements from the lock set.
1323
1324     You can either not hold anything in the lockset or already hold a superset
1325     of the elements you want to delete, exclusively.
1326
1327     @type names: list of strings
1328     @param names: names of the resource to remove.
1329
1330     @return: a list of locks which we removed; the list is always
1331         equal to the names list if we were holding all the locks
1332         exclusively
1333
1334     """
1335     # Support passing in a single resource to remove rather than many
1336     if isinstance(names, basestring):
1337       names = [names]
1338
1339     # If we own any subset of this lock it must be a superset of what we want
1340     # to delete. The ownership must also be exclusive, but that will be checked
1341     # by the lock itself.
1342     assert not self._is_owned() or self._list_owned().issuperset(names), (
1343       "remove() on acquired lockset %s while not owning all elements" %
1344       self.name)
1345
1346     removed = []
1347
1348     for lname in names:
1349       # Calling delete() acquires the lock exclusively if we don't already own
1350       # it, and causes all pending and subsequent lock acquires to fail. It's
1351       # fine to call it out of order because delete() also implies release(),
1352       # and the assertion above guarantees that if we either already hold
1353       # everything we want to delete, or we hold none.
1354       try:
1355         self.__lockdict[lname].delete()
1356         removed.append(lname)
1357       except (KeyError, errors.LockError):
1358         # This cannot happen if we were already holding it, verify:
1359         assert not self._is_owned(), ("remove failed while holding lockset %s"
1360                                       % self.name)
1361       else:
1362         # If no LockError was raised we are the ones who deleted the lock.
1363         # This means we can safely remove it from lockdict, as any further or
1364         # pending delete() or acquire() will fail (and nobody can have the lock
1365         # since before our call to delete()).
1366         #
1367         # This is done in an else clause because if the exception was thrown
1368         # it's the job of the one who actually deleted it.
1369         del self.__lockdict[lname]
1370         # And let's remove it from our private list if we owned it.
1371         if self._is_owned():
1372           self._del_owned(name=lname)
1373
1374     return removed
1375
1376
1377 # Locking levels, must be acquired in increasing order.
1378 # Current rules are:
1379 #   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1380 #   acquired before performing any operation, either in shared or in exclusive
1381 #   mode. acquiring the BGL in exclusive mode is discouraged and should be
1382 #   avoided.
1383 #   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1384 #   If you need more than one node, or more than one instance, acquire them at
1385 #   the same time.
1386 LEVEL_CLUSTER = 0
1387 LEVEL_INSTANCE = 1
1388 LEVEL_NODEGROUP = 2
1389 LEVEL_NODE = 3
1390
1391 LEVELS = [LEVEL_CLUSTER,
1392           LEVEL_INSTANCE,
1393           LEVEL_NODEGROUP,
1394           LEVEL_NODE]
1395
1396 # Lock levels which are modifiable
1397 LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE]
1398
1399 LEVEL_NAMES = {
1400   LEVEL_CLUSTER: "cluster",
1401   LEVEL_INSTANCE: "instance",
1402   LEVEL_NODEGROUP: "nodegroup",
1403   LEVEL_NODE: "node",
1404   }
1405
1406 # Constant for the big ganeti lock
1407 BGL = 'BGL'
1408
1409
1410 class GanetiLockManager:
1411   """The Ganeti Locking Library
1412
1413   The purpose of this small library is to manage locking for ganeti clusters
1414   in a central place, while at the same time doing dynamic checks against
1415   possible deadlocks. It will also make it easier to transition to a different
1416   lock type should we migrate away from python threads.
1417
1418   """
1419   _instance = None
1420
1421   def __init__(self, nodes, nodegroups, instances):
1422     """Constructs a new GanetiLockManager object.
1423
1424     There should be only a GanetiLockManager object at any time, so this
1425     function raises an error if this is not the case.
1426
1427     @param nodes: list of node names
1428     @param nodegroups: list of nodegroup uuids
1429     @param instances: list of instance names
1430
1431     """
1432     assert self.__class__._instance is None, \
1433            "double GanetiLockManager instance"
1434
1435     self.__class__._instance = self
1436
1437     self._monitor = LockMonitor()
1438
1439     # The keyring contains all the locks, at their level and in the correct
1440     # locking order.
1441     self.__keyring = {
1442       LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1443       LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1444       LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor),
1445       LEVEL_INSTANCE: LockSet(instances, "instances",
1446                               monitor=self._monitor),
1447       }
1448
1449   def QueryLocks(self, fields):
1450     """Queries information from all locks.
1451
1452     See L{LockMonitor.QueryLocks}.
1453
1454     """
1455     return self._monitor.QueryLocks(fields)
1456
1457   def OldStyleQueryLocks(self, fields):
1458     """Queries information from all locks, returning old-style data.
1459
1460     See L{LockMonitor.OldStyleQueryLocks}.
1461
1462     """
1463     return self._monitor.OldStyleQueryLocks(fields)
1464
1465   def _names(self, level):
1466     """List the lock names at the given level.
1467
1468     This can be used for debugging/testing purposes.
1469
1470     @param level: the level whose list of locks to get
1471
1472     """
1473     assert level in LEVELS, "Invalid locking level %s" % level
1474     return self.__keyring[level]._names()
1475
1476   def _is_owned(self, level):
1477     """Check whether we are owning locks at the given level
1478
1479     """
1480     return self.__keyring[level]._is_owned()
1481
1482   is_owned = _is_owned
1483
1484   def _list_owned(self, level):
1485     """Get the set of owned locks at the given level
1486
1487     """
1488     return self.__keyring[level]._list_owned()
1489
1490   list_owned = _list_owned
1491
1492   def _upper_owned(self, level):
1493     """Check that we don't own any lock at a level greater than the given one.
1494
1495     """
1496     # This way of checking only works if LEVELS[i] = i, which we check for in
1497     # the test cases.
1498     return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1499
1500   def _BGL_owned(self): # pylint: disable-msg=C0103
1501     """Check if the current thread owns the BGL.
1502
1503     Both an exclusive or a shared acquisition work.
1504
1505     """
1506     return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1507
1508   @staticmethod
1509   def _contains_BGL(level, names): # pylint: disable-msg=C0103
1510     """Check if the level contains the BGL.
1511
1512     Check if acting on the given level and set of names will change
1513     the status of the Big Ganeti Lock.
1514
1515     """
1516     return level == LEVEL_CLUSTER and (names is None or BGL in names)
1517
1518   def acquire(self, level, names, timeout=None, shared=0, priority=None):
1519     """Acquire a set of resource locks, at the same level.
1520
1521     @type level: member of locking.LEVELS
1522     @param level: the level at which the locks shall be acquired
1523     @type names: list of strings (or string)
1524     @param names: the names of the locks which shall be acquired
1525         (special lock names, or instance/node names)
1526     @type shared: integer (0/1) used as a boolean
1527     @param shared: whether to acquire in shared mode; by default
1528         an exclusive lock will be acquired
1529     @type timeout: float
1530     @param timeout: Maximum time to acquire all locks
1531     @type priority: integer
1532     @param priority: Priority for acquiring lock
1533
1534     """
1535     assert level in LEVELS, "Invalid locking level %s" % level
1536
1537     # Check that we are either acquiring the Big Ganeti Lock or we already own
1538     # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1539     # so even if we've migrated we need to at least share the BGL to be
1540     # compatible with them. Of course if we own the BGL exclusively there's no
1541     # point in acquiring any other lock, unless perhaps we are half way through
1542     # the migration of the current opcode.
1543     assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1544             "You must own the Big Ganeti Lock before acquiring any other")
1545
1546     # Check we don't own locks at the same or upper levels.
1547     assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1548            " while owning some at a greater one")
1549
1550     # Acquire the locks in the set.
1551     return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1552                                          priority=priority)
1553
1554   def downgrade(self, level, names=None):
1555     """Downgrade a set of resource locks from exclusive to shared mode.
1556
1557     You must have acquired the locks in exclusive mode.
1558
1559     @type level: member of locking.LEVELS
1560     @param level: the level at which the locks shall be downgraded
1561     @type names: list of strings, or None
1562     @param names: the names of the locks which shall be downgraded
1563         (defaults to all the locks acquired at the level)
1564
1565     """
1566     assert level in LEVELS, "Invalid locking level %s" % level
1567
1568     return self.__keyring[level].downgrade(names=names)
1569
1570   def release(self, level, names=None):
1571     """Release a set of resource locks, at the same level.
1572
1573     You must have acquired the locks, either in shared or in exclusive
1574     mode, before releasing them.
1575
1576     @type level: member of locking.LEVELS
1577     @param level: the level at which the locks shall be released
1578     @type names: list of strings, or None
1579     @param names: the names of the locks which shall be released
1580         (defaults to all the locks acquired at that level)
1581
1582     """
1583     assert level in LEVELS, "Invalid locking level %s" % level
1584     assert (not self._contains_BGL(level, names) or
1585             not self._upper_owned(LEVEL_CLUSTER)), (
1586             "Cannot release the Big Ganeti Lock while holding something"
1587             " at upper levels (%r)" %
1588             (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1589                               for i in self.__keyring.keys()]), ))
1590
1591     # Release will complain if we don't own the locks already
1592     return self.__keyring[level].release(names)
1593
1594   def add(self, level, names, acquired=0, shared=0):
1595     """Add locks at the specified level.
1596
1597     @type level: member of locking.LEVELS_MOD
1598     @param level: the level at which the locks shall be added
1599     @type names: list of strings
1600     @param names: names of the locks to acquire
1601     @type acquired: integer (0/1) used as a boolean
1602     @param acquired: whether to acquire the newly added locks
1603     @type shared: integer (0/1) used as a boolean
1604     @param shared: whether the acquisition will be shared
1605
1606     """
1607     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1608     assert self._BGL_owned(), ("You must own the BGL before performing other"
1609            " operations")
1610     assert not self._upper_owned(level), ("Cannot add locks at a level"
1611            " while owning some at a greater one")
1612     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1613
1614   def remove(self, level, names):
1615     """Remove locks from the specified level.
1616
1617     You must either already own the locks you are trying to remove
1618     exclusively or not own any lock at an upper level.
1619
1620     @type level: member of locking.LEVELS_MOD
1621     @param level: the level at which the locks shall be removed
1622     @type names: list of strings
1623     @param names: the names of the locks which shall be removed
1624         (special lock names, or instance/node names)
1625
1626     """
1627     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1628     assert self._BGL_owned(), ("You must own the BGL before performing other"
1629            " operations")
1630     # Check we either own the level or don't own anything from here
1631     # up. LockSet.remove() will check the case in which we don't own
1632     # all the needed resources, or we have a shared ownership.
1633     assert self._is_owned(level) or not self._upper_owned(level), (
1634            "Cannot remove locks at a level while not owning it or"
1635            " owning some at a greater one")
1636     return self.__keyring[level].remove(names)
1637
1638
1639 def _MonitorSortKey((num, item)):
1640   """Sorting key function.
1641
1642   Sort by name, then by incoming order.
1643
1644   """
1645   (name, _, _, _) = item
1646
1647   return (utils.NiceSortKey(name), num)
1648
1649
1650 class LockMonitor(object):
1651   _LOCK_ATTR = "_lock"
1652
1653   def __init__(self):
1654     """Initializes this class.
1655
1656     """
1657     self._lock = SharedLock("LockMonitor")
1658
1659     # Counter for stable sorting
1660     self._counter = itertools.count(0)
1661
1662     # Tracked locks. Weak references are used to avoid issues with circular
1663     # references and deletion.
1664     self._locks = weakref.WeakKeyDictionary()
1665
1666   @ssynchronized(_LOCK_ATTR)
1667   def RegisterLock(self, lock):
1668     """Registers a new lock.
1669
1670     """
1671     logging.debug("Registering lock %s", lock.name)
1672     assert lock not in self._locks, "Duplicate lock registration"
1673
1674     # There used to be a check for duplicate names here. As it turned out, when
1675     # a lock is re-created with the same name in a very short timeframe, the
1676     # previous instance might not yet be removed from the weakref dictionary.
1677     # By keeping track of the order of incoming registrations, a stable sort
1678     # ordering can still be guaranteed.
1679
1680     self._locks[lock] = self._counter.next()
1681
1682   @ssynchronized(_LOCK_ATTR)
1683   def _GetLockInfo(self, requested):
1684     """Get information from all locks while the monitor lock is held.
1685
1686     """
1687     return [(num, lock.GetInfo(requested)) for lock, num in self._locks.items()]
1688
1689   def _Query(self, fields):
1690     """Queries information from all locks.
1691
1692     @type fields: list of strings
1693     @param fields: List of fields to return
1694
1695     """
1696     qobj = query.Query(query.LOCK_FIELDS, fields)
1697
1698     # Get all data with internal lock held and then sort by name and incoming
1699     # order
1700     lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
1701                       key=_MonitorSortKey)
1702
1703     # Extract lock information and build query data
1704     return (qobj, query.LockQueryData(map(operator.itemgetter(1), lockinfo)))
1705
1706   def QueryLocks(self, fields):
1707     """Queries information from all locks.
1708
1709     @type fields: list of strings
1710     @param fields: List of fields to return
1711
1712     """
1713     (qobj, ctx) = self._Query(fields)
1714
1715     # Prepare query response
1716     return query.GetQueryResponse(qobj, ctx)
1717
1718   def OldStyleQueryLocks(self, fields):
1719     """Queries information from all locks, returning old-style data.
1720
1721     @type fields: list of strings
1722     @param fields: List of fields to return
1723
1724     """
1725     (qobj, ctx) = self._Query(fields)
1726
1727     return qobj.OldStyleQuery(ctx)