#
#
-# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
import logging
import heapq
import itertools
+import time
from ganeti import errors
from ganeti import utils
_DEFAULT_PRIORITY = 0
+#: Minimum timeout required to consider scheduling a pending acquisition
+#: (seconds)
+_LOCK_ACQUIRE_MIN_TIMEOUT = (1.0 / 1000)
+
+# Internal lock acquisition modes for L{LockSet}
+(_LS_ACQUIRE_EXACT,
+ _LS_ACQUIRE_ALL,
+ _LS_ACQUIRE_OPPORTUNISTIC) = range(1, 4)
+
+_LS_ACQUIRE_MODES = compat.UniqueFrozenset([
+ _LS_ACQUIRE_EXACT,
+ _LS_ACQUIRE_ALL,
+ _LS_ACQUIRE_OPPORTUNISTIC,
+ ])
+
def ssynchronized(mylock, shared=0):
"""Shared Synchronization decorator.
except AttributeError:
self._acquire_restore = self._base_acquire_restore
try:
- self._is_owned = lock._is_owned
+ self._is_owned = lock.is_owned
except AttributeError:
self._is_owned = self._base_is_owned
return bool(self._waiters)
+ def __repr__(self):
+ return ("<%s.%s waiters=%s at %#x>" %
+ (self.__class__.__module__, self.__class__.__name__,
+ self._waiters, id(self)))
+
class _PipeConditionWithMode(PipeCondition):
__slots__ = [
"__pending_by_prio",
"__pending_shared",
"__shr",
+ "__time_fn",
"name",
]
__condition_class = _PipeConditionWithMode
- def __init__(self, name, monitor=None):
+ def __init__(self, name, monitor=None, _time_fn=time.time):
"""Construct a new SharedLock.
@param name: the name of the lock
self.name = name
+ # Used for unittesting
+ self.__time_fn = _time_fn
+
# Internal lock
self.__lock = threading.Lock()
logging.debug("Adding lock %s to monitor", name)
monitor.RegisterLock(self)
+ def __repr__(self):
+ return ("<%s.%s name=%s at %#x>" %
+ (self.__class__.__module__, self.__class__.__name__,
+ self.name, id(self)))
+
def GetLockInfo(self, requested):
"""Retrieves information for querying locks.
else:
return self.__is_exclusive()
- def _is_owned(self, shared=-1):
+ def is_owned(self, shared=-1):
"""Is the current thread somehow owning the lock at this time?
@param shared:
finally:
self.__lock.release()
- is_owned = _is_owned
+ #: Necessary to remain compatible with threading.Condition, which tries to
+ #: retrieve a locks' "_is_owned" attribute
+ _is_owned = is_owned
def _count_pending(self):
"""Returns the number of pending acquires.
self.__do_acquire(shared)
return True
+ # The lock couldn't be acquired right away, so if a timeout is given and is
+ # considered too short, return right away as scheduling a pending
+ # acquisition is quite expensive
+ if timeout is not None and timeout < _LOCK_ACQUIRE_MIN_TIMEOUT:
+ return False
+
prioqueue = self.__pending_by_prio.get(priority, None)
if shared:
assert priority not in self.__pending_shared
self.__pending_shared[priority] = wait_condition
+ wait_start = self.__time_fn()
+ acquired = False
+
try:
# Wait until we become the topmost acquire in the queue or the timeout
# expires.
- # TODO: Decrease timeout with spurious notifications
- while not (self.__is_on_top(wait_condition) and
- self.__can_acquire(shared)):
- # Wait for notification
- wait_condition.wait(timeout)
- self.__check_deleted()
+ while True:
+ if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
+ self.__do_acquire(shared)
+ acquired = True
+ break
- # A lot of code assumes blocking acquires always succeed. Loop
- # internally for that case.
- if timeout is not None:
+ # A lot of code assumes blocking acquires always succeed, therefore we
+ # can never return False for a blocking acquire
+ if (timeout is not None and
+ utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)):
break
- if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
- self.__do_acquire(shared)
- return True
+ # Wait for notification
+ wait_condition.wait(timeout)
+ self.__check_deleted()
finally:
# Remove condition from queue if there are no more waiters
if not wait_condition.has_waiting():
# (e.g. on lock deletion)
self.__pending_shared.pop(priority, None)
- return False
+ return acquired
def acquire(self, shared=0, timeout=None, priority=None,
test_notify=None):
# Autodetect release type
if self.__is_exclusive():
self.__exc = None
+ notify = True
else:
self.__shr.remove(threading.currentThread())
+ notify = not self.__shr
- # Notify topmost condition in queue
- (priority, prioqueue) = self.__find_first_pending_queue()
- if prioqueue:
- cond = prioqueue[0]
- cond.notifyAll()
- if cond.shared:
- # Prevent further shared acquires from sneaking in while waiters are
- # notified
- self.__pending_shared.pop(priority, None)
+ # Notify topmost condition in queue if there are no owners left (for
+ # shared locks)
+ if notify:
+ self.__notify_topmost()
+ finally:
+ self.__lock.release()
+
+ def __notify_topmost(self):
+ """Notifies topmost condition in queue of pending acquires.
+
+ """
+ (priority, prioqueue) = self.__find_first_pending_queue()
+ if prioqueue:
+ cond = prioqueue[0]
+ cond.notifyAll()
+ if cond.shared:
+ # Prevent further shared acquires from sneaking in while waiters are
+ # notified
+ self.__pending_shared.pop(priority, None)
+
+ def _notify_topmost(self):
+ """Exported version of L{__notify_topmost}.
+ """
+ self.__lock.acquire()
+ try:
+ return self.__notify_topmost()
finally:
self.__lock.release()
if not acquired:
acquired = self.__acquire_unlocked(0, timeout, priority)
+ if acquired:
assert self.__is_exclusive() and not self.__is_sharer(), \
"Lock wasn't acquired in exclusive mode"
- if acquired:
self.__deleted = True
self.__exc = None
ALL_SET = None
+def _TimeoutZero():
+ """Returns the number zero.
+
+ """
+ return 0
+
+
+def _GetLsAcquireModeAndTimeouts(want_all, timeout, opportunistic):
+ """Determines modes and timeouts for L{LockSet.acquire}.
+
+ @type want_all: boolean
+ @param want_all: Whether all locks in set should be acquired
+ @param timeout: Timeout in seconds or C{None}
+ @param opportunistic: Whther locks should be acquired opportunistically
+ @rtype: tuple
+ @return: Tuple containing mode to be passed to L{LockSet.__acquire_inner}
+ (one of L{_LS_ACQUIRE_MODES}), a function to calculate timeout for
+ acquiring the lockset-internal lock (might be C{None}) and a function to
+ calculate the timeout for acquiring individual locks
+
+ """
+ # Short circuit when no running timeout is needed
+ if opportunistic and not want_all:
+ assert timeout is None, "Got timeout for an opportunistic acquisition"
+ return (_LS_ACQUIRE_OPPORTUNISTIC, None, _TimeoutZero)
+
+ # We need to keep track of how long we spent waiting for a lock. The
+ # timeout passed to this function is over all lock acquisitions.
+ running_timeout = utils.RunningTimeout(timeout, False)
+
+ if want_all:
+ mode = _LS_ACQUIRE_ALL
+ ls_timeout_fn = running_timeout.Remaining
+ else:
+ mode = _LS_ACQUIRE_EXACT
+ ls_timeout_fn = None
+
+ if opportunistic:
+ mode = _LS_ACQUIRE_OPPORTUNISTIC
+ timeout_fn = _TimeoutZero
+ else:
+ timeout_fn = running_timeout.Remaining
+
+ return (mode, ls_timeout_fn, timeout_fn)
+
+
class _AcquireTimeout(Exception):
"""Internal exception to abort an acquire on a timeout.
"""
return self.__lockdict
- def _is_owned(self):
- """Is the current thread a current level owner?"""
+ def is_owned(self):
+ """Is the current thread a current level owner?
+
+ @note: Use L{check_owned} to check if a specific lock is held
+
+ """
return threading.currentThread() in self.__owners
+ def check_owned(self, names, shared=-1):
+ """Check if locks are owned in a specific mode.
+
+ @type names: sequence or string
+ @param names: Lock names (or a single lock name)
+ @param shared: See L{SharedLock.is_owned}
+ @rtype: bool
+ @note: Use L{is_owned} to check if the current thread holds I{any} lock and
+ L{list_owned} to get the names of all owned locks
+
+ """
+ if isinstance(names, basestring):
+ names = [names]
+
+ # Avoid check if no locks are owned anyway
+ if names and self.is_owned():
+ candidates = []
+
+ # Gather references to all locks (in case they're deleted in the meantime)
+ for lname in names:
+ try:
+ lock = self.__lockdict[lname]
+ except KeyError:
+ raise errors.LockError("Non-existing lock '%s' in set '%s' (it may"
+ " have been removed)" % (lname, self.name))
+ else:
+ candidates.append(lock)
+
+ return compat.all(lock.is_owned(shared=shared) for lock in candidates)
+ else:
+ return False
+
+ def owning_all(self):
+ """Checks whether current thread owns internal lock.
+
+ Holding the internal lock is equivalent with holding all locks in the set
+ (the opposite does not necessarily hold as it can not be easily
+ determined). L{add} and L{remove} require the internal lock.
+
+ @rtype: boolean
+
+ """
+ return self.__lock.is_owned()
+
def _add_owned(self, name=None):
"""Note the current thread owns the given lock"""
if name is None:
- if not self._is_owned():
+ if not self.is_owned():
self.__owners[threading.currentThread()] = set()
else:
- if self._is_owned():
+ if self.is_owned():
self.__owners[threading.currentThread()].add(name)
else:
self.__owners[threading.currentThread()] = set([name])
def _del_owned(self, name=None):
"""Note the current thread owns the given lock"""
- assert not (name is None and self.__lock._is_owned()), \
+ assert not (name is None and self.__lock.is_owned()), \
"Cannot hold internal lock when deleting owner status"
if name is not None:
self.__owners[threading.currentThread()].remove(name)
# Only remove the key if we don't hold the set-lock as well
- if (not self.__lock._is_owned() and
- not self.__owners[threading.currentThread()]):
+ if not (self.__lock.is_owned() or
+ self.__owners[threading.currentThread()]):
del self.__owners[threading.currentThread()]
- def _list_owned(self):
+ def list_owned(self):
"""Get the set of resource names owned by the current thread"""
- if self._is_owned():
+ if self.is_owned():
return self.__owners[threading.currentThread()].copy()
else:
return set()
def _release_and_delete_owned(self):
"""Release and delete all resources owned by the current thread"""
- for lname in self._list_owned():
+ for lname in self.list_owned():
lock = self.__lockdict[lname]
- if lock._is_owned():
+ if lock.is_owned():
lock.release()
self._del_owned(name=lname)
# If we don't already own the set-level lock acquired
# we'll get it and note we need to release it later.
release_lock = False
- if not self.__lock._is_owned():
+ if not self.__lock.is_owned():
release_lock = True
self.__lock.acquire(shared=1)
try:
return set(result)
def acquire(self, names, timeout=None, shared=0, priority=None,
- test_notify=None):
+ opportunistic=False, test_notify=None):
"""Acquire a set of resource locks.
+ @note: When acquiring locks opportunistically, any number of locks might
+ actually be acquired, even zero.
+
@type names: list of strings (or string)
@param names: the names of the locks which shall be acquired
(special lock names, or instance/node names)
@param shared: whether to acquire in shared mode; by default an
exclusive lock will be acquired
@type timeout: float or None
- @param timeout: Maximum time to acquire all locks
+ @param timeout: Maximum time to acquire all locks; for opportunistic
+ acquisitions, a timeout can only be given when C{names} is C{None}, in
+ which case it is exclusively used for acquiring the L{LockSet}-internal
+ lock; opportunistic acquisitions don't use a timeout for acquiring
+ individual locks
@type priority: integer
@param priority: Priority for acquiring locks
+ @type opportunistic: boolean
+ @param opportunistic: Acquire locks opportunistically; use the return value
+ to determine which locks were actually acquired
@type test_notify: callable or None
@param test_notify: Special callback function for unittesting
assert timeout is None or timeout >= 0.0
# Check we don't already own locks at this level
- assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
- " (lockset %s)" % self.name)
+ assert not self.is_owned(), ("Cannot acquire locks in the same set twice"
+ " (lockset %s)" % self.name)
if priority is None:
priority = _DEFAULT_PRIORITY
- # We need to keep track of how long we spent waiting for a lock. The
- # timeout passed to this function is over all lock acquires.
- running_timeout = utils.RunningTimeout(timeout, False)
-
try:
if names is not None:
+ assert timeout is None or not opportunistic, \
+ ("Opportunistic acquisitions can only use a timeout if no"
+ " names are given; see docstring for details")
+
# Support passing in a single resource to acquire rather than many
if isinstance(names, basestring):
names = [names]
- return self.__acquire_inner(names, False, shared, priority,
- running_timeout.Remaining, test_notify)
+ (mode, _, timeout_fn) = \
+ _GetLsAcquireModeAndTimeouts(False, timeout, opportunistic)
+
+ return self.__acquire_inner(names, mode, shared, priority,
+ timeout_fn, test_notify)
else:
+ (mode, ls_timeout_fn, timeout_fn) = \
+ _GetLsAcquireModeAndTimeouts(True, timeout, opportunistic)
+
# If no names are given acquire the whole set by not letting new names
# being added before we release, and getting the current list of names.
# Some of them may then be deleted later, but we'll cope with this.
# anyway, though, so we'll get the list lock exclusively as well in
# order to be able to do add() on the set while owning it.
if not self.__lock.acquire(shared=shared, priority=priority,
- timeout=running_timeout.Remaining()):
+ timeout=ls_timeout_fn()):
raise _AcquireTimeout()
+
try:
# note we own the set-lock
self._add_owned()
- return self.__acquire_inner(self.__names(), True, shared, priority,
- running_timeout.Remaining, test_notify)
+ return self.__acquire_inner(self.__names(), mode, shared,
+ priority, timeout_fn, test_notify)
except:
# We shouldn't have problems adding the lock to the owners list, but
# if we did we'll try to release this lock and re-raise exception.
except _AcquireTimeout:
return None
- def __acquire_inner(self, names, want_all, shared, priority,
+ def __acquire_inner(self, names, mode, shared, priority,
timeout_fn, test_notify):
"""Inner logic for acquiring a number of locks.
+ Acquisition modes:
+
+ - C{_LS_ACQUIRE_ALL}: C{names} contains names of all locks in set, but
+ deleted locks can be ignored as the whole set is being acquired with
+ its internal lock held
+ - C{_LS_ACQUIRE_EXACT}: The names listed in C{names} must be acquired;
+ timeouts and deleted locks are fatal
+ - C{_LS_ACQUIRE_OPPORTUNISTIC}: C{names} lists names of locks (potentially
+ all within the set) which should be acquired opportunistically, that is
+ failures are ignored
+
@param names: Names of the locks to be acquired
- @param want_all: Whether all locks in the set should be acquired
+ @param mode: Lock acquisition mode (one of L{_LS_ACQUIRE_MODES})
@param shared: Whether to acquire in shared mode
- @param timeout_fn: Function returning remaining timeout
+ @param timeout_fn: Function returning remaining timeout (C{None} for
+ opportunistic acquisitions)
@param priority: Priority for acquiring locks
@param test_notify: Special callback function for unittesting
"""
+ assert mode in _LS_ACQUIRE_MODES
+
acquire_list = []
# First we look the locks up on __lockdict. We have no way of being sure
# they will still be there after, but this makes it a lot faster should
# just one of them be the already wrong. Using a sorted sequence to prevent
# deadlocks.
- for lname in sorted(utils.UniqueSequence(names)):
+ for lname in sorted(frozenset(names)):
try:
lock = self.__lockdict[lname] # raises KeyError if lock is not there
except KeyError:
- if want_all:
- # We are acquiring all the set, it doesn't matter if this particular
- # element is not there anymore.
- continue
-
- raise errors.LockError("Non-existing lock %s in set %s (it may have"
- " been removed)" % (lname, self.name))
-
- acquire_list.append((lname, lock))
+ # We are acquiring the whole set, it doesn't matter if this particular
+ # element is not there anymore. If, however, only certain names should
+ # be acquired, not finding a lock is an error.
+ if mode == _LS_ACQUIRE_EXACT:
+ raise errors.LockError("Lock '%s' not found in set '%s' (it may have"
+ " been removed)" % (lname, self.name))
+ else:
+ acquire_list.append((lname, lock))
# This will hold the locknames we effectively acquired.
acquired = set()
priority=priority,
test_notify=test_notify_fn)
except errors.LockError:
- if want_all:
- # We are acquiring all the set, it doesn't matter if this
+ if mode in (_LS_ACQUIRE_ALL, _LS_ACQUIRE_OPPORTUNISTIC):
+ # We are acquiring the whole set, it doesn't matter if this
# particular element is not there anymore.
continue
- raise errors.LockError("Non-existing lock %s in set %s (it may"
- " have been removed)" % (lname, self.name))
+ raise errors.LockError("Lock '%s' not found in set '%s' (it may have"
+ " been removed)" % (lname, self.name))
if not acq_success:
# Couldn't get lock or timeout occurred
+ if mode == _LS_ACQUIRE_OPPORTUNISTIC:
+ # Ignore timeouts on opportunistic acquisitions
+ continue
+
if timeout is None:
# This shouldn't happen as SharedLock.acquire(timeout=None) is
# blocking.
# We shouldn't have problems adding the lock to the owners list, but
# if we did we'll try to release this lock and re-raise exception.
# Of course something is going to be really wrong after this.
- if lock._is_owned():
+ if lock.is_owned():
lock.release()
raise
The locks must have been acquired in exclusive mode.
"""
- assert self._is_owned(), ("downgrade on lockset %s while not owning any"
- " lock" % self.name)
+ assert self.is_owned(), ("downgrade on lockset %s while not owning any"
+ " lock" % self.name)
# Support passing in a single resource to downgrade rather than many
if isinstance(names, basestring):
names = [names]
- owned = self._list_owned()
+ owned = self.list_owned()
if names is None:
names = owned
self.__lockdict[lockname].downgrade()
# Do we own the lockset in exclusive mode?
- if self.__lock._is_owned(shared=0):
+ if self.__lock.is_owned(shared=0):
# Have all locks been downgraded?
- if not compat.any(lock._is_owned(shared=0)
+ if not compat.any(lock.is_owned(shared=0)
for lock in self.__lockdict.values()):
self.__lock.downgrade()
- assert self.__lock._is_owned(shared=1)
+ assert self.__lock.is_owned(shared=1)
return True
(defaults to all the locks acquired at that level).
"""
- assert self._is_owned(), ("release() on lock set %s while not owner" %
- self.name)
+ assert self.is_owned(), ("release() on lock set %s while not owner" %
+ self.name)
# Support passing in a single resource to release rather than many
if isinstance(names, basestring):
names = [names]
if names is None:
- names = self._list_owned()
+ names = self.list_owned()
else:
names = set(names)
- assert self._list_owned().issuperset(names), (
+ assert self.list_owned().issuperset(names), (
"release() on unheld resources %s (set %s)" %
- (names.difference(self._list_owned()), self.name))
+ (names.difference(self.list_owned()), self.name))
# First of all let's release the "all elements" lock, if set.
# After this 'add' can work again
- if self.__lock._is_owned():
+ if self.__lock.is_owned():
self.__lock.release()
self._del_owned()
"""
# Check we don't already own locks at this level
- assert not self._is_owned() or self.__lock._is_owned(shared=0), \
+ assert not self.is_owned() or self.__lock.is_owned(shared=0), \
("Cannot add locks if the set %s is only partially owned, or shared" %
self.name)
# If we don't already own the set-level lock acquired in an exclusive way
# we'll get it and note we need to release it later.
release_lock = False
- if not self.__lock._is_owned():
+ if not self.__lock.is_owned():
release_lock = True
self.__lock.acquire()
# If we own any subset of this lock it must be a superset of what we want
# to delete. The ownership must also be exclusive, but that will be checked
# by the lock itself.
- assert not self._is_owned() or self._list_owned().issuperset(names), (
+ assert not self.is_owned() or self.list_owned().issuperset(names), (
"remove() on acquired lockset %s while not owning all elements" %
self.name)
removed.append(lname)
except (KeyError, errors.LockError):
# This cannot happen if we were already holding it, verify:
- assert not self._is_owned(), ("remove failed while holding lockset %s"
- % self.name)
+ assert not self.is_owned(), ("remove failed while holding lockset %s" %
+ self.name)
else:
# If no LockError was raised we are the ones who deleted the lock.
# This means we can safely remove it from lockdict, as any further or
# it's the job of the one who actually deleted it.
del self.__lockdict[lname]
# And let's remove it from our private list if we owned it.
- if self._is_owned():
+ if self.is_owned():
self._del_owned(name=lname)
return removed
-# Locking levels, must be acquired in increasing order.
-# Current rules are:
-# - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
-# acquired before performing any operation, either in shared or in exclusive
-# mode. acquiring the BGL in exclusive mode is discouraged and should be
-# avoided.
-# - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
-# If you need more than one node, or more than one instance, acquire them at
-# the same time.
-LEVEL_CLUSTER = 0
-LEVEL_INSTANCE = 1
-LEVEL_NODEGROUP = 2
-LEVEL_NODE = 3
-LEVEL_NODE_RES = 4
+# Locking levels, must be acquired in increasing order. Current rules are:
+# - At level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
+# acquired before performing any operation, either in shared or exclusive
+# mode. Acquiring the BGL in exclusive mode is discouraged and should be
+# avoided..
+# - At levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks. If
+# you need more than one node, or more than one instance, acquire them at the
+# same time.
+# - LEVEL_NODE_RES is for node resources and should be used by operations with
+# possibly high impact on the node's disks.
+# - LEVEL_NODE_ALLOC blocks instance allocations for the whole cluster
+# ("NAL" is the only lock at this level). It should be acquired in shared
+# mode when an opcode blocks all or a significant amount of a cluster's
+# locks. Opcodes doing instance allocations should acquire in exclusive mode.
+# Once the set of acquired locks for an opcode has been reduced to the working
+# set, the NAL should be released as well to allow allocations to proceed.
+(LEVEL_CLUSTER,
+ LEVEL_INSTANCE,
+ LEVEL_NODE_ALLOC,
+ LEVEL_NODEGROUP,
+ LEVEL_NODE,
+ LEVEL_NODE_RES,
+ LEVEL_NETWORK) = range(0, 7)
LEVELS = [
LEVEL_CLUSTER,
LEVEL_INSTANCE,
+ LEVEL_NODE_ALLOC,
LEVEL_NODEGROUP,
LEVEL_NODE,
LEVEL_NODE_RES,
+ LEVEL_NETWORK,
]
# Lock levels which are modifiable
-LEVELS_MOD = frozenset([
+LEVELS_MOD = compat.UniqueFrozenset([
LEVEL_NODE_RES,
LEVEL_NODE,
LEVEL_NODEGROUP,
LEVEL_INSTANCE,
+ LEVEL_NETWORK,
])
#: Lock level names (make sure to use singular form)
LEVEL_NAMES = {
LEVEL_CLUSTER: "cluster",
LEVEL_INSTANCE: "instance",
+ LEVEL_NODE_ALLOC: "node-alloc",
LEVEL_NODEGROUP: "nodegroup",
LEVEL_NODE: "node",
LEVEL_NODE_RES: "node-res",
+ LEVEL_NETWORK: "network",
}
# Constant for the big ganeti lock
-BGL = 'BGL'
+BGL = "BGL"
+
+#: Node allocation lock
+NAL = "NAL"
class GanetiLockManager:
"""
_instance = None
- def __init__(self, nodes, nodegroups, instances):
+ def __init__(self, node_uuids, nodegroups, instance_names, networks):
"""Constructs a new GanetiLockManager object.
There should be only a GanetiLockManager object at any time, so this
function raises an error if this is not the case.
- @param nodes: list of node names
+ @param node_uuids: list of node UUIDs
@param nodegroups: list of nodegroup uuids
- @param instances: list of instance names
+ @param instance_names: list of instance names
"""
assert self.__class__._instance is None, \
# locking order.
self.__keyring = {
LEVEL_CLUSTER: LockSet([BGL], "cluster", monitor=self._monitor),
- LEVEL_NODE: LockSet(nodes, "node", monitor=self._monitor),
- LEVEL_NODE_RES: LockSet(nodes, "node-res", monitor=self._monitor),
+ LEVEL_NODE: LockSet(node_uuids, "node", monitor=self._monitor),
+ LEVEL_NODE_RES: LockSet(node_uuids, "node-res", monitor=self._monitor),
LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroup", monitor=self._monitor),
- LEVEL_INSTANCE: LockSet(instances, "instance",
+ LEVEL_INSTANCE: LockSet(instance_names, "instance",
monitor=self._monitor),
+ LEVEL_NETWORK: LockSet(networks, "network", monitor=self._monitor),
+ LEVEL_NODE_ALLOC: LockSet([NAL], "node-alloc", monitor=self._monitor),
}
assert compat.all(ls.name == LEVEL_NAMES[level]
- for (level, ls) in self.__keyring.items())
+ for (level, ls) in self.__keyring.items()), \
+ "Keyring name mismatch"
def AddToLockMonitor(self, provider):
"""Registers a new lock with the monitor.
"""
return self._monitor.QueryLocks(fields)
- def OldStyleQueryLocks(self, fields):
- """Queries information from all locks, returning old-style data.
-
- See L{LockMonitor.OldStyleQueryLocks}.
-
- """
- return self._monitor.OldStyleQueryLocks(fields)
-
def _names(self, level):
"""List the lock names at the given level.
assert level in LEVELS, "Invalid locking level %s" % level
return self.__keyring[level]._names()
- def _is_owned(self, level):
+ def is_owned(self, level):
"""Check whether we are owning locks at the given level
"""
- return self.__keyring[level]._is_owned()
+ return self.__keyring[level].is_owned()
- is_owned = _is_owned
-
- def _list_owned(self, level):
+ def list_owned(self, level):
"""Get the set of owned locks at the given level
"""
- return self.__keyring[level]._list_owned()
+ return self.__keyring[level].list_owned()
+
+ def check_owned(self, level, names, shared=-1):
+ """Check if locks at a certain level are owned in a specific mode.
+
+ @see: L{LockSet.check_owned}
+
+ """
+ return self.__keyring[level].check_owned(names, shared=shared)
- list_owned = _list_owned
+ def owning_all(self, level):
+ """Checks whether current thread owns all locks at a certain level.
+
+ @see: L{LockSet.owning_all}
+
+ """
+ return self.__keyring[level].owning_all()
def _upper_owned(self, level):
"""Check that we don't own any lock at a level greater than the given one.
"""
# This way of checking only works if LEVELS[i] = i, which we check for in
# the test cases.
- return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
+ return compat.any((self.is_owned(l) for l in LEVELS[level + 1:]))
def _BGL_owned(self): # pylint: disable=C0103
"""Check if the current thread owns the BGL.
Both an exclusive or a shared acquisition work.
"""
- return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
+ return BGL in self.__keyring[LEVEL_CLUSTER].list_owned()
@staticmethod
def _contains_BGL(level, names): # pylint: disable=C0103
"""
return level == LEVEL_CLUSTER and (names is None or BGL in names)
- def acquire(self, level, names, timeout=None, shared=0, priority=None):
+ def acquire(self, level, names, timeout=None, shared=0, priority=None,
+ opportunistic=False):
"""Acquire a set of resource locks, at the same level.
@type level: member of locking.LEVELS
@param timeout: Maximum time to acquire all locks
@type priority: integer
@param priority: Priority for acquiring lock
+ @type opportunistic: boolean
+ @param opportunistic: Acquire locks opportunistically; use the return value
+ to determine which locks were actually acquired
"""
assert level in LEVELS, "Invalid locking level %s" % level
# point in acquiring any other lock, unless perhaps we are half way through
# the migration of the current opcode.
assert (self._contains_BGL(level, names) or self._BGL_owned()), (
- "You must own the Big Ganeti Lock before acquiring any other")
+ "You must own the Big Ganeti Lock before acquiring any other")
# Check we don't own locks at the same or upper levels.
assert not self._upper_owned(level), ("Cannot acquire locks at a level"
- " while owning some at a greater one")
+ " while owning some at a greater one")
# Acquire the locks in the set.
return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
- priority=priority)
+ priority=priority,
+ opportunistic=opportunistic)
def downgrade(self, level, names=None):
"""Downgrade a set of resource locks from exclusive to shared mode.
assert level in LEVELS, "Invalid locking level %s" % level
assert (not self._contains_BGL(level, names) or
not self._upper_owned(LEVEL_CLUSTER)), (
- "Cannot release the Big Ganeti Lock while holding something"
- " at upper levels (%r)" %
- (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
- for i in self.__keyring.keys()]), ))
+ "Cannot release the Big Ganeti Lock while holding something"
+ " at upper levels (%r)" %
+ (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self.list_owned(i))
+ for i in self.__keyring.keys()]), ))
# Release will complain if we don't own the locks already
return self.__keyring[level].release(names)
"""
assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
assert self._BGL_owned(), ("You must own the BGL before performing other"
- " operations")
+ " operations")
assert not self._upper_owned(level), ("Cannot add locks at a level"
- " while owning some at a greater one")
+ " while owning some at a greater one")
return self.__keyring[level].add(names, acquired=acquired, shared=shared)
def remove(self, level, names):
"""
assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
assert self._BGL_owned(), ("You must own the BGL before performing other"
- " operations")
+ " operations")
# Check we either own the level or don't own anything from here
# up. LockSet.remove() will check the case in which we don't own
# all the needed resources, or we have a shared ownership.
- assert self._is_owned(level) or not self._upper_owned(level), (
+ assert self.is_owned(level) or not self._upper_owned(level), (
"Cannot remove locks at a level while not owning it or"
" owning some at a greater one")
return self.__keyring[level].remove(names)
# Prepare query response
return query.GetQueryResponse(qobj, ctx)
-
- def OldStyleQueryLocks(self, fields):
- """Queries information from all locks, returning old-style data.
-
- @type fields: list of strings
- @param fields: List of fields to return
-
- """
- (qobj, ctx) = self._Query(fields)
-
- return qobj.OldStyleQuery(ctx)