4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Module implementing the Ganeti locking code."""
29 from ganeti import errors
30 from ganeti import utils
33 def ssynchronized(lock, shared=0):
34 """Shared Synchronization decorator.
36 Calls the function holding the given lock, either in exclusive or shared
37 mode. It requires the passed lock to be a SharedLock (or support its
42 def sync_function(*args, **kwargs):
43 lock.acquire(shared=shared)
45 return fn(*args, **kwargs)
52 class _SingleActionPipeConditionWaiter(object):
53 """Callable helper class for _SingleActionPipeCondition.
62 def __init__(self, cond, poller, fd):
63 """Initializes this class.
65 @type cond: L{_SingleActionPipeCondition}
66 @param cond: Parent condition
67 @type poller: select.poll
68 @param poller: Poller object
70 @param fd: File descriptor to wait for
79 def __call__(self, timeout):
80 """Wait for something to happen on the pipe.
82 @type timeout: float or None
83 @param timeout: Timeout for waiting (can be None)
86 start_time = time.time()
87 remaining_time = timeout
89 while timeout is None or remaining_time > 0:
91 result = self._poller.poll(remaining_time)
92 except EnvironmentError, err:
93 if err.errno != errno.EINTR:
97 # Check whether we were notified
98 if result and result[0][0] == self._fd:
101 # Re-calculate timeout if necessary
102 if timeout is not None:
103 remaining_time = start_time + timeout - time.time()
106 class _SingleActionPipeCondition(object):
107 """Wrapper around a pipe for usage inside conditions.
109 This class contains a POSIX pipe(2) and a poller to poll it. The pipe is
110 always allocated when constructing the class. Extra care is taken to always
111 close the file descriptors.
113 An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for
116 Warning: This class is designed to be used as the underlying component of a
117 locking condition, but is not by itself thread safe, and needs to be
118 protected by an external lock.
128 _waiter_class = _SingleActionPipeConditionWaiter
131 """Initializes this class.
134 object.__init__(self)
138 # Just assume the unpacking is successful, otherwise error handling gets
140 (self._read_fd, self._write_fd) = os.pipe()
142 # The poller looks for closure of the write side
143 poller = select.poll()
144 poller.register(self._read_fd, select.POLLHUP)
146 self._poller = poller
148 if self._read_fd is not None:
149 os.close(self._read_fd)
150 if self._write_fd is not None:
151 os.close(self._write_fd)
154 # There should be no code here anymore, otherwise the pipe file descriptors
155 # may be not be cleaned up properly in case of errors.
157 def StartWaiting(self):
158 """Return function to wait for notification.
160 @rtype: L{_SingleActionPipeConditionWaiter}
161 @return: Function to wait for notification
164 assert self._nwaiters >= 0
166 if self._poller is None:
167 raise RuntimeError("Already cleaned up")
169 # Create waiter function and increase number of waiters
170 wait_fn = self._waiter_class(self, self._poller, self._read_fd)
174 def DoneWaiting(self):
175 """Decrement number of waiters and automatic cleanup.
177 Must be called after waiting for a notification.
180 @return: Whether this was the last waiter
183 assert self._nwaiters > 0
187 if self._nwaiters == 0:
194 """Close the writing side of the pipe to notify all waiters.
197 if self._write_fd is None:
198 raise RuntimeError("Can only notify once")
200 os.close(self._write_fd)
201 self._write_fd = None
204 """Close all file descriptors.
207 if self._read_fd is not None:
208 os.close(self._read_fd)
211 if self._write_fd is not None:
212 os.close(self._write_fd)
213 self._write_fd = None
218 """Called on object deletion.
220 Ensure no file descriptors are left open.
226 class _PipeCondition(object):
227 """Group-only non-polling condition with counters.
229 This condition class uses pipes and poll, internally, to be able to wait for
230 notification with a timeout, without resorting to polling. It is almost
231 compatible with Python's threading.Condition, but only supports notifyAll and
232 non-recursive locks. As an additional features it's able to report whether
233 there are any waiting threads.
244 _pipe_class = _SingleActionPipeCondition
246 def __init__(self, lock):
247 """Initializes this class.
250 object.__init__(self)
252 # Recursive locks are not supported
253 assert not hasattr(lock, "_acquire_restore")
254 assert not hasattr(lock, "_release_save")
258 # Export the lock's acquire() and release() methods
259 self.acquire = lock.acquire
260 self.release = lock.release
266 """Check whether lock is owned by current thread.
269 if self._lock.acquire(0):
275 def _check_owned(self):
276 """Raise an exception if the current thread doesn't own the lock.
279 if not self._is_owned():
280 raise RuntimeError("cannot work with un-aquired lock")
282 def wait(self, timeout=None):
283 """Wait for a notification.
285 @type timeout: float or None
286 @param timeout: Waiting timeout (can be None)
292 self._pipe = self._pipe_class()
294 # Keep local reference to the pipe. It could be replaced by another thread
295 # notifying while we're waiting.
298 assert self._nwaiters >= 0
301 # Get function to wait on the pipe
302 wait_fn = pipe.StartWaiting()
304 # Release lock while waiting
307 # Wait for notification
313 # Destroy pipe if this was the last waiter and the current pipe is
314 # still the same. The same pipe cannot be reused after cleanup.
315 if pipe.DoneWaiting() and pipe == self._pipe:
318 assert self._nwaiters > 0
322 """Notify all currently waiting threads.
327 # Notify and forget pipe. A new one will be created on the next call to
329 if self._pipe is not None:
330 self._pipe.notifyAll()
333 def has_waiting(self):
334 """Returns whether there are active waiters.
339 return bool(self._nwaiters)
342 class _CountingCondition(object):
343 """Wrapper for Python's built-in threading.Condition class.
345 This wrapper keeps a count of active waiters. We can't access the internal
346 "__waiters" attribute of threading.Condition because it's not thread-safe.
354 def __init__(self, lock):
355 """Initializes this class.
358 object.__init__(self)
359 self._cond = threading.Condition(lock=lock)
363 """Notifies the condition.
366 return self._cond.notifyAll()
368 def wait(self, timeout=None):
369 """Waits for the condition to be notified.
371 @type timeout: float or None
372 @param timeout: Timeout in seconds
375 assert self._nwaiters >= 0
379 return self._cond.wait(timeout=timeout)
383 def has_waiting(self):
384 """Returns whether there are active waiters.
387 return bool(self._nwaiters)
390 class SharedLock(object):
391 """Implements a shared lock.
393 Multiple threads can acquire the lock in a shared way, calling
394 acquire_shared(). In order to acquire the lock in an exclusive way threads
395 can call acquire_exclusive().
397 The lock prevents starvation but does not guarantee that threads will acquire
398 the shared lock in the order they queued for it, just that they will
412 __condition_class = _CountingCondition
415 """Construct a new SharedLock.
418 object.__init__(self)
421 self.__lock = threading.Lock()
423 # Queue containing waiting acquires
426 # Active and inactive conditions for shared locks
427 self.__active_shr_c = self.__condition_class(self.__lock)
428 self.__inactive_shr_c = self.__condition_class(self.__lock)
430 # Current lock holders
434 # is this lock in the deleted state?
435 self.__deleted = False
437 def __check_deleted(self):
438 """Raises an exception if the lock has been deleted.
442 raise errors.LockError("Deleted lock")
444 def __is_sharer(self):
445 """Is the current thread sharing the lock at this time?
448 return threading.currentThread() in self.__shr
450 def __is_exclusive(self):
451 """Is the current thread holding the lock exclusively at this time?
454 return threading.currentThread() == self.__exc
456 def __is_owned(self, shared=-1):
457 """Is the current thread somehow owning the lock at this time?
459 This is a private version of the function, which presumes you're holding
464 return self.__is_sharer() or self.__is_exclusive()
466 return self.__is_sharer()
468 return self.__is_exclusive()
470 def _is_owned(self, shared=-1):
471 """Is the current thread somehow owning the lock at this time?
474 - < 0: check for any type of ownership (default)
475 - 0: check for exclusive ownership
476 - > 0: check for shared ownership
479 self.__lock.acquire()
481 return self.__is_owned(shared=shared)
483 self.__lock.release()
485 def _count_pending(self):
486 """Returns the number of pending acquires.
491 self.__lock.acquire()
493 return len(self.__pending)
495 self.__lock.release()
497 def __do_acquire(self, shared):
498 """Actually acquire the lock.
502 self.__shr.add(threading.currentThread())
504 self.__exc = threading.currentThread()
506 def __can_acquire(self, shared):
507 """Determine whether lock can be acquired.
511 return self.__exc is None
513 return len(self.__shr) == 0 and self.__exc is None
515 def __is_on_top(self, cond):
516 """Checks whether the passed condition is on top of the queue.
518 The caller must make sure the queue isn't empty.
521 return self.__pending[0] == cond
523 def __acquire_unlocked(self, shared=0, timeout=None):
524 """Acquire a shared lock.
526 @param shared: whether to acquire in shared mode; by default an
527 exclusive lock will be acquired
528 @param timeout: maximum waiting time before giving up
531 self.__check_deleted()
533 # We cannot acquire the lock if we already have it
534 assert not self.__is_owned(), "double acquire() on a non-recursive lock"
536 # Check whether someone else holds the lock or there are pending acquires.
537 if not self.__pending and self.__can_acquire(shared):
538 # Apparently not, can acquire lock directly.
539 self.__do_acquire(shared)
543 wait_condition = self.__active_shr_c
545 # Check if we're not yet in the queue
546 if wait_condition not in self.__pending:
547 self.__pending.append(wait_condition)
549 wait_condition = self.__condition_class(self.__lock)
550 # Always add to queue
551 self.__pending.append(wait_condition)
554 # Wait until we become the topmost acquire in the queue or the timeout
556 while not (self.__is_on_top(wait_condition) and
557 self.__can_acquire(shared)):
558 # Wait for notification
559 wait_condition.wait(timeout)
560 self.__check_deleted()
562 # A lot of code assumes blocking acquires always succeed. Loop
563 # internally for that case.
564 if timeout is not None:
567 if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
568 self.__do_acquire(shared)
571 # Remove condition from queue if there are no more waiters
572 if not wait_condition.has_waiting() and not self.__deleted:
573 self.__pending.remove(wait_condition)
577 def acquire(self, shared=0, timeout=None):
578 """Acquire a shared lock.
581 @param shared: whether to acquire in shared mode; by default an
582 exclusive lock will be acquired
584 @param timeout: maximum waiting time before giving up
587 self.__lock.acquire()
589 return self.__acquire_unlocked(shared, timeout)
591 self.__lock.release()
594 """Release a Shared Lock.
596 You must have acquired the lock, either in shared or in exclusive mode,
597 before calling this function.
600 self.__lock.acquire()
602 assert self.__is_exclusive() or self.__is_sharer(), \
603 "Cannot release non-owned lock"
605 # Autodetect release type
606 if self.__is_exclusive():
609 self.__shr.remove(threading.currentThread())
611 # Notify topmost condition in queue
613 first_condition = self.__pending[0]
614 first_condition.notifyAll()
616 if first_condition == self.__active_shr_c:
617 self.__active_shr_c = self.__inactive_shr_c
618 self.__inactive_shr_c = first_condition
621 self.__lock.release()
623 def delete(self, timeout=None):
624 """Delete a Shared Lock.
626 This operation will declare the lock for removal. First the lock will be
627 acquired in exclusive mode if you don't already own it, then the lock
628 will be put in a state where any future and pending acquire() fail.
631 @param timeout: maximum waiting time before giving up
634 self.__lock.acquire()
636 assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
638 self.__check_deleted()
640 # The caller is allowed to hold the lock exclusively already.
641 acquired = self.__is_exclusive()
644 acquired = self.__acquire_unlocked(timeout)
647 self.__deleted = True
650 # Notify all acquires. They'll throw an error.
651 while self.__pending:
652 self.__pending.pop().notifyAll()
656 self.__lock.release()
659 # Whenever we want to acquire a full LockSet we pass None as the value
660 # to acquire. Hide this behind this nicely named constant.
665 """Implements a set of locks.
667 This abstraction implements a set of shared locks for the same resource type,
668 distinguished by name. The user can lock a subset of the resources and the
669 LockSet will take care of acquiring the locks always in the same order, thus
672 All the locks needed in the same set must be acquired together, though.
675 def __init__(self, members=None):
676 """Constructs a new LockSet.
678 @param members: initial members of the set
681 # Used internally to guarantee coherency.
682 self.__lock = SharedLock()
684 # The lockdict indexes the relationship name -> lock
685 # The order-of-locking is implied by the alphabetical order of names
688 if members is not None:
690 self.__lockdict[name] = SharedLock()
692 # The owner dict contains the set of locks each thread owns. For
693 # performance each thread can access its own key without a global lock on
694 # this structure. It is paramount though that *no* other type of access is
695 # done to this structure (eg. no looping over its keys). *_owner helper
696 # function are defined to guarantee access is correct, but in general never
697 # do anything different than __owners[threading.currentThread()], or there
702 """Is the current thread a current level owner?"""
703 return threading.currentThread() in self.__owners
705 def _add_owned(self, name=None):
706 """Note the current thread owns the given lock"""
708 if not self._is_owned():
709 self.__owners[threading.currentThread()] = set()
712 self.__owners[threading.currentThread()].add(name)
714 self.__owners[threading.currentThread()] = set([name])
716 def _del_owned(self, name=None):
717 """Note the current thread owns the given lock"""
720 self.__owners[threading.currentThread()].remove(name)
722 # Only remove the key if we don't hold the set-lock as well
723 if (not self.__lock._is_owned() and
724 not self.__owners[threading.currentThread()]):
725 del self.__owners[threading.currentThread()]
727 def _list_owned(self):
728 """Get the set of resource names owned by the current thread"""
730 return self.__owners[threading.currentThread()].copy()
735 """Return the current set of names.
737 Only call this function while holding __lock and don't iterate on the
738 result after releasing the lock.
741 return self.__lockdict.keys()
744 """Return a copy of the current set of elements.
746 Used only for debugging purposes.
749 # If we don't already own the set-level lock acquired
750 # we'll get it and note we need to release it later.
752 if not self.__lock._is_owned():
754 self.__lock.acquire(shared=1)
756 result = self.__names()
759 self.__lock.release()
762 def acquire(self, names, blocking=1, shared=0):
763 """Acquire a set of resource locks.
765 @param names: the names of the locks which shall be acquired
766 (special lock names, or instance/node names)
767 @param shared: whether to acquire in shared mode; by default an
768 exclusive lock will be acquired
769 @param blocking: whether to block while trying to acquire or to
770 operate in try-lock mode (this locking mode is not supported yet)
772 @return: True when all the locks are successfully acquired
774 @raise errors.LockError: when any lock we try to acquire has
775 been deleted before we succeed. In this case none of the
776 locks requested will be acquired.
780 # We don't have non-blocking mode for now
781 raise NotImplementedError
783 # Check we don't already own locks at this level
784 assert not self._is_owned(), "Cannot acquire locks in the same set twice"
787 # If no names are given acquire the whole set by not letting new names
788 # being added before we release, and getting the current list of names.
789 # Some of them may then be deleted later, but we'll cope with this.
791 # We'd like to acquire this lock in a shared way, as it's nice if
792 # everybody else can use the instances at the same time. If are acquiring
793 # them exclusively though they won't be able to do this anyway, though,
794 # so we'll get the list lock exclusively as well in order to be able to
795 # do add() on the set while owning it.
796 self.__lock.acquire(shared=shared)
798 # note we own the set-lock
800 names = self.__names()
802 # We shouldn't have problems adding the lock to the owners list, but
803 # if we did we'll try to release this lock and re-raise exception.
804 # Of course something is going to be really wrong, after this.
805 self.__lock.release()
809 # Support passing in a single resource to acquire rather than many
810 if isinstance(names, basestring):
813 names = sorted(names)
816 # First we look the locks up on __lockdict. We have no way of being sure
817 # they will still be there after, but this makes it a lot faster should
818 # just one of them be the already wrong
819 for lname in utils.UniqueSequence(names):
821 lock = self.__lockdict[lname] # raises KeyError if lock is not there
822 acquire_list.append((lname, lock))
824 if self.__lock._is_owned():
825 # We are acquiring all the set, it doesn't matter if this
826 # particular element is not there anymore.
829 raise errors.LockError('non-existing lock in set (%s)' % lname)
831 # This will hold the locknames we effectively acquired.
833 # Now acquire_list contains a sorted list of resources and locks we want.
834 # In order to get them we loop on this (private) list and acquire() them.
835 # We gave no real guarantee they will still exist till this is done but
836 # .acquire() itself is safe and will alert us if the lock gets deleted.
837 for (lname, lock) in acquire_list:
839 lock.acquire(shared=shared) # raises LockError if the lock is deleted
840 # now the lock cannot be deleted, we have it!
841 self._add_owned(name=lname)
843 except (errors.LockError):
844 if self.__lock._is_owned():
845 # We are acquiring all the set, it doesn't matter if this
846 # particular element is not there anymore.
850 for lname in self._list_owned():
851 self.__lockdict[lname].release()
852 self._del_owned(name=lname)
853 raise errors.LockError('non-existing lock in set (%s)' % name_fail)
855 # We shouldn't have problems adding the lock to the owners list, but
856 # if we did we'll try to release this lock and re-raise exception.
857 # Of course something is going to be really wrong, after this.
863 # If something went wrong and we had the set-lock let's release it...
864 if self.__lock._is_owned():
865 self.__lock.release()
870 def release(self, names=None):
871 """Release a set of resource locks, at the same level.
873 You must have acquired the locks, either in shared or in exclusive mode,
874 before releasing them.
876 @param names: the names of the locks which shall be released
877 (defaults to all the locks acquired at that level).
880 assert self._is_owned(), "release() on lock set while not owner"
882 # Support passing in a single resource to release rather than many
883 if isinstance(names, basestring):
887 names = self._list_owned()
890 assert self._list_owned().issuperset(names), (
891 "release() on unheld resources %s" %
892 names.difference(self._list_owned()))
894 # First of all let's release the "all elements" lock, if set.
895 # After this 'add' can work again
896 if self.__lock._is_owned():
897 self.__lock.release()
900 for lockname in names:
901 # If we are sure the lock doesn't leave __lockdict without being
902 # exclusively held we can do this...
903 self.__lockdict[lockname].release()
904 self._del_owned(name=lockname)
906 def add(self, names, acquired=0, shared=0):
907 """Add a new set of elements to the set
909 @param names: names of the new elements to add
910 @param acquired: pre-acquire the new resource?
911 @param shared: is the pre-acquisition shared?
914 # Check we don't already own locks at this level
915 assert not self._is_owned() or self.__lock._is_owned(shared=0), \
916 "Cannot add locks if the set is only partially owned, or shared"
918 # Support passing in a single resource to add rather than many
919 if isinstance(names, basestring):
922 # If we don't already own the set-level lock acquired in an exclusive way
923 # we'll get it and note we need to release it later.
925 if not self.__lock._is_owned():
927 self.__lock.acquire()
930 invalid_names = set(self.__names()).intersection(names)
932 # This must be an explicit raise, not an assert, because assert is
933 # turned off when using optimization, and this can happen because of
934 # concurrency even if the user doesn't want it.
935 raise errors.LockError("duplicate add() (%s)" % invalid_names)
937 for lockname in names:
941 lock.acquire(shared=shared)
942 # now the lock cannot be deleted, we have it!
944 self._add_owned(name=lockname)
946 # We shouldn't have problems adding the lock to the owners list,
947 # but if we did we'll try to release this lock and re-raise
948 # exception. Of course something is going to be really wrong,
949 # after this. On the other hand the lock hasn't been added to the
950 # __lockdict yet so no other threads should be pending on it. This
951 # release is just a safety measure.
955 self.__lockdict[lockname] = lock
958 # Only release __lock if we were not holding it previously.
960 self.__lock.release()
964 def remove(self, names, blocking=1):
965 """Remove elements from the lock set.
967 You can either not hold anything in the lockset or already hold a superset
968 of the elements you want to delete, exclusively.
970 @param names: names of the resource to remove.
971 @param blocking: whether to block while trying to acquire or to
972 operate in try-lock mode (this locking mode is not supported
973 yet unless you are already holding exclusively the locks)
975 @return:: a list of locks which we removed; the list is always
976 equal to the names list if we were holding all the locks
980 if not blocking and not self._is_owned():
981 # We don't have non-blocking mode for now
982 raise NotImplementedError
984 # Support passing in a single resource to remove rather than many
985 if isinstance(names, basestring):
988 # If we own any subset of this lock it must be a superset of what we want
989 # to delete. The ownership must also be exclusive, but that will be checked
990 # by the lock itself.
991 assert not self._is_owned() or self._list_owned().issuperset(names), (
992 "remove() on acquired lockset while not owning all elements")
997 # Calling delete() acquires the lock exclusively if we don't already own
998 # it, and causes all pending and subsequent lock acquires to fail. It's
999 # fine to call it out of order because delete() also implies release(),
1000 # and the assertion above guarantees that if we either already hold
1001 # everything we want to delete, or we hold none.
1003 self.__lockdict[lname].delete()
1004 removed.append(lname)
1005 except (KeyError, errors.LockError):
1006 # This cannot happen if we were already holding it, verify:
1007 assert not self._is_owned(), "remove failed while holding lockset"
1009 # If no LockError was raised we are the ones who deleted the lock.
1010 # This means we can safely remove it from lockdict, as any further or
1011 # pending delete() or acquire() will fail (and nobody can have the lock
1012 # since before our call to delete()).
1014 # This is done in an else clause because if the exception was thrown
1015 # it's the job of the one who actually deleted it.
1016 del self.__lockdict[lname]
1017 # And let's remove it from our private list if we owned it.
1018 if self._is_owned():
1019 self._del_owned(name=lname)
1024 # Locking levels, must be acquired in increasing order.
1025 # Current rules are:
1026 # - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1027 # acquired before performing any operation, either in shared or in exclusive
1028 # mode. acquiring the BGL in exclusive mode is discouraged and should be
1030 # - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1031 # If you need more than one node, or more than one instance, acquire them at
1037 LEVELS = [LEVEL_CLUSTER,
1041 # Lock levels which are modifiable
1042 LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1045 LEVEL_CLUSTER: "cluster",
1046 LEVEL_INSTANCE: "instance",
1050 # Constant for the big ganeti lock
1054 class GanetiLockManager:
1055 """The Ganeti Locking Library
1057 The purpose of this small library is to manage locking for ganeti clusters
1058 in a central place, while at the same time doing dynamic checks against
1059 possible deadlocks. It will also make it easier to transition to a different
1060 lock type should we migrate away from python threads.
1065 def __init__(self, nodes=None, instances=None):
1066 """Constructs a new GanetiLockManager object.
1068 There should be only a GanetiLockManager object at any time, so this
1069 function raises an error if this is not the case.
1071 @param nodes: list of node names
1072 @param instances: list of instance names
1075 assert self.__class__._instance is None, \
1076 "double GanetiLockManager instance"
1078 self.__class__._instance = self
1080 # The keyring contains all the locks, at their level and in the correct
1083 LEVEL_CLUSTER: LockSet([BGL]),
1084 LEVEL_NODE: LockSet(nodes),
1085 LEVEL_INSTANCE: LockSet(instances),
1088 def _names(self, level):
1089 """List the lock names at the given level.
1091 This can be used for debugging/testing purposes.
1093 @param level: the level whose list of locks to get
1096 assert level in LEVELS, "Invalid locking level %s" % level
1097 return self.__keyring[level]._names()
1099 def _is_owned(self, level):
1100 """Check whether we are owning locks at the given level
1103 return self.__keyring[level]._is_owned()
1105 is_owned = _is_owned
1107 def _list_owned(self, level):
1108 """Get the set of owned locks at the given level
1111 return self.__keyring[level]._list_owned()
1113 def _upper_owned(self, level):
1114 """Check that we don't own any lock at a level greater than the given one.
1117 # This way of checking only works if LEVELS[i] = i, which we check for in
1119 return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1121 def _BGL_owned(self):
1122 """Check if the current thread owns the BGL.
1124 Both an exclusive or a shared acquisition work.
1127 return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1129 def _contains_BGL(self, level, names):
1130 """Check if the level contains the BGL.
1132 Check if acting on the given level and set of names will change
1133 the status of the Big Ganeti Lock.
1136 return level == LEVEL_CLUSTER and (names is None or BGL in names)
1138 def acquire(self, level, names, blocking=1, shared=0):
1139 """Acquire a set of resource locks, at the same level.
1141 @param level: the level at which the locks shall be acquired;
1142 it must be a member of LEVELS.
1143 @param names: the names of the locks which shall be acquired
1144 (special lock names, or instance/node names)
1145 @param shared: whether to acquire in shared mode; by default
1146 an exclusive lock will be acquired
1147 @param blocking: whether to block while trying to acquire or to
1148 operate in try-lock mode (this locking mode is not supported yet)
1151 assert level in LEVELS, "Invalid locking level %s" % level
1153 # Check that we are either acquiring the Big Ganeti Lock or we already own
1154 # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1155 # so even if we've migrated we need to at least share the BGL to be
1156 # compatible with them. Of course if we own the BGL exclusively there's no
1157 # point in acquiring any other lock, unless perhaps we are half way through
1158 # the migration of the current opcode.
1159 assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1160 "You must own the Big Ganeti Lock before acquiring any other")
1162 # Check we don't own locks at the same or upper levels.
1163 assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1164 " while owning some at a greater one")
1166 # Acquire the locks in the set.
1167 return self.__keyring[level].acquire(names, shared=shared,
1170 def release(self, level, names=None):
1171 """Release a set of resource locks, at the same level.
1173 You must have acquired the locks, either in shared or in exclusive
1174 mode, before releasing them.
1176 @param level: the level at which the locks shall be released;
1177 it must be a member of LEVELS
1178 @param names: the names of the locks which shall be released
1179 (defaults to all the locks acquired at that level)
1182 assert level in LEVELS, "Invalid locking level %s" % level
1183 assert (not self._contains_BGL(level, names) or
1184 not self._upper_owned(LEVEL_CLUSTER)), (
1185 "Cannot release the Big Ganeti Lock while holding something"
1188 # Release will complain if we don't own the locks already
1189 return self.__keyring[level].release(names)
1191 def add(self, level, names, acquired=0, shared=0):
1192 """Add locks at the specified level.
1194 @param level: the level at which the locks shall be added;
1195 it must be a member of LEVELS_MOD.
1196 @param names: names of the locks to acquire
1197 @param acquired: whether to acquire the newly added locks
1198 @param shared: whether the acquisition will be shared
1201 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1202 assert self._BGL_owned(), ("You must own the BGL before performing other"
1204 assert not self._upper_owned(level), ("Cannot add locks at a level"
1205 " while owning some at a greater one")
1206 return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1208 def remove(self, level, names, blocking=1):
1209 """Remove locks from the specified level.
1211 You must either already own the locks you are trying to remove
1212 exclusively or not own any lock at an upper level.
1214 @param level: the level at which the locks shall be removed;
1215 it must be a member of LEVELS_MOD
1216 @param names: the names of the locks which shall be removed
1217 (special lock names, or instance/node names)
1218 @param blocking: whether to block while trying to operate in
1219 try-lock mode (this locking mode is not supported yet)
1222 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1223 assert self._BGL_owned(), ("You must own the BGL before performing other"
1225 # Check we either own the level or don't own anything from here
1226 # up. LockSet.remove() will check the case in which we don't own
1227 # all the needed resources, or we have a shared ownership.
1228 assert self._is_owned(level) or not self._upper_owned(level), (
1229 "Cannot remove locks at a level while not owning it or"
1230 " owning some at a greater one")
1231 return self.__keyring[level].remove(names, blocking=blocking)