import logging
import heapq
import itertools
+import time
from ganeti import errors
from ganeti import utils
return bool(self._waiters)
+ def __repr__(self):
+ return ("<%s.%s waiters=%s at %#x>" %
+ (self.__class__.__module__, self.__class__.__name__,
+ self._waiters, id(self)))
+
class _PipeConditionWithMode(PipeCondition):
__slots__ = [
"__pending_by_prio",
"__pending_shared",
"__shr",
+ "__time_fn",
"name",
]
__condition_class = _PipeConditionWithMode
- def __init__(self, name, monitor=None):
+ def __init__(self, name, monitor=None, _time_fn=time.time):
"""Construct a new SharedLock.
@param name: the name of the lock
self.name = name
+ # Used for unittesting
+ self.__time_fn = _time_fn
+
# Internal lock
self.__lock = threading.Lock()
logging.debug("Adding lock %s to monitor", name)
monitor.RegisterLock(self)
+ def __repr__(self):
+ return ("<%s.%s name=%s at %#x>" %
+ (self.__class__.__module__, self.__class__.__name__,
+ self.name, id(self)))
+
def GetLockInfo(self, requested):
"""Retrieves information for querying locks.
assert priority not in self.__pending_shared
self.__pending_shared[priority] = wait_condition
+ wait_start = self.__time_fn()
+ acquired = False
+
try:
# Wait until we become the topmost acquire in the queue or the timeout
# expires.
- # TODO: Decrease timeout with spurious notifications
- while not (self.__is_on_top(wait_condition) and
- self.__can_acquire(shared)):
- # Wait for notification
- wait_condition.wait(timeout)
- self.__check_deleted()
+ while True:
+ if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
+ self.__do_acquire(shared)
+ acquired = True
+ break
- # A lot of code assumes blocking acquires always succeed. Loop
- # internally for that case.
- if timeout is not None:
+ # A lot of code assumes blocking acquires always succeed, therefore we
+ # can never return False for a blocking acquire
+ if (timeout is not None and
+ utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)):
break
- if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
- self.__do_acquire(shared)
- return True
+ # Wait for notification
+ wait_condition.wait(timeout)
+ self.__check_deleted()
finally:
# Remove condition from queue if there are no more waiters
if not wait_condition.has_waiting():
# (e.g. on lock deletion)
self.__pending_shared.pop(priority, None)
- return False
+ return acquired
def acquire(self, shared=0, timeout=None, priority=None,
test_notify=None):
# Autodetect release type
if self.__is_exclusive():
self.__exc = None
+ notify = True
else:
self.__shr.remove(threading.currentThread())
+ notify = not self.__shr
- # Notify topmost condition in queue
- (priority, prioqueue) = self.__find_first_pending_queue()
- if prioqueue:
- cond = prioqueue[0]
- cond.notifyAll()
- if cond.shared:
- # Prevent further shared acquires from sneaking in while waiters are
- # notified
- self.__pending_shared.pop(priority, None)
+ # Notify topmost condition in queue if there are no owners left (for
+ # shared locks)
+ if notify:
+ self.__notify_topmost()
+ finally:
+ self.__lock.release()
+
+ def __notify_topmost(self):
+ """Notifies topmost condition in queue of pending acquires.
+
+ """
+ (priority, prioqueue) = self.__find_first_pending_queue()
+ if prioqueue:
+ cond = prioqueue[0]
+ cond.notifyAll()
+ if cond.shared:
+ # Prevent further shared acquires from sneaking in while waiters are
+ # notified
+ self.__pending_shared.pop(priority, None)
+ def _notify_topmost(self):
+ """Exported version of L{__notify_topmost}.
+
+ """
+ self.__lock.acquire()
+ try:
+ return self.__notify_topmost()
finally:
self.__lock.release()
if not acquired:
acquired = self.__acquire_unlocked(0, timeout, priority)
+ if acquired:
assert self.__is_exclusive() and not self.__is_sharer(), \
"Lock wasn't acquired in exclusive mode"
- if acquired:
self.__deleted = True
self.__exc = None
return self.__lockdict
def is_owned(self):
- """Is the current thread a current level owner?"""
+ """Is the current thread a current level owner?
+
+ @note: Use L{check_owned} to check if a specific lock is held
+
+ """
return threading.currentThread() in self.__owners
+ def check_owned(self, names, shared=-1):
+ """Check if locks are owned in a specific mode.
+
+ @type names: sequence or string
+ @param names: Lock names (or a single lock name)
+ @param shared: See L{SharedLock.is_owned}
+ @rtype: bool
+ @note: Use L{is_owned} to check if the current thread holds I{any} lock and
+ L{list_owned} to get the names of all owned locks
+
+ """
+ if isinstance(names, basestring):
+ names = [names]
+
+ # Avoid check if no locks are owned anyway
+ if names and self.is_owned():
+ candidates = []
+
+ # Gather references to all locks (in case they're deleted in the meantime)
+ for lname in names:
+ try:
+ lock = self.__lockdict[lname]
+ except KeyError:
+ raise errors.LockError("Non-existing lock '%s' in set '%s' (it may"
+ " have been removed)" % (lname, self.name))
+ else:
+ candidates.append(lock)
+
+ return compat.all(lock.is_owned(shared=shared) for lock in candidates)
+ else:
+ return False
+
def _add_owned(self, name=None):
"""Note the current thread owns the given lock"""
if name is None:
}
# Constant for the big ganeti lock
-BGL = 'BGL'
+BGL = "BGL"
class GanetiLockManager:
"""
return self._monitor.QueryLocks(fields)
- def OldStyleQueryLocks(self, fields):
- """Queries information from all locks, returning old-style data.
-
- See L{LockMonitor.OldStyleQueryLocks}.
-
- """
- return self._monitor.OldStyleQueryLocks(fields)
-
def _names(self, level):
"""List the lock names at the given level.
"""
return self.__keyring[level].list_owned()
+ def check_owned(self, level, names, shared=-1):
+ """Check if locks at a certain level are owned in a specific mode.
+
+ @see: L{LockSet.check_owned}
+
+ """
+ return self.__keyring[level].check_owned(names, shared=shared)
+
def _upper_owned(self, level):
"""Check that we don't own any lock at a level greater than the given one.
# Prepare query response
return query.GetQueryResponse(qobj, ctx)
-
- def OldStyleQueryLocks(self, fields):
- """Queries information from all locks, returning old-style data.
-
- @type fields: list of strings
- @param fields: List of fields to return
-
- """
- (qobj, ctx) = self._Query(fields)
-
- return qobj.OldStyleQuery(ctx)