4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Module implementing the Ganeti locking code."""
23 # pylint: disable-msg=W0212
25 # W0212 since e.g. LockSet methods use (a lot) the internals of
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import compat
42 _EXCLUSIVE_TEXT = "exclusive"
43 _SHARED_TEXT = "shared"
48 def ssynchronized(mylock, shared=0):
49 """Shared Synchronization decorator.
51 Calls the function holding the given lock, either in exclusive or shared
52 mode. It requires the passed lock to be a SharedLock (or support its
55 @type mylock: lockable object or string
56 @param mylock: lock to acquire or class member name of the lock to acquire
60 def sync_function(*args, **kwargs):
61 if isinstance(mylock, basestring):
62 assert args, "cannot ssynchronize on non-class method: self not found"
64 lock = getattr(args[0], mylock)
67 lock.acquire(shared=shared)
69 return fn(*args, **kwargs)
76 class RunningTimeout(object):
77 """Class to calculate remaining timeout when doing several operations.
87 def __init__(self, timeout, allow_negative, _time_fn=time.time):
88 """Initializes this class.
91 @param timeout: Timeout duration
92 @type allow_negative: bool
93 @param allow_negative: Whether to return values below zero
94 @param _time_fn: Time function for unittests
99 if timeout is not None and timeout < 0.0:
100 raise ValueError("Timeout must not be negative")
102 self._timeout = timeout
103 self._allow_negative = allow_negative
104 self._time_fn = _time_fn
106 self._start_time = None
109 """Returns the remaining timeout.
112 if self._timeout is None:
115 # Get start time on first calculation
116 if self._start_time is None:
117 self._start_time = self._time_fn()
119 # Calculate remaining time
120 remaining_timeout = self._start_time + self._timeout - self._time_fn()
122 if not self._allow_negative:
123 # Ensure timeout is always >= 0
124 return max(0.0, remaining_timeout)
126 return remaining_timeout
129 class _SingleNotifyPipeConditionWaiter(object):
130 """Helper class for SingleNotifyPipeCondition
138 def __init__(self, poller, fd):
139 """Constructor for _SingleNotifyPipeConditionWaiter
141 @type poller: select.poll
142 @param poller: Poller object
144 @param fd: File descriptor to wait for
147 object.__init__(self)
148 self._poller = poller
151 def __call__(self, timeout):
152 """Wait for something to happen on the pipe.
154 @type timeout: float or None
155 @param timeout: Timeout for waiting (can be None)
158 running_timeout = RunningTimeout(timeout, True)
161 remaining_time = running_timeout.Remaining()
163 if remaining_time is not None:
164 if remaining_time < 0.0:
167 # Our calculation uses seconds, poll() wants milliseconds
168 remaining_time *= 1000
171 result = self._poller.poll(remaining_time)
172 except EnvironmentError, err:
173 if err.errno != errno.EINTR:
177 # Check whether we were notified
178 if result and result[0][0] == self._fd:
182 class _BaseCondition(object):
183 """Base class containing common code for conditions.
185 Some of this code is taken from python's threading module.
197 def __init__(self, lock):
198 """Constructor for _BaseCondition.
200 @type lock: threading.Lock
201 @param lock: condition base lock
204 object.__init__(self)
207 self._release_save = lock._release_save
208 except AttributeError:
209 self._release_save = self._base_release_save
211 self._acquire_restore = lock._acquire_restore
212 except AttributeError:
213 self._acquire_restore = self._base_acquire_restore
215 self._is_owned = lock._is_owned
216 except AttributeError:
217 self._is_owned = self._base_is_owned
221 # Export the lock's acquire() and release() methods
222 self.acquire = lock.acquire
223 self.release = lock.release
225 def _base_is_owned(self):
226 """Check whether lock is owned by current thread.
229 if self._lock.acquire(0):
234 def _base_release_save(self):
237 def _base_acquire_restore(self, _):
240 def _check_owned(self):
241 """Raise an exception if the current thread doesn't own the lock.
244 if not self._is_owned():
245 raise RuntimeError("cannot work with un-aquired lock")
248 class SingleNotifyPipeCondition(_BaseCondition):
249 """Condition which can only be notified once.
251 This condition class uses pipes and poll, internally, to be able to wait for
252 notification with a timeout, without resorting to polling. It is almost
253 compatible with Python's threading.Condition, with the following differences:
254 - notifyAll can only be called once, and no wait can happen after that
255 - notify is not supported, only notifyAll
267 _waiter_class = _SingleNotifyPipeConditionWaiter
269 def __init__(self, lock):
270 """Constructor for SingleNotifyPipeCondition
273 _BaseCondition.__init__(self, lock)
275 self._notified = False
277 self._write_fd = None
280 def _check_unnotified(self):
281 """Throws an exception if already notified.
285 raise RuntimeError("cannot use already notified condition")
288 """Cleanup open file descriptors, if any.
291 if self._read_fd is not None:
292 os.close(self._read_fd)
295 if self._write_fd is not None:
296 os.close(self._write_fd)
297 self._write_fd = None
300 def wait(self, timeout=None):
301 """Wait for a notification.
303 @type timeout: float or None
304 @param timeout: Waiting timeout (can be None)
308 self._check_unnotified()
312 if self._poller is None:
313 (self._read_fd, self._write_fd) = os.pipe()
314 self._poller = select.poll()
315 self._poller.register(self._read_fd, select.POLLHUP)
317 wait_fn = self._waiter_class(self._poller, self._read_fd)
318 state = self._release_save()
320 # Wait for notification
324 self._acquire_restore(state)
327 if self._nwaiters == 0:
330 def notifyAll(self): # pylint: disable-msg=C0103
331 """Close the writing side of the pipe to notify all waiters.
335 self._check_unnotified()
336 self._notified = True
337 if self._write_fd is not None:
338 os.close(self._write_fd)
339 self._write_fd = None
342 class PipeCondition(_BaseCondition):
343 """Group-only non-polling condition with counters.
345 This condition class uses pipes and poll, internally, to be able to wait for
346 notification with a timeout, without resorting to polling. It is almost
347 compatible with Python's threading.Condition, but only supports notifyAll and
348 non-recursive locks. As an additional features it's able to report whether
349 there are any waiting threads.
357 _single_condition_class = SingleNotifyPipeCondition
359 def __init__(self, lock):
360 """Initializes this class.
363 _BaseCondition.__init__(self, lock)
364 self._waiters = set()
365 self._single_condition = self._single_condition_class(self._lock)
367 def wait(self, timeout=None):
368 """Wait for a notification.
370 @type timeout: float or None
371 @param timeout: Waiting timeout (can be None)
376 # Keep local reference to the pipe. It could be replaced by another thread
377 # notifying while we're waiting.
378 cond = self._single_condition
380 self._waiters.add(threading.currentThread())
385 self._waiters.remove(threading.currentThread())
387 def notifyAll(self): # pylint: disable-msg=C0103
388 """Notify all currently waiting threads.
392 self._single_condition.notifyAll()
393 self._single_condition = self._single_condition_class(self._lock)
395 def get_waiting(self):
396 """Returns a list of all waiting threads.
403 def has_waiting(self):
404 """Returns whether there are active waiters.
409 return bool(self._waiters)
412 class _PipeConditionWithMode(PipeCondition):
417 def __init__(self, lock, shared):
418 """Initializes this class.
422 PipeCondition.__init__(self, lock)
425 class SharedLock(object):
426 """Implements a shared lock.
428 Multiple threads can acquire the lock in a shared way by calling
429 C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
430 threads can call C{acquire(shared=0)}.
432 Notes on data structures: C{__pending} contains a priority queue (heapq) of
433 all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
434 ...]}. Each per-priority queue contains a normal in-order list of conditions
435 to be notified when the lock can be acquired. Shared locks are grouped
436 together by priority and the condition for them is stored in
437 C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
438 references for the per-priority queues indexed by priority for faster access.
441 @ivar name: the name of the lock
456 __condition_class = _PipeConditionWithMode
458 def __init__(self, name, monitor=None):
459 """Construct a new SharedLock.
461 @param name: the name of the lock
462 @type monitor: L{LockMonitor}
463 @param monitor: Lock monitor with which to register
466 object.__init__(self)
471 self.__lock = threading.Lock()
473 # Queue containing waiting acquires
475 self.__pending_by_prio = {}
476 self.__pending_shared = {}
478 # Current lock holders
482 # is this lock in the deleted state?
483 self.__deleted = False
485 # Register with lock monitor
487 monitor.RegisterLock(self)
489 def GetInfo(self, fields):
490 """Retrieves information for querying locks.
492 @type fields: list of strings
493 @param fields: List of fields to return
496 self.__lock.acquire()
500 # Note: to avoid unintentional race conditions, no references to
501 # modifiable objects should be returned unless they were created in this
505 info.append(self.name)
506 elif fname == "mode":
508 info.append("deleted")
509 assert not (self.__exc or self.__shr)
511 info.append(_EXCLUSIVE_TEXT)
513 info.append(_SHARED_TEXT)
516 elif fname == "owner":
523 assert not self.__deleted
524 info.append([i.getName() for i in owner])
527 elif fname == "pending":
530 # Sorting instead of copying and using heaq functions for simplicity
531 for (_, prioqueue) in sorted(self.__pending):
532 for cond in prioqueue:
536 mode = _EXCLUSIVE_TEXT
538 # This function should be fast as it runs with the lock held.
539 # Hence not using utils.NiceSort.
540 data.append((mode, sorted(i.getName()
541 for i in cond.get_waiting())))
545 raise errors.OpExecError("Invalid query field '%s'" % fname)
549 self.__lock.release()
551 def __check_deleted(self):
552 """Raises an exception if the lock has been deleted.
556 raise errors.LockError("Deleted lock %s" % self.name)
558 def __is_sharer(self):
559 """Is the current thread sharing the lock at this time?
562 return threading.currentThread() in self.__shr
564 def __is_exclusive(self):
565 """Is the current thread holding the lock exclusively at this time?
568 return threading.currentThread() == self.__exc
570 def __is_owned(self, shared=-1):
571 """Is the current thread somehow owning the lock at this time?
573 This is a private version of the function, which presumes you're holding
578 return self.__is_sharer() or self.__is_exclusive()
580 return self.__is_sharer()
582 return self.__is_exclusive()
584 def _is_owned(self, shared=-1):
585 """Is the current thread somehow owning the lock at this time?
588 - < 0: check for any type of ownership (default)
589 - 0: check for exclusive ownership
590 - > 0: check for shared ownership
593 self.__lock.acquire()
595 return self.__is_owned(shared=shared)
597 self.__lock.release()
599 def _count_pending(self):
600 """Returns the number of pending acquires.
605 self.__lock.acquire()
607 return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
609 self.__lock.release()
611 def _check_empty(self):
612 """Checks whether there are any pending acquires.
617 self.__lock.acquire()
619 # Order is important: __find_first_pending_queue modifies __pending
620 return not (self.__find_first_pending_queue() or
622 self.__pending_by_prio or
623 self.__pending_shared)
625 self.__lock.release()
627 def __do_acquire(self, shared):
628 """Actually acquire the lock.
632 self.__shr.add(threading.currentThread())
634 self.__exc = threading.currentThread()
636 def __can_acquire(self, shared):
637 """Determine whether lock can be acquired.
641 return self.__exc is None
643 return len(self.__shr) == 0 and self.__exc is None
645 def __find_first_pending_queue(self):
646 """Tries to find the topmost queued entry with pending acquires.
648 Removes empty entries while going through the list.
651 while self.__pending:
652 (priority, prioqueue) = self.__pending[0]
655 heapq.heappop(self.__pending)
656 del self.__pending_by_prio[priority]
657 assert priority not in self.__pending_shared
665 def __is_on_top(self, cond):
666 """Checks whether the passed condition is on top of the queue.
668 The caller must make sure the queue isn't empty.
671 return cond == self.__find_first_pending_queue()[0]
673 def __acquire_unlocked(self, shared, timeout, priority):
674 """Acquire a shared lock.
676 @param shared: whether to acquire in shared mode; by default an
677 exclusive lock will be acquired
678 @param timeout: maximum waiting time before giving up
679 @type priority: integer
680 @param priority: Priority for acquiring lock
683 self.__check_deleted()
685 # We cannot acquire the lock if we already have it
686 assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
689 # Remove empty entries from queue
690 self.__find_first_pending_queue()
692 # Check whether someone else holds the lock or there are pending acquires.
693 if not self.__pending and self.__can_acquire(shared):
694 # Apparently not, can acquire lock directly.
695 self.__do_acquire(shared)
698 prioqueue = self.__pending_by_prio.get(priority, None)
701 # Try to re-use condition for shared acquire
702 wait_condition = self.__pending_shared.get(priority, None)
703 assert (wait_condition is None or
704 (wait_condition.shared and wait_condition in prioqueue))
706 wait_condition = None
708 if wait_condition is None:
709 if prioqueue is None:
710 assert priority not in self.__pending_by_prio
713 heapq.heappush(self.__pending, (priority, prioqueue))
714 self.__pending_by_prio[priority] = prioqueue
716 wait_condition = self.__condition_class(self.__lock, shared)
717 prioqueue.append(wait_condition)
720 # Keep reference for further shared acquires on same priority. This is
721 # better than trying to find it in the list of pending acquires.
722 assert priority not in self.__pending_shared
723 self.__pending_shared[priority] = wait_condition
726 # Wait until we become the topmost acquire in the queue or the timeout
728 # TODO: Decrease timeout with spurious notifications
729 while not (self.__is_on_top(wait_condition) and
730 self.__can_acquire(shared)):
731 # Wait for notification
732 wait_condition.wait(timeout)
733 self.__check_deleted()
735 # A lot of code assumes blocking acquires always succeed. Loop
736 # internally for that case.
737 if timeout is not None:
740 if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
741 self.__do_acquire(shared)
744 # Remove condition from queue if there are no more waiters
745 if not wait_condition.has_waiting():
746 prioqueue.remove(wait_condition)
747 if wait_condition.shared:
748 del self.__pending_shared[priority]
752 def acquire(self, shared=0, timeout=None, priority=None,
754 """Acquire a shared lock.
756 @type shared: integer (0/1) used as a boolean
757 @param shared: whether to acquire in shared mode; by default an
758 exclusive lock will be acquired
760 @param timeout: maximum waiting time before giving up
761 @type priority: integer
762 @param priority: Priority for acquiring lock
763 @type test_notify: callable or None
764 @param test_notify: Special callback function for unittesting
768 priority = _DEFAULT_PRIORITY
770 self.__lock.acquire()
772 # We already got the lock, notify now
773 if __debug__ and callable(test_notify):
776 return self.__acquire_unlocked(shared, timeout, priority)
778 self.__lock.release()
781 """Release a Shared Lock.
783 You must have acquired the lock, either in shared or in exclusive mode,
784 before calling this function.
787 self.__lock.acquire()
789 assert self.__is_exclusive() or self.__is_sharer(), \
790 "Cannot release non-owned lock"
792 # Autodetect release type
793 if self.__is_exclusive():
796 self.__shr.remove(threading.currentThread())
798 # Notify topmost condition in queue
799 prioqueue = self.__find_first_pending_queue()
801 prioqueue[0].notifyAll()
804 self.__lock.release()
806 def delete(self, timeout=None, priority=None):
807 """Delete a Shared Lock.
809 This operation will declare the lock for removal. First the lock will be
810 acquired in exclusive mode if you don't already own it, then the lock
811 will be put in a state where any future and pending acquire() fail.
814 @param timeout: maximum waiting time before giving up
815 @type priority: integer
816 @param priority: Priority for acquiring lock
820 priority = _DEFAULT_PRIORITY
822 self.__lock.acquire()
824 assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
826 self.__check_deleted()
828 # The caller is allowed to hold the lock exclusively already.
829 acquired = self.__is_exclusive()
832 acquired = self.__acquire_unlocked(0, timeout, priority)
834 assert self.__is_exclusive() and not self.__is_sharer(), \
835 "Lock wasn't acquired in exclusive mode"
838 self.__deleted = True
841 assert not (self.__exc or self.__shr), "Found owner during deletion"
843 # Notify all acquires. They'll throw an error.
844 for (_, prioqueue) in self.__pending:
845 for cond in prioqueue:
848 assert self.__deleted
852 self.__lock.release()
854 def _release_save(self):
855 shared = self.__is_sharer()
859 def _acquire_restore(self, shared):
860 self.acquire(shared=shared)
863 # Whenever we want to acquire a full LockSet we pass None as the value
864 # to acquire. Hide this behind this nicely named constant.
868 class _AcquireTimeout(Exception):
869 """Internal exception to abort an acquire on a timeout.
875 """Implements a set of locks.
877 This abstraction implements a set of shared locks for the same resource type,
878 distinguished by name. The user can lock a subset of the resources and the
879 LockSet will take care of acquiring the locks always in the same order, thus
882 All the locks needed in the same set must be acquired together, though.
885 @ivar name: the name of the lockset
888 def __init__(self, members, name, monitor=None):
889 """Constructs a new LockSet.
891 @type members: list of strings
892 @param members: initial members of the set
893 @type monitor: L{LockMonitor}
894 @param monitor: Lock monitor with which to register member locks
897 assert members is not None, "members parameter is not a list"
901 self.__monitor = monitor
903 # Used internally to guarantee coherency.
904 self.__lock = SharedLock(name)
906 # The lockdict indexes the relationship name -> lock
907 # The order-of-locking is implied by the alphabetical order of names
910 for mname in members:
911 self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
914 # The owner dict contains the set of locks each thread owns. For
915 # performance each thread can access its own key without a global lock on
916 # this structure. It is paramount though that *no* other type of access is
917 # done to this structure (eg. no looping over its keys). *_owner helper
918 # function are defined to guarantee access is correct, but in general never
919 # do anything different than __owners[threading.currentThread()], or there
923 def _GetLockName(self, mname):
924 """Returns the name for a member lock.
927 return "%s/%s" % (self.name, mname)
930 """Is the current thread a current level owner?"""
931 return threading.currentThread() in self.__owners
933 def _add_owned(self, name=None):
934 """Note the current thread owns the given lock"""
936 if not self._is_owned():
937 self.__owners[threading.currentThread()] = set()
940 self.__owners[threading.currentThread()].add(name)
942 self.__owners[threading.currentThread()] = set([name])
944 def _del_owned(self, name=None):
945 """Note the current thread owns the given lock"""
947 assert not (name is None and self.__lock._is_owned()), \
948 "Cannot hold internal lock when deleting owner status"
951 self.__owners[threading.currentThread()].remove(name)
953 # Only remove the key if we don't hold the set-lock as well
954 if (not self.__lock._is_owned() and
955 not self.__owners[threading.currentThread()]):
956 del self.__owners[threading.currentThread()]
958 def _list_owned(self):
959 """Get the set of resource names owned by the current thread"""
961 return self.__owners[threading.currentThread()].copy()
965 def _release_and_delete_owned(self):
966 """Release and delete all resources owned by the current thread"""
967 for lname in self._list_owned():
968 lock = self.__lockdict[lname]
971 self._del_owned(name=lname)
974 """Return the current set of names.
976 Only call this function while holding __lock and don't iterate on the
977 result after releasing the lock.
980 return self.__lockdict.keys()
983 """Return a copy of the current set of elements.
985 Used only for debugging purposes.
988 # If we don't already own the set-level lock acquired
989 # we'll get it and note we need to release it later.
991 if not self.__lock._is_owned():
993 self.__lock.acquire(shared=1)
995 result = self.__names()
998 self.__lock.release()
1001 def acquire(self, names, timeout=None, shared=0, priority=None,
1003 """Acquire a set of resource locks.
1005 @type names: list of strings (or string)
1006 @param names: the names of the locks which shall be acquired
1007 (special lock names, or instance/node names)
1008 @type shared: integer (0/1) used as a boolean
1009 @param shared: whether to acquire in shared mode; by default an
1010 exclusive lock will be acquired
1011 @type timeout: float or None
1012 @param timeout: Maximum time to acquire all locks
1013 @type priority: integer
1014 @param priority: Priority for acquiring locks
1015 @type test_notify: callable or None
1016 @param test_notify: Special callback function for unittesting
1018 @return: Set of all locks successfully acquired or None in case of timeout
1020 @raise errors.LockError: when any lock we try to acquire has
1021 been deleted before we succeed. In this case none of the
1022 locks requested will be acquired.
1025 assert timeout is None or timeout >= 0.0
1027 # Check we don't already own locks at this level
1028 assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
1029 " (lockset %s)" % self.name)
1031 if priority is None:
1032 priority = _DEFAULT_PRIORITY
1034 # We need to keep track of how long we spent waiting for a lock. The
1035 # timeout passed to this function is over all lock acquires.
1036 running_timeout = RunningTimeout(timeout, False)
1039 if names is not None:
1040 # Support passing in a single resource to acquire rather than many
1041 if isinstance(names, basestring):
1044 return self.__acquire_inner(names, False, shared, priority,
1045 running_timeout.Remaining, test_notify)
1048 # If no names are given acquire the whole set by not letting new names
1049 # being added before we release, and getting the current list of names.
1050 # Some of them may then be deleted later, but we'll cope with this.
1052 # We'd like to acquire this lock in a shared way, as it's nice if
1053 # everybody else can use the instances at the same time. If we are
1054 # acquiring them exclusively though they won't be able to do this
1055 # anyway, though, so we'll get the list lock exclusively as well in
1056 # order to be able to do add() on the set while owning it.
1057 if not self.__lock.acquire(shared=shared, priority=priority,
1058 timeout=running_timeout.Remaining()):
1059 raise _AcquireTimeout()
1061 # note we own the set-lock
1064 return self.__acquire_inner(self.__names(), True, shared, priority,
1065 running_timeout.Remaining, test_notify)
1067 # We shouldn't have problems adding the lock to the owners list, but
1068 # if we did we'll try to release this lock and re-raise exception.
1069 # Of course something is going to be really wrong, after this.
1070 self.__lock.release()
1074 except _AcquireTimeout:
1077 def __acquire_inner(self, names, want_all, shared, priority,
1078 timeout_fn, test_notify):
1079 """Inner logic for acquiring a number of locks.
1081 @param names: Names of the locks to be acquired
1082 @param want_all: Whether all locks in the set should be acquired
1083 @param shared: Whether to acquire in shared mode
1084 @param timeout_fn: Function returning remaining timeout
1085 @param priority: Priority for acquiring locks
1086 @param test_notify: Special callback function for unittesting
1091 # First we look the locks up on __lockdict. We have no way of being sure
1092 # they will still be there after, but this makes it a lot faster should
1093 # just one of them be the already wrong. Using a sorted sequence to prevent
1095 for lname in sorted(utils.UniqueSequence(names)):
1097 lock = self.__lockdict[lname] # raises KeyError if lock is not there
1100 # We are acquiring all the set, it doesn't matter if this particular
1101 # element is not there anymore.
1104 raise errors.LockError("Non-existing lock %s in set %s (it may have"
1105 " been removed)" % (lname, self.name))
1107 acquire_list.append((lname, lock))
1109 # This will hold the locknames we effectively acquired.
1113 # Now acquire_list contains a sorted list of resources and locks we
1114 # want. In order to get them we loop on this (private) list and
1115 # acquire() them. We gave no real guarantee they will still exist till
1116 # this is done but .acquire() itself is safe and will alert us if the
1117 # lock gets deleted.
1118 for (lname, lock) in acquire_list:
1119 if __debug__ and callable(test_notify):
1120 test_notify_fn = lambda: test_notify(lname)
1122 test_notify_fn = None
1124 timeout = timeout_fn()
1127 # raises LockError if the lock was deleted
1128 acq_success = lock.acquire(shared=shared, timeout=timeout,
1130 test_notify=test_notify_fn)
1131 except errors.LockError:
1133 # We are acquiring all the set, it doesn't matter if this
1134 # particular element is not there anymore.
1137 raise errors.LockError("Non-existing lock %s in set %s (it may"
1138 " have been removed)" % (lname, self.name))
1141 # Couldn't get lock or timeout occurred
1143 # This shouldn't happen as SharedLock.acquire(timeout=None) is
1145 raise errors.LockError("Failed to get lock %s (set %s)" %
1148 raise _AcquireTimeout()
1151 # now the lock cannot be deleted, we have it!
1152 self._add_owned(name=lname)
1156 # We shouldn't have problems adding the lock to the owners list, but
1157 # if we did we'll try to release this lock and re-raise exception.
1158 # Of course something is going to be really wrong after this.
1159 if lock._is_owned():
1164 # Release all owned locks
1165 self._release_and_delete_owned()
1170 def release(self, names=None):
1171 """Release a set of resource locks, at the same level.
1173 You must have acquired the locks, either in shared or in exclusive mode,
1174 before releasing them.
1176 @type names: list of strings, or None
1177 @param names: the names of the locks which shall be released
1178 (defaults to all the locks acquired at that level).
1181 assert self._is_owned(), ("release() on lock set %s while not owner" %
1184 # Support passing in a single resource to release rather than many
1185 if isinstance(names, basestring):
1189 names = self._list_owned()
1192 assert self._list_owned().issuperset(names), (
1193 "release() on unheld resources %s (set %s)" %
1194 (names.difference(self._list_owned()), self.name))
1196 # First of all let's release the "all elements" lock, if set.
1197 # After this 'add' can work again
1198 if self.__lock._is_owned():
1199 self.__lock.release()
1202 for lockname in names:
1203 # If we are sure the lock doesn't leave __lockdict without being
1204 # exclusively held we can do this...
1205 self.__lockdict[lockname].release()
1206 self._del_owned(name=lockname)
1208 def add(self, names, acquired=0, shared=0):
1209 """Add a new set of elements to the set
1211 @type names: list of strings
1212 @param names: names of the new elements to add
1213 @type acquired: integer (0/1) used as a boolean
1214 @param acquired: pre-acquire the new resource?
1215 @type shared: integer (0/1) used as a boolean
1216 @param shared: is the pre-acquisition shared?
1219 # Check we don't already own locks at this level
1220 assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1221 ("Cannot add locks if the set %s is only partially owned, or shared" %
1224 # Support passing in a single resource to add rather than many
1225 if isinstance(names, basestring):
1228 # If we don't already own the set-level lock acquired in an exclusive way
1229 # we'll get it and note we need to release it later.
1230 release_lock = False
1231 if not self.__lock._is_owned():
1233 self.__lock.acquire()
1236 invalid_names = set(self.__names()).intersection(names)
1238 # This must be an explicit raise, not an assert, because assert is
1239 # turned off when using optimization, and this can happen because of
1240 # concurrency even if the user doesn't want it.
1241 raise errors.LockError("duplicate add(%s) on lockset %s" %
1242 (invalid_names, self.name))
1244 for lockname in names:
1245 lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1248 # No need for priority or timeout here as this lock has just been
1250 lock.acquire(shared=shared)
1251 # now the lock cannot be deleted, we have it!
1253 self._add_owned(name=lockname)
1255 # We shouldn't have problems adding the lock to the owners list,
1256 # but if we did we'll try to release this lock and re-raise
1257 # exception. Of course something is going to be really wrong,
1258 # after this. On the other hand the lock hasn't been added to the
1259 # __lockdict yet so no other threads should be pending on it. This
1260 # release is just a safety measure.
1264 self.__lockdict[lockname] = lock
1267 # Only release __lock if we were not holding it previously.
1269 self.__lock.release()
1273 def remove(self, names):
1274 """Remove elements from the lock set.
1276 You can either not hold anything in the lockset or already hold a superset
1277 of the elements you want to delete, exclusively.
1279 @type names: list of strings
1280 @param names: names of the resource to remove.
1282 @return: a list of locks which we removed; the list is always
1283 equal to the names list if we were holding all the locks
1287 # Support passing in a single resource to remove rather than many
1288 if isinstance(names, basestring):
1291 # If we own any subset of this lock it must be a superset of what we want
1292 # to delete. The ownership must also be exclusive, but that will be checked
1293 # by the lock itself.
1294 assert not self._is_owned() or self._list_owned().issuperset(names), (
1295 "remove() on acquired lockset %s while not owning all elements" %
1301 # Calling delete() acquires the lock exclusively if we don't already own
1302 # it, and causes all pending and subsequent lock acquires to fail. It's
1303 # fine to call it out of order because delete() also implies release(),
1304 # and the assertion above guarantees that if we either already hold
1305 # everything we want to delete, or we hold none.
1307 self.__lockdict[lname].delete()
1308 removed.append(lname)
1309 except (KeyError, errors.LockError):
1310 # This cannot happen if we were already holding it, verify:
1311 assert not self._is_owned(), ("remove failed while holding lockset %s"
1314 # If no LockError was raised we are the ones who deleted the lock.
1315 # This means we can safely remove it from lockdict, as any further or
1316 # pending delete() or acquire() will fail (and nobody can have the lock
1317 # since before our call to delete()).
1319 # This is done in an else clause because if the exception was thrown
1320 # it's the job of the one who actually deleted it.
1321 del self.__lockdict[lname]
1322 # And let's remove it from our private list if we owned it.
1323 if self._is_owned():
1324 self._del_owned(name=lname)
1329 # Locking levels, must be acquired in increasing order.
1330 # Current rules are:
1331 # - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1332 # acquired before performing any operation, either in shared or in exclusive
1333 # mode. acquiring the BGL in exclusive mode is discouraged and should be
1335 # - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1336 # If you need more than one node, or more than one instance, acquire them at
1342 LEVELS = [LEVEL_CLUSTER,
1346 # Lock levels which are modifiable
1347 LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1350 LEVEL_CLUSTER: "cluster",
1351 LEVEL_INSTANCE: "instance",
1355 # Constant for the big ganeti lock
1359 class GanetiLockManager:
1360 """The Ganeti Locking Library
1362 The purpose of this small library is to manage locking for ganeti clusters
1363 in a central place, while at the same time doing dynamic checks against
1364 possible deadlocks. It will also make it easier to transition to a different
1365 lock type should we migrate away from python threads.
1370 def __init__(self, nodes, instances):
1371 """Constructs a new GanetiLockManager object.
1373 There should be only a GanetiLockManager object at any time, so this
1374 function raises an error if this is not the case.
1376 @param nodes: list of node names
1377 @param instances: list of instance names
1380 assert self.__class__._instance is None, \
1381 "double GanetiLockManager instance"
1383 self.__class__._instance = self
1385 self._monitor = LockMonitor()
1387 # The keyring contains all the locks, at their level and in the correct
1390 LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1391 LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1392 LEVEL_INSTANCE: LockSet(instances, "instances",
1393 monitor=self._monitor),
1396 def QueryLocks(self, fields, sync):
1397 """Queries information from all locks.
1399 See L{LockMonitor.QueryLocks}.
1402 return self._monitor.QueryLocks(fields, sync)
1404 def _names(self, level):
1405 """List the lock names at the given level.
1407 This can be used for debugging/testing purposes.
1409 @param level: the level whose list of locks to get
1412 assert level in LEVELS, "Invalid locking level %s" % level
1413 return self.__keyring[level]._names()
1415 def _is_owned(self, level):
1416 """Check whether we are owning locks at the given level
1419 return self.__keyring[level]._is_owned()
1421 is_owned = _is_owned
1423 def _list_owned(self, level):
1424 """Get the set of owned locks at the given level
1427 return self.__keyring[level]._list_owned()
1429 def _upper_owned(self, level):
1430 """Check that we don't own any lock at a level greater than the given one.
1433 # This way of checking only works if LEVELS[i] = i, which we check for in
1435 return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1437 def _BGL_owned(self): # pylint: disable-msg=C0103
1438 """Check if the current thread owns the BGL.
1440 Both an exclusive or a shared acquisition work.
1443 return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1446 def _contains_BGL(level, names): # pylint: disable-msg=C0103
1447 """Check if the level contains the BGL.
1449 Check if acting on the given level and set of names will change
1450 the status of the Big Ganeti Lock.
1453 return level == LEVEL_CLUSTER and (names is None or BGL in names)
1455 def acquire(self, level, names, timeout=None, shared=0, priority=None):
1456 """Acquire a set of resource locks, at the same level.
1458 @type level: member of locking.LEVELS
1459 @param level: the level at which the locks shall be acquired
1460 @type names: list of strings (or string)
1461 @param names: the names of the locks which shall be acquired
1462 (special lock names, or instance/node names)
1463 @type shared: integer (0/1) used as a boolean
1464 @param shared: whether to acquire in shared mode; by default
1465 an exclusive lock will be acquired
1466 @type timeout: float
1467 @param timeout: Maximum time to acquire all locks
1468 @type priority: integer
1469 @param priority: Priority for acquiring lock
1472 assert level in LEVELS, "Invalid locking level %s" % level
1474 # Check that we are either acquiring the Big Ganeti Lock or we already own
1475 # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1476 # so even if we've migrated we need to at least share the BGL to be
1477 # compatible with them. Of course if we own the BGL exclusively there's no
1478 # point in acquiring any other lock, unless perhaps we are half way through
1479 # the migration of the current opcode.
1480 assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1481 "You must own the Big Ganeti Lock before acquiring any other")
1483 # Check we don't own locks at the same or upper levels.
1484 assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1485 " while owning some at a greater one")
1487 # Acquire the locks in the set.
1488 return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1491 def release(self, level, names=None):
1492 """Release a set of resource locks, at the same level.
1494 You must have acquired the locks, either in shared or in exclusive
1495 mode, before releasing them.
1497 @type level: member of locking.LEVELS
1498 @param level: the level at which the locks shall be released
1499 @type names: list of strings, or None
1500 @param names: the names of the locks which shall be released
1501 (defaults to all the locks acquired at that level)
1504 assert level in LEVELS, "Invalid locking level %s" % level
1505 assert (not self._contains_BGL(level, names) or
1506 not self._upper_owned(LEVEL_CLUSTER)), (
1507 "Cannot release the Big Ganeti Lock while holding something"
1508 " at upper levels (%r)" %
1509 (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1510 for i in self.__keyring.keys()]), ))
1512 # Release will complain if we don't own the locks already
1513 return self.__keyring[level].release(names)
1515 def add(self, level, names, acquired=0, shared=0):
1516 """Add locks at the specified level.
1518 @type level: member of locking.LEVELS_MOD
1519 @param level: the level at which the locks shall be added
1520 @type names: list of strings
1521 @param names: names of the locks to acquire
1522 @type acquired: integer (0/1) used as a boolean
1523 @param acquired: whether to acquire the newly added locks
1524 @type shared: integer (0/1) used as a boolean
1525 @param shared: whether the acquisition will be shared
1528 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1529 assert self._BGL_owned(), ("You must own the BGL before performing other"
1531 assert not self._upper_owned(level), ("Cannot add locks at a level"
1532 " while owning some at a greater one")
1533 return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1535 def remove(self, level, names):
1536 """Remove locks from the specified level.
1538 You must either already own the locks you are trying to remove
1539 exclusively or not own any lock at an upper level.
1541 @type level: member of locking.LEVELS_MOD
1542 @param level: the level at which the locks shall be removed
1543 @type names: list of strings
1544 @param names: the names of the locks which shall be removed
1545 (special lock names, or instance/node names)
1548 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1549 assert self._BGL_owned(), ("You must own the BGL before performing other"
1551 # Check we either own the level or don't own anything from here
1552 # up. LockSet.remove() will check the case in which we don't own
1553 # all the needed resources, or we have a shared ownership.
1554 assert self._is_owned(level) or not self._upper_owned(level), (
1555 "Cannot remove locks at a level while not owning it or"
1556 " owning some at a greater one")
1557 return self.__keyring[level].remove(names)
1560 class LockMonitor(object):
1561 _LOCK_ATTR = "_lock"
1564 """Initializes this class.
1567 self._lock = SharedLock("LockMonitor")
1569 # Tracked locks. Weak references are used to avoid issues with circular
1570 # references and deletion.
1571 self._locks = weakref.WeakKeyDictionary()
1573 @ssynchronized(_LOCK_ATTR)
1574 def RegisterLock(self, lock):
1575 """Registers a new lock.
1578 logging.debug("Registering lock %s", lock.name)
1579 assert lock not in self._locks, "Duplicate lock registration"
1580 assert not compat.any(lock.name == i.name for i in self._locks.keys()), \
1581 "Found duplicate lock name"
1582 self._locks[lock] = None
1584 @ssynchronized(_LOCK_ATTR)
1585 def _GetLockInfo(self, fields):
1586 """Get information from all locks while the monitor lock is held.
1591 for lock in self._locks.keys():
1592 assert lock.name not in result, "Found duplicate lock name"
1593 result[lock.name] = lock.GetInfo(fields)
1597 def QueryLocks(self, fields, sync):
1598 """Queries information from all locks.
1600 @type fields: list of strings
1601 @param fields: List of fields to return
1603 @param sync: Whether to operate in synchronous mode
1607 raise NotImplementedError("Synchronous queries are not implemented")
1609 # Get all data without sorting
1610 result = self._GetLockInfo(fields)
1613 return [result[name] for name in utils.NiceSort(result.keys())]