4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Module implementing the Ganeti locking code."""
23 # pylint: disable=W0212
25 # W0212 since e.g. LockSet methods use (a lot) the internals of
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import compat
40 from ganeti import query
43 _EXCLUSIVE_TEXT = "exclusive"
44 _SHARED_TEXT = "shared"
45 _DELETED_TEXT = "deleted"
50 def ssynchronized(mylock, shared=0):
51 """Shared Synchronization decorator.
53 Calls the function holding the given lock, either in exclusive or shared
54 mode. It requires the passed lock to be a SharedLock (or support its
57 @type mylock: lockable object or string
58 @param mylock: lock to acquire or class member name of the lock to acquire
62 def sync_function(*args, **kwargs):
63 if isinstance(mylock, basestring):
64 assert args, "cannot ssynchronize on non-class method: self not found"
66 lock = getattr(args[0], mylock)
69 lock.acquire(shared=shared)
71 return fn(*args, **kwargs)
78 class _SingleNotifyPipeConditionWaiter(object):
79 """Helper class for SingleNotifyPipeCondition
87 def __init__(self, poller, fd):
88 """Constructor for _SingleNotifyPipeConditionWaiter
90 @type poller: select.poll
91 @param poller: Poller object
93 @param fd: File descriptor to wait for
100 def __call__(self, timeout):
101 """Wait for something to happen on the pipe.
103 @type timeout: float or None
104 @param timeout: Timeout for waiting (can be None)
107 running_timeout = utils.RunningTimeout(timeout, True)
110 remaining_time = running_timeout.Remaining()
112 if remaining_time is not None:
113 if remaining_time < 0.0:
116 # Our calculation uses seconds, poll() wants milliseconds
117 remaining_time *= 1000
120 result = self._poller.poll(remaining_time)
121 except EnvironmentError, err:
122 if err.errno != errno.EINTR:
126 # Check whether we were notified
127 if result and result[0][0] == self._fd:
131 class _BaseCondition(object):
132 """Base class containing common code for conditions.
134 Some of this code is taken from python's threading module.
146 def __init__(self, lock):
147 """Constructor for _BaseCondition.
149 @type lock: threading.Lock
150 @param lock: condition base lock
153 object.__init__(self)
156 self._release_save = lock._release_save
157 except AttributeError:
158 self._release_save = self._base_release_save
160 self._acquire_restore = lock._acquire_restore
161 except AttributeError:
162 self._acquire_restore = self._base_acquire_restore
164 self._is_owned = lock._is_owned
165 except AttributeError:
166 self._is_owned = self._base_is_owned
170 # Export the lock's acquire() and release() methods
171 self.acquire = lock.acquire
172 self.release = lock.release
174 def _base_is_owned(self):
175 """Check whether lock is owned by current thread.
178 if self._lock.acquire(0):
183 def _base_release_save(self):
186 def _base_acquire_restore(self, _):
189 def _check_owned(self):
190 """Raise an exception if the current thread doesn't own the lock.
193 if not self._is_owned():
194 raise RuntimeError("cannot work with un-aquired lock")
197 class SingleNotifyPipeCondition(_BaseCondition):
198 """Condition which can only be notified once.
200 This condition class uses pipes and poll, internally, to be able to wait for
201 notification with a timeout, without resorting to polling. It is almost
202 compatible with Python's threading.Condition, with the following differences:
203 - notifyAll can only be called once, and no wait can happen after that
204 - notify is not supported, only notifyAll
216 _waiter_class = _SingleNotifyPipeConditionWaiter
218 def __init__(self, lock):
219 """Constructor for SingleNotifyPipeCondition
222 _BaseCondition.__init__(self, lock)
224 self._notified = False
226 self._write_fd = None
229 def _check_unnotified(self):
230 """Throws an exception if already notified.
234 raise RuntimeError("cannot use already notified condition")
237 """Cleanup open file descriptors, if any.
240 if self._read_fd is not None:
241 os.close(self._read_fd)
244 if self._write_fd is not None:
245 os.close(self._write_fd)
246 self._write_fd = None
249 def wait(self, timeout):
250 """Wait for a notification.
252 @type timeout: float or None
253 @param timeout: Waiting timeout (can be None)
257 self._check_unnotified()
261 if self._poller is None:
262 (self._read_fd, self._write_fd) = os.pipe()
263 self._poller = select.poll()
264 self._poller.register(self._read_fd, select.POLLHUP)
266 wait_fn = self._waiter_class(self._poller, self._read_fd)
267 state = self._release_save()
269 # Wait for notification
273 self._acquire_restore(state)
276 if self._nwaiters == 0:
279 def notifyAll(self): # pylint: disable=C0103
280 """Close the writing side of the pipe to notify all waiters.
284 self._check_unnotified()
285 self._notified = True
286 if self._write_fd is not None:
287 os.close(self._write_fd)
288 self._write_fd = None
291 class PipeCondition(_BaseCondition):
292 """Group-only non-polling condition with counters.
294 This condition class uses pipes and poll, internally, to be able to wait for
295 notification with a timeout, without resorting to polling. It is almost
296 compatible with Python's threading.Condition, but only supports notifyAll and
297 non-recursive locks. As an additional features it's able to report whether
298 there are any waiting threads.
306 _single_condition_class = SingleNotifyPipeCondition
308 def __init__(self, lock):
309 """Initializes this class.
312 _BaseCondition.__init__(self, lock)
313 self._waiters = set()
314 self._single_condition = self._single_condition_class(self._lock)
316 def wait(self, timeout):
317 """Wait for a notification.
319 @type timeout: float or None
320 @param timeout: Waiting timeout (can be None)
325 # Keep local reference to the pipe. It could be replaced by another thread
326 # notifying while we're waiting.
327 cond = self._single_condition
329 self._waiters.add(threading.currentThread())
334 self._waiters.remove(threading.currentThread())
336 def notifyAll(self): # pylint: disable=C0103
337 """Notify all currently waiting threads.
341 self._single_condition.notifyAll()
342 self._single_condition = self._single_condition_class(self._lock)
344 def get_waiting(self):
345 """Returns a list of all waiting threads.
352 def has_waiting(self):
353 """Returns whether there are active waiters.
358 return bool(self._waiters)
361 return ("<%s.%s waiters=%s at %#x>" %
362 (self.__class__.__module__, self.__class__.__name__,
363 self._waiters, id(self)))
366 class _PipeConditionWithMode(PipeCondition):
371 def __init__(self, lock, shared):
372 """Initializes this class.
376 PipeCondition.__init__(self, lock)
379 class SharedLock(object):
380 """Implements a shared lock.
382 Multiple threads can acquire the lock in a shared way by calling
383 C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
384 threads can call C{acquire(shared=0)}.
386 Notes on data structures: C{__pending} contains a priority queue (heapq) of
387 all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
388 ...]}. Each per-priority queue contains a normal in-order list of conditions
389 to be notified when the lock can be acquired. Shared locks are grouped
390 together by priority and the condition for them is stored in
391 C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
392 references for the per-priority queues indexed by priority for faster access.
395 @ivar name: the name of the lock
410 __condition_class = _PipeConditionWithMode
412 def __init__(self, name, monitor=None):
413 """Construct a new SharedLock.
415 @param name: the name of the lock
416 @type monitor: L{LockMonitor}
417 @param monitor: Lock monitor with which to register
420 object.__init__(self)
425 self.__lock = threading.Lock()
427 # Queue containing waiting acquires
429 self.__pending_by_prio = {}
430 self.__pending_shared = {}
432 # Current lock holders
436 # is this lock in the deleted state?
437 self.__deleted = False
439 # Register with lock monitor
441 logging.debug("Adding lock %s to monitor", name)
442 monitor.RegisterLock(self)
445 return ("<%s.%s name=%s at %#x>" %
446 (self.__class__.__module__, self.__class__.__name__,
447 self.name, id(self)))
449 def GetLockInfo(self, requested):
450 """Retrieves information for querying locks.
453 @param requested: Requested information, see C{query.LQ_*}
456 self.__lock.acquire()
458 # Note: to avoid unintentional race conditions, no references to
459 # modifiable objects should be returned unless they were created in this
464 if query.LQ_MODE in requested:
467 assert not (self.__exc or self.__shr)
469 mode = _EXCLUSIVE_TEXT
473 # Current owner(s) are wanted
474 if query.LQ_OWNER in requested:
481 assert not self.__deleted
482 owner_names = [i.getName() for i in owner]
484 # Pending acquires are wanted
485 if query.LQ_PENDING in requested:
488 # Sorting instead of copying and using heaq functions for simplicity
489 for (_, prioqueue) in sorted(self.__pending):
490 for cond in prioqueue:
492 pendmode = _SHARED_TEXT
494 pendmode = _EXCLUSIVE_TEXT
496 # List of names will be sorted in L{query._GetLockPending}
497 pending.append((pendmode, [i.getName()
498 for i in cond.get_waiting()]))
502 return [(self.name, mode, owner_names, pending)]
504 self.__lock.release()
506 def __check_deleted(self):
507 """Raises an exception if the lock has been deleted.
511 raise errors.LockError("Deleted lock %s" % self.name)
513 def __is_sharer(self):
514 """Is the current thread sharing the lock at this time?
517 return threading.currentThread() in self.__shr
519 def __is_exclusive(self):
520 """Is the current thread holding the lock exclusively at this time?
523 return threading.currentThread() == self.__exc
525 def __is_owned(self, shared=-1):
526 """Is the current thread somehow owning the lock at this time?
528 This is a private version of the function, which presumes you're holding
533 return self.__is_sharer() or self.__is_exclusive()
535 return self.__is_sharer()
537 return self.__is_exclusive()
539 def _is_owned(self, shared=-1):
540 """Is the current thread somehow owning the lock at this time?
543 - < 0: check for any type of ownership (default)
544 - 0: check for exclusive ownership
545 - > 0: check for shared ownership
548 self.__lock.acquire()
550 return self.__is_owned(shared=shared)
552 self.__lock.release()
556 def _count_pending(self):
557 """Returns the number of pending acquires.
562 self.__lock.acquire()
564 return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
566 self.__lock.release()
568 def _check_empty(self):
569 """Checks whether there are any pending acquires.
574 self.__lock.acquire()
576 # Order is important: __find_first_pending_queue modifies __pending
577 (_, prioqueue) = self.__find_first_pending_queue()
579 return not (prioqueue or
581 self.__pending_by_prio or
582 self.__pending_shared)
584 self.__lock.release()
586 def __do_acquire(self, shared):
587 """Actually acquire the lock.
591 self.__shr.add(threading.currentThread())
593 self.__exc = threading.currentThread()
595 def __can_acquire(self, shared):
596 """Determine whether lock can be acquired.
600 return self.__exc is None
602 return len(self.__shr) == 0 and self.__exc is None
604 def __find_first_pending_queue(self):
605 """Tries to find the topmost queued entry with pending acquires.
607 Removes empty entries while going through the list.
610 while self.__pending:
611 (priority, prioqueue) = self.__pending[0]
614 return (priority, prioqueue)
617 heapq.heappop(self.__pending)
618 del self.__pending_by_prio[priority]
619 assert priority not in self.__pending_shared
623 def __is_on_top(self, cond):
624 """Checks whether the passed condition is on top of the queue.
626 The caller must make sure the queue isn't empty.
629 (_, prioqueue) = self.__find_first_pending_queue()
631 return cond == prioqueue[0]
633 def __acquire_unlocked(self, shared, timeout, priority):
634 """Acquire a shared lock.
636 @param shared: whether to acquire in shared mode; by default an
637 exclusive lock will be acquired
638 @param timeout: maximum waiting time before giving up
639 @type priority: integer
640 @param priority: Priority for acquiring lock
643 self.__check_deleted()
645 # We cannot acquire the lock if we already have it
646 assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
649 # Remove empty entries from queue
650 self.__find_first_pending_queue()
652 # Check whether someone else holds the lock or there are pending acquires.
653 if not self.__pending and self.__can_acquire(shared):
654 # Apparently not, can acquire lock directly.
655 self.__do_acquire(shared)
658 prioqueue = self.__pending_by_prio.get(priority, None)
661 # Try to re-use condition for shared acquire
662 wait_condition = self.__pending_shared.get(priority, None)
663 assert (wait_condition is None or
664 (wait_condition.shared and wait_condition in prioqueue))
666 wait_condition = None
668 if wait_condition is None:
669 if prioqueue is None:
670 assert priority not in self.__pending_by_prio
673 heapq.heappush(self.__pending, (priority, prioqueue))
674 self.__pending_by_prio[priority] = prioqueue
676 wait_condition = self.__condition_class(self.__lock, shared)
677 prioqueue.append(wait_condition)
680 # Keep reference for further shared acquires on same priority. This is
681 # better than trying to find it in the list of pending acquires.
682 assert priority not in self.__pending_shared
683 self.__pending_shared[priority] = wait_condition
686 # Wait until we become the topmost acquire in the queue or the timeout
688 # TODO: Decrease timeout with spurious notifications
689 while not (self.__is_on_top(wait_condition) and
690 self.__can_acquire(shared)):
691 # Wait for notification
692 wait_condition.wait(timeout)
693 self.__check_deleted()
695 # A lot of code assumes blocking acquires always succeed. Loop
696 # internally for that case.
697 if timeout is not None:
700 if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
701 self.__do_acquire(shared)
704 # Remove condition from queue if there are no more waiters
705 if not wait_condition.has_waiting():
706 prioqueue.remove(wait_condition)
707 if wait_condition.shared:
708 # Remove from list of shared acquires if it wasn't while releasing
709 # (e.g. on lock deletion)
710 self.__pending_shared.pop(priority, None)
714 def acquire(self, shared=0, timeout=None, priority=None,
716 """Acquire a shared lock.
718 @type shared: integer (0/1) used as a boolean
719 @param shared: whether to acquire in shared mode; by default an
720 exclusive lock will be acquired
722 @param timeout: maximum waiting time before giving up
723 @type priority: integer
724 @param priority: Priority for acquiring lock
725 @type test_notify: callable or None
726 @param test_notify: Special callback function for unittesting
730 priority = _DEFAULT_PRIORITY
732 self.__lock.acquire()
734 # We already got the lock, notify now
735 if __debug__ and callable(test_notify):
738 return self.__acquire_unlocked(shared, timeout, priority)
740 self.__lock.release()
743 """Changes the lock mode from exclusive to shared.
745 Pending acquires in shared mode on the same priority will go ahead.
748 self.__lock.acquire()
750 assert self.__is_owned(), "Lock must be owned"
752 if self.__is_exclusive():
753 # Do nothing if the lock is already acquired in shared mode
757 # Important: pending shared acquires should only jump ahead if there
758 # was a transition from exclusive to shared, otherwise an owner of a
759 # shared lock can keep calling this function to push incoming shared
761 (priority, prioqueue) = self.__find_first_pending_queue()
763 # Is there a pending shared acquire on this priority?
764 cond = self.__pending_shared.pop(priority, None)
767 assert cond in prioqueue
769 # Ensure shared acquire is on top of queue
770 if len(prioqueue) > 1:
771 prioqueue.remove(cond)
772 prioqueue.insert(0, cond)
777 assert not self.__is_exclusive()
778 assert self.__is_sharer()
782 self.__lock.release()
785 """Release a Shared Lock.
787 You must have acquired the lock, either in shared or in exclusive mode,
788 before calling this function.
791 self.__lock.acquire()
793 assert self.__is_exclusive() or self.__is_sharer(), \
794 "Cannot release non-owned lock"
796 # Autodetect release type
797 if self.__is_exclusive():
800 self.__shr.remove(threading.currentThread())
802 # Notify topmost condition in queue
803 (priority, prioqueue) = self.__find_first_pending_queue()
808 # Prevent further shared acquires from sneaking in while waiters are
810 self.__pending_shared.pop(priority, None)
813 self.__lock.release()
815 def delete(self, timeout=None, priority=None):
816 """Delete a Shared Lock.
818 This operation will declare the lock for removal. First the lock will be
819 acquired in exclusive mode if you don't already own it, then the lock
820 will be put in a state where any future and pending acquire() fail.
823 @param timeout: maximum waiting time before giving up
824 @type priority: integer
825 @param priority: Priority for acquiring lock
829 priority = _DEFAULT_PRIORITY
831 self.__lock.acquire()
833 assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
835 self.__check_deleted()
837 # The caller is allowed to hold the lock exclusively already.
838 acquired = self.__is_exclusive()
841 acquired = self.__acquire_unlocked(0, timeout, priority)
843 assert self.__is_exclusive() and not self.__is_sharer(), \
844 "Lock wasn't acquired in exclusive mode"
847 self.__deleted = True
850 assert not (self.__exc or self.__shr), "Found owner during deletion"
852 # Notify all acquires. They'll throw an error.
853 for (_, prioqueue) in self.__pending:
854 for cond in prioqueue:
857 assert self.__deleted
861 self.__lock.release()
863 def _release_save(self):
864 shared = self.__is_sharer()
868 def _acquire_restore(self, shared):
869 self.acquire(shared=shared)
872 # Whenever we want to acquire a full LockSet we pass None as the value
873 # to acquire. Hide this behind this nicely named constant.
877 class _AcquireTimeout(Exception):
878 """Internal exception to abort an acquire on a timeout.
884 """Implements a set of locks.
886 This abstraction implements a set of shared locks for the same resource type,
887 distinguished by name. The user can lock a subset of the resources and the
888 LockSet will take care of acquiring the locks always in the same order, thus
891 All the locks needed in the same set must be acquired together, though.
894 @ivar name: the name of the lockset
897 def __init__(self, members, name, monitor=None):
898 """Constructs a new LockSet.
900 @type members: list of strings
901 @param members: initial members of the set
902 @type monitor: L{LockMonitor}
903 @param monitor: Lock monitor with which to register member locks
906 assert members is not None, "members parameter is not a list"
910 self.__monitor = monitor
912 # Used internally to guarantee coherency
913 self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
915 # The lockdict indexes the relationship name -> lock
916 # The order-of-locking is implied by the alphabetical order of names
919 for mname in members:
920 self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
923 # The owner dict contains the set of locks each thread owns. For
924 # performance each thread can access its own key without a global lock on
925 # this structure. It is paramount though that *no* other type of access is
926 # done to this structure (eg. no looping over its keys). *_owner helper
927 # function are defined to guarantee access is correct, but in general never
928 # do anything different than __owners[threading.currentThread()], or there
932 def _GetLockName(self, mname):
933 """Returns the name for a member lock.
936 return "%s/%s" % (self.name, mname)
939 """Returns the lockset-internal lock.
944 def _get_lockdict(self):
945 """Returns the lockset-internal lock dictionary.
947 Accessing this structure is only safe in single-thread usage or when the
948 lockset-internal lock is held.
951 return self.__lockdict
954 """Is the current thread a current level owner?"""
955 return threading.currentThread() in self.__owners
957 def _add_owned(self, name=None):
958 """Note the current thread owns the given lock"""
960 if not self._is_owned():
961 self.__owners[threading.currentThread()] = set()
964 self.__owners[threading.currentThread()].add(name)
966 self.__owners[threading.currentThread()] = set([name])
968 def _del_owned(self, name=None):
969 """Note the current thread owns the given lock"""
971 assert not (name is None and self.__lock._is_owned()), \
972 "Cannot hold internal lock when deleting owner status"
975 self.__owners[threading.currentThread()].remove(name)
977 # Only remove the key if we don't hold the set-lock as well
978 if (not self.__lock._is_owned() and
979 not self.__owners[threading.currentThread()]):
980 del self.__owners[threading.currentThread()]
982 def _list_owned(self):
983 """Get the set of resource names owned by the current thread"""
985 return self.__owners[threading.currentThread()].copy()
989 def _release_and_delete_owned(self):
990 """Release and delete all resources owned by the current thread"""
991 for lname in self._list_owned():
992 lock = self.__lockdict[lname]
995 self._del_owned(name=lname)
998 """Return the current set of names.
1000 Only call this function while holding __lock and don't iterate on the
1001 result after releasing the lock.
1004 return self.__lockdict.keys()
1007 """Return a copy of the current set of elements.
1009 Used only for debugging purposes.
1012 # If we don't already own the set-level lock acquired
1013 # we'll get it and note we need to release it later.
1014 release_lock = False
1015 if not self.__lock._is_owned():
1017 self.__lock.acquire(shared=1)
1019 result = self.__names()
1022 self.__lock.release()
1025 def acquire(self, names, timeout=None, shared=0, priority=None,
1027 """Acquire a set of resource locks.
1029 @type names: list of strings (or string)
1030 @param names: the names of the locks which shall be acquired
1031 (special lock names, or instance/node names)
1032 @type shared: integer (0/1) used as a boolean
1033 @param shared: whether to acquire in shared mode; by default an
1034 exclusive lock will be acquired
1035 @type timeout: float or None
1036 @param timeout: Maximum time to acquire all locks
1037 @type priority: integer
1038 @param priority: Priority for acquiring locks
1039 @type test_notify: callable or None
1040 @param test_notify: Special callback function for unittesting
1042 @return: Set of all locks successfully acquired or None in case of timeout
1044 @raise errors.LockError: when any lock we try to acquire has
1045 been deleted before we succeed. In this case none of the
1046 locks requested will be acquired.
1049 assert timeout is None or timeout >= 0.0
1051 # Check we don't already own locks at this level
1052 assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
1053 " (lockset %s)" % self.name)
1055 if priority is None:
1056 priority = _DEFAULT_PRIORITY
1058 # We need to keep track of how long we spent waiting for a lock. The
1059 # timeout passed to this function is over all lock acquires.
1060 running_timeout = utils.RunningTimeout(timeout, False)
1063 if names is not None:
1064 # Support passing in a single resource to acquire rather than many
1065 if isinstance(names, basestring):
1068 return self.__acquire_inner(names, False, shared, priority,
1069 running_timeout.Remaining, test_notify)
1072 # If no names are given acquire the whole set by not letting new names
1073 # being added before we release, and getting the current list of names.
1074 # Some of them may then be deleted later, but we'll cope with this.
1076 # We'd like to acquire this lock in a shared way, as it's nice if
1077 # everybody else can use the instances at the same time. If we are
1078 # acquiring them exclusively though they won't be able to do this
1079 # anyway, though, so we'll get the list lock exclusively as well in
1080 # order to be able to do add() on the set while owning it.
1081 if not self.__lock.acquire(shared=shared, priority=priority,
1082 timeout=running_timeout.Remaining()):
1083 raise _AcquireTimeout()
1085 # note we own the set-lock
1088 return self.__acquire_inner(self.__names(), True, shared, priority,
1089 running_timeout.Remaining, test_notify)
1091 # We shouldn't have problems adding the lock to the owners list, but
1092 # if we did we'll try to release this lock and re-raise exception.
1093 # Of course something is going to be really wrong, after this.
1094 self.__lock.release()
1098 except _AcquireTimeout:
1101 def __acquire_inner(self, names, want_all, shared, priority,
1102 timeout_fn, test_notify):
1103 """Inner logic for acquiring a number of locks.
1105 @param names: Names of the locks to be acquired
1106 @param want_all: Whether all locks in the set should be acquired
1107 @param shared: Whether to acquire in shared mode
1108 @param timeout_fn: Function returning remaining timeout
1109 @param priority: Priority for acquiring locks
1110 @param test_notify: Special callback function for unittesting
1115 # First we look the locks up on __lockdict. We have no way of being sure
1116 # they will still be there after, but this makes it a lot faster should
1117 # just one of them be the already wrong. Using a sorted sequence to prevent
1119 for lname in sorted(utils.UniqueSequence(names)):
1121 lock = self.__lockdict[lname] # raises KeyError if lock is not there
1124 # We are acquiring all the set, it doesn't matter if this particular
1125 # element is not there anymore.
1128 raise errors.LockError("Non-existing lock %s in set %s (it may have"
1129 " been removed)" % (lname, self.name))
1131 acquire_list.append((lname, lock))
1133 # This will hold the locknames we effectively acquired.
1137 # Now acquire_list contains a sorted list of resources and locks we
1138 # want. In order to get them we loop on this (private) list and
1139 # acquire() them. We gave no real guarantee they will still exist till
1140 # this is done but .acquire() itself is safe and will alert us if the
1141 # lock gets deleted.
1142 for (lname, lock) in acquire_list:
1143 if __debug__ and callable(test_notify):
1144 test_notify_fn = lambda: test_notify(lname)
1146 test_notify_fn = None
1148 timeout = timeout_fn()
1151 # raises LockError if the lock was deleted
1152 acq_success = lock.acquire(shared=shared, timeout=timeout,
1154 test_notify=test_notify_fn)
1155 except errors.LockError:
1157 # We are acquiring all the set, it doesn't matter if this
1158 # particular element is not there anymore.
1161 raise errors.LockError("Non-existing lock %s in set %s (it may"
1162 " have been removed)" % (lname, self.name))
1165 # Couldn't get lock or timeout occurred
1167 # This shouldn't happen as SharedLock.acquire(timeout=None) is
1169 raise errors.LockError("Failed to get lock %s (set %s)" %
1172 raise _AcquireTimeout()
1175 # now the lock cannot be deleted, we have it!
1176 self._add_owned(name=lname)
1180 # We shouldn't have problems adding the lock to the owners list, but
1181 # if we did we'll try to release this lock and re-raise exception.
1182 # Of course something is going to be really wrong after this.
1183 if lock._is_owned():
1188 # Release all owned locks
1189 self._release_and_delete_owned()
1194 def downgrade(self, names=None):
1195 """Downgrade a set of resource locks from exclusive to shared mode.
1197 The locks must have been acquired in exclusive mode.
1200 assert self._is_owned(), ("downgrade on lockset %s while not owning any"
1201 " lock" % self.name)
1203 # Support passing in a single resource to downgrade rather than many
1204 if isinstance(names, basestring):
1207 owned = self._list_owned()
1213 assert owned.issuperset(names), \
1214 ("downgrade() on unheld resources %s (set %s)" %
1215 (names.difference(owned), self.name))
1217 for lockname in names:
1218 self.__lockdict[lockname].downgrade()
1220 # Do we own the lockset in exclusive mode?
1221 if self.__lock._is_owned(shared=0):
1222 # Have all locks been downgraded?
1223 if not compat.any(lock._is_owned(shared=0)
1224 for lock in self.__lockdict.values()):
1225 self.__lock.downgrade()
1226 assert self.__lock._is_owned(shared=1)
1230 def release(self, names=None):
1231 """Release a set of resource locks, at the same level.
1233 You must have acquired the locks, either in shared or in exclusive mode,
1234 before releasing them.
1236 @type names: list of strings, or None
1237 @param names: the names of the locks which shall be released
1238 (defaults to all the locks acquired at that level).
1241 assert self._is_owned(), ("release() on lock set %s while not owner" %
1244 # Support passing in a single resource to release rather than many
1245 if isinstance(names, basestring):
1249 names = self._list_owned()
1252 assert self._list_owned().issuperset(names), (
1253 "release() on unheld resources %s (set %s)" %
1254 (names.difference(self._list_owned()), self.name))
1256 # First of all let's release the "all elements" lock, if set.
1257 # After this 'add' can work again
1258 if self.__lock._is_owned():
1259 self.__lock.release()
1262 for lockname in names:
1263 # If we are sure the lock doesn't leave __lockdict without being
1264 # exclusively held we can do this...
1265 self.__lockdict[lockname].release()
1266 self._del_owned(name=lockname)
1268 def add(self, names, acquired=0, shared=0):
1269 """Add a new set of elements to the set
1271 @type names: list of strings
1272 @param names: names of the new elements to add
1273 @type acquired: integer (0/1) used as a boolean
1274 @param acquired: pre-acquire the new resource?
1275 @type shared: integer (0/1) used as a boolean
1276 @param shared: is the pre-acquisition shared?
1279 # Check we don't already own locks at this level
1280 assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1281 ("Cannot add locks if the set %s is only partially owned, or shared" %
1284 # Support passing in a single resource to add rather than many
1285 if isinstance(names, basestring):
1288 # If we don't already own the set-level lock acquired in an exclusive way
1289 # we'll get it and note we need to release it later.
1290 release_lock = False
1291 if not self.__lock._is_owned():
1293 self.__lock.acquire()
1296 invalid_names = set(self.__names()).intersection(names)
1298 # This must be an explicit raise, not an assert, because assert is
1299 # turned off when using optimization, and this can happen because of
1300 # concurrency even if the user doesn't want it.
1301 raise errors.LockError("duplicate add(%s) on lockset %s" %
1302 (invalid_names, self.name))
1304 for lockname in names:
1305 lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1308 # No need for priority or timeout here as this lock has just been
1310 lock.acquire(shared=shared)
1311 # now the lock cannot be deleted, we have it!
1313 self._add_owned(name=lockname)
1315 # We shouldn't have problems adding the lock to the owners list,
1316 # but if we did we'll try to release this lock and re-raise
1317 # exception. Of course something is going to be really wrong,
1318 # after this. On the other hand the lock hasn't been added to the
1319 # __lockdict yet so no other threads should be pending on it. This
1320 # release is just a safety measure.
1324 self.__lockdict[lockname] = lock
1327 # Only release __lock if we were not holding it previously.
1329 self.__lock.release()
1333 def remove(self, names):
1334 """Remove elements from the lock set.
1336 You can either not hold anything in the lockset or already hold a superset
1337 of the elements you want to delete, exclusively.
1339 @type names: list of strings
1340 @param names: names of the resource to remove.
1342 @return: a list of locks which we removed; the list is always
1343 equal to the names list if we were holding all the locks
1347 # Support passing in a single resource to remove rather than many
1348 if isinstance(names, basestring):
1351 # If we own any subset of this lock it must be a superset of what we want
1352 # to delete. The ownership must also be exclusive, but that will be checked
1353 # by the lock itself.
1354 assert not self._is_owned() or self._list_owned().issuperset(names), (
1355 "remove() on acquired lockset %s while not owning all elements" %
1361 # Calling delete() acquires the lock exclusively if we don't already own
1362 # it, and causes all pending and subsequent lock acquires to fail. It's
1363 # fine to call it out of order because delete() also implies release(),
1364 # and the assertion above guarantees that if we either already hold
1365 # everything we want to delete, or we hold none.
1367 self.__lockdict[lname].delete()
1368 removed.append(lname)
1369 except (KeyError, errors.LockError):
1370 # This cannot happen if we were already holding it, verify:
1371 assert not self._is_owned(), ("remove failed while holding lockset %s"
1374 # If no LockError was raised we are the ones who deleted the lock.
1375 # This means we can safely remove it from lockdict, as any further or
1376 # pending delete() or acquire() will fail (and nobody can have the lock
1377 # since before our call to delete()).
1379 # This is done in an else clause because if the exception was thrown
1380 # it's the job of the one who actually deleted it.
1381 del self.__lockdict[lname]
1382 # And let's remove it from our private list if we owned it.
1383 if self._is_owned():
1384 self._del_owned(name=lname)
1389 # Locking levels, must be acquired in increasing order.
1390 # Current rules are:
1391 # - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1392 # acquired before performing any operation, either in shared or in exclusive
1393 # mode. acquiring the BGL in exclusive mode is discouraged and should be
1395 # - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1396 # If you need more than one node, or more than one instance, acquire them at
1403 LEVELS = [LEVEL_CLUSTER,
1408 # Lock levels which are modifiable
1409 LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE]
1412 LEVEL_CLUSTER: "cluster",
1413 LEVEL_INSTANCE: "instance",
1414 LEVEL_NODEGROUP: "nodegroup",
1418 # Constant for the big ganeti lock
1422 class GanetiLockManager:
1423 """The Ganeti Locking Library
1425 The purpose of this small library is to manage locking for ganeti clusters
1426 in a central place, while at the same time doing dynamic checks against
1427 possible deadlocks. It will also make it easier to transition to a different
1428 lock type should we migrate away from python threads.
1433 def __init__(self, nodes, nodegroups, instances):
1434 """Constructs a new GanetiLockManager object.
1436 There should be only a GanetiLockManager object at any time, so this
1437 function raises an error if this is not the case.
1439 @param nodes: list of node names
1440 @param nodegroups: list of nodegroup uuids
1441 @param instances: list of instance names
1444 assert self.__class__._instance is None, \
1445 "double GanetiLockManager instance"
1447 self.__class__._instance = self
1449 self._monitor = LockMonitor()
1451 # The keyring contains all the locks, at their level and in the correct
1454 LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1455 LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1456 LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor),
1457 LEVEL_INSTANCE: LockSet(instances, "instances",
1458 monitor=self._monitor),
1461 def AddToLockMonitor(self, provider):
1462 """Registers a new lock with the monitor.
1464 See L{LockMonitor.RegisterLock}.
1467 return self._monitor.RegisterLock(provider)
1469 def QueryLocks(self, fields):
1470 """Queries information from all locks.
1472 See L{LockMonitor.QueryLocks}.
1475 return self._monitor.QueryLocks(fields)
1477 def OldStyleQueryLocks(self, fields):
1478 """Queries information from all locks, returning old-style data.
1480 See L{LockMonitor.OldStyleQueryLocks}.
1483 return self._monitor.OldStyleQueryLocks(fields)
1485 def _names(self, level):
1486 """List the lock names at the given level.
1488 This can be used for debugging/testing purposes.
1490 @param level: the level whose list of locks to get
1493 assert level in LEVELS, "Invalid locking level %s" % level
1494 return self.__keyring[level]._names()
1496 def _is_owned(self, level):
1497 """Check whether we are owning locks at the given level
1500 return self.__keyring[level]._is_owned()
1502 is_owned = _is_owned
1504 def _list_owned(self, level):
1505 """Get the set of owned locks at the given level
1508 return self.__keyring[level]._list_owned()
1510 list_owned = _list_owned
1512 def _upper_owned(self, level):
1513 """Check that we don't own any lock at a level greater than the given one.
1516 # This way of checking only works if LEVELS[i] = i, which we check for in
1518 return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1520 def _BGL_owned(self): # pylint: disable=C0103
1521 """Check if the current thread owns the BGL.
1523 Both an exclusive or a shared acquisition work.
1526 return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1529 def _contains_BGL(level, names): # pylint: disable=C0103
1530 """Check if the level contains the BGL.
1532 Check if acting on the given level and set of names will change
1533 the status of the Big Ganeti Lock.
1536 return level == LEVEL_CLUSTER and (names is None or BGL in names)
1538 def acquire(self, level, names, timeout=None, shared=0, priority=None):
1539 """Acquire a set of resource locks, at the same level.
1541 @type level: member of locking.LEVELS
1542 @param level: the level at which the locks shall be acquired
1543 @type names: list of strings (or string)
1544 @param names: the names of the locks which shall be acquired
1545 (special lock names, or instance/node names)
1546 @type shared: integer (0/1) used as a boolean
1547 @param shared: whether to acquire in shared mode; by default
1548 an exclusive lock will be acquired
1549 @type timeout: float
1550 @param timeout: Maximum time to acquire all locks
1551 @type priority: integer
1552 @param priority: Priority for acquiring lock
1555 assert level in LEVELS, "Invalid locking level %s" % level
1557 # Check that we are either acquiring the Big Ganeti Lock or we already own
1558 # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1559 # so even if we've migrated we need to at least share the BGL to be
1560 # compatible with them. Of course if we own the BGL exclusively there's no
1561 # point in acquiring any other lock, unless perhaps we are half way through
1562 # the migration of the current opcode.
1563 assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1564 "You must own the Big Ganeti Lock before acquiring any other")
1566 # Check we don't own locks at the same or upper levels.
1567 assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1568 " while owning some at a greater one")
1570 # Acquire the locks in the set.
1571 return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1574 def downgrade(self, level, names=None):
1575 """Downgrade a set of resource locks from exclusive to shared mode.
1577 You must have acquired the locks in exclusive mode.
1579 @type level: member of locking.LEVELS
1580 @param level: the level at which the locks shall be downgraded
1581 @type names: list of strings, or None
1582 @param names: the names of the locks which shall be downgraded
1583 (defaults to all the locks acquired at the level)
1586 assert level in LEVELS, "Invalid locking level %s" % level
1588 return self.__keyring[level].downgrade(names=names)
1590 def release(self, level, names=None):
1591 """Release a set of resource locks, at the same level.
1593 You must have acquired the locks, either in shared or in exclusive
1594 mode, before releasing them.
1596 @type level: member of locking.LEVELS
1597 @param level: the level at which the locks shall be released
1598 @type names: list of strings, or None
1599 @param names: the names of the locks which shall be released
1600 (defaults to all the locks acquired at that level)
1603 assert level in LEVELS, "Invalid locking level %s" % level
1604 assert (not self._contains_BGL(level, names) or
1605 not self._upper_owned(LEVEL_CLUSTER)), (
1606 "Cannot release the Big Ganeti Lock while holding something"
1607 " at upper levels (%r)" %
1608 (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1609 for i in self.__keyring.keys()]), ))
1611 # Release will complain if we don't own the locks already
1612 return self.__keyring[level].release(names)
1614 def add(self, level, names, acquired=0, shared=0):
1615 """Add locks at the specified level.
1617 @type level: member of locking.LEVELS_MOD
1618 @param level: the level at which the locks shall be added
1619 @type names: list of strings
1620 @param names: names of the locks to acquire
1621 @type acquired: integer (0/1) used as a boolean
1622 @param acquired: whether to acquire the newly added locks
1623 @type shared: integer (0/1) used as a boolean
1624 @param shared: whether the acquisition will be shared
1627 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1628 assert self._BGL_owned(), ("You must own the BGL before performing other"
1630 assert not self._upper_owned(level), ("Cannot add locks at a level"
1631 " while owning some at a greater one")
1632 return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1634 def remove(self, level, names):
1635 """Remove locks from the specified level.
1637 You must either already own the locks you are trying to remove
1638 exclusively or not own any lock at an upper level.
1640 @type level: member of locking.LEVELS_MOD
1641 @param level: the level at which the locks shall be removed
1642 @type names: list of strings
1643 @param names: the names of the locks which shall be removed
1644 (special lock names, or instance/node names)
1647 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1648 assert self._BGL_owned(), ("You must own the BGL before performing other"
1650 # Check we either own the level or don't own anything from here
1651 # up. LockSet.remove() will check the case in which we don't own
1652 # all the needed resources, or we have a shared ownership.
1653 assert self._is_owned(level) or not self._upper_owned(level), (
1654 "Cannot remove locks at a level while not owning it or"
1655 " owning some at a greater one")
1656 return self.__keyring[level].remove(names)
1659 def _MonitorSortKey((item, idx, num)):
1660 """Sorting key function.
1662 Sort by name, registration order and then order of information. This provides
1663 a stable sort order over different providers, even if they return the same
1667 (name, _, _, _) = item
1669 return (utils.NiceSortKey(name), num, idx)
1672 class LockMonitor(object):
1673 _LOCK_ATTR = "_lock"
1676 """Initializes this class.
1679 self._lock = SharedLock("LockMonitor")
1681 # Counter for stable sorting
1682 self._counter = itertools.count(0)
1684 # Tracked locks. Weak references are used to avoid issues with circular
1685 # references and deletion.
1686 self._locks = weakref.WeakKeyDictionary()
1688 @ssynchronized(_LOCK_ATTR)
1689 def RegisterLock(self, provider):
1690 """Registers a new lock.
1692 @param provider: Object with a callable method named C{GetLockInfo}, taking
1693 a single C{set} containing the requested information items
1694 @note: It would be nicer to only receive the function generating the
1695 requested information but, as it turns out, weak references to bound
1696 methods (e.g. C{self.GetLockInfo}) are tricky; there are several
1697 workarounds, but none of the ones I found works properly in combination
1698 with a standard C{WeakKeyDictionary}
1701 assert provider not in self._locks, "Duplicate registration"
1703 # There used to be a check for duplicate names here. As it turned out, when
1704 # a lock is re-created with the same name in a very short timeframe, the
1705 # previous instance might not yet be removed from the weakref dictionary.
1706 # By keeping track of the order of incoming registrations, a stable sort
1707 # ordering can still be guaranteed.
1709 self._locks[provider] = self._counter.next()
1711 def _GetLockInfo(self, requested):
1712 """Get information from all locks.
1715 # Must hold lock while getting consistent list of tracked items
1716 self._lock.acquire(shared=1)
1718 items = self._locks.items()
1720 self._lock.release()
1722 return [(info, idx, num)
1723 for (provider, num) in items
1724 for (idx, info) in enumerate(provider.GetLockInfo(requested))]
1726 def _Query(self, fields):
1727 """Queries information from all locks.
1729 @type fields: list of strings
1730 @param fields: List of fields to return
1733 qobj = query.Query(query.LOCK_FIELDS, fields)
1735 # Get all data with internal lock held and then sort by name and incoming
1737 lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
1738 key=_MonitorSortKey)
1740 # Extract lock information and build query data
1741 return (qobj, query.LockQueryData(map(compat.fst, lockinfo)))
1743 def QueryLocks(self, fields):
1744 """Queries information from all locks.
1746 @type fields: list of strings
1747 @param fields: List of fields to return
1750 (qobj, ctx) = self._Query(fields)
1752 # Prepare query response
1753 return query.GetQueryResponse(qobj, ctx)
1755 def OldStyleQueryLocks(self, fields):
1756 """Queries information from all locks, returning old-style data.
1758 @type fields: list of strings
1759 @param fields: List of fields to return
1762 (qobj, ctx) = self._Query(fields)
1764 return qobj.OldStyleQuery(ctx)