Break line longer than 80 chars in configure.ac
[ganeti-local] / lib / locking.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Module implementing the Ganeti locking code."""
22
23 # pylint: disable=W0212
24
25 # W0212 since e.g. LockSet methods use (a lot) the internals of
26 # SharedLock
27
28 import os
29 import select
30 import threading
31 import errno
32 import weakref
33 import logging
34 import heapq
35 import itertools
36 import time
37
38 from ganeti import errors
39 from ganeti import utils
40 from ganeti import compat
41 from ganeti import query
42
43
44 _EXCLUSIVE_TEXT = "exclusive"
45 _SHARED_TEXT = "shared"
46 _DELETED_TEXT = "deleted"
47
48 _DEFAULT_PRIORITY = 0
49
50 #: Minimum timeout required to consider scheduling a pending acquisition
51 #: (seconds)
52 _LOCK_ACQUIRE_MIN_TIMEOUT = (1.0 / 1000)
53
54 # Internal lock acquisition modes for L{LockSet}
55 (_LS_ACQUIRE_EXACT,
56  _LS_ACQUIRE_ALL,
57  _LS_ACQUIRE_OPPORTUNISTIC) = range(1, 4)
58
59 _LS_ACQUIRE_MODES = compat.UniqueFrozenset([
60   _LS_ACQUIRE_EXACT,
61   _LS_ACQUIRE_ALL,
62   _LS_ACQUIRE_OPPORTUNISTIC,
63   ])
64
65
66 def ssynchronized(mylock, shared=0):
67   """Shared Synchronization decorator.
68
69   Calls the function holding the given lock, either in exclusive or shared
70   mode. It requires the passed lock to be a SharedLock (or support its
71   semantics).
72
73   @type mylock: lockable object or string
74   @param mylock: lock to acquire or class member name of the lock to acquire
75
76   """
77   def wrap(fn):
78     def sync_function(*args, **kwargs):
79       if isinstance(mylock, basestring):
80         assert args, "cannot ssynchronize on non-class method: self not found"
81         # args[0] is "self"
82         lock = getattr(args[0], mylock)
83       else:
84         lock = mylock
85       lock.acquire(shared=shared)
86       try:
87         return fn(*args, **kwargs)
88       finally:
89         lock.release()
90     return sync_function
91   return wrap
92
93
94 class _SingleNotifyPipeConditionWaiter(object):
95   """Helper class for SingleNotifyPipeCondition
96
97   """
98   __slots__ = [
99     "_fd",
100     ]
101
102   def __init__(self, fd):
103     """Constructor for _SingleNotifyPipeConditionWaiter
104
105     @type fd: int
106     @param fd: File descriptor to wait for
107
108     """
109     object.__init__(self)
110     self._fd = fd
111
112   def __call__(self, timeout):
113     """Wait for something to happen on the pipe.
114
115     @type timeout: float or None
116     @param timeout: Timeout for waiting (can be None)
117
118     """
119     running_timeout = utils.RunningTimeout(timeout, True)
120     poller = select.poll()
121     poller.register(self._fd, select.POLLHUP)
122
123     while True:
124       remaining_time = running_timeout.Remaining()
125
126       if remaining_time is not None:
127         if remaining_time < 0.0:
128           break
129
130         # Our calculation uses seconds, poll() wants milliseconds
131         remaining_time *= 1000
132
133       try:
134         result = poller.poll(remaining_time)
135       except EnvironmentError, err:
136         if err.errno != errno.EINTR:
137           raise
138         result = None
139
140       # Check whether we were notified
141       if result and result[0][0] == self._fd:
142         break
143
144
145 class _BaseCondition(object):
146   """Base class containing common code for conditions.
147
148   Some of this code is taken from python's threading module.
149
150   """
151   __slots__ = [
152     "_lock",
153     "acquire",
154     "release",
155     "_is_owned",
156     "_acquire_restore",
157     "_release_save",
158     ]
159
160   def __init__(self, lock):
161     """Constructor for _BaseCondition.
162
163     @type lock: threading.Lock
164     @param lock: condition base lock
165
166     """
167     object.__init__(self)
168
169     try:
170       self._release_save = lock._release_save
171     except AttributeError:
172       self._release_save = self._base_release_save
173     try:
174       self._acquire_restore = lock._acquire_restore
175     except AttributeError:
176       self._acquire_restore = self._base_acquire_restore
177     try:
178       self._is_owned = lock.is_owned
179     except AttributeError:
180       self._is_owned = self._base_is_owned
181
182     self._lock = lock
183
184     # Export the lock's acquire() and release() methods
185     self.acquire = lock.acquire
186     self.release = lock.release
187
188   def _base_is_owned(self):
189     """Check whether lock is owned by current thread.
190
191     """
192     if self._lock.acquire(0):
193       self._lock.release()
194       return False
195     return True
196
197   def _base_release_save(self):
198     self._lock.release()
199
200   def _base_acquire_restore(self, _):
201     self._lock.acquire()
202
203   def _check_owned(self):
204     """Raise an exception if the current thread doesn't own the lock.
205
206     """
207     if not self._is_owned():
208       raise RuntimeError("cannot work with un-aquired lock")
209
210
211 class SingleNotifyPipeCondition(_BaseCondition):
212   """Condition which can only be notified once.
213
214   This condition class uses pipes and poll, internally, to be able to wait for
215   notification with a timeout, without resorting to polling. It is almost
216   compatible with Python's threading.Condition, with the following differences:
217     - notifyAll can only be called once, and no wait can happen after that
218     - notify is not supported, only notifyAll
219
220   """
221
222   __slots__ = [
223     "_read_fd",
224     "_write_fd",
225     "_nwaiters",
226     "_notified",
227     ]
228
229   _waiter_class = _SingleNotifyPipeConditionWaiter
230
231   def __init__(self, lock):
232     """Constructor for SingleNotifyPipeCondition
233
234     """
235     _BaseCondition.__init__(self, lock)
236     self._nwaiters = 0
237     self._notified = False
238     self._read_fd = None
239     self._write_fd = None
240
241   def _check_unnotified(self):
242     """Throws an exception if already notified.
243
244     """
245     if self._notified:
246       raise RuntimeError("cannot use already notified condition")
247
248   def _Cleanup(self):
249     """Cleanup open file descriptors, if any.
250
251     """
252     if self._read_fd is not None:
253       os.close(self._read_fd)
254       self._read_fd = None
255
256     if self._write_fd is not None:
257       os.close(self._write_fd)
258       self._write_fd = None
259
260   def wait(self, timeout):
261     """Wait for a notification.
262
263     @type timeout: float or None
264     @param timeout: Waiting timeout (can be None)
265
266     """
267     self._check_owned()
268     self._check_unnotified()
269
270     self._nwaiters += 1
271     try:
272       if self._read_fd is None:
273         (self._read_fd, self._write_fd) = os.pipe()
274
275       wait_fn = self._waiter_class(self._read_fd)
276       state = self._release_save()
277       try:
278         # Wait for notification
279         wait_fn(timeout)
280       finally:
281         # Re-acquire lock
282         self._acquire_restore(state)
283     finally:
284       self._nwaiters -= 1
285       if self._nwaiters == 0:
286         self._Cleanup()
287
288   def notifyAll(self): # pylint: disable=C0103
289     """Close the writing side of the pipe to notify all waiters.
290
291     """
292     self._check_owned()
293     self._check_unnotified()
294     self._notified = True
295     if self._write_fd is not None:
296       os.close(self._write_fd)
297       self._write_fd = None
298
299
300 class PipeCondition(_BaseCondition):
301   """Group-only non-polling condition with counters.
302
303   This condition class uses pipes and poll, internally, to be able to wait for
304   notification with a timeout, without resorting to polling. It is almost
305   compatible with Python's threading.Condition, but only supports notifyAll and
306   non-recursive locks. As an additional features it's able to report whether
307   there are any waiting threads.
308
309   """
310   __slots__ = [
311     "_waiters",
312     "_single_condition",
313     ]
314
315   _single_condition_class = SingleNotifyPipeCondition
316
317   def __init__(self, lock):
318     """Initializes this class.
319
320     """
321     _BaseCondition.__init__(self, lock)
322     self._waiters = set()
323     self._single_condition = self._single_condition_class(self._lock)
324
325   def wait(self, timeout):
326     """Wait for a notification.
327
328     @type timeout: float or None
329     @param timeout: Waiting timeout (can be None)
330
331     """
332     self._check_owned()
333
334     # Keep local reference to the pipe. It could be replaced by another thread
335     # notifying while we're waiting.
336     cond = self._single_condition
337
338     self._waiters.add(threading.currentThread())
339     try:
340       cond.wait(timeout)
341     finally:
342       self._check_owned()
343       self._waiters.remove(threading.currentThread())
344
345   def notifyAll(self): # pylint: disable=C0103
346     """Notify all currently waiting threads.
347
348     """
349     self._check_owned()
350     self._single_condition.notifyAll()
351     self._single_condition = self._single_condition_class(self._lock)
352
353   def get_waiting(self):
354     """Returns a list of all waiting threads.
355
356     """
357     self._check_owned()
358
359     return self._waiters
360
361   def has_waiting(self):
362     """Returns whether there are active waiters.
363
364     """
365     self._check_owned()
366
367     return bool(self._waiters)
368
369   def __repr__(self):
370     return ("<%s.%s waiters=%s at %#x>" %
371             (self.__class__.__module__, self.__class__.__name__,
372              self._waiters, id(self)))
373
374
375 class _PipeConditionWithMode(PipeCondition):
376   __slots__ = [
377     "shared",
378     ]
379
380   def __init__(self, lock, shared):
381     """Initializes this class.
382
383     """
384     self.shared = shared
385     PipeCondition.__init__(self, lock)
386
387
388 class SharedLock(object):
389   """Implements a shared lock.
390
391   Multiple threads can acquire the lock in a shared way by calling
392   C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
393   threads can call C{acquire(shared=0)}.
394
395   Notes on data structures: C{__pending} contains a priority queue (heapq) of
396   all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
397   ...]}. Each per-priority queue contains a normal in-order list of conditions
398   to be notified when the lock can be acquired. Shared locks are grouped
399   together by priority and the condition for them is stored in
400   C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
401   references for the per-priority queues indexed by priority for faster access.
402
403   @type name: string
404   @ivar name: the name of the lock
405
406   """
407   __slots__ = [
408     "__weakref__",
409     "__deleted",
410     "__exc",
411     "__lock",
412     "__pending",
413     "__pending_by_prio",
414     "__pending_shared",
415     "__shr",
416     "__time_fn",
417     "name",
418     ]
419
420   __condition_class = _PipeConditionWithMode
421
422   def __init__(self, name, monitor=None, _time_fn=time.time):
423     """Construct a new SharedLock.
424
425     @param name: the name of the lock
426     @type monitor: L{LockMonitor}
427     @param monitor: Lock monitor with which to register
428
429     """
430     object.__init__(self)
431
432     self.name = name
433
434     # Used for unittesting
435     self.__time_fn = _time_fn
436
437     # Internal lock
438     self.__lock = threading.Lock()
439
440     # Queue containing waiting acquires
441     self.__pending = []
442     self.__pending_by_prio = {}
443     self.__pending_shared = {}
444
445     # Current lock holders
446     self.__shr = set()
447     self.__exc = None
448
449     # is this lock in the deleted state?
450     self.__deleted = False
451
452     # Register with lock monitor
453     if monitor:
454       logging.debug("Adding lock %s to monitor", name)
455       monitor.RegisterLock(self)
456
457   def __repr__(self):
458     return ("<%s.%s name=%s at %#x>" %
459             (self.__class__.__module__, self.__class__.__name__,
460              self.name, id(self)))
461
462   def GetLockInfo(self, requested):
463     """Retrieves information for querying locks.
464
465     @type requested: set
466     @param requested: Requested information, see C{query.LQ_*}
467
468     """
469     self.__lock.acquire()
470     try:
471       # Note: to avoid unintentional race conditions, no references to
472       # modifiable objects should be returned unless they were created in this
473       # function.
474       mode = None
475       owner_names = None
476
477       if query.LQ_MODE in requested:
478         if self.__deleted:
479           mode = _DELETED_TEXT
480           assert not (self.__exc or self.__shr)
481         elif self.__exc:
482           mode = _EXCLUSIVE_TEXT
483         elif self.__shr:
484           mode = _SHARED_TEXT
485
486       # Current owner(s) are wanted
487       if query.LQ_OWNER in requested:
488         if self.__exc:
489           owner = [self.__exc]
490         else:
491           owner = self.__shr
492
493         if owner:
494           assert not self.__deleted
495           owner_names = [i.getName() for i in owner]
496
497       # Pending acquires are wanted
498       if query.LQ_PENDING in requested:
499         pending = []
500
501         # Sorting instead of copying and using heaq functions for simplicity
502         for (_, prioqueue) in sorted(self.__pending):
503           for cond in prioqueue:
504             if cond.shared:
505               pendmode = _SHARED_TEXT
506             else:
507               pendmode = _EXCLUSIVE_TEXT
508
509             # List of names will be sorted in L{query._GetLockPending}
510             pending.append((pendmode, [i.getName()
511                                        for i in cond.get_waiting()]))
512       else:
513         pending = None
514
515       return [(self.name, mode, owner_names, pending)]
516     finally:
517       self.__lock.release()
518
519   def __check_deleted(self):
520     """Raises an exception if the lock has been deleted.
521
522     """
523     if self.__deleted:
524       raise errors.LockError("Deleted lock %s" % self.name)
525
526   def __is_sharer(self):
527     """Is the current thread sharing the lock at this time?
528
529     """
530     return threading.currentThread() in self.__shr
531
532   def __is_exclusive(self):
533     """Is the current thread holding the lock exclusively at this time?
534
535     """
536     return threading.currentThread() == self.__exc
537
538   def __is_owned(self, shared=-1):
539     """Is the current thread somehow owning the lock at this time?
540
541     This is a private version of the function, which presumes you're holding
542     the internal lock.
543
544     """
545     if shared < 0:
546       return self.__is_sharer() or self.__is_exclusive()
547     elif shared:
548       return self.__is_sharer()
549     else:
550       return self.__is_exclusive()
551
552   def is_owned(self, shared=-1):
553     """Is the current thread somehow owning the lock at this time?
554
555     @param shared:
556         - < 0: check for any type of ownership (default)
557         - 0: check for exclusive ownership
558         - > 0: check for shared ownership
559
560     """
561     self.__lock.acquire()
562     try:
563       return self.__is_owned(shared=shared)
564     finally:
565       self.__lock.release()
566
567   #: Necessary to remain compatible with threading.Condition, which tries to
568   #: retrieve a locks' "_is_owned" attribute
569   _is_owned = is_owned
570
571   def _count_pending(self):
572     """Returns the number of pending acquires.
573
574     @rtype: int
575
576     """
577     self.__lock.acquire()
578     try:
579       return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
580     finally:
581       self.__lock.release()
582
583   def _check_empty(self):
584     """Checks whether there are any pending acquires.
585
586     @rtype: bool
587
588     """
589     self.__lock.acquire()
590     try:
591       # Order is important: __find_first_pending_queue modifies __pending
592       (_, prioqueue) = self.__find_first_pending_queue()
593
594       return not (prioqueue or
595                   self.__pending or
596                   self.__pending_by_prio or
597                   self.__pending_shared)
598     finally:
599       self.__lock.release()
600
601   def __do_acquire(self, shared):
602     """Actually acquire the lock.
603
604     """
605     if shared:
606       self.__shr.add(threading.currentThread())
607     else:
608       self.__exc = threading.currentThread()
609
610   def __can_acquire(self, shared):
611     """Determine whether lock can be acquired.
612
613     """
614     if shared:
615       return self.__exc is None
616     else:
617       return len(self.__shr) == 0 and self.__exc is None
618
619   def __find_first_pending_queue(self):
620     """Tries to find the topmost queued entry with pending acquires.
621
622     Removes empty entries while going through the list.
623
624     """
625     while self.__pending:
626       (priority, prioqueue) = self.__pending[0]
627
628       if prioqueue:
629         return (priority, prioqueue)
630
631       # Remove empty queue
632       heapq.heappop(self.__pending)
633       del self.__pending_by_prio[priority]
634       assert priority not in self.__pending_shared
635
636     return (None, None)
637
638   def __is_on_top(self, cond):
639     """Checks whether the passed condition is on top of the queue.
640
641     The caller must make sure the queue isn't empty.
642
643     """
644     (_, prioqueue) = self.__find_first_pending_queue()
645
646     return cond == prioqueue[0]
647
648   def __acquire_unlocked(self, shared, timeout, priority):
649     """Acquire a shared lock.
650
651     @param shared: whether to acquire in shared mode; by default an
652         exclusive lock will be acquired
653     @param timeout: maximum waiting time before giving up
654     @type priority: integer
655     @param priority: Priority for acquiring lock
656
657     """
658     self.__check_deleted()
659
660     # We cannot acquire the lock if we already have it
661     assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
662                                    " %s" % self.name)
663
664     # Remove empty entries from queue
665     self.__find_first_pending_queue()
666
667     # Check whether someone else holds the lock or there are pending acquires.
668     if not self.__pending and self.__can_acquire(shared):
669       # Apparently not, can acquire lock directly.
670       self.__do_acquire(shared)
671       return True
672
673     # The lock couldn't be acquired right away, so if a timeout is given and is
674     # considered too short, return right away as scheduling a pending
675     # acquisition is quite expensive
676     if timeout is not None and timeout < _LOCK_ACQUIRE_MIN_TIMEOUT:
677       return False
678
679     prioqueue = self.__pending_by_prio.get(priority, None)
680
681     if shared:
682       # Try to re-use condition for shared acquire
683       wait_condition = self.__pending_shared.get(priority, None)
684       assert (wait_condition is None or
685               (wait_condition.shared and wait_condition in prioqueue))
686     else:
687       wait_condition = None
688
689     if wait_condition is None:
690       if prioqueue is None:
691         assert priority not in self.__pending_by_prio
692
693         prioqueue = []
694         heapq.heappush(self.__pending, (priority, prioqueue))
695         self.__pending_by_prio[priority] = prioqueue
696
697       wait_condition = self.__condition_class(self.__lock, shared)
698       prioqueue.append(wait_condition)
699
700       if shared:
701         # Keep reference for further shared acquires on same priority. This is
702         # better than trying to find it in the list of pending acquires.
703         assert priority not in self.__pending_shared
704         self.__pending_shared[priority] = wait_condition
705
706     wait_start = self.__time_fn()
707     acquired = False
708
709     try:
710       # Wait until we become the topmost acquire in the queue or the timeout
711       # expires.
712       while True:
713         if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
714           self.__do_acquire(shared)
715           acquired = True
716           break
717
718         # A lot of code assumes blocking acquires always succeed, therefore we
719         # can never return False for a blocking acquire
720         if (timeout is not None and
721             utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)):
722           break
723
724         # Wait for notification
725         wait_condition.wait(timeout)
726         self.__check_deleted()
727     finally:
728       # Remove condition from queue if there are no more waiters
729       if not wait_condition.has_waiting():
730         prioqueue.remove(wait_condition)
731         if wait_condition.shared:
732           # Remove from list of shared acquires if it wasn't while releasing
733           # (e.g. on lock deletion)
734           self.__pending_shared.pop(priority, None)
735
736     return acquired
737
738   def acquire(self, shared=0, timeout=None, priority=None,
739               test_notify=None):
740     """Acquire a shared lock.
741
742     @type shared: integer (0/1) used as a boolean
743     @param shared: whether to acquire in shared mode; by default an
744         exclusive lock will be acquired
745     @type timeout: float
746     @param timeout: maximum waiting time before giving up
747     @type priority: integer
748     @param priority: Priority for acquiring lock
749     @type test_notify: callable or None
750     @param test_notify: Special callback function for unittesting
751
752     """
753     if priority is None:
754       priority = _DEFAULT_PRIORITY
755
756     self.__lock.acquire()
757     try:
758       # We already got the lock, notify now
759       if __debug__ and callable(test_notify):
760         test_notify()
761
762       return self.__acquire_unlocked(shared, timeout, priority)
763     finally:
764       self.__lock.release()
765
766   def downgrade(self):
767     """Changes the lock mode from exclusive to shared.
768
769     Pending acquires in shared mode on the same priority will go ahead.
770
771     """
772     self.__lock.acquire()
773     try:
774       assert self.__is_owned(), "Lock must be owned"
775
776       if self.__is_exclusive():
777         # Do nothing if the lock is already acquired in shared mode
778         self.__exc = None
779         self.__do_acquire(1)
780
781         # Important: pending shared acquires should only jump ahead if there
782         # was a transition from exclusive to shared, otherwise an owner of a
783         # shared lock can keep calling this function to push incoming shared
784         # acquires
785         (priority, prioqueue) = self.__find_first_pending_queue()
786         if prioqueue:
787           # Is there a pending shared acquire on this priority?
788           cond = self.__pending_shared.pop(priority, None)
789           if cond:
790             assert cond.shared
791             assert cond in prioqueue
792
793             # Ensure shared acquire is on top of queue
794             if len(prioqueue) > 1:
795               prioqueue.remove(cond)
796               prioqueue.insert(0, cond)
797
798             # Notify
799             cond.notifyAll()
800
801       assert not self.__is_exclusive()
802       assert self.__is_sharer()
803
804       return True
805     finally:
806       self.__lock.release()
807
808   def release(self):
809     """Release a Shared Lock.
810
811     You must have acquired the lock, either in shared or in exclusive mode,
812     before calling this function.
813
814     """
815     self.__lock.acquire()
816     try:
817       assert self.__is_exclusive() or self.__is_sharer(), \
818         "Cannot release non-owned lock"
819
820       # Autodetect release type
821       if self.__is_exclusive():
822         self.__exc = None
823         notify = True
824       else:
825         self.__shr.remove(threading.currentThread())
826         notify = not self.__shr
827
828       # Notify topmost condition in queue if there are no owners left (for
829       # shared locks)
830       if notify:
831         self.__notify_topmost()
832     finally:
833       self.__lock.release()
834
835   def __notify_topmost(self):
836     """Notifies topmost condition in queue of pending acquires.
837
838     """
839     (priority, prioqueue) = self.__find_first_pending_queue()
840     if prioqueue:
841       cond = prioqueue[0]
842       cond.notifyAll()
843       if cond.shared:
844         # Prevent further shared acquires from sneaking in while waiters are
845         # notified
846         self.__pending_shared.pop(priority, None)
847
848   def _notify_topmost(self):
849     """Exported version of L{__notify_topmost}.
850
851     """
852     self.__lock.acquire()
853     try:
854       return self.__notify_topmost()
855     finally:
856       self.__lock.release()
857
858   def delete(self, timeout=None, priority=None):
859     """Delete a Shared Lock.
860
861     This operation will declare the lock for removal. First the lock will be
862     acquired in exclusive mode if you don't already own it, then the lock
863     will be put in a state where any future and pending acquire() fail.
864
865     @type timeout: float
866     @param timeout: maximum waiting time before giving up
867     @type priority: integer
868     @param priority: Priority for acquiring lock
869
870     """
871     if priority is None:
872       priority = _DEFAULT_PRIORITY
873
874     self.__lock.acquire()
875     try:
876       assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
877
878       self.__check_deleted()
879
880       # The caller is allowed to hold the lock exclusively already.
881       acquired = self.__is_exclusive()
882
883       if not acquired:
884         acquired = self.__acquire_unlocked(0, timeout, priority)
885
886       if acquired:
887         assert self.__is_exclusive() and not self.__is_sharer(), \
888           "Lock wasn't acquired in exclusive mode"
889
890         self.__deleted = True
891         self.__exc = None
892
893         assert not (self.__exc or self.__shr), "Found owner during deletion"
894
895         # Notify all acquires. They'll throw an error.
896         for (_, prioqueue) in self.__pending:
897           for cond in prioqueue:
898             cond.notifyAll()
899
900         assert self.__deleted
901
902       return acquired
903     finally:
904       self.__lock.release()
905
906   def _release_save(self):
907     shared = self.__is_sharer()
908     self.release()
909     return shared
910
911   def _acquire_restore(self, shared):
912     self.acquire(shared=shared)
913
914
915 # Whenever we want to acquire a full LockSet we pass None as the value
916 # to acquire.  Hide this behind this nicely named constant.
917 ALL_SET = None
918
919
920 def _TimeoutZero():
921   """Returns the number zero.
922
923   """
924   return 0
925
926
927 def _GetLsAcquireModeAndTimeouts(want_all, timeout, opportunistic):
928   """Determines modes and timeouts for L{LockSet.acquire}.
929
930   @type want_all: boolean
931   @param want_all: Whether all locks in set should be acquired
932   @param timeout: Timeout in seconds or C{None}
933   @param opportunistic: Whther locks should be acquired opportunistically
934   @rtype: tuple
935   @return: Tuple containing mode to be passed to L{LockSet.__acquire_inner}
936     (one of L{_LS_ACQUIRE_MODES}), a function to calculate timeout for
937     acquiring the lockset-internal lock (might be C{None}) and a function to
938     calculate the timeout for acquiring individual locks
939
940   """
941   # Short circuit when no running timeout is needed
942   if opportunistic and not want_all:
943     assert timeout is None, "Got timeout for an opportunistic acquisition"
944     return (_LS_ACQUIRE_OPPORTUNISTIC, None, _TimeoutZero)
945
946   # We need to keep track of how long we spent waiting for a lock. The
947   # timeout passed to this function is over all lock acquisitions.
948   running_timeout = utils.RunningTimeout(timeout, False)
949
950   if want_all:
951     mode = _LS_ACQUIRE_ALL
952     ls_timeout_fn = running_timeout.Remaining
953   else:
954     mode = _LS_ACQUIRE_EXACT
955     ls_timeout_fn = None
956
957   if opportunistic:
958     mode = _LS_ACQUIRE_OPPORTUNISTIC
959     timeout_fn = _TimeoutZero
960   else:
961     timeout_fn = running_timeout.Remaining
962
963   return (mode, ls_timeout_fn, timeout_fn)
964
965
966 class _AcquireTimeout(Exception):
967   """Internal exception to abort an acquire on a timeout.
968
969   """
970
971
972 class LockSet:
973   """Implements a set of locks.
974
975   This abstraction implements a set of shared locks for the same resource type,
976   distinguished by name. The user can lock a subset of the resources and the
977   LockSet will take care of acquiring the locks always in the same order, thus
978   preventing deadlock.
979
980   All the locks needed in the same set must be acquired together, though.
981
982   @type name: string
983   @ivar name: the name of the lockset
984
985   """
986   def __init__(self, members, name, monitor=None):
987     """Constructs a new LockSet.
988
989     @type members: list of strings
990     @param members: initial members of the set
991     @type monitor: L{LockMonitor}
992     @param monitor: Lock monitor with which to register member locks
993
994     """
995     assert members is not None, "members parameter is not a list"
996     self.name = name
997
998     # Lock monitor
999     self.__monitor = monitor
1000
1001     # Used internally to guarantee coherency
1002     self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
1003
1004     # The lockdict indexes the relationship name -> lock
1005     # The order-of-locking is implied by the alphabetical order of names
1006     self.__lockdict = {}
1007
1008     for mname in members:
1009       self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
1010                                           monitor=monitor)
1011
1012     # The owner dict contains the set of locks each thread owns. For
1013     # performance each thread can access its own key without a global lock on
1014     # this structure. It is paramount though that *no* other type of access is
1015     # done to this structure (eg. no looping over its keys). *_owner helper
1016     # function are defined to guarantee access is correct, but in general never
1017     # do anything different than __owners[threading.currentThread()], or there
1018     # will be trouble.
1019     self.__owners = {}
1020
1021   def _GetLockName(self, mname):
1022     """Returns the name for a member lock.
1023
1024     """
1025     return "%s/%s" % (self.name, mname)
1026
1027   def _get_lock(self):
1028     """Returns the lockset-internal lock.
1029
1030     """
1031     return self.__lock
1032
1033   def _get_lockdict(self):
1034     """Returns the lockset-internal lock dictionary.
1035
1036     Accessing this structure is only safe in single-thread usage or when the
1037     lockset-internal lock is held.
1038
1039     """
1040     return self.__lockdict
1041
1042   def is_owned(self):
1043     """Is the current thread a current level owner?
1044
1045     @note: Use L{check_owned} to check if a specific lock is held
1046
1047     """
1048     return threading.currentThread() in self.__owners
1049
1050   def check_owned(self, names, shared=-1):
1051     """Check if locks are owned in a specific mode.
1052
1053     @type names: sequence or string
1054     @param names: Lock names (or a single lock name)
1055     @param shared: See L{SharedLock.is_owned}
1056     @rtype: bool
1057     @note: Use L{is_owned} to check if the current thread holds I{any} lock and
1058       L{list_owned} to get the names of all owned locks
1059
1060     """
1061     if isinstance(names, basestring):
1062       names = [names]
1063
1064     # Avoid check if no locks are owned anyway
1065     if names and self.is_owned():
1066       candidates = []
1067
1068       # Gather references to all locks (in case they're deleted in the meantime)
1069       for lname in names:
1070         try:
1071           lock = self.__lockdict[lname]
1072         except KeyError:
1073           raise errors.LockError("Non-existing lock '%s' in set '%s' (it may"
1074                                  " have been removed)" % (lname, self.name))
1075         else:
1076           candidates.append(lock)
1077
1078       return compat.all(lock.is_owned(shared=shared) for lock in candidates)
1079     else:
1080       return False
1081
1082   def owning_all(self):
1083     """Checks whether current thread owns internal lock.
1084
1085     Holding the internal lock is equivalent with holding all locks in the set
1086     (the opposite does not necessarily hold as it can not be easily
1087     determined). L{add} and L{remove} require the internal lock.
1088
1089     @rtype: boolean
1090
1091     """
1092     return self.__lock.is_owned()
1093
1094   def _add_owned(self, name=None):
1095     """Note the current thread owns the given lock"""
1096     if name is None:
1097       if not self.is_owned():
1098         self.__owners[threading.currentThread()] = set()
1099     else:
1100       if self.is_owned():
1101         self.__owners[threading.currentThread()].add(name)
1102       else:
1103         self.__owners[threading.currentThread()] = set([name])
1104
1105   def _del_owned(self, name=None):
1106     """Note the current thread owns the given lock"""
1107
1108     assert not (name is None and self.__lock.is_owned()), \
1109            "Cannot hold internal lock when deleting owner status"
1110
1111     if name is not None:
1112       self.__owners[threading.currentThread()].remove(name)
1113
1114     # Only remove the key if we don't hold the set-lock as well
1115     if not (self.__lock.is_owned() or
1116             self.__owners[threading.currentThread()]):
1117       del self.__owners[threading.currentThread()]
1118
1119   def list_owned(self):
1120     """Get the set of resource names owned by the current thread"""
1121     if self.is_owned():
1122       return self.__owners[threading.currentThread()].copy()
1123     else:
1124       return set()
1125
1126   def _release_and_delete_owned(self):
1127     """Release and delete all resources owned by the current thread"""
1128     for lname in self.list_owned():
1129       lock = self.__lockdict[lname]
1130       if lock.is_owned():
1131         lock.release()
1132       self._del_owned(name=lname)
1133
1134   def __names(self):
1135     """Return the current set of names.
1136
1137     Only call this function while holding __lock and don't iterate on the
1138     result after releasing the lock.
1139
1140     """
1141     return self.__lockdict.keys()
1142
1143   def _names(self):
1144     """Return a copy of the current set of elements.
1145
1146     Used only for debugging purposes.
1147
1148     """
1149     # If we don't already own the set-level lock acquired
1150     # we'll get it and note we need to release it later.
1151     release_lock = False
1152     if not self.__lock.is_owned():
1153       release_lock = True
1154       self.__lock.acquire(shared=1)
1155     try:
1156       result = self.__names()
1157     finally:
1158       if release_lock:
1159         self.__lock.release()
1160     return set(result)
1161
1162   def acquire(self, names, timeout=None, shared=0, priority=None,
1163               opportunistic=False, test_notify=None):
1164     """Acquire a set of resource locks.
1165
1166     @note: When acquiring locks opportunistically, any number of locks might
1167       actually be acquired, even zero.
1168
1169     @type names: list of strings (or string)
1170     @param names: the names of the locks which shall be acquired
1171         (special lock names, or instance/node names)
1172     @type shared: integer (0/1) used as a boolean
1173     @param shared: whether to acquire in shared mode; by default an
1174         exclusive lock will be acquired
1175     @type timeout: float or None
1176     @param timeout: Maximum time to acquire all locks; for opportunistic
1177       acquisitions, a timeout can only be given when C{names} is C{None}, in
1178       which case it is exclusively used for acquiring the L{LockSet}-internal
1179       lock; opportunistic acquisitions don't use a timeout for acquiring
1180       individual locks
1181     @type priority: integer
1182     @param priority: Priority for acquiring locks
1183     @type opportunistic: boolean
1184     @param opportunistic: Acquire locks opportunistically; use the return value
1185       to determine which locks were actually acquired
1186     @type test_notify: callable or None
1187     @param test_notify: Special callback function for unittesting
1188
1189     @return: Set of all locks successfully acquired or None in case of timeout
1190
1191     @raise errors.LockError: when any lock we try to acquire has
1192         been deleted before we succeed. In this case none of the
1193         locks requested will be acquired.
1194
1195     """
1196     assert timeout is None or timeout >= 0.0
1197
1198     # Check we don't already own locks at this level
1199     assert not self.is_owned(), ("Cannot acquire locks in the same set twice"
1200                                  " (lockset %s)" % self.name)
1201
1202     if priority is None:
1203       priority = _DEFAULT_PRIORITY
1204
1205     try:
1206       if names is not None:
1207         assert timeout is None or not opportunistic, \
1208           ("Opportunistic acquisitions can only use a timeout if no"
1209            " names are given; see docstring for details")
1210
1211         # Support passing in a single resource to acquire rather than many
1212         if isinstance(names, basestring):
1213           names = [names]
1214
1215         (mode, _, timeout_fn) = \
1216           _GetLsAcquireModeAndTimeouts(False, timeout, opportunistic)
1217
1218         return self.__acquire_inner(names, mode, shared, priority,
1219                                     timeout_fn, test_notify)
1220
1221       else:
1222         (mode, ls_timeout_fn, timeout_fn) = \
1223           _GetLsAcquireModeAndTimeouts(True, timeout, opportunistic)
1224
1225         # If no names are given acquire the whole set by not letting new names
1226         # being added before we release, and getting the current list of names.
1227         # Some of them may then be deleted later, but we'll cope with this.
1228         #
1229         # We'd like to acquire this lock in a shared way, as it's nice if
1230         # everybody else can use the instances at the same time. If we are
1231         # acquiring them exclusively though they won't be able to do this
1232         # anyway, though, so we'll get the list lock exclusively as well in
1233         # order to be able to do add() on the set while owning it.
1234         if not self.__lock.acquire(shared=shared, priority=priority,
1235                                    timeout=ls_timeout_fn()):
1236           raise _AcquireTimeout()
1237
1238         try:
1239           # note we own the set-lock
1240           self._add_owned()
1241
1242           return self.__acquire_inner(self.__names(), mode, shared,
1243                                       priority, timeout_fn, test_notify)
1244         except:
1245           # We shouldn't have problems adding the lock to the owners list, but
1246           # if we did we'll try to release this lock and re-raise exception.
1247           # Of course something is going to be really wrong, after this.
1248           self.__lock.release()
1249           self._del_owned()
1250           raise
1251
1252     except _AcquireTimeout:
1253       return None
1254
1255   def __acquire_inner(self, names, mode, shared, priority,
1256                       timeout_fn, test_notify):
1257     """Inner logic for acquiring a number of locks.
1258
1259     Acquisition modes:
1260
1261       - C{_LS_ACQUIRE_ALL}: C{names} contains names of all locks in set, but
1262         deleted locks can be ignored as the whole set is being acquired with
1263         its internal lock held
1264       - C{_LS_ACQUIRE_EXACT}: The names listed in C{names} must be acquired;
1265         timeouts and deleted locks are fatal
1266       - C{_LS_ACQUIRE_OPPORTUNISTIC}: C{names} lists names of locks (potentially
1267         all within the set) which should be acquired opportunistically, that is
1268         failures are ignored
1269
1270     @param names: Names of the locks to be acquired
1271     @param mode: Lock acquisition mode (one of L{_LS_ACQUIRE_MODES})
1272     @param shared: Whether to acquire in shared mode
1273     @param timeout_fn: Function returning remaining timeout (C{None} for
1274       opportunistic acquisitions)
1275     @param priority: Priority for acquiring locks
1276     @param test_notify: Special callback function for unittesting
1277
1278     """
1279     assert mode in _LS_ACQUIRE_MODES
1280
1281     acquire_list = []
1282
1283     # First we look the locks up on __lockdict. We have no way of being sure
1284     # they will still be there after, but this makes it a lot faster should
1285     # just one of them be the already wrong. Using a sorted sequence to prevent
1286     # deadlocks.
1287     for lname in sorted(frozenset(names)):
1288       try:
1289         lock = self.__lockdict[lname] # raises KeyError if lock is not there
1290       except KeyError:
1291         # We are acquiring the whole set, it doesn't matter if this particular
1292         # element is not there anymore. If, however, only certain names should
1293         # be acquired, not finding a lock is an error.
1294         if mode == _LS_ACQUIRE_EXACT:
1295           raise errors.LockError("Lock '%s' not found in set '%s' (it may have"
1296                                  " been removed)" % (lname, self.name))
1297       else:
1298         acquire_list.append((lname, lock))
1299
1300     # This will hold the locknames we effectively acquired.
1301     acquired = set()
1302
1303     try:
1304       # Now acquire_list contains a sorted list of resources and locks we
1305       # want.  In order to get them we loop on this (private) list and
1306       # acquire() them.  We gave no real guarantee they will still exist till
1307       # this is done but .acquire() itself is safe and will alert us if the
1308       # lock gets deleted.
1309       for (lname, lock) in acquire_list:
1310         if __debug__ and callable(test_notify):
1311           test_notify_fn = lambda: test_notify(lname)
1312         else:
1313           test_notify_fn = None
1314
1315         timeout = timeout_fn()
1316
1317         try:
1318           # raises LockError if the lock was deleted
1319           acq_success = lock.acquire(shared=shared, timeout=timeout,
1320                                      priority=priority,
1321                                      test_notify=test_notify_fn)
1322         except errors.LockError:
1323           if mode in (_LS_ACQUIRE_ALL, _LS_ACQUIRE_OPPORTUNISTIC):
1324             # We are acquiring the whole set, it doesn't matter if this
1325             # particular element is not there anymore.
1326             continue
1327
1328           raise errors.LockError("Lock '%s' not found in set '%s' (it may have"
1329                                  " been removed)" % (lname, self.name))
1330
1331         if not acq_success:
1332           # Couldn't get lock or timeout occurred
1333           if mode == _LS_ACQUIRE_OPPORTUNISTIC:
1334             # Ignore timeouts on opportunistic acquisitions
1335             continue
1336
1337           if timeout is None:
1338             # This shouldn't happen as SharedLock.acquire(timeout=None) is
1339             # blocking.
1340             raise errors.LockError("Failed to get lock %s (set %s)" %
1341                                    (lname, self.name))
1342
1343           raise _AcquireTimeout()
1344
1345         try:
1346           # now the lock cannot be deleted, we have it!
1347           self._add_owned(name=lname)
1348           acquired.add(lname)
1349
1350         except:
1351           # We shouldn't have problems adding the lock to the owners list, but
1352           # if we did we'll try to release this lock and re-raise exception.
1353           # Of course something is going to be really wrong after this.
1354           if lock.is_owned():
1355             lock.release()
1356           raise
1357
1358     except:
1359       # Release all owned locks
1360       self._release_and_delete_owned()
1361       raise
1362
1363     return acquired
1364
1365   def downgrade(self, names=None):
1366     """Downgrade a set of resource locks from exclusive to shared mode.
1367
1368     The locks must have been acquired in exclusive mode.
1369
1370     """
1371     assert self.is_owned(), ("downgrade on lockset %s while not owning any"
1372                              " lock" % self.name)
1373
1374     # Support passing in a single resource to downgrade rather than many
1375     if isinstance(names, basestring):
1376       names = [names]
1377
1378     owned = self.list_owned()
1379
1380     if names is None:
1381       names = owned
1382     else:
1383       names = set(names)
1384       assert owned.issuperset(names), \
1385         ("downgrade() on unheld resources %s (set %s)" %
1386          (names.difference(owned), self.name))
1387
1388     for lockname in names:
1389       self.__lockdict[lockname].downgrade()
1390
1391     # Do we own the lockset in exclusive mode?
1392     if self.__lock.is_owned(shared=0):
1393       # Have all locks been downgraded?
1394       if not compat.any(lock.is_owned(shared=0)
1395                         for lock in self.__lockdict.values()):
1396         self.__lock.downgrade()
1397         assert self.__lock.is_owned(shared=1)
1398
1399     return True
1400
1401   def release(self, names=None):
1402     """Release a set of resource locks, at the same level.
1403
1404     You must have acquired the locks, either in shared or in exclusive mode,
1405     before releasing them.
1406
1407     @type names: list of strings, or None
1408     @param names: the names of the locks which shall be released
1409         (defaults to all the locks acquired at that level).
1410
1411     """
1412     assert self.is_owned(), ("release() on lock set %s while not owner" %
1413                              self.name)
1414
1415     # Support passing in a single resource to release rather than many
1416     if isinstance(names, basestring):
1417       names = [names]
1418
1419     if names is None:
1420       names = self.list_owned()
1421     else:
1422       names = set(names)
1423       assert self.list_owned().issuperset(names), (
1424                "release() on unheld resources %s (set %s)" %
1425                (names.difference(self.list_owned()), self.name))
1426
1427     # First of all let's release the "all elements" lock, if set.
1428     # After this 'add' can work again
1429     if self.__lock.is_owned():
1430       self.__lock.release()
1431       self._del_owned()
1432
1433     for lockname in names:
1434       # If we are sure the lock doesn't leave __lockdict without being
1435       # exclusively held we can do this...
1436       self.__lockdict[lockname].release()
1437       self._del_owned(name=lockname)
1438
1439   def add(self, names, acquired=0, shared=0):
1440     """Add a new set of elements to the set
1441
1442     @type names: list of strings
1443     @param names: names of the new elements to add
1444     @type acquired: integer (0/1) used as a boolean
1445     @param acquired: pre-acquire the new resource?
1446     @type shared: integer (0/1) used as a boolean
1447     @param shared: is the pre-acquisition shared?
1448
1449     """
1450     # Check we don't already own locks at this level
1451     assert not self.is_owned() or self.__lock.is_owned(shared=0), \
1452       ("Cannot add locks if the set %s is only partially owned, or shared" %
1453        self.name)
1454
1455     # Support passing in a single resource to add rather than many
1456     if isinstance(names, basestring):
1457       names = [names]
1458
1459     # If we don't already own the set-level lock acquired in an exclusive way
1460     # we'll get it and note we need to release it later.
1461     release_lock = False
1462     if not self.__lock.is_owned():
1463       release_lock = True
1464       self.__lock.acquire()
1465
1466     try:
1467       invalid_names = set(self.__names()).intersection(names)
1468       if invalid_names:
1469         # This must be an explicit raise, not an assert, because assert is
1470         # turned off when using optimization, and this can happen because of
1471         # concurrency even if the user doesn't want it.
1472         raise errors.LockError("duplicate add(%s) on lockset %s" %
1473                                (invalid_names, self.name))
1474
1475       for lockname in names:
1476         lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1477
1478         if acquired:
1479           # No need for priority or timeout here as this lock has just been
1480           # created
1481           lock.acquire(shared=shared)
1482           # now the lock cannot be deleted, we have it!
1483           try:
1484             self._add_owned(name=lockname)
1485           except:
1486             # We shouldn't have problems adding the lock to the owners list,
1487             # but if we did we'll try to release this lock and re-raise
1488             # exception.  Of course something is going to be really wrong,
1489             # after this.  On the other hand the lock hasn't been added to the
1490             # __lockdict yet so no other threads should be pending on it. This
1491             # release is just a safety measure.
1492             lock.release()
1493             raise
1494
1495         self.__lockdict[lockname] = lock
1496
1497     finally:
1498       # Only release __lock if we were not holding it previously.
1499       if release_lock:
1500         self.__lock.release()
1501
1502     return True
1503
1504   def remove(self, names):
1505     """Remove elements from the lock set.
1506
1507     You can either not hold anything in the lockset or already hold a superset
1508     of the elements you want to delete, exclusively.
1509
1510     @type names: list of strings
1511     @param names: names of the resource to remove.
1512
1513     @return: a list of locks which we removed; the list is always
1514         equal to the names list if we were holding all the locks
1515         exclusively
1516
1517     """
1518     # Support passing in a single resource to remove rather than many
1519     if isinstance(names, basestring):
1520       names = [names]
1521
1522     # If we own any subset of this lock it must be a superset of what we want
1523     # to delete. The ownership must also be exclusive, but that will be checked
1524     # by the lock itself.
1525     assert not self.is_owned() or self.list_owned().issuperset(names), (
1526       "remove() on acquired lockset %s while not owning all elements" %
1527       self.name)
1528
1529     removed = []
1530
1531     for lname in names:
1532       # Calling delete() acquires the lock exclusively if we don't already own
1533       # it, and causes all pending and subsequent lock acquires to fail. It's
1534       # fine to call it out of order because delete() also implies release(),
1535       # and the assertion above guarantees that if we either already hold
1536       # everything we want to delete, or we hold none.
1537       try:
1538         self.__lockdict[lname].delete()
1539         removed.append(lname)
1540       except (KeyError, errors.LockError):
1541         # This cannot happen if we were already holding it, verify:
1542         assert not self.is_owned(), ("remove failed while holding lockset %s" %
1543                                      self.name)
1544       else:
1545         # If no LockError was raised we are the ones who deleted the lock.
1546         # This means we can safely remove it from lockdict, as any further or
1547         # pending delete() or acquire() will fail (and nobody can have the lock
1548         # since before our call to delete()).
1549         #
1550         # This is done in an else clause because if the exception was thrown
1551         # it's the job of the one who actually deleted it.
1552         del self.__lockdict[lname]
1553         # And let's remove it from our private list if we owned it.
1554         if self.is_owned():
1555           self._del_owned(name=lname)
1556
1557     return removed
1558
1559
1560 # Locking levels, must be acquired in increasing order. Current rules are:
1561 # - At level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1562 #   acquired before performing any operation, either in shared or exclusive
1563 #   mode. Acquiring the BGL in exclusive mode is discouraged and should be
1564 #   avoided..
1565 # - At levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks. If
1566 #   you need more than one node, or more than one instance, acquire them at the
1567 #   same time.
1568 # - LEVEL_NODE_RES is for node resources and should be used by operations with
1569 #   possibly high impact on the node's disks.
1570 # - LEVEL_NODE_ALLOC blocks instance allocations for the whole cluster
1571 #   ("NAL" is the only lock at this level). It should be acquired in shared
1572 #   mode when an opcode blocks all or a significant amount of a cluster's
1573 #   locks. Opcodes doing instance allocations should acquire in exclusive mode.
1574 #   Once the set of acquired locks for an opcode has been reduced to the working
1575 #   set, the NAL should be released as well to allow allocations to proceed.
1576 (LEVEL_CLUSTER,
1577  LEVEL_INSTANCE,
1578  LEVEL_NODE_ALLOC,
1579  LEVEL_NODEGROUP,
1580  LEVEL_NODE,
1581  LEVEL_NODE_RES,
1582  LEVEL_NETWORK) = range(0, 7)
1583
1584 LEVELS = [
1585   LEVEL_CLUSTER,
1586   LEVEL_INSTANCE,
1587   LEVEL_NODE_ALLOC,
1588   LEVEL_NODEGROUP,
1589   LEVEL_NODE,
1590   LEVEL_NODE_RES,
1591   LEVEL_NETWORK,
1592   ]
1593
1594 # Lock levels which are modifiable
1595 LEVELS_MOD = compat.UniqueFrozenset([
1596   LEVEL_NODE_RES,
1597   LEVEL_NODE,
1598   LEVEL_NODEGROUP,
1599   LEVEL_INSTANCE,
1600   LEVEL_NETWORK,
1601   ])
1602
1603 #: Lock level names (make sure to use singular form)
1604 LEVEL_NAMES = {
1605   LEVEL_CLUSTER: "cluster",
1606   LEVEL_INSTANCE: "instance",
1607   LEVEL_NODE_ALLOC: "node-alloc",
1608   LEVEL_NODEGROUP: "nodegroup",
1609   LEVEL_NODE: "node",
1610   LEVEL_NODE_RES: "node-res",
1611   LEVEL_NETWORK: "network",
1612   }
1613
1614 # Constant for the big ganeti lock
1615 BGL = "BGL"
1616
1617 #: Node allocation lock
1618 NAL = "NAL"
1619
1620
1621 class GanetiLockManager:
1622   """The Ganeti Locking Library
1623
1624   The purpose of this small library is to manage locking for ganeti clusters
1625   in a central place, while at the same time doing dynamic checks against
1626   possible deadlocks. It will also make it easier to transition to a different
1627   lock type should we migrate away from python threads.
1628
1629   """
1630   _instance = None
1631
1632   def __init__(self, node_uuids, nodegroups, instance_names, networks):
1633     """Constructs a new GanetiLockManager object.
1634
1635     There should be only a GanetiLockManager object at any time, so this
1636     function raises an error if this is not the case.
1637
1638     @param node_uuids: list of node UUIDs
1639     @param nodegroups: list of nodegroup uuids
1640     @param instance_names: list of instance names
1641
1642     """
1643     assert self.__class__._instance is None, \
1644            "double GanetiLockManager instance"
1645
1646     self.__class__._instance = self
1647
1648     self._monitor = LockMonitor()
1649
1650     # The keyring contains all the locks, at their level and in the correct
1651     # locking order.
1652     self.__keyring = {
1653       LEVEL_CLUSTER: LockSet([BGL], "cluster", monitor=self._monitor),
1654       LEVEL_NODE: LockSet(node_uuids, "node", monitor=self._monitor),
1655       LEVEL_NODE_RES: LockSet(node_uuids, "node-res", monitor=self._monitor),
1656       LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroup", monitor=self._monitor),
1657       LEVEL_INSTANCE: LockSet(instance_names, "instance",
1658                               monitor=self._monitor),
1659       LEVEL_NETWORK: LockSet(networks, "network", monitor=self._monitor),
1660       LEVEL_NODE_ALLOC: LockSet([NAL], "node-alloc", monitor=self._monitor),
1661       }
1662
1663     assert compat.all(ls.name == LEVEL_NAMES[level]
1664                       for (level, ls) in self.__keyring.items()), \
1665       "Keyring name mismatch"
1666
1667   def AddToLockMonitor(self, provider):
1668     """Registers a new lock with the monitor.
1669
1670     See L{LockMonitor.RegisterLock}.
1671
1672     """
1673     return self._monitor.RegisterLock(provider)
1674
1675   def QueryLocks(self, fields):
1676     """Queries information from all locks.
1677
1678     See L{LockMonitor.QueryLocks}.
1679
1680     """
1681     return self._monitor.QueryLocks(fields)
1682
1683   def _names(self, level):
1684     """List the lock names at the given level.
1685
1686     This can be used for debugging/testing purposes.
1687
1688     @param level: the level whose list of locks to get
1689
1690     """
1691     assert level in LEVELS, "Invalid locking level %s" % level
1692     return self.__keyring[level]._names()
1693
1694   def is_owned(self, level):
1695     """Check whether we are owning locks at the given level
1696
1697     """
1698     return self.__keyring[level].is_owned()
1699
1700   def list_owned(self, level):
1701     """Get the set of owned locks at the given level
1702
1703     """
1704     return self.__keyring[level].list_owned()
1705
1706   def check_owned(self, level, names, shared=-1):
1707     """Check if locks at a certain level are owned in a specific mode.
1708
1709     @see: L{LockSet.check_owned}
1710
1711     """
1712     return self.__keyring[level].check_owned(names, shared=shared)
1713
1714   def owning_all(self, level):
1715     """Checks whether current thread owns all locks at a certain level.
1716
1717     @see: L{LockSet.owning_all}
1718
1719     """
1720     return self.__keyring[level].owning_all()
1721
1722   def _upper_owned(self, level):
1723     """Check that we don't own any lock at a level greater than the given one.
1724
1725     """
1726     # This way of checking only works if LEVELS[i] = i, which we check for in
1727     # the test cases.
1728     return compat.any((self.is_owned(l) for l in LEVELS[level + 1:]))
1729
1730   def _BGL_owned(self): # pylint: disable=C0103
1731     """Check if the current thread owns the BGL.
1732
1733     Both an exclusive or a shared acquisition work.
1734
1735     """
1736     return BGL in self.__keyring[LEVEL_CLUSTER].list_owned()
1737
1738   @staticmethod
1739   def _contains_BGL(level, names): # pylint: disable=C0103
1740     """Check if the level contains the BGL.
1741
1742     Check if acting on the given level and set of names will change
1743     the status of the Big Ganeti Lock.
1744
1745     """
1746     return level == LEVEL_CLUSTER and (names is None or BGL in names)
1747
1748   def acquire(self, level, names, timeout=None, shared=0, priority=None,
1749               opportunistic=False):
1750     """Acquire a set of resource locks, at the same level.
1751
1752     @type level: member of locking.LEVELS
1753     @param level: the level at which the locks shall be acquired
1754     @type names: list of strings (or string)
1755     @param names: the names of the locks which shall be acquired
1756         (special lock names, or instance/node names)
1757     @type shared: integer (0/1) used as a boolean
1758     @param shared: whether to acquire in shared mode; by default
1759         an exclusive lock will be acquired
1760     @type timeout: float
1761     @param timeout: Maximum time to acquire all locks
1762     @type priority: integer
1763     @param priority: Priority for acquiring lock
1764     @type opportunistic: boolean
1765     @param opportunistic: Acquire locks opportunistically; use the return value
1766       to determine which locks were actually acquired
1767
1768     """
1769     assert level in LEVELS, "Invalid locking level %s" % level
1770
1771     # Check that we are either acquiring the Big Ganeti Lock or we already own
1772     # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1773     # so even if we've migrated we need to at least share the BGL to be
1774     # compatible with them. Of course if we own the BGL exclusively there's no
1775     # point in acquiring any other lock, unless perhaps we are half way through
1776     # the migration of the current opcode.
1777     assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1778       "You must own the Big Ganeti Lock before acquiring any other")
1779
1780     # Check we don't own locks at the same or upper levels.
1781     assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1782                                           " while owning some at a greater one")
1783
1784     # Acquire the locks in the set.
1785     return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1786                                          priority=priority,
1787                                          opportunistic=opportunistic)
1788
1789   def downgrade(self, level, names=None):
1790     """Downgrade a set of resource locks from exclusive to shared mode.
1791
1792     You must have acquired the locks in exclusive mode.
1793
1794     @type level: member of locking.LEVELS
1795     @param level: the level at which the locks shall be downgraded
1796     @type names: list of strings, or None
1797     @param names: the names of the locks which shall be downgraded
1798         (defaults to all the locks acquired at the level)
1799
1800     """
1801     assert level in LEVELS, "Invalid locking level %s" % level
1802
1803     return self.__keyring[level].downgrade(names=names)
1804
1805   def release(self, level, names=None):
1806     """Release a set of resource locks, at the same level.
1807
1808     You must have acquired the locks, either in shared or in exclusive
1809     mode, before releasing them.
1810
1811     @type level: member of locking.LEVELS
1812     @param level: the level at which the locks shall be released
1813     @type names: list of strings, or None
1814     @param names: the names of the locks which shall be released
1815         (defaults to all the locks acquired at that level)
1816
1817     """
1818     assert level in LEVELS, "Invalid locking level %s" % level
1819     assert (not self._contains_BGL(level, names) or
1820             not self._upper_owned(LEVEL_CLUSTER)), (
1821               "Cannot release the Big Ganeti Lock while holding something"
1822               " at upper levels (%r)" %
1823               (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self.list_owned(i))
1824                                 for i in self.__keyring.keys()]), ))
1825
1826     # Release will complain if we don't own the locks already
1827     return self.__keyring[level].release(names)
1828
1829   def add(self, level, names, acquired=0, shared=0):
1830     """Add locks at the specified level.
1831
1832     @type level: member of locking.LEVELS_MOD
1833     @param level: the level at which the locks shall be added
1834     @type names: list of strings
1835     @param names: names of the locks to acquire
1836     @type acquired: integer (0/1) used as a boolean
1837     @param acquired: whether to acquire the newly added locks
1838     @type shared: integer (0/1) used as a boolean
1839     @param shared: whether the acquisition will be shared
1840
1841     """
1842     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1843     assert self._BGL_owned(), ("You must own the BGL before performing other"
1844                                " operations")
1845     assert not self._upper_owned(level), ("Cannot add locks at a level"
1846                                           " while owning some at a greater one")
1847     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1848
1849   def remove(self, level, names):
1850     """Remove locks from the specified level.
1851
1852     You must either already own the locks you are trying to remove
1853     exclusively or not own any lock at an upper level.
1854
1855     @type level: member of locking.LEVELS_MOD
1856     @param level: the level at which the locks shall be removed
1857     @type names: list of strings
1858     @param names: the names of the locks which shall be removed
1859         (special lock names, or instance/node names)
1860
1861     """
1862     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1863     assert self._BGL_owned(), ("You must own the BGL before performing other"
1864                                " operations")
1865     # Check we either own the level or don't own anything from here
1866     # up. LockSet.remove() will check the case in which we don't own
1867     # all the needed resources, or we have a shared ownership.
1868     assert self.is_owned(level) or not self._upper_owned(level), (
1869            "Cannot remove locks at a level while not owning it or"
1870            " owning some at a greater one")
1871     return self.__keyring[level].remove(names)
1872
1873
1874 def _MonitorSortKey((item, idx, num)):
1875   """Sorting key function.
1876
1877   Sort by name, registration order and then order of information. This provides
1878   a stable sort order over different providers, even if they return the same
1879   name.
1880
1881   """
1882   (name, _, _, _) = item
1883
1884   return (utils.NiceSortKey(name), num, idx)
1885
1886
1887 class LockMonitor(object):
1888   _LOCK_ATTR = "_lock"
1889
1890   def __init__(self):
1891     """Initializes this class.
1892
1893     """
1894     self._lock = SharedLock("LockMonitor")
1895
1896     # Counter for stable sorting
1897     self._counter = itertools.count(0)
1898
1899     # Tracked locks. Weak references are used to avoid issues with circular
1900     # references and deletion.
1901     self._locks = weakref.WeakKeyDictionary()
1902
1903   @ssynchronized(_LOCK_ATTR)
1904   def RegisterLock(self, provider):
1905     """Registers a new lock.
1906
1907     @param provider: Object with a callable method named C{GetLockInfo}, taking
1908       a single C{set} containing the requested information items
1909     @note: It would be nicer to only receive the function generating the
1910       requested information but, as it turns out, weak references to bound
1911       methods (e.g. C{self.GetLockInfo}) are tricky; there are several
1912       workarounds, but none of the ones I found works properly in combination
1913       with a standard C{WeakKeyDictionary}
1914
1915     """
1916     assert provider not in self._locks, "Duplicate registration"
1917
1918     # There used to be a check for duplicate names here. As it turned out, when
1919     # a lock is re-created with the same name in a very short timeframe, the
1920     # previous instance might not yet be removed from the weakref dictionary.
1921     # By keeping track of the order of incoming registrations, a stable sort
1922     # ordering can still be guaranteed.
1923
1924     self._locks[provider] = self._counter.next()
1925
1926   def _GetLockInfo(self, requested):
1927     """Get information from all locks.
1928
1929     """
1930     # Must hold lock while getting consistent list of tracked items
1931     self._lock.acquire(shared=1)
1932     try:
1933       items = self._locks.items()
1934     finally:
1935       self._lock.release()
1936
1937     return [(info, idx, num)
1938             for (provider, num) in items
1939             for (idx, info) in enumerate(provider.GetLockInfo(requested))]
1940
1941   def _Query(self, fields):
1942     """Queries information from all locks.
1943
1944     @type fields: list of strings
1945     @param fields: List of fields to return
1946
1947     """
1948     qobj = query.Query(query.LOCK_FIELDS, fields)
1949
1950     # Get all data with internal lock held and then sort by name and incoming
1951     # order
1952     lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
1953                       key=_MonitorSortKey)
1954
1955     # Extract lock information and build query data
1956     return (qobj, query.LockQueryData(map(compat.fst, lockinfo)))
1957
1958   def QueryLocks(self, fields):
1959     """Queries information from all locks.
1960
1961     @type fields: list of strings
1962     @param fields: List of fields to return
1963
1964     """
1965     (qobj, ctx) = self._Query(fields)
1966
1967     # Prepare query response
1968     return query.GetQueryResponse(qobj, ctx)