Add stricter checks for OpInstanceSetParams.{nics,disks}
[ganeti-local] / lib / locking.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Module implementing the Ganeti locking code."""
22
23 # pylint: disable=W0212
24
25 # W0212 since e.g. LockSet methods use (a lot) the internals of
26 # SharedLock
27
28 import os
29 import select
30 import threading
31 import errno
32 import weakref
33 import logging
34 import heapq
35 import itertools
36
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import compat
40 from ganeti import query
41
42
43 _EXCLUSIVE_TEXT = "exclusive"
44 _SHARED_TEXT = "shared"
45 _DELETED_TEXT = "deleted"
46
47 _DEFAULT_PRIORITY = 0
48
49
50 def ssynchronized(mylock, shared=0):
51   """Shared Synchronization decorator.
52
53   Calls the function holding the given lock, either in exclusive or shared
54   mode. It requires the passed lock to be a SharedLock (or support its
55   semantics).
56
57   @type mylock: lockable object or string
58   @param mylock: lock to acquire or class member name of the lock to acquire
59
60   """
61   def wrap(fn):
62     def sync_function(*args, **kwargs):
63       if isinstance(mylock, basestring):
64         assert args, "cannot ssynchronize on non-class method: self not found"
65         # args[0] is "self"
66         lock = getattr(args[0], mylock)
67       else:
68         lock = mylock
69       lock.acquire(shared=shared)
70       try:
71         return fn(*args, **kwargs)
72       finally:
73         lock.release()
74     return sync_function
75   return wrap
76
77
78 class _SingleNotifyPipeConditionWaiter(object):
79   """Helper class for SingleNotifyPipeCondition
80
81   """
82   __slots__ = [
83     "_fd",
84     "_poller",
85     ]
86
87   def __init__(self, poller, fd):
88     """Constructor for _SingleNotifyPipeConditionWaiter
89
90     @type poller: select.poll
91     @param poller: Poller object
92     @type fd: int
93     @param fd: File descriptor to wait for
94
95     """
96     object.__init__(self)
97     self._poller = poller
98     self._fd = fd
99
100   def __call__(self, timeout):
101     """Wait for something to happen on the pipe.
102
103     @type timeout: float or None
104     @param timeout: Timeout for waiting (can be None)
105
106     """
107     running_timeout = utils.RunningTimeout(timeout, True)
108
109     while True:
110       remaining_time = running_timeout.Remaining()
111
112       if remaining_time is not None:
113         if remaining_time < 0.0:
114           break
115
116         # Our calculation uses seconds, poll() wants milliseconds
117         remaining_time *= 1000
118
119       try:
120         result = self._poller.poll(remaining_time)
121       except EnvironmentError, err:
122         if err.errno != errno.EINTR:
123           raise
124         result = None
125
126       # Check whether we were notified
127       if result and result[0][0] == self._fd:
128         break
129
130
131 class _BaseCondition(object):
132   """Base class containing common code for conditions.
133
134   Some of this code is taken from python's threading module.
135
136   """
137   __slots__ = [
138     "_lock",
139     "acquire",
140     "release",
141     "_is_owned",
142     "_acquire_restore",
143     "_release_save",
144     ]
145
146   def __init__(self, lock):
147     """Constructor for _BaseCondition.
148
149     @type lock: threading.Lock
150     @param lock: condition base lock
151
152     """
153     object.__init__(self)
154
155     try:
156       self._release_save = lock._release_save
157     except AttributeError:
158       self._release_save = self._base_release_save
159     try:
160       self._acquire_restore = lock._acquire_restore
161     except AttributeError:
162       self._acquire_restore = self._base_acquire_restore
163     try:
164       self._is_owned = lock.is_owned
165     except AttributeError:
166       self._is_owned = self._base_is_owned
167
168     self._lock = lock
169
170     # Export the lock's acquire() and release() methods
171     self.acquire = lock.acquire
172     self.release = lock.release
173
174   def _base_is_owned(self):
175     """Check whether lock is owned by current thread.
176
177     """
178     if self._lock.acquire(0):
179       self._lock.release()
180       return False
181     return True
182
183   def _base_release_save(self):
184     self._lock.release()
185
186   def _base_acquire_restore(self, _):
187     self._lock.acquire()
188
189   def _check_owned(self):
190     """Raise an exception if the current thread doesn't own the lock.
191
192     """
193     if not self._is_owned():
194       raise RuntimeError("cannot work with un-aquired lock")
195
196
197 class SingleNotifyPipeCondition(_BaseCondition):
198   """Condition which can only be notified once.
199
200   This condition class uses pipes and poll, internally, to be able to wait for
201   notification with a timeout, without resorting to polling. It is almost
202   compatible with Python's threading.Condition, with the following differences:
203     - notifyAll can only be called once, and no wait can happen after that
204     - notify is not supported, only notifyAll
205
206   """
207
208   __slots__ = [
209     "_poller",
210     "_read_fd",
211     "_write_fd",
212     "_nwaiters",
213     "_notified",
214     ]
215
216   _waiter_class = _SingleNotifyPipeConditionWaiter
217
218   def __init__(self, lock):
219     """Constructor for SingleNotifyPipeCondition
220
221     """
222     _BaseCondition.__init__(self, lock)
223     self._nwaiters = 0
224     self._notified = False
225     self._read_fd = None
226     self._write_fd = None
227     self._poller = None
228
229   def _check_unnotified(self):
230     """Throws an exception if already notified.
231
232     """
233     if self._notified:
234       raise RuntimeError("cannot use already notified condition")
235
236   def _Cleanup(self):
237     """Cleanup open file descriptors, if any.
238
239     """
240     if self._read_fd is not None:
241       os.close(self._read_fd)
242       self._read_fd = None
243
244     if self._write_fd is not None:
245       os.close(self._write_fd)
246       self._write_fd = None
247     self._poller = None
248
249   def wait(self, timeout):
250     """Wait for a notification.
251
252     @type timeout: float or None
253     @param timeout: Waiting timeout (can be None)
254
255     """
256     self._check_owned()
257     self._check_unnotified()
258
259     self._nwaiters += 1
260     try:
261       if self._poller is None:
262         (self._read_fd, self._write_fd) = os.pipe()
263         self._poller = select.poll()
264         self._poller.register(self._read_fd, select.POLLHUP)
265
266       wait_fn = self._waiter_class(self._poller, self._read_fd)
267       state = self._release_save()
268       try:
269         # Wait for notification
270         wait_fn(timeout)
271       finally:
272         # Re-acquire lock
273         self._acquire_restore(state)
274     finally:
275       self._nwaiters -= 1
276       if self._nwaiters == 0:
277         self._Cleanup()
278
279   def notifyAll(self): # pylint: disable=C0103
280     """Close the writing side of the pipe to notify all waiters.
281
282     """
283     self._check_owned()
284     self._check_unnotified()
285     self._notified = True
286     if self._write_fd is not None:
287       os.close(self._write_fd)
288       self._write_fd = None
289
290
291 class PipeCondition(_BaseCondition):
292   """Group-only non-polling condition with counters.
293
294   This condition class uses pipes and poll, internally, to be able to wait for
295   notification with a timeout, without resorting to polling. It is almost
296   compatible with Python's threading.Condition, but only supports notifyAll and
297   non-recursive locks. As an additional features it's able to report whether
298   there are any waiting threads.
299
300   """
301   __slots__ = [
302     "_waiters",
303     "_single_condition",
304     ]
305
306   _single_condition_class = SingleNotifyPipeCondition
307
308   def __init__(self, lock):
309     """Initializes this class.
310
311     """
312     _BaseCondition.__init__(self, lock)
313     self._waiters = set()
314     self._single_condition = self._single_condition_class(self._lock)
315
316   def wait(self, timeout):
317     """Wait for a notification.
318
319     @type timeout: float or None
320     @param timeout: Waiting timeout (can be None)
321
322     """
323     self._check_owned()
324
325     # Keep local reference to the pipe. It could be replaced by another thread
326     # notifying while we're waiting.
327     cond = self._single_condition
328
329     self._waiters.add(threading.currentThread())
330     try:
331       cond.wait(timeout)
332     finally:
333       self._check_owned()
334       self._waiters.remove(threading.currentThread())
335
336   def notifyAll(self): # pylint: disable=C0103
337     """Notify all currently waiting threads.
338
339     """
340     self._check_owned()
341     self._single_condition.notifyAll()
342     self._single_condition = self._single_condition_class(self._lock)
343
344   def get_waiting(self):
345     """Returns a list of all waiting threads.
346
347     """
348     self._check_owned()
349
350     return self._waiters
351
352   def has_waiting(self):
353     """Returns whether there are active waiters.
354
355     """
356     self._check_owned()
357
358     return bool(self._waiters)
359
360   def __repr__(self):
361     return ("<%s.%s waiters=%s at %#x>" %
362             (self.__class__.__module__, self.__class__.__name__,
363              self._waiters, id(self)))
364
365
366 class _PipeConditionWithMode(PipeCondition):
367   __slots__ = [
368     "shared",
369     ]
370
371   def __init__(self, lock, shared):
372     """Initializes this class.
373
374     """
375     self.shared = shared
376     PipeCondition.__init__(self, lock)
377
378
379 class SharedLock(object):
380   """Implements a shared lock.
381
382   Multiple threads can acquire the lock in a shared way by calling
383   C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
384   threads can call C{acquire(shared=0)}.
385
386   Notes on data structures: C{__pending} contains a priority queue (heapq) of
387   all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
388   ...]}. Each per-priority queue contains a normal in-order list of conditions
389   to be notified when the lock can be acquired. Shared locks are grouped
390   together by priority and the condition for them is stored in
391   C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
392   references for the per-priority queues indexed by priority for faster access.
393
394   @type name: string
395   @ivar name: the name of the lock
396
397   """
398   __slots__ = [
399     "__weakref__",
400     "__deleted",
401     "__exc",
402     "__lock",
403     "__pending",
404     "__pending_by_prio",
405     "__pending_shared",
406     "__shr",
407     "name",
408     ]
409
410   __condition_class = _PipeConditionWithMode
411
412   def __init__(self, name, monitor=None):
413     """Construct a new SharedLock.
414
415     @param name: the name of the lock
416     @type monitor: L{LockMonitor}
417     @param monitor: Lock monitor with which to register
418
419     """
420     object.__init__(self)
421
422     self.name = name
423
424     # Internal lock
425     self.__lock = threading.Lock()
426
427     # Queue containing waiting acquires
428     self.__pending = []
429     self.__pending_by_prio = {}
430     self.__pending_shared = {}
431
432     # Current lock holders
433     self.__shr = set()
434     self.__exc = None
435
436     # is this lock in the deleted state?
437     self.__deleted = False
438
439     # Register with lock monitor
440     if monitor:
441       logging.debug("Adding lock %s to monitor", name)
442       monitor.RegisterLock(self)
443
444   def __repr__(self):
445     return ("<%s.%s name=%s at %#x>" %
446             (self.__class__.__module__, self.__class__.__name__,
447              self.name, id(self)))
448
449   def GetLockInfo(self, requested):
450     """Retrieves information for querying locks.
451
452     @type requested: set
453     @param requested: Requested information, see C{query.LQ_*}
454
455     """
456     self.__lock.acquire()
457     try:
458       # Note: to avoid unintentional race conditions, no references to
459       # modifiable objects should be returned unless they were created in this
460       # function.
461       mode = None
462       owner_names = None
463
464       if query.LQ_MODE in requested:
465         if self.__deleted:
466           mode = _DELETED_TEXT
467           assert not (self.__exc or self.__shr)
468         elif self.__exc:
469           mode = _EXCLUSIVE_TEXT
470         elif self.__shr:
471           mode = _SHARED_TEXT
472
473       # Current owner(s) are wanted
474       if query.LQ_OWNER in requested:
475         if self.__exc:
476           owner = [self.__exc]
477         else:
478           owner = self.__shr
479
480         if owner:
481           assert not self.__deleted
482           owner_names = [i.getName() for i in owner]
483
484       # Pending acquires are wanted
485       if query.LQ_PENDING in requested:
486         pending = []
487
488         # Sorting instead of copying and using heaq functions for simplicity
489         for (_, prioqueue) in sorted(self.__pending):
490           for cond in prioqueue:
491             if cond.shared:
492               pendmode = _SHARED_TEXT
493             else:
494               pendmode = _EXCLUSIVE_TEXT
495
496             # List of names will be sorted in L{query._GetLockPending}
497             pending.append((pendmode, [i.getName()
498                                        for i in cond.get_waiting()]))
499       else:
500         pending = None
501
502       return [(self.name, mode, owner_names, pending)]
503     finally:
504       self.__lock.release()
505
506   def __check_deleted(self):
507     """Raises an exception if the lock has been deleted.
508
509     """
510     if self.__deleted:
511       raise errors.LockError("Deleted lock %s" % self.name)
512
513   def __is_sharer(self):
514     """Is the current thread sharing the lock at this time?
515
516     """
517     return threading.currentThread() in self.__shr
518
519   def __is_exclusive(self):
520     """Is the current thread holding the lock exclusively at this time?
521
522     """
523     return threading.currentThread() == self.__exc
524
525   def __is_owned(self, shared=-1):
526     """Is the current thread somehow owning the lock at this time?
527
528     This is a private version of the function, which presumes you're holding
529     the internal lock.
530
531     """
532     if shared < 0:
533       return self.__is_sharer() or self.__is_exclusive()
534     elif shared:
535       return self.__is_sharer()
536     else:
537       return self.__is_exclusive()
538
539   def is_owned(self, shared=-1):
540     """Is the current thread somehow owning the lock at this time?
541
542     @param shared:
543         - < 0: check for any type of ownership (default)
544         - 0: check for exclusive ownership
545         - > 0: check for shared ownership
546
547     """
548     self.__lock.acquire()
549     try:
550       return self.__is_owned(shared=shared)
551     finally:
552       self.__lock.release()
553
554   #: Necessary to remain compatible with threading.Condition, which tries to
555   #: retrieve a locks' "_is_owned" attribute
556   _is_owned = is_owned
557
558   def _count_pending(self):
559     """Returns the number of pending acquires.
560
561     @rtype: int
562
563     """
564     self.__lock.acquire()
565     try:
566       return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
567     finally:
568       self.__lock.release()
569
570   def _check_empty(self):
571     """Checks whether there are any pending acquires.
572
573     @rtype: bool
574
575     """
576     self.__lock.acquire()
577     try:
578       # Order is important: __find_first_pending_queue modifies __pending
579       (_, prioqueue) = self.__find_first_pending_queue()
580
581       return not (prioqueue or
582                   self.__pending or
583                   self.__pending_by_prio or
584                   self.__pending_shared)
585     finally:
586       self.__lock.release()
587
588   def __do_acquire(self, shared):
589     """Actually acquire the lock.
590
591     """
592     if shared:
593       self.__shr.add(threading.currentThread())
594     else:
595       self.__exc = threading.currentThread()
596
597   def __can_acquire(self, shared):
598     """Determine whether lock can be acquired.
599
600     """
601     if shared:
602       return self.__exc is None
603     else:
604       return len(self.__shr) == 0 and self.__exc is None
605
606   def __find_first_pending_queue(self):
607     """Tries to find the topmost queued entry with pending acquires.
608
609     Removes empty entries while going through the list.
610
611     """
612     while self.__pending:
613       (priority, prioqueue) = self.__pending[0]
614
615       if prioqueue:
616         return (priority, prioqueue)
617
618       # Remove empty queue
619       heapq.heappop(self.__pending)
620       del self.__pending_by_prio[priority]
621       assert priority not in self.__pending_shared
622
623     return (None, None)
624
625   def __is_on_top(self, cond):
626     """Checks whether the passed condition is on top of the queue.
627
628     The caller must make sure the queue isn't empty.
629
630     """
631     (_, prioqueue) = self.__find_first_pending_queue()
632
633     return cond == prioqueue[0]
634
635   def __acquire_unlocked(self, shared, timeout, priority):
636     """Acquire a shared lock.
637
638     @param shared: whether to acquire in shared mode; by default an
639         exclusive lock will be acquired
640     @param timeout: maximum waiting time before giving up
641     @type priority: integer
642     @param priority: Priority for acquiring lock
643
644     """
645     self.__check_deleted()
646
647     # We cannot acquire the lock if we already have it
648     assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
649                                    " %s" % self.name)
650
651     # Remove empty entries from queue
652     self.__find_first_pending_queue()
653
654     # Check whether someone else holds the lock or there are pending acquires.
655     if not self.__pending and self.__can_acquire(shared):
656       # Apparently not, can acquire lock directly.
657       self.__do_acquire(shared)
658       return True
659
660     prioqueue = self.__pending_by_prio.get(priority, None)
661
662     if shared:
663       # Try to re-use condition for shared acquire
664       wait_condition = self.__pending_shared.get(priority, None)
665       assert (wait_condition is None or
666               (wait_condition.shared and wait_condition in prioqueue))
667     else:
668       wait_condition = None
669
670     if wait_condition is None:
671       if prioqueue is None:
672         assert priority not in self.__pending_by_prio
673
674         prioqueue = []
675         heapq.heappush(self.__pending, (priority, prioqueue))
676         self.__pending_by_prio[priority] = prioqueue
677
678       wait_condition = self.__condition_class(self.__lock, shared)
679       prioqueue.append(wait_condition)
680
681       if shared:
682         # Keep reference for further shared acquires on same priority. This is
683         # better than trying to find it in the list of pending acquires.
684         assert priority not in self.__pending_shared
685         self.__pending_shared[priority] = wait_condition
686
687     try:
688       # Wait until we become the topmost acquire in the queue or the timeout
689       # expires.
690       # TODO: Decrease timeout with spurious notifications
691       while not (self.__is_on_top(wait_condition) and
692                  self.__can_acquire(shared)):
693         # Wait for notification
694         wait_condition.wait(timeout)
695         self.__check_deleted()
696
697         # A lot of code assumes blocking acquires always succeed. Loop
698         # internally for that case.
699         if timeout is not None:
700           break
701
702       if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
703         self.__do_acquire(shared)
704         return True
705     finally:
706       # Remove condition from queue if there are no more waiters
707       if not wait_condition.has_waiting():
708         prioqueue.remove(wait_condition)
709         if wait_condition.shared:
710           # Remove from list of shared acquires if it wasn't while releasing
711           # (e.g. on lock deletion)
712           self.__pending_shared.pop(priority, None)
713
714     return False
715
716   def acquire(self, shared=0, timeout=None, priority=None,
717               test_notify=None):
718     """Acquire a shared lock.
719
720     @type shared: integer (0/1) used as a boolean
721     @param shared: whether to acquire in shared mode; by default an
722         exclusive lock will be acquired
723     @type timeout: float
724     @param timeout: maximum waiting time before giving up
725     @type priority: integer
726     @param priority: Priority for acquiring lock
727     @type test_notify: callable or None
728     @param test_notify: Special callback function for unittesting
729
730     """
731     if priority is None:
732       priority = _DEFAULT_PRIORITY
733
734     self.__lock.acquire()
735     try:
736       # We already got the lock, notify now
737       if __debug__ and callable(test_notify):
738         test_notify()
739
740       return self.__acquire_unlocked(shared, timeout, priority)
741     finally:
742       self.__lock.release()
743
744   def downgrade(self):
745     """Changes the lock mode from exclusive to shared.
746
747     Pending acquires in shared mode on the same priority will go ahead.
748
749     """
750     self.__lock.acquire()
751     try:
752       assert self.__is_owned(), "Lock must be owned"
753
754       if self.__is_exclusive():
755         # Do nothing if the lock is already acquired in shared mode
756         self.__exc = None
757         self.__do_acquire(1)
758
759         # Important: pending shared acquires should only jump ahead if there
760         # was a transition from exclusive to shared, otherwise an owner of a
761         # shared lock can keep calling this function to push incoming shared
762         # acquires
763         (priority, prioqueue) = self.__find_first_pending_queue()
764         if prioqueue:
765           # Is there a pending shared acquire on this priority?
766           cond = self.__pending_shared.pop(priority, None)
767           if cond:
768             assert cond.shared
769             assert cond in prioqueue
770
771             # Ensure shared acquire is on top of queue
772             if len(prioqueue) > 1:
773               prioqueue.remove(cond)
774               prioqueue.insert(0, cond)
775
776             # Notify
777             cond.notifyAll()
778
779       assert not self.__is_exclusive()
780       assert self.__is_sharer()
781
782       return True
783     finally:
784       self.__lock.release()
785
786   def release(self):
787     """Release a Shared Lock.
788
789     You must have acquired the lock, either in shared or in exclusive mode,
790     before calling this function.
791
792     """
793     self.__lock.acquire()
794     try:
795       assert self.__is_exclusive() or self.__is_sharer(), \
796         "Cannot release non-owned lock"
797
798       # Autodetect release type
799       if self.__is_exclusive():
800         self.__exc = None
801       else:
802         self.__shr.remove(threading.currentThread())
803
804       # Notify topmost condition in queue
805       (priority, prioqueue) = self.__find_first_pending_queue()
806       if prioqueue:
807         cond = prioqueue[0]
808         cond.notifyAll()
809         if cond.shared:
810           # Prevent further shared acquires from sneaking in while waiters are
811           # notified
812           self.__pending_shared.pop(priority, None)
813
814     finally:
815       self.__lock.release()
816
817   def delete(self, timeout=None, priority=None):
818     """Delete a Shared Lock.
819
820     This operation will declare the lock for removal. First the lock will be
821     acquired in exclusive mode if you don't already own it, then the lock
822     will be put in a state where any future and pending acquire() fail.
823
824     @type timeout: float
825     @param timeout: maximum waiting time before giving up
826     @type priority: integer
827     @param priority: Priority for acquiring lock
828
829     """
830     if priority is None:
831       priority = _DEFAULT_PRIORITY
832
833     self.__lock.acquire()
834     try:
835       assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
836
837       self.__check_deleted()
838
839       # The caller is allowed to hold the lock exclusively already.
840       acquired = self.__is_exclusive()
841
842       if not acquired:
843         acquired = self.__acquire_unlocked(0, timeout, priority)
844
845         assert self.__is_exclusive() and not self.__is_sharer(), \
846           "Lock wasn't acquired in exclusive mode"
847
848       if acquired:
849         self.__deleted = True
850         self.__exc = None
851
852         assert not (self.__exc or self.__shr), "Found owner during deletion"
853
854         # Notify all acquires. They'll throw an error.
855         for (_, prioqueue) in self.__pending:
856           for cond in prioqueue:
857             cond.notifyAll()
858
859         assert self.__deleted
860
861       return acquired
862     finally:
863       self.__lock.release()
864
865   def _release_save(self):
866     shared = self.__is_sharer()
867     self.release()
868     return shared
869
870   def _acquire_restore(self, shared):
871     self.acquire(shared=shared)
872
873
874 # Whenever we want to acquire a full LockSet we pass None as the value
875 # to acquire.  Hide this behind this nicely named constant.
876 ALL_SET = None
877
878
879 class _AcquireTimeout(Exception):
880   """Internal exception to abort an acquire on a timeout.
881
882   """
883
884
885 class LockSet:
886   """Implements a set of locks.
887
888   This abstraction implements a set of shared locks for the same resource type,
889   distinguished by name. The user can lock a subset of the resources and the
890   LockSet will take care of acquiring the locks always in the same order, thus
891   preventing deadlock.
892
893   All the locks needed in the same set must be acquired together, though.
894
895   @type name: string
896   @ivar name: the name of the lockset
897
898   """
899   def __init__(self, members, name, monitor=None):
900     """Constructs a new LockSet.
901
902     @type members: list of strings
903     @param members: initial members of the set
904     @type monitor: L{LockMonitor}
905     @param monitor: Lock monitor with which to register member locks
906
907     """
908     assert members is not None, "members parameter is not a list"
909     self.name = name
910
911     # Lock monitor
912     self.__monitor = monitor
913
914     # Used internally to guarantee coherency
915     self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
916
917     # The lockdict indexes the relationship name -> lock
918     # The order-of-locking is implied by the alphabetical order of names
919     self.__lockdict = {}
920
921     for mname in members:
922       self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
923                                           monitor=monitor)
924
925     # The owner dict contains the set of locks each thread owns. For
926     # performance each thread can access its own key without a global lock on
927     # this structure. It is paramount though that *no* other type of access is
928     # done to this structure (eg. no looping over its keys). *_owner helper
929     # function are defined to guarantee access is correct, but in general never
930     # do anything different than __owners[threading.currentThread()], or there
931     # will be trouble.
932     self.__owners = {}
933
934   def _GetLockName(self, mname):
935     """Returns the name for a member lock.
936
937     """
938     return "%s/%s" % (self.name, mname)
939
940   def _get_lock(self):
941     """Returns the lockset-internal lock.
942
943     """
944     return self.__lock
945
946   def _get_lockdict(self):
947     """Returns the lockset-internal lock dictionary.
948
949     Accessing this structure is only safe in single-thread usage or when the
950     lockset-internal lock is held.
951
952     """
953     return self.__lockdict
954
955   def is_owned(self):
956     """Is the current thread a current level owner?
957
958     @note: Use L{check_owned} to check if a specific lock is held
959
960     """
961     return threading.currentThread() in self.__owners
962
963   def check_owned(self, names, shared=-1):
964     """Check if locks are owned in a specific mode.
965
966     @type names: sequence or string
967     @param names: Lock names (or a single lock name)
968     @param shared: See L{SharedLock.is_owned}
969     @rtype: bool
970     @note: Use L{is_owned} to check if the current thread holds I{any} lock and
971       L{list_owned} to get the names of all owned locks
972
973     """
974     if isinstance(names, basestring):
975       names = [names]
976
977     # Avoid check if no locks are owned anyway
978     if names and self.is_owned():
979       candidates = []
980
981       # Gather references to all locks (in case they're deleted in the meantime)
982       for lname in names:
983         try:
984           lock = self.__lockdict[lname]
985         except KeyError:
986           raise errors.LockError("Non-existing lock '%s' in set '%s' (it may"
987                                  " have been removed)" % (lname, self.name))
988         else:
989           candidates.append(lock)
990
991       return compat.all(lock.is_owned(shared=shared) for lock in candidates)
992     else:
993       return False
994
995   def _add_owned(self, name=None):
996     """Note the current thread owns the given lock"""
997     if name is None:
998       if not self.is_owned():
999         self.__owners[threading.currentThread()] = set()
1000     else:
1001       if self.is_owned():
1002         self.__owners[threading.currentThread()].add(name)
1003       else:
1004         self.__owners[threading.currentThread()] = set([name])
1005
1006   def _del_owned(self, name=None):
1007     """Note the current thread owns the given lock"""
1008
1009     assert not (name is None and self.__lock.is_owned()), \
1010            "Cannot hold internal lock when deleting owner status"
1011
1012     if name is not None:
1013       self.__owners[threading.currentThread()].remove(name)
1014
1015     # Only remove the key if we don't hold the set-lock as well
1016     if (not self.__lock.is_owned() and
1017         not self.__owners[threading.currentThread()]):
1018       del self.__owners[threading.currentThread()]
1019
1020   def list_owned(self):
1021     """Get the set of resource names owned by the current thread"""
1022     if self.is_owned():
1023       return self.__owners[threading.currentThread()].copy()
1024     else:
1025       return set()
1026
1027   def _release_and_delete_owned(self):
1028     """Release and delete all resources owned by the current thread"""
1029     for lname in self.list_owned():
1030       lock = self.__lockdict[lname]
1031       if lock.is_owned():
1032         lock.release()
1033       self._del_owned(name=lname)
1034
1035   def __names(self):
1036     """Return the current set of names.
1037
1038     Only call this function while holding __lock and don't iterate on the
1039     result after releasing the lock.
1040
1041     """
1042     return self.__lockdict.keys()
1043
1044   def _names(self):
1045     """Return a copy of the current set of elements.
1046
1047     Used only for debugging purposes.
1048
1049     """
1050     # If we don't already own the set-level lock acquired
1051     # we'll get it and note we need to release it later.
1052     release_lock = False
1053     if not self.__lock.is_owned():
1054       release_lock = True
1055       self.__lock.acquire(shared=1)
1056     try:
1057       result = self.__names()
1058     finally:
1059       if release_lock:
1060         self.__lock.release()
1061     return set(result)
1062
1063   def acquire(self, names, timeout=None, shared=0, priority=None,
1064               test_notify=None):
1065     """Acquire a set of resource locks.
1066
1067     @type names: list of strings (or string)
1068     @param names: the names of the locks which shall be acquired
1069         (special lock names, or instance/node names)
1070     @type shared: integer (0/1) used as a boolean
1071     @param shared: whether to acquire in shared mode; by default an
1072         exclusive lock will be acquired
1073     @type timeout: float or None
1074     @param timeout: Maximum time to acquire all locks
1075     @type priority: integer
1076     @param priority: Priority for acquiring locks
1077     @type test_notify: callable or None
1078     @param test_notify: Special callback function for unittesting
1079
1080     @return: Set of all locks successfully acquired or None in case of timeout
1081
1082     @raise errors.LockError: when any lock we try to acquire has
1083         been deleted before we succeed. In this case none of the
1084         locks requested will be acquired.
1085
1086     """
1087     assert timeout is None or timeout >= 0.0
1088
1089     # Check we don't already own locks at this level
1090     assert not self.is_owned(), ("Cannot acquire locks in the same set twice"
1091                                  " (lockset %s)" % self.name)
1092
1093     if priority is None:
1094       priority = _DEFAULT_PRIORITY
1095
1096     # We need to keep track of how long we spent waiting for a lock. The
1097     # timeout passed to this function is over all lock acquires.
1098     running_timeout = utils.RunningTimeout(timeout, False)
1099
1100     try:
1101       if names is not None:
1102         # Support passing in a single resource to acquire rather than many
1103         if isinstance(names, basestring):
1104           names = [names]
1105
1106         return self.__acquire_inner(names, False, shared, priority,
1107                                     running_timeout.Remaining, test_notify)
1108
1109       else:
1110         # If no names are given acquire the whole set by not letting new names
1111         # being added before we release, and getting the current list of names.
1112         # Some of them may then be deleted later, but we'll cope with this.
1113         #
1114         # We'd like to acquire this lock in a shared way, as it's nice if
1115         # everybody else can use the instances at the same time. If we are
1116         # acquiring them exclusively though they won't be able to do this
1117         # anyway, though, so we'll get the list lock exclusively as well in
1118         # order to be able to do add() on the set while owning it.
1119         if not self.__lock.acquire(shared=shared, priority=priority,
1120                                    timeout=running_timeout.Remaining()):
1121           raise _AcquireTimeout()
1122         try:
1123           # note we own the set-lock
1124           self._add_owned()
1125
1126           return self.__acquire_inner(self.__names(), True, shared, priority,
1127                                       running_timeout.Remaining, test_notify)
1128         except:
1129           # We shouldn't have problems adding the lock to the owners list, but
1130           # if we did we'll try to release this lock and re-raise exception.
1131           # Of course something is going to be really wrong, after this.
1132           self.__lock.release()
1133           self._del_owned()
1134           raise
1135
1136     except _AcquireTimeout:
1137       return None
1138
1139   def __acquire_inner(self, names, want_all, shared, priority,
1140                       timeout_fn, test_notify):
1141     """Inner logic for acquiring a number of locks.
1142
1143     @param names: Names of the locks to be acquired
1144     @param want_all: Whether all locks in the set should be acquired
1145     @param shared: Whether to acquire in shared mode
1146     @param timeout_fn: Function returning remaining timeout
1147     @param priority: Priority for acquiring locks
1148     @param test_notify: Special callback function for unittesting
1149
1150     """
1151     acquire_list = []
1152
1153     # First we look the locks up on __lockdict. We have no way of being sure
1154     # they will still be there after, but this makes it a lot faster should
1155     # just one of them be the already wrong. Using a sorted sequence to prevent
1156     # deadlocks.
1157     for lname in sorted(utils.UniqueSequence(names)):
1158       try:
1159         lock = self.__lockdict[lname] # raises KeyError if lock is not there
1160       except KeyError:
1161         if want_all:
1162           # We are acquiring all the set, it doesn't matter if this particular
1163           # element is not there anymore.
1164           continue
1165
1166         raise errors.LockError("Non-existing lock %s in set %s (it may have"
1167                                " been removed)" % (lname, self.name))
1168
1169       acquire_list.append((lname, lock))
1170
1171     # This will hold the locknames we effectively acquired.
1172     acquired = set()
1173
1174     try:
1175       # Now acquire_list contains a sorted list of resources and locks we
1176       # want.  In order to get them we loop on this (private) list and
1177       # acquire() them.  We gave no real guarantee they will still exist till
1178       # this is done but .acquire() itself is safe and will alert us if the
1179       # lock gets deleted.
1180       for (lname, lock) in acquire_list:
1181         if __debug__ and callable(test_notify):
1182           test_notify_fn = lambda: test_notify(lname)
1183         else:
1184           test_notify_fn = None
1185
1186         timeout = timeout_fn()
1187
1188         try:
1189           # raises LockError if the lock was deleted
1190           acq_success = lock.acquire(shared=shared, timeout=timeout,
1191                                      priority=priority,
1192                                      test_notify=test_notify_fn)
1193         except errors.LockError:
1194           if want_all:
1195             # We are acquiring all the set, it doesn't matter if this
1196             # particular element is not there anymore.
1197             continue
1198
1199           raise errors.LockError("Non-existing lock %s in set %s (it may"
1200                                  " have been removed)" % (lname, self.name))
1201
1202         if not acq_success:
1203           # Couldn't get lock or timeout occurred
1204           if timeout is None:
1205             # This shouldn't happen as SharedLock.acquire(timeout=None) is
1206             # blocking.
1207             raise errors.LockError("Failed to get lock %s (set %s)" %
1208                                    (lname, self.name))
1209
1210           raise _AcquireTimeout()
1211
1212         try:
1213           # now the lock cannot be deleted, we have it!
1214           self._add_owned(name=lname)
1215           acquired.add(lname)
1216
1217         except:
1218           # We shouldn't have problems adding the lock to the owners list, but
1219           # if we did we'll try to release this lock and re-raise exception.
1220           # Of course something is going to be really wrong after this.
1221           if lock.is_owned():
1222             lock.release()
1223           raise
1224
1225     except:
1226       # Release all owned locks
1227       self._release_and_delete_owned()
1228       raise
1229
1230     return acquired
1231
1232   def downgrade(self, names=None):
1233     """Downgrade a set of resource locks from exclusive to shared mode.
1234
1235     The locks must have been acquired in exclusive mode.
1236
1237     """
1238     assert self.is_owned(), ("downgrade on lockset %s while not owning any"
1239                              " lock" % self.name)
1240
1241     # Support passing in a single resource to downgrade rather than many
1242     if isinstance(names, basestring):
1243       names = [names]
1244
1245     owned = self.list_owned()
1246
1247     if names is None:
1248       names = owned
1249     else:
1250       names = set(names)
1251       assert owned.issuperset(names), \
1252         ("downgrade() on unheld resources %s (set %s)" %
1253          (names.difference(owned), self.name))
1254
1255     for lockname in names:
1256       self.__lockdict[lockname].downgrade()
1257
1258     # Do we own the lockset in exclusive mode?
1259     if self.__lock.is_owned(shared=0):
1260       # Have all locks been downgraded?
1261       if not compat.any(lock.is_owned(shared=0)
1262                         for lock in self.__lockdict.values()):
1263         self.__lock.downgrade()
1264         assert self.__lock.is_owned(shared=1)
1265
1266     return True
1267
1268   def release(self, names=None):
1269     """Release a set of resource locks, at the same level.
1270
1271     You must have acquired the locks, either in shared or in exclusive mode,
1272     before releasing them.
1273
1274     @type names: list of strings, or None
1275     @param names: the names of the locks which shall be released
1276         (defaults to all the locks acquired at that level).
1277
1278     """
1279     assert self.is_owned(), ("release() on lock set %s while not owner" %
1280                              self.name)
1281
1282     # Support passing in a single resource to release rather than many
1283     if isinstance(names, basestring):
1284       names = [names]
1285
1286     if names is None:
1287       names = self.list_owned()
1288     else:
1289       names = set(names)
1290       assert self.list_owned().issuperset(names), (
1291                "release() on unheld resources %s (set %s)" %
1292                (names.difference(self.list_owned()), self.name))
1293
1294     # First of all let's release the "all elements" lock, if set.
1295     # After this 'add' can work again
1296     if self.__lock.is_owned():
1297       self.__lock.release()
1298       self._del_owned()
1299
1300     for lockname in names:
1301       # If we are sure the lock doesn't leave __lockdict without being
1302       # exclusively held we can do this...
1303       self.__lockdict[lockname].release()
1304       self._del_owned(name=lockname)
1305
1306   def add(self, names, acquired=0, shared=0):
1307     """Add a new set of elements to the set
1308
1309     @type names: list of strings
1310     @param names: names of the new elements to add
1311     @type acquired: integer (0/1) used as a boolean
1312     @param acquired: pre-acquire the new resource?
1313     @type shared: integer (0/1) used as a boolean
1314     @param shared: is the pre-acquisition shared?
1315
1316     """
1317     # Check we don't already own locks at this level
1318     assert not self.is_owned() or self.__lock.is_owned(shared=0), \
1319       ("Cannot add locks if the set %s is only partially owned, or shared" %
1320        self.name)
1321
1322     # Support passing in a single resource to add rather than many
1323     if isinstance(names, basestring):
1324       names = [names]
1325
1326     # If we don't already own the set-level lock acquired in an exclusive way
1327     # we'll get it and note we need to release it later.
1328     release_lock = False
1329     if not self.__lock.is_owned():
1330       release_lock = True
1331       self.__lock.acquire()
1332
1333     try:
1334       invalid_names = set(self.__names()).intersection(names)
1335       if invalid_names:
1336         # This must be an explicit raise, not an assert, because assert is
1337         # turned off when using optimization, and this can happen because of
1338         # concurrency even if the user doesn't want it.
1339         raise errors.LockError("duplicate add(%s) on lockset %s" %
1340                                (invalid_names, self.name))
1341
1342       for lockname in names:
1343         lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1344
1345         if acquired:
1346           # No need for priority or timeout here as this lock has just been
1347           # created
1348           lock.acquire(shared=shared)
1349           # now the lock cannot be deleted, we have it!
1350           try:
1351             self._add_owned(name=lockname)
1352           except:
1353             # We shouldn't have problems adding the lock to the owners list,
1354             # but if we did we'll try to release this lock and re-raise
1355             # exception.  Of course something is going to be really wrong,
1356             # after this.  On the other hand the lock hasn't been added to the
1357             # __lockdict yet so no other threads should be pending on it. This
1358             # release is just a safety measure.
1359             lock.release()
1360             raise
1361
1362         self.__lockdict[lockname] = lock
1363
1364     finally:
1365       # Only release __lock if we were not holding it previously.
1366       if release_lock:
1367         self.__lock.release()
1368
1369     return True
1370
1371   def remove(self, names):
1372     """Remove elements from the lock set.
1373
1374     You can either not hold anything in the lockset or already hold a superset
1375     of the elements you want to delete, exclusively.
1376
1377     @type names: list of strings
1378     @param names: names of the resource to remove.
1379
1380     @return: a list of locks which we removed; the list is always
1381         equal to the names list if we were holding all the locks
1382         exclusively
1383
1384     """
1385     # Support passing in a single resource to remove rather than many
1386     if isinstance(names, basestring):
1387       names = [names]
1388
1389     # If we own any subset of this lock it must be a superset of what we want
1390     # to delete. The ownership must also be exclusive, but that will be checked
1391     # by the lock itself.
1392     assert not self.is_owned() or self.list_owned().issuperset(names), (
1393       "remove() on acquired lockset %s while not owning all elements" %
1394       self.name)
1395
1396     removed = []
1397
1398     for lname in names:
1399       # Calling delete() acquires the lock exclusively if we don't already own
1400       # it, and causes all pending and subsequent lock acquires to fail. It's
1401       # fine to call it out of order because delete() also implies release(),
1402       # and the assertion above guarantees that if we either already hold
1403       # everything we want to delete, or we hold none.
1404       try:
1405         self.__lockdict[lname].delete()
1406         removed.append(lname)
1407       except (KeyError, errors.LockError):
1408         # This cannot happen if we were already holding it, verify:
1409         assert not self.is_owned(), ("remove failed while holding lockset %s" %
1410                                      self.name)
1411       else:
1412         # If no LockError was raised we are the ones who deleted the lock.
1413         # This means we can safely remove it from lockdict, as any further or
1414         # pending delete() or acquire() will fail (and nobody can have the lock
1415         # since before our call to delete()).
1416         #
1417         # This is done in an else clause because if the exception was thrown
1418         # it's the job of the one who actually deleted it.
1419         del self.__lockdict[lname]
1420         # And let's remove it from our private list if we owned it.
1421         if self.is_owned():
1422           self._del_owned(name=lname)
1423
1424     return removed
1425
1426
1427 # Locking levels, must be acquired in increasing order.
1428 # Current rules are:
1429 #   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1430 #   acquired before performing any operation, either in shared or in exclusive
1431 #   mode. acquiring the BGL in exclusive mode is discouraged and should be
1432 #   avoided.
1433 #   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1434 #   If you need more than one node, or more than one instance, acquire them at
1435 #   the same time.
1436 LEVEL_CLUSTER = 0
1437 LEVEL_INSTANCE = 1
1438 LEVEL_NODEGROUP = 2
1439 LEVEL_NODE = 3
1440 LEVEL_NODE_RES = 4
1441
1442 LEVELS = [
1443   LEVEL_CLUSTER,
1444   LEVEL_INSTANCE,
1445   LEVEL_NODEGROUP,
1446   LEVEL_NODE,
1447   LEVEL_NODE_RES,
1448   ]
1449
1450 # Lock levels which are modifiable
1451 LEVELS_MOD = frozenset([
1452   LEVEL_NODE_RES,
1453   LEVEL_NODE,
1454   LEVEL_NODEGROUP,
1455   LEVEL_INSTANCE,
1456   ])
1457
1458 #: Lock level names (make sure to use singular form)
1459 LEVEL_NAMES = {
1460   LEVEL_CLUSTER: "cluster",
1461   LEVEL_INSTANCE: "instance",
1462   LEVEL_NODEGROUP: "nodegroup",
1463   LEVEL_NODE: "node",
1464   LEVEL_NODE_RES: "node-res",
1465   }
1466
1467 # Constant for the big ganeti lock
1468 BGL = 'BGL'
1469
1470
1471 class GanetiLockManager:
1472   """The Ganeti Locking Library
1473
1474   The purpose of this small library is to manage locking for ganeti clusters
1475   in a central place, while at the same time doing dynamic checks against
1476   possible deadlocks. It will also make it easier to transition to a different
1477   lock type should we migrate away from python threads.
1478
1479   """
1480   _instance = None
1481
1482   def __init__(self, nodes, nodegroups, instances):
1483     """Constructs a new GanetiLockManager object.
1484
1485     There should be only a GanetiLockManager object at any time, so this
1486     function raises an error if this is not the case.
1487
1488     @param nodes: list of node names
1489     @param nodegroups: list of nodegroup uuids
1490     @param instances: list of instance names
1491
1492     """
1493     assert self.__class__._instance is None, \
1494            "double GanetiLockManager instance"
1495
1496     self.__class__._instance = self
1497
1498     self._monitor = LockMonitor()
1499
1500     # The keyring contains all the locks, at their level and in the correct
1501     # locking order.
1502     self.__keyring = {
1503       LEVEL_CLUSTER: LockSet([BGL], "cluster", monitor=self._monitor),
1504       LEVEL_NODE: LockSet(nodes, "node", monitor=self._monitor),
1505       LEVEL_NODE_RES: LockSet(nodes, "node-res", monitor=self._monitor),
1506       LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroup", monitor=self._monitor),
1507       LEVEL_INSTANCE: LockSet(instances, "instance",
1508                               monitor=self._monitor),
1509       }
1510
1511     assert compat.all(ls.name == LEVEL_NAMES[level]
1512                       for (level, ls) in self.__keyring.items())
1513
1514   def AddToLockMonitor(self, provider):
1515     """Registers a new lock with the monitor.
1516
1517     See L{LockMonitor.RegisterLock}.
1518
1519     """
1520     return self._monitor.RegisterLock(provider)
1521
1522   def QueryLocks(self, fields):
1523     """Queries information from all locks.
1524
1525     See L{LockMonitor.QueryLocks}.
1526
1527     """
1528     return self._monitor.QueryLocks(fields)
1529
1530   def OldStyleQueryLocks(self, fields):
1531     """Queries information from all locks, returning old-style data.
1532
1533     See L{LockMonitor.OldStyleQueryLocks}.
1534
1535     """
1536     return self._monitor.OldStyleQueryLocks(fields)
1537
1538   def _names(self, level):
1539     """List the lock names at the given level.
1540
1541     This can be used for debugging/testing purposes.
1542
1543     @param level: the level whose list of locks to get
1544
1545     """
1546     assert level in LEVELS, "Invalid locking level %s" % level
1547     return self.__keyring[level]._names()
1548
1549   def is_owned(self, level):
1550     """Check whether we are owning locks at the given level
1551
1552     """
1553     return self.__keyring[level].is_owned()
1554
1555   def list_owned(self, level):
1556     """Get the set of owned locks at the given level
1557
1558     """
1559     return self.__keyring[level].list_owned()
1560
1561   def check_owned(self, level, names, shared=-1):
1562     """Check if locks at a certain level are owned in a specific mode.
1563
1564     @see: L{LockSet.check_owned}
1565
1566     """
1567     return self.__keyring[level].check_owned(names, shared=shared)
1568
1569   def _upper_owned(self, level):
1570     """Check that we don't own any lock at a level greater than the given one.
1571
1572     """
1573     # This way of checking only works if LEVELS[i] = i, which we check for in
1574     # the test cases.
1575     return compat.any((self.is_owned(l) for l in LEVELS[level + 1:]))
1576
1577   def _BGL_owned(self): # pylint: disable=C0103
1578     """Check if the current thread owns the BGL.
1579
1580     Both an exclusive or a shared acquisition work.
1581
1582     """
1583     return BGL in self.__keyring[LEVEL_CLUSTER].list_owned()
1584
1585   @staticmethod
1586   def _contains_BGL(level, names): # pylint: disable=C0103
1587     """Check if the level contains the BGL.
1588
1589     Check if acting on the given level and set of names will change
1590     the status of the Big Ganeti Lock.
1591
1592     """
1593     return level == LEVEL_CLUSTER and (names is None or BGL in names)
1594
1595   def acquire(self, level, names, timeout=None, shared=0, priority=None):
1596     """Acquire a set of resource locks, at the same level.
1597
1598     @type level: member of locking.LEVELS
1599     @param level: the level at which the locks shall be acquired
1600     @type names: list of strings (or string)
1601     @param names: the names of the locks which shall be acquired
1602         (special lock names, or instance/node names)
1603     @type shared: integer (0/1) used as a boolean
1604     @param shared: whether to acquire in shared mode; by default
1605         an exclusive lock will be acquired
1606     @type timeout: float
1607     @param timeout: Maximum time to acquire all locks
1608     @type priority: integer
1609     @param priority: Priority for acquiring lock
1610
1611     """
1612     assert level in LEVELS, "Invalid locking level %s" % level
1613
1614     # Check that we are either acquiring the Big Ganeti Lock or we already own
1615     # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1616     # so even if we've migrated we need to at least share the BGL to be
1617     # compatible with them. Of course if we own the BGL exclusively there's no
1618     # point in acquiring any other lock, unless perhaps we are half way through
1619     # the migration of the current opcode.
1620     assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1621             "You must own the Big Ganeti Lock before acquiring any other")
1622
1623     # Check we don't own locks at the same or upper levels.
1624     assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1625            " while owning some at a greater one")
1626
1627     # Acquire the locks in the set.
1628     return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1629                                          priority=priority)
1630
1631   def downgrade(self, level, names=None):
1632     """Downgrade a set of resource locks from exclusive to shared mode.
1633
1634     You must have acquired the locks in exclusive mode.
1635
1636     @type level: member of locking.LEVELS
1637     @param level: the level at which the locks shall be downgraded
1638     @type names: list of strings, or None
1639     @param names: the names of the locks which shall be downgraded
1640         (defaults to all the locks acquired at the level)
1641
1642     """
1643     assert level in LEVELS, "Invalid locking level %s" % level
1644
1645     return self.__keyring[level].downgrade(names=names)
1646
1647   def release(self, level, names=None):
1648     """Release a set of resource locks, at the same level.
1649
1650     You must have acquired the locks, either in shared or in exclusive
1651     mode, before releasing them.
1652
1653     @type level: member of locking.LEVELS
1654     @param level: the level at which the locks shall be released
1655     @type names: list of strings, or None
1656     @param names: the names of the locks which shall be released
1657         (defaults to all the locks acquired at that level)
1658
1659     """
1660     assert level in LEVELS, "Invalid locking level %s" % level
1661     assert (not self._contains_BGL(level, names) or
1662             not self._upper_owned(LEVEL_CLUSTER)), (
1663             "Cannot release the Big Ganeti Lock while holding something"
1664             " at upper levels (%r)" %
1665             (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self.list_owned(i))
1666                               for i in self.__keyring.keys()]), ))
1667
1668     # Release will complain if we don't own the locks already
1669     return self.__keyring[level].release(names)
1670
1671   def add(self, level, names, acquired=0, shared=0):
1672     """Add locks at the specified level.
1673
1674     @type level: member of locking.LEVELS_MOD
1675     @param level: the level at which the locks shall be added
1676     @type names: list of strings
1677     @param names: names of the locks to acquire
1678     @type acquired: integer (0/1) used as a boolean
1679     @param acquired: whether to acquire the newly added locks
1680     @type shared: integer (0/1) used as a boolean
1681     @param shared: whether the acquisition will be shared
1682
1683     """
1684     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1685     assert self._BGL_owned(), ("You must own the BGL before performing other"
1686            " operations")
1687     assert not self._upper_owned(level), ("Cannot add locks at a level"
1688            " while owning some at a greater one")
1689     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1690
1691   def remove(self, level, names):
1692     """Remove locks from the specified level.
1693
1694     You must either already own the locks you are trying to remove
1695     exclusively or not own any lock at an upper level.
1696
1697     @type level: member of locking.LEVELS_MOD
1698     @param level: the level at which the locks shall be removed
1699     @type names: list of strings
1700     @param names: the names of the locks which shall be removed
1701         (special lock names, or instance/node names)
1702
1703     """
1704     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1705     assert self._BGL_owned(), ("You must own the BGL before performing other"
1706            " operations")
1707     # Check we either own the level or don't own anything from here
1708     # up. LockSet.remove() will check the case in which we don't own
1709     # all the needed resources, or we have a shared ownership.
1710     assert self.is_owned(level) or not self._upper_owned(level), (
1711            "Cannot remove locks at a level while not owning it or"
1712            " owning some at a greater one")
1713     return self.__keyring[level].remove(names)
1714
1715
1716 def _MonitorSortKey((item, idx, num)):
1717   """Sorting key function.
1718
1719   Sort by name, registration order and then order of information. This provides
1720   a stable sort order over different providers, even if they return the same
1721   name.
1722
1723   """
1724   (name, _, _, _) = item
1725
1726   return (utils.NiceSortKey(name), num, idx)
1727
1728
1729 class LockMonitor(object):
1730   _LOCK_ATTR = "_lock"
1731
1732   def __init__(self):
1733     """Initializes this class.
1734
1735     """
1736     self._lock = SharedLock("LockMonitor")
1737
1738     # Counter for stable sorting
1739     self._counter = itertools.count(0)
1740
1741     # Tracked locks. Weak references are used to avoid issues with circular
1742     # references and deletion.
1743     self._locks = weakref.WeakKeyDictionary()
1744
1745   @ssynchronized(_LOCK_ATTR)
1746   def RegisterLock(self, provider):
1747     """Registers a new lock.
1748
1749     @param provider: Object with a callable method named C{GetLockInfo}, taking
1750       a single C{set} containing the requested information items
1751     @note: It would be nicer to only receive the function generating the
1752       requested information but, as it turns out, weak references to bound
1753       methods (e.g. C{self.GetLockInfo}) are tricky; there are several
1754       workarounds, but none of the ones I found works properly in combination
1755       with a standard C{WeakKeyDictionary}
1756
1757     """
1758     assert provider not in self._locks, "Duplicate registration"
1759
1760     # There used to be a check for duplicate names here. As it turned out, when
1761     # a lock is re-created with the same name in a very short timeframe, the
1762     # previous instance might not yet be removed from the weakref dictionary.
1763     # By keeping track of the order of incoming registrations, a stable sort
1764     # ordering can still be guaranteed.
1765
1766     self._locks[provider] = self._counter.next()
1767
1768   def _GetLockInfo(self, requested):
1769     """Get information from all locks.
1770
1771     """
1772     # Must hold lock while getting consistent list of tracked items
1773     self._lock.acquire(shared=1)
1774     try:
1775       items = self._locks.items()
1776     finally:
1777       self._lock.release()
1778
1779     return [(info, idx, num)
1780             for (provider, num) in items
1781             for (idx, info) in enumerate(provider.GetLockInfo(requested))]
1782
1783   def _Query(self, fields):
1784     """Queries information from all locks.
1785
1786     @type fields: list of strings
1787     @param fields: List of fields to return
1788
1789     """
1790     qobj = query.Query(query.LOCK_FIELDS, fields)
1791
1792     # Get all data with internal lock held and then sort by name and incoming
1793     # order
1794     lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
1795                       key=_MonitorSortKey)
1796
1797     # Extract lock information and build query data
1798     return (qobj, query.LockQueryData(map(compat.fst, lockinfo)))
1799
1800   def QueryLocks(self, fields):
1801     """Queries information from all locks.
1802
1803     @type fields: list of strings
1804     @param fields: List of fields to return
1805
1806     """
1807     (qobj, ctx) = self._Query(fields)
1808
1809     # Prepare query response
1810     return query.GetQueryResponse(qobj, ctx)
1811
1812   def OldStyleQueryLocks(self, fields):
1813     """Queries information from all locks, returning old-style data.
1814
1815     @type fields: list of strings
1816     @param fields: List of fields to return
1817
1818     """
1819     (qobj, ctx) = self._Query(fields)
1820
1821     return qobj.OldStyleQuery(ctx)