4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Module implementing the Ganeti locking code."""
23 # pylint: disable=W0212
25 # W0212 since e.g. LockSet methods use (a lot) the internals of
38 from ganeti import errors
39 from ganeti import utils
40 from ganeti import compat
41 from ganeti import query
44 _EXCLUSIVE_TEXT = "exclusive"
45 _SHARED_TEXT = "shared"
46 _DELETED_TEXT = "deleted"
50 #: Minimum timeout required to consider scheduling a pending acquisition
52 _LOCK_ACQUIRE_MIN_TIMEOUT = (1.0 / 1000)
55 def ssynchronized(mylock, shared=0):
56 """Shared Synchronization decorator.
58 Calls the function holding the given lock, either in exclusive or shared
59 mode. It requires the passed lock to be a SharedLock (or support its
62 @type mylock: lockable object or string
63 @param mylock: lock to acquire or class member name of the lock to acquire
67 def sync_function(*args, **kwargs):
68 if isinstance(mylock, basestring):
69 assert args, "cannot ssynchronize on non-class method: self not found"
71 lock = getattr(args[0], mylock)
74 lock.acquire(shared=shared)
76 return fn(*args, **kwargs)
83 class _SingleNotifyPipeConditionWaiter(object):
84 """Helper class for SingleNotifyPipeCondition
92 def __init__(self, poller, fd):
93 """Constructor for _SingleNotifyPipeConditionWaiter
95 @type poller: select.poll
96 @param poller: Poller object
98 @param fd: File descriptor to wait for
101 object.__init__(self)
102 self._poller = poller
105 def __call__(self, timeout):
106 """Wait for something to happen on the pipe.
108 @type timeout: float or None
109 @param timeout: Timeout for waiting (can be None)
112 running_timeout = utils.RunningTimeout(timeout, True)
115 remaining_time = running_timeout.Remaining()
117 if remaining_time is not None:
118 if remaining_time < 0.0:
121 # Our calculation uses seconds, poll() wants milliseconds
122 remaining_time *= 1000
125 result = self._poller.poll(remaining_time)
126 except EnvironmentError, err:
127 if err.errno != errno.EINTR:
131 # Check whether we were notified
132 if result and result[0][0] == self._fd:
136 class _BaseCondition(object):
137 """Base class containing common code for conditions.
139 Some of this code is taken from python's threading module.
151 def __init__(self, lock):
152 """Constructor for _BaseCondition.
154 @type lock: threading.Lock
155 @param lock: condition base lock
158 object.__init__(self)
161 self._release_save = lock._release_save
162 except AttributeError:
163 self._release_save = self._base_release_save
165 self._acquire_restore = lock._acquire_restore
166 except AttributeError:
167 self._acquire_restore = self._base_acquire_restore
169 self._is_owned = lock.is_owned
170 except AttributeError:
171 self._is_owned = self._base_is_owned
175 # Export the lock's acquire() and release() methods
176 self.acquire = lock.acquire
177 self.release = lock.release
179 def _base_is_owned(self):
180 """Check whether lock is owned by current thread.
183 if self._lock.acquire(0):
188 def _base_release_save(self):
191 def _base_acquire_restore(self, _):
194 def _check_owned(self):
195 """Raise an exception if the current thread doesn't own the lock.
198 if not self._is_owned():
199 raise RuntimeError("cannot work with un-aquired lock")
202 class SingleNotifyPipeCondition(_BaseCondition):
203 """Condition which can only be notified once.
205 This condition class uses pipes and poll, internally, to be able to wait for
206 notification with a timeout, without resorting to polling. It is almost
207 compatible with Python's threading.Condition, with the following differences:
208 - notifyAll can only be called once, and no wait can happen after that
209 - notify is not supported, only notifyAll
221 _waiter_class = _SingleNotifyPipeConditionWaiter
223 def __init__(self, lock):
224 """Constructor for SingleNotifyPipeCondition
227 _BaseCondition.__init__(self, lock)
229 self._notified = False
231 self._write_fd = None
234 def _check_unnotified(self):
235 """Throws an exception if already notified.
239 raise RuntimeError("cannot use already notified condition")
242 """Cleanup open file descriptors, if any.
245 if self._read_fd is not None:
246 os.close(self._read_fd)
249 if self._write_fd is not None:
250 os.close(self._write_fd)
251 self._write_fd = None
254 def wait(self, timeout):
255 """Wait for a notification.
257 @type timeout: float or None
258 @param timeout: Waiting timeout (can be None)
262 self._check_unnotified()
266 if self._poller is None:
267 (self._read_fd, self._write_fd) = os.pipe()
268 self._poller = select.poll()
269 self._poller.register(self._read_fd, select.POLLHUP)
271 wait_fn = self._waiter_class(self._poller, self._read_fd)
272 state = self._release_save()
274 # Wait for notification
278 self._acquire_restore(state)
281 if self._nwaiters == 0:
284 def notifyAll(self): # pylint: disable=C0103
285 """Close the writing side of the pipe to notify all waiters.
289 self._check_unnotified()
290 self._notified = True
291 if self._write_fd is not None:
292 os.close(self._write_fd)
293 self._write_fd = None
296 class PipeCondition(_BaseCondition):
297 """Group-only non-polling condition with counters.
299 This condition class uses pipes and poll, internally, to be able to wait for
300 notification with a timeout, without resorting to polling. It is almost
301 compatible with Python's threading.Condition, but only supports notifyAll and
302 non-recursive locks. As an additional features it's able to report whether
303 there are any waiting threads.
311 _single_condition_class = SingleNotifyPipeCondition
313 def __init__(self, lock):
314 """Initializes this class.
317 _BaseCondition.__init__(self, lock)
318 self._waiters = set()
319 self._single_condition = self._single_condition_class(self._lock)
321 def wait(self, timeout):
322 """Wait for a notification.
324 @type timeout: float or None
325 @param timeout: Waiting timeout (can be None)
330 # Keep local reference to the pipe. It could be replaced by another thread
331 # notifying while we're waiting.
332 cond = self._single_condition
334 self._waiters.add(threading.currentThread())
339 self._waiters.remove(threading.currentThread())
341 def notifyAll(self): # pylint: disable=C0103
342 """Notify all currently waiting threads.
346 self._single_condition.notifyAll()
347 self._single_condition = self._single_condition_class(self._lock)
349 def get_waiting(self):
350 """Returns a list of all waiting threads.
357 def has_waiting(self):
358 """Returns whether there are active waiters.
363 return bool(self._waiters)
366 return ("<%s.%s waiters=%s at %#x>" %
367 (self.__class__.__module__, self.__class__.__name__,
368 self._waiters, id(self)))
371 class _PipeConditionWithMode(PipeCondition):
376 def __init__(self, lock, shared):
377 """Initializes this class.
381 PipeCondition.__init__(self, lock)
384 class SharedLock(object):
385 """Implements a shared lock.
387 Multiple threads can acquire the lock in a shared way by calling
388 C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
389 threads can call C{acquire(shared=0)}.
391 Notes on data structures: C{__pending} contains a priority queue (heapq) of
392 all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
393 ...]}. Each per-priority queue contains a normal in-order list of conditions
394 to be notified when the lock can be acquired. Shared locks are grouped
395 together by priority and the condition for them is stored in
396 C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
397 references for the per-priority queues indexed by priority for faster access.
400 @ivar name: the name of the lock
416 __condition_class = _PipeConditionWithMode
418 def __init__(self, name, monitor=None, _time_fn=time.time):
419 """Construct a new SharedLock.
421 @param name: the name of the lock
422 @type monitor: L{LockMonitor}
423 @param monitor: Lock monitor with which to register
426 object.__init__(self)
430 # Used for unittesting
431 self.__time_fn = _time_fn
434 self.__lock = threading.Lock()
436 # Queue containing waiting acquires
438 self.__pending_by_prio = {}
439 self.__pending_shared = {}
441 # Current lock holders
445 # is this lock in the deleted state?
446 self.__deleted = False
448 # Register with lock monitor
450 logging.debug("Adding lock %s to monitor", name)
451 monitor.RegisterLock(self)
454 return ("<%s.%s name=%s at %#x>" %
455 (self.__class__.__module__, self.__class__.__name__,
456 self.name, id(self)))
458 def GetLockInfo(self, requested):
459 """Retrieves information for querying locks.
462 @param requested: Requested information, see C{query.LQ_*}
465 self.__lock.acquire()
467 # Note: to avoid unintentional race conditions, no references to
468 # modifiable objects should be returned unless they were created in this
473 if query.LQ_MODE in requested:
476 assert not (self.__exc or self.__shr)
478 mode = _EXCLUSIVE_TEXT
482 # Current owner(s) are wanted
483 if query.LQ_OWNER in requested:
490 assert not self.__deleted
491 owner_names = [i.getName() for i in owner]
493 # Pending acquires are wanted
494 if query.LQ_PENDING in requested:
497 # Sorting instead of copying and using heaq functions for simplicity
498 for (_, prioqueue) in sorted(self.__pending):
499 for cond in prioqueue:
501 pendmode = _SHARED_TEXT
503 pendmode = _EXCLUSIVE_TEXT
505 # List of names will be sorted in L{query._GetLockPending}
506 pending.append((pendmode, [i.getName()
507 for i in cond.get_waiting()]))
511 return [(self.name, mode, owner_names, pending)]
513 self.__lock.release()
515 def __check_deleted(self):
516 """Raises an exception if the lock has been deleted.
520 raise errors.LockError("Deleted lock %s" % self.name)
522 def __is_sharer(self):
523 """Is the current thread sharing the lock at this time?
526 return threading.currentThread() in self.__shr
528 def __is_exclusive(self):
529 """Is the current thread holding the lock exclusively at this time?
532 return threading.currentThread() == self.__exc
534 def __is_owned(self, shared=-1):
535 """Is the current thread somehow owning the lock at this time?
537 This is a private version of the function, which presumes you're holding
542 return self.__is_sharer() or self.__is_exclusive()
544 return self.__is_sharer()
546 return self.__is_exclusive()
548 def is_owned(self, shared=-1):
549 """Is the current thread somehow owning the lock at this time?
552 - < 0: check for any type of ownership (default)
553 - 0: check for exclusive ownership
554 - > 0: check for shared ownership
557 self.__lock.acquire()
559 return self.__is_owned(shared=shared)
561 self.__lock.release()
563 #: Necessary to remain compatible with threading.Condition, which tries to
564 #: retrieve a locks' "_is_owned" attribute
567 def _count_pending(self):
568 """Returns the number of pending acquires.
573 self.__lock.acquire()
575 return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
577 self.__lock.release()
579 def _check_empty(self):
580 """Checks whether there are any pending acquires.
585 self.__lock.acquire()
587 # Order is important: __find_first_pending_queue modifies __pending
588 (_, prioqueue) = self.__find_first_pending_queue()
590 return not (prioqueue or
592 self.__pending_by_prio or
593 self.__pending_shared)
595 self.__lock.release()
597 def __do_acquire(self, shared):
598 """Actually acquire the lock.
602 self.__shr.add(threading.currentThread())
604 self.__exc = threading.currentThread()
606 def __can_acquire(self, shared):
607 """Determine whether lock can be acquired.
611 return self.__exc is None
613 return len(self.__shr) == 0 and self.__exc is None
615 def __find_first_pending_queue(self):
616 """Tries to find the topmost queued entry with pending acquires.
618 Removes empty entries while going through the list.
621 while self.__pending:
622 (priority, prioqueue) = self.__pending[0]
625 return (priority, prioqueue)
628 heapq.heappop(self.__pending)
629 del self.__pending_by_prio[priority]
630 assert priority not in self.__pending_shared
634 def __is_on_top(self, cond):
635 """Checks whether the passed condition is on top of the queue.
637 The caller must make sure the queue isn't empty.
640 (_, prioqueue) = self.__find_first_pending_queue()
642 return cond == prioqueue[0]
644 def __acquire_unlocked(self, shared, timeout, priority):
645 """Acquire a shared lock.
647 @param shared: whether to acquire in shared mode; by default an
648 exclusive lock will be acquired
649 @param timeout: maximum waiting time before giving up
650 @type priority: integer
651 @param priority: Priority for acquiring lock
654 self.__check_deleted()
656 # We cannot acquire the lock if we already have it
657 assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
660 # Remove empty entries from queue
661 self.__find_first_pending_queue()
663 # Check whether someone else holds the lock or there are pending acquires.
664 if not self.__pending and self.__can_acquire(shared):
665 # Apparently not, can acquire lock directly.
666 self.__do_acquire(shared)
669 # The lock couldn't be acquired right away, so if a timeout is given and is
670 # considered too short, return right away as scheduling a pending
671 # acquisition is quite expensive
672 if timeout is not None and timeout < _LOCK_ACQUIRE_MIN_TIMEOUT:
675 prioqueue = self.__pending_by_prio.get(priority, None)
678 # Try to re-use condition for shared acquire
679 wait_condition = self.__pending_shared.get(priority, None)
680 assert (wait_condition is None or
681 (wait_condition.shared and wait_condition in prioqueue))
683 wait_condition = None
685 if wait_condition is None:
686 if prioqueue is None:
687 assert priority not in self.__pending_by_prio
690 heapq.heappush(self.__pending, (priority, prioqueue))
691 self.__pending_by_prio[priority] = prioqueue
693 wait_condition = self.__condition_class(self.__lock, shared)
694 prioqueue.append(wait_condition)
697 # Keep reference for further shared acquires on same priority. This is
698 # better than trying to find it in the list of pending acquires.
699 assert priority not in self.__pending_shared
700 self.__pending_shared[priority] = wait_condition
702 wait_start = self.__time_fn()
706 # Wait until we become the topmost acquire in the queue or the timeout
709 if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
710 self.__do_acquire(shared)
714 # A lot of code assumes blocking acquires always succeed, therefore we
715 # can never return False for a blocking acquire
716 if (timeout is not None and
717 utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)):
720 # Wait for notification
721 wait_condition.wait(timeout)
722 self.__check_deleted()
724 # Remove condition from queue if there are no more waiters
725 if not wait_condition.has_waiting():
726 prioqueue.remove(wait_condition)
727 if wait_condition.shared:
728 # Remove from list of shared acquires if it wasn't while releasing
729 # (e.g. on lock deletion)
730 self.__pending_shared.pop(priority, None)
734 def acquire(self, shared=0, timeout=None, priority=None,
736 """Acquire a shared lock.
738 @type shared: integer (0/1) used as a boolean
739 @param shared: whether to acquire in shared mode; by default an
740 exclusive lock will be acquired
742 @param timeout: maximum waiting time before giving up
743 @type priority: integer
744 @param priority: Priority for acquiring lock
745 @type test_notify: callable or None
746 @param test_notify: Special callback function for unittesting
750 priority = _DEFAULT_PRIORITY
752 self.__lock.acquire()
754 # We already got the lock, notify now
755 if __debug__ and callable(test_notify):
758 return self.__acquire_unlocked(shared, timeout, priority)
760 self.__lock.release()
763 """Changes the lock mode from exclusive to shared.
765 Pending acquires in shared mode on the same priority will go ahead.
768 self.__lock.acquire()
770 assert self.__is_owned(), "Lock must be owned"
772 if self.__is_exclusive():
773 # Do nothing if the lock is already acquired in shared mode
777 # Important: pending shared acquires should only jump ahead if there
778 # was a transition from exclusive to shared, otherwise an owner of a
779 # shared lock can keep calling this function to push incoming shared
781 (priority, prioqueue) = self.__find_first_pending_queue()
783 # Is there a pending shared acquire on this priority?
784 cond = self.__pending_shared.pop(priority, None)
787 assert cond in prioqueue
789 # Ensure shared acquire is on top of queue
790 if len(prioqueue) > 1:
791 prioqueue.remove(cond)
792 prioqueue.insert(0, cond)
797 assert not self.__is_exclusive()
798 assert self.__is_sharer()
802 self.__lock.release()
805 """Release a Shared Lock.
807 You must have acquired the lock, either in shared or in exclusive mode,
808 before calling this function.
811 self.__lock.acquire()
813 assert self.__is_exclusive() or self.__is_sharer(), \
814 "Cannot release non-owned lock"
816 # Autodetect release type
817 if self.__is_exclusive():
821 self.__shr.remove(threading.currentThread())
822 notify = not self.__shr
824 # Notify topmost condition in queue if there are no owners left (for
827 self.__notify_topmost()
829 self.__lock.release()
831 def __notify_topmost(self):
832 """Notifies topmost condition in queue of pending acquires.
835 (priority, prioqueue) = self.__find_first_pending_queue()
840 # Prevent further shared acquires from sneaking in while waiters are
842 self.__pending_shared.pop(priority, None)
844 def _notify_topmost(self):
845 """Exported version of L{__notify_topmost}.
848 self.__lock.acquire()
850 return self.__notify_topmost()
852 self.__lock.release()
854 def delete(self, timeout=None, priority=None):
855 """Delete a Shared Lock.
857 This operation will declare the lock for removal. First the lock will be
858 acquired in exclusive mode if you don't already own it, then the lock
859 will be put in a state where any future and pending acquire() fail.
862 @param timeout: maximum waiting time before giving up
863 @type priority: integer
864 @param priority: Priority for acquiring lock
868 priority = _DEFAULT_PRIORITY
870 self.__lock.acquire()
872 assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
874 self.__check_deleted()
876 # The caller is allowed to hold the lock exclusively already.
877 acquired = self.__is_exclusive()
880 acquired = self.__acquire_unlocked(0, timeout, priority)
883 assert self.__is_exclusive() and not self.__is_sharer(), \
884 "Lock wasn't acquired in exclusive mode"
886 self.__deleted = True
889 assert not (self.__exc or self.__shr), "Found owner during deletion"
891 # Notify all acquires. They'll throw an error.
892 for (_, prioqueue) in self.__pending:
893 for cond in prioqueue:
896 assert self.__deleted
900 self.__lock.release()
902 def _release_save(self):
903 shared = self.__is_sharer()
907 def _acquire_restore(self, shared):
908 self.acquire(shared=shared)
911 # Whenever we want to acquire a full LockSet we pass None as the value
912 # to acquire. Hide this behind this nicely named constant.
916 class _AcquireTimeout(Exception):
917 """Internal exception to abort an acquire on a timeout.
923 """Implements a set of locks.
925 This abstraction implements a set of shared locks for the same resource type,
926 distinguished by name. The user can lock a subset of the resources and the
927 LockSet will take care of acquiring the locks always in the same order, thus
930 All the locks needed in the same set must be acquired together, though.
933 @ivar name: the name of the lockset
936 def __init__(self, members, name, monitor=None):
937 """Constructs a new LockSet.
939 @type members: list of strings
940 @param members: initial members of the set
941 @type monitor: L{LockMonitor}
942 @param monitor: Lock monitor with which to register member locks
945 assert members is not None, "members parameter is not a list"
949 self.__monitor = monitor
951 # Used internally to guarantee coherency
952 self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
954 # The lockdict indexes the relationship name -> lock
955 # The order-of-locking is implied by the alphabetical order of names
958 for mname in members:
959 self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
962 # The owner dict contains the set of locks each thread owns. For
963 # performance each thread can access its own key without a global lock on
964 # this structure. It is paramount though that *no* other type of access is
965 # done to this structure (eg. no looping over its keys). *_owner helper
966 # function are defined to guarantee access is correct, but in general never
967 # do anything different than __owners[threading.currentThread()], or there
971 def _GetLockName(self, mname):
972 """Returns the name for a member lock.
975 return "%s/%s" % (self.name, mname)
978 """Returns the lockset-internal lock.
983 def _get_lockdict(self):
984 """Returns the lockset-internal lock dictionary.
986 Accessing this structure is only safe in single-thread usage or when the
987 lockset-internal lock is held.
990 return self.__lockdict
993 """Is the current thread a current level owner?
995 @note: Use L{check_owned} to check if a specific lock is held
998 return threading.currentThread() in self.__owners
1000 def check_owned(self, names, shared=-1):
1001 """Check if locks are owned in a specific mode.
1003 @type names: sequence or string
1004 @param names: Lock names (or a single lock name)
1005 @param shared: See L{SharedLock.is_owned}
1007 @note: Use L{is_owned} to check if the current thread holds I{any} lock and
1008 L{list_owned} to get the names of all owned locks
1011 if isinstance(names, basestring):
1014 # Avoid check if no locks are owned anyway
1015 if names and self.is_owned():
1018 # Gather references to all locks (in case they're deleted in the meantime)
1021 lock = self.__lockdict[lname]
1023 raise errors.LockError("Non-existing lock '%s' in set '%s' (it may"
1024 " have been removed)" % (lname, self.name))
1026 candidates.append(lock)
1028 return compat.all(lock.is_owned(shared=shared) for lock in candidates)
1032 def _add_owned(self, name=None):
1033 """Note the current thread owns the given lock"""
1035 if not self.is_owned():
1036 self.__owners[threading.currentThread()] = set()
1039 self.__owners[threading.currentThread()].add(name)
1041 self.__owners[threading.currentThread()] = set([name])
1043 def _del_owned(self, name=None):
1044 """Note the current thread owns the given lock"""
1046 assert not (name is None and self.__lock.is_owned()), \
1047 "Cannot hold internal lock when deleting owner status"
1049 if name is not None:
1050 self.__owners[threading.currentThread()].remove(name)
1052 # Only remove the key if we don't hold the set-lock as well
1053 if not (self.__lock.is_owned() or
1054 self.__owners[threading.currentThread()]):
1055 del self.__owners[threading.currentThread()]
1057 def list_owned(self):
1058 """Get the set of resource names owned by the current thread"""
1060 return self.__owners[threading.currentThread()].copy()
1064 def _release_and_delete_owned(self):
1065 """Release and delete all resources owned by the current thread"""
1066 for lname in self.list_owned():
1067 lock = self.__lockdict[lname]
1070 self._del_owned(name=lname)
1073 """Return the current set of names.
1075 Only call this function while holding __lock and don't iterate on the
1076 result after releasing the lock.
1079 return self.__lockdict.keys()
1082 """Return a copy of the current set of elements.
1084 Used only for debugging purposes.
1087 # If we don't already own the set-level lock acquired
1088 # we'll get it and note we need to release it later.
1089 release_lock = False
1090 if not self.__lock.is_owned():
1092 self.__lock.acquire(shared=1)
1094 result = self.__names()
1097 self.__lock.release()
1100 def acquire(self, names, timeout=None, shared=0, priority=None,
1102 """Acquire a set of resource locks.
1104 @type names: list of strings (or string)
1105 @param names: the names of the locks which shall be acquired
1106 (special lock names, or instance/node names)
1107 @type shared: integer (0/1) used as a boolean
1108 @param shared: whether to acquire in shared mode; by default an
1109 exclusive lock will be acquired
1110 @type timeout: float or None
1111 @param timeout: Maximum time to acquire all locks
1112 @type priority: integer
1113 @param priority: Priority for acquiring locks
1114 @type test_notify: callable or None
1115 @param test_notify: Special callback function for unittesting
1117 @return: Set of all locks successfully acquired or None in case of timeout
1119 @raise errors.LockError: when any lock we try to acquire has
1120 been deleted before we succeed. In this case none of the
1121 locks requested will be acquired.
1124 assert timeout is None or timeout >= 0.0
1126 # Check we don't already own locks at this level
1127 assert not self.is_owned(), ("Cannot acquire locks in the same set twice"
1128 " (lockset %s)" % self.name)
1130 if priority is None:
1131 priority = _DEFAULT_PRIORITY
1133 # We need to keep track of how long we spent waiting for a lock. The
1134 # timeout passed to this function is over all lock acquires.
1135 running_timeout = utils.RunningTimeout(timeout, False)
1138 if names is not None:
1139 # Support passing in a single resource to acquire rather than many
1140 if isinstance(names, basestring):
1143 return self.__acquire_inner(names, False, shared, priority,
1144 running_timeout.Remaining, test_notify)
1147 # If no names are given acquire the whole set by not letting new names
1148 # being added before we release, and getting the current list of names.
1149 # Some of them may then be deleted later, but we'll cope with this.
1151 # We'd like to acquire this lock in a shared way, as it's nice if
1152 # everybody else can use the instances at the same time. If we are
1153 # acquiring them exclusively though they won't be able to do this
1154 # anyway, though, so we'll get the list lock exclusively as well in
1155 # order to be able to do add() on the set while owning it.
1156 if not self.__lock.acquire(shared=shared, priority=priority,
1157 timeout=running_timeout.Remaining()):
1158 raise _AcquireTimeout()
1160 # note we own the set-lock
1163 return self.__acquire_inner(self.__names(), True, shared, priority,
1164 running_timeout.Remaining, test_notify)
1166 # We shouldn't have problems adding the lock to the owners list, but
1167 # if we did we'll try to release this lock and re-raise exception.
1168 # Of course something is going to be really wrong, after this.
1169 self.__lock.release()
1173 except _AcquireTimeout:
1176 def __acquire_inner(self, names, want_all, shared, priority,
1177 timeout_fn, test_notify):
1178 """Inner logic for acquiring a number of locks.
1180 @param names: Names of the locks to be acquired
1181 @param want_all: Whether all locks in the set should be acquired
1182 @param shared: Whether to acquire in shared mode
1183 @param timeout_fn: Function returning remaining timeout
1184 @param priority: Priority for acquiring locks
1185 @param test_notify: Special callback function for unittesting
1190 # First we look the locks up on __lockdict. We have no way of being sure
1191 # they will still be there after, but this makes it a lot faster should
1192 # just one of them be the already wrong. Using a sorted sequence to prevent
1194 for lname in sorted(utils.UniqueSequence(names)):
1196 lock = self.__lockdict[lname] # raises KeyError if lock is not there
1199 # We are acquiring all the set, it doesn't matter if this particular
1200 # element is not there anymore.
1203 raise errors.LockError("Non-existing lock %s in set %s (it may have"
1204 " been removed)" % (lname, self.name))
1206 acquire_list.append((lname, lock))
1208 # This will hold the locknames we effectively acquired.
1212 # Now acquire_list contains a sorted list of resources and locks we
1213 # want. In order to get them we loop on this (private) list and
1214 # acquire() them. We gave no real guarantee they will still exist till
1215 # this is done but .acquire() itself is safe and will alert us if the
1216 # lock gets deleted.
1217 for (lname, lock) in acquire_list:
1218 if __debug__ and callable(test_notify):
1219 test_notify_fn = lambda: test_notify(lname)
1221 test_notify_fn = None
1223 timeout = timeout_fn()
1226 # raises LockError if the lock was deleted
1227 acq_success = lock.acquire(shared=shared, timeout=timeout,
1229 test_notify=test_notify_fn)
1230 except errors.LockError:
1232 # We are acquiring all the set, it doesn't matter if this
1233 # particular element is not there anymore.
1236 raise errors.LockError("Non-existing lock %s in set %s (it may"
1237 " have been removed)" % (lname, self.name))
1240 # Couldn't get lock or timeout occurred
1242 # This shouldn't happen as SharedLock.acquire(timeout=None) is
1244 raise errors.LockError("Failed to get lock %s (set %s)" %
1247 raise _AcquireTimeout()
1250 # now the lock cannot be deleted, we have it!
1251 self._add_owned(name=lname)
1255 # We shouldn't have problems adding the lock to the owners list, but
1256 # if we did we'll try to release this lock and re-raise exception.
1257 # Of course something is going to be really wrong after this.
1263 # Release all owned locks
1264 self._release_and_delete_owned()
1269 def downgrade(self, names=None):
1270 """Downgrade a set of resource locks from exclusive to shared mode.
1272 The locks must have been acquired in exclusive mode.
1275 assert self.is_owned(), ("downgrade on lockset %s while not owning any"
1276 " lock" % self.name)
1278 # Support passing in a single resource to downgrade rather than many
1279 if isinstance(names, basestring):
1282 owned = self.list_owned()
1288 assert owned.issuperset(names), \
1289 ("downgrade() on unheld resources %s (set %s)" %
1290 (names.difference(owned), self.name))
1292 for lockname in names:
1293 self.__lockdict[lockname].downgrade()
1295 # Do we own the lockset in exclusive mode?
1296 if self.__lock.is_owned(shared=0):
1297 # Have all locks been downgraded?
1298 if not compat.any(lock.is_owned(shared=0)
1299 for lock in self.__lockdict.values()):
1300 self.__lock.downgrade()
1301 assert self.__lock.is_owned(shared=1)
1305 def release(self, names=None):
1306 """Release a set of resource locks, at the same level.
1308 You must have acquired the locks, either in shared or in exclusive mode,
1309 before releasing them.
1311 @type names: list of strings, or None
1312 @param names: the names of the locks which shall be released
1313 (defaults to all the locks acquired at that level).
1316 assert self.is_owned(), ("release() on lock set %s while not owner" %
1319 # Support passing in a single resource to release rather than many
1320 if isinstance(names, basestring):
1324 names = self.list_owned()
1327 assert self.list_owned().issuperset(names), (
1328 "release() on unheld resources %s (set %s)" %
1329 (names.difference(self.list_owned()), self.name))
1331 # First of all let's release the "all elements" lock, if set.
1332 # After this 'add' can work again
1333 if self.__lock.is_owned():
1334 self.__lock.release()
1337 for lockname in names:
1338 # If we are sure the lock doesn't leave __lockdict without being
1339 # exclusively held we can do this...
1340 self.__lockdict[lockname].release()
1341 self._del_owned(name=lockname)
1343 def add(self, names, acquired=0, shared=0):
1344 """Add a new set of elements to the set
1346 @type names: list of strings
1347 @param names: names of the new elements to add
1348 @type acquired: integer (0/1) used as a boolean
1349 @param acquired: pre-acquire the new resource?
1350 @type shared: integer (0/1) used as a boolean
1351 @param shared: is the pre-acquisition shared?
1354 # Check we don't already own locks at this level
1355 assert not self.is_owned() or self.__lock.is_owned(shared=0), \
1356 ("Cannot add locks if the set %s is only partially owned, or shared" %
1359 # Support passing in a single resource to add rather than many
1360 if isinstance(names, basestring):
1363 # If we don't already own the set-level lock acquired in an exclusive way
1364 # we'll get it and note we need to release it later.
1365 release_lock = False
1366 if not self.__lock.is_owned():
1368 self.__lock.acquire()
1371 invalid_names = set(self.__names()).intersection(names)
1373 # This must be an explicit raise, not an assert, because assert is
1374 # turned off when using optimization, and this can happen because of
1375 # concurrency even if the user doesn't want it.
1376 raise errors.LockError("duplicate add(%s) on lockset %s" %
1377 (invalid_names, self.name))
1379 for lockname in names:
1380 lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1383 # No need for priority or timeout here as this lock has just been
1385 lock.acquire(shared=shared)
1386 # now the lock cannot be deleted, we have it!
1388 self._add_owned(name=lockname)
1390 # We shouldn't have problems adding the lock to the owners list,
1391 # but if we did we'll try to release this lock and re-raise
1392 # exception. Of course something is going to be really wrong,
1393 # after this. On the other hand the lock hasn't been added to the
1394 # __lockdict yet so no other threads should be pending on it. This
1395 # release is just a safety measure.
1399 self.__lockdict[lockname] = lock
1402 # Only release __lock if we were not holding it previously.
1404 self.__lock.release()
1408 def remove(self, names):
1409 """Remove elements from the lock set.
1411 You can either not hold anything in the lockset or already hold a superset
1412 of the elements you want to delete, exclusively.
1414 @type names: list of strings
1415 @param names: names of the resource to remove.
1417 @return: a list of locks which we removed; the list is always
1418 equal to the names list if we were holding all the locks
1422 # Support passing in a single resource to remove rather than many
1423 if isinstance(names, basestring):
1426 # If we own any subset of this lock it must be a superset of what we want
1427 # to delete. The ownership must also be exclusive, but that will be checked
1428 # by the lock itself.
1429 assert not self.is_owned() or self.list_owned().issuperset(names), (
1430 "remove() on acquired lockset %s while not owning all elements" %
1436 # Calling delete() acquires the lock exclusively if we don't already own
1437 # it, and causes all pending and subsequent lock acquires to fail. It's
1438 # fine to call it out of order because delete() also implies release(),
1439 # and the assertion above guarantees that if we either already hold
1440 # everything we want to delete, or we hold none.
1442 self.__lockdict[lname].delete()
1443 removed.append(lname)
1444 except (KeyError, errors.LockError):
1445 # This cannot happen if we were already holding it, verify:
1446 assert not self.is_owned(), ("remove failed while holding lockset %s" %
1449 # If no LockError was raised we are the ones who deleted the lock.
1450 # This means we can safely remove it from lockdict, as any further or
1451 # pending delete() or acquire() will fail (and nobody can have the lock
1452 # since before our call to delete()).
1454 # This is done in an else clause because if the exception was thrown
1455 # it's the job of the one who actually deleted it.
1456 del self.__lockdict[lname]
1457 # And let's remove it from our private list if we owned it.
1459 self._del_owned(name=lname)
1464 # Locking levels, must be acquired in increasing order.
1465 # Current rules are:
1466 # - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1467 # acquired before performing any operation, either in shared or in exclusive
1468 # mode. acquiring the BGL in exclusive mode is discouraged and should be
1470 # - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1471 # If you need more than one node, or more than one instance, acquire them at
1477 #: Level for node resources, used for operations with possibly high impact on
1478 #: the node's disks.
1489 # Lock levels which are modifiable
1490 LEVELS_MOD = frozenset([
1497 #: Lock level names (make sure to use singular form)
1499 LEVEL_CLUSTER: "cluster",
1500 LEVEL_INSTANCE: "instance",
1501 LEVEL_NODEGROUP: "nodegroup",
1503 LEVEL_NODE_RES: "node-res",
1506 # Constant for the big ganeti lock
1510 class GanetiLockManager:
1511 """The Ganeti Locking Library
1513 The purpose of this small library is to manage locking for ganeti clusters
1514 in a central place, while at the same time doing dynamic checks against
1515 possible deadlocks. It will also make it easier to transition to a different
1516 lock type should we migrate away from python threads.
1521 def __init__(self, nodes, nodegroups, instances):
1522 """Constructs a new GanetiLockManager object.
1524 There should be only a GanetiLockManager object at any time, so this
1525 function raises an error if this is not the case.
1527 @param nodes: list of node names
1528 @param nodegroups: list of nodegroup uuids
1529 @param instances: list of instance names
1532 assert self.__class__._instance is None, \
1533 "double GanetiLockManager instance"
1535 self.__class__._instance = self
1537 self._monitor = LockMonitor()
1539 # The keyring contains all the locks, at their level and in the correct
1542 LEVEL_CLUSTER: LockSet([BGL], "cluster", monitor=self._monitor),
1543 LEVEL_NODE: LockSet(nodes, "node", monitor=self._monitor),
1544 LEVEL_NODE_RES: LockSet(nodes, "node-res", monitor=self._monitor),
1545 LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroup", monitor=self._monitor),
1546 LEVEL_INSTANCE: LockSet(instances, "instance",
1547 monitor=self._monitor),
1550 assert compat.all(ls.name == LEVEL_NAMES[level]
1551 for (level, ls) in self.__keyring.items())
1553 def AddToLockMonitor(self, provider):
1554 """Registers a new lock with the monitor.
1556 See L{LockMonitor.RegisterLock}.
1559 return self._monitor.RegisterLock(provider)
1561 def QueryLocks(self, fields):
1562 """Queries information from all locks.
1564 See L{LockMonitor.QueryLocks}.
1567 return self._monitor.QueryLocks(fields)
1569 def _names(self, level):
1570 """List the lock names at the given level.
1572 This can be used for debugging/testing purposes.
1574 @param level: the level whose list of locks to get
1577 assert level in LEVELS, "Invalid locking level %s" % level
1578 return self.__keyring[level]._names()
1580 def is_owned(self, level):
1581 """Check whether we are owning locks at the given level
1584 return self.__keyring[level].is_owned()
1586 def list_owned(self, level):
1587 """Get the set of owned locks at the given level
1590 return self.__keyring[level].list_owned()
1592 def check_owned(self, level, names, shared=-1):
1593 """Check if locks at a certain level are owned in a specific mode.
1595 @see: L{LockSet.check_owned}
1598 return self.__keyring[level].check_owned(names, shared=shared)
1600 def _upper_owned(self, level):
1601 """Check that we don't own any lock at a level greater than the given one.
1604 # This way of checking only works if LEVELS[i] = i, which we check for in
1606 return compat.any((self.is_owned(l) for l in LEVELS[level + 1:]))
1608 def _BGL_owned(self): # pylint: disable=C0103
1609 """Check if the current thread owns the BGL.
1611 Both an exclusive or a shared acquisition work.
1614 return BGL in self.__keyring[LEVEL_CLUSTER].list_owned()
1617 def _contains_BGL(level, names): # pylint: disable=C0103
1618 """Check if the level contains the BGL.
1620 Check if acting on the given level and set of names will change
1621 the status of the Big Ganeti Lock.
1624 return level == LEVEL_CLUSTER and (names is None or BGL in names)
1626 def acquire(self, level, names, timeout=None, shared=0, priority=None):
1627 """Acquire a set of resource locks, at the same level.
1629 @type level: member of locking.LEVELS
1630 @param level: the level at which the locks shall be acquired
1631 @type names: list of strings (or string)
1632 @param names: the names of the locks which shall be acquired
1633 (special lock names, or instance/node names)
1634 @type shared: integer (0/1) used as a boolean
1635 @param shared: whether to acquire in shared mode; by default
1636 an exclusive lock will be acquired
1637 @type timeout: float
1638 @param timeout: Maximum time to acquire all locks
1639 @type priority: integer
1640 @param priority: Priority for acquiring lock
1643 assert level in LEVELS, "Invalid locking level %s" % level
1645 # Check that we are either acquiring the Big Ganeti Lock or we already own
1646 # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1647 # so even if we've migrated we need to at least share the BGL to be
1648 # compatible with them. Of course if we own the BGL exclusively there's no
1649 # point in acquiring any other lock, unless perhaps we are half way through
1650 # the migration of the current opcode.
1651 assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1652 "You must own the Big Ganeti Lock before acquiring any other")
1654 # Check we don't own locks at the same or upper levels.
1655 assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1656 " while owning some at a greater one")
1658 # Acquire the locks in the set.
1659 return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1662 def downgrade(self, level, names=None):
1663 """Downgrade a set of resource locks from exclusive to shared mode.
1665 You must have acquired the locks in exclusive mode.
1667 @type level: member of locking.LEVELS
1668 @param level: the level at which the locks shall be downgraded
1669 @type names: list of strings, or None
1670 @param names: the names of the locks which shall be downgraded
1671 (defaults to all the locks acquired at the level)
1674 assert level in LEVELS, "Invalid locking level %s" % level
1676 return self.__keyring[level].downgrade(names=names)
1678 def release(self, level, names=None):
1679 """Release a set of resource locks, at the same level.
1681 You must have acquired the locks, either in shared or in exclusive
1682 mode, before releasing them.
1684 @type level: member of locking.LEVELS
1685 @param level: the level at which the locks shall be released
1686 @type names: list of strings, or None
1687 @param names: the names of the locks which shall be released
1688 (defaults to all the locks acquired at that level)
1691 assert level in LEVELS, "Invalid locking level %s" % level
1692 assert (not self._contains_BGL(level, names) or
1693 not self._upper_owned(LEVEL_CLUSTER)), (
1694 "Cannot release the Big Ganeti Lock while holding something"
1695 " at upper levels (%r)" %
1696 (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self.list_owned(i))
1697 for i in self.__keyring.keys()]), ))
1699 # Release will complain if we don't own the locks already
1700 return self.__keyring[level].release(names)
1702 def add(self, level, names, acquired=0, shared=0):
1703 """Add locks at the specified level.
1705 @type level: member of locking.LEVELS_MOD
1706 @param level: the level at which the locks shall be added
1707 @type names: list of strings
1708 @param names: names of the locks to acquire
1709 @type acquired: integer (0/1) used as a boolean
1710 @param acquired: whether to acquire the newly added locks
1711 @type shared: integer (0/1) used as a boolean
1712 @param shared: whether the acquisition will be shared
1715 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1716 assert self._BGL_owned(), ("You must own the BGL before performing other"
1718 assert not self._upper_owned(level), ("Cannot add locks at a level"
1719 " while owning some at a greater one")
1720 return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1722 def remove(self, level, names):
1723 """Remove locks from the specified level.
1725 You must either already own the locks you are trying to remove
1726 exclusively or not own any lock at an upper level.
1728 @type level: member of locking.LEVELS_MOD
1729 @param level: the level at which the locks shall be removed
1730 @type names: list of strings
1731 @param names: the names of the locks which shall be removed
1732 (special lock names, or instance/node names)
1735 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1736 assert self._BGL_owned(), ("You must own the BGL before performing other"
1738 # Check we either own the level or don't own anything from here
1739 # up. LockSet.remove() will check the case in which we don't own
1740 # all the needed resources, or we have a shared ownership.
1741 assert self.is_owned(level) or not self._upper_owned(level), (
1742 "Cannot remove locks at a level while not owning it or"
1743 " owning some at a greater one")
1744 return self.__keyring[level].remove(names)
1747 def _MonitorSortKey((item, idx, num)):
1748 """Sorting key function.
1750 Sort by name, registration order and then order of information. This provides
1751 a stable sort order over different providers, even if they return the same
1755 (name, _, _, _) = item
1757 return (utils.NiceSortKey(name), num, idx)
1760 class LockMonitor(object):
1761 _LOCK_ATTR = "_lock"
1764 """Initializes this class.
1767 self._lock = SharedLock("LockMonitor")
1769 # Counter for stable sorting
1770 self._counter = itertools.count(0)
1772 # Tracked locks. Weak references are used to avoid issues with circular
1773 # references and deletion.
1774 self._locks = weakref.WeakKeyDictionary()
1776 @ssynchronized(_LOCK_ATTR)
1777 def RegisterLock(self, provider):
1778 """Registers a new lock.
1780 @param provider: Object with a callable method named C{GetLockInfo}, taking
1781 a single C{set} containing the requested information items
1782 @note: It would be nicer to only receive the function generating the
1783 requested information but, as it turns out, weak references to bound
1784 methods (e.g. C{self.GetLockInfo}) are tricky; there are several
1785 workarounds, but none of the ones I found works properly in combination
1786 with a standard C{WeakKeyDictionary}
1789 assert provider not in self._locks, "Duplicate registration"
1791 # There used to be a check for duplicate names here. As it turned out, when
1792 # a lock is re-created with the same name in a very short timeframe, the
1793 # previous instance might not yet be removed from the weakref dictionary.
1794 # By keeping track of the order of incoming registrations, a stable sort
1795 # ordering can still be guaranteed.
1797 self._locks[provider] = self._counter.next()
1799 def _GetLockInfo(self, requested):
1800 """Get information from all locks.
1803 # Must hold lock while getting consistent list of tracked items
1804 self._lock.acquire(shared=1)
1806 items = self._locks.items()
1808 self._lock.release()
1810 return [(info, idx, num)
1811 for (provider, num) in items
1812 for (idx, info) in enumerate(provider.GetLockInfo(requested))]
1814 def _Query(self, fields):
1815 """Queries information from all locks.
1817 @type fields: list of strings
1818 @param fields: List of fields to return
1821 qobj = query.Query(query.LOCK_FIELDS, fields)
1823 # Get all data with internal lock held and then sort by name and incoming
1825 lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
1826 key=_MonitorSortKey)
1828 # Extract lock information and build query data
1829 return (qobj, query.LockQueryData(map(compat.fst, lockinfo)))
1831 def QueryLocks(self, fields):
1832 """Queries information from all locks.
1834 @type fields: list of strings
1835 @param fields: List of fields to return
1838 (qobj, ctx) = self._Query(fields)
1840 # Prepare query response
1841 return query.GetQueryResponse(qobj, ctx)