import os
import select
import threading
-import time
import errno
import weakref
import logging
import heapq
+import operator
from ganeti import errors
from ganeti import utils
from ganeti import compat
+from ganeti import query
_EXCLUSIVE_TEXT = "exclusive"
_SHARED_TEXT = "shared"
+_DELETED_TEXT = "deleted"
_DEFAULT_PRIORITY = 0
return wrap
-class RunningTimeout(object):
- """Class to calculate remaining timeout when doing several operations.
-
- """
- __slots__ = [
- "_allow_negative",
- "_start_time",
- "_time_fn",
- "_timeout",
- ]
-
- def __init__(self, timeout, allow_negative, _time_fn=time.time):
- """Initializes this class.
-
- @type timeout: float
- @param timeout: Timeout duration
- @type allow_negative: bool
- @param allow_negative: Whether to return values below zero
- @param _time_fn: Time function for unittests
-
- """
- object.__init__(self)
-
- if timeout is not None and timeout < 0.0:
- raise ValueError("Timeout must not be negative")
-
- self._timeout = timeout
- self._allow_negative = allow_negative
- self._time_fn = _time_fn
-
- self._start_time = None
-
- def Remaining(self):
- """Returns the remaining timeout.
-
- """
- if self._timeout is None:
- return None
-
- # Get start time on first calculation
- if self._start_time is None:
- self._start_time = self._time_fn()
-
- # Calculate remaining time
- remaining_timeout = self._start_time + self._timeout - self._time_fn()
-
- if not self._allow_negative:
- # Ensure timeout is always >= 0
- return max(0.0, remaining_timeout)
-
- return remaining_timeout
-
-
class _SingleNotifyPipeConditionWaiter(object):
"""Helper class for SingleNotifyPipeCondition
@param timeout: Timeout for waiting (can be None)
"""
- running_timeout = RunningTimeout(timeout, True)
+ running_timeout = utils.RunningTimeout(timeout, True)
while True:
remaining_time = running_timeout.Remaining()
if monitor:
monitor.RegisterLock(self)
- def GetInfo(self, fields):
+ def GetInfo(self, requested):
"""Retrieves information for querying locks.
- @type fields: list of strings
- @param fields: List of fields to return
+ @type requested: set
+ @param requested: Requested information, see C{query.LQ_*}
"""
self.__lock.acquire()
try:
- info = []
-
# Note: to avoid unintentional race conditions, no references to
# modifiable objects should be returned unless they were created in this
# function.
- for fname in fields:
- if fname == "name":
- info.append(self.name)
- elif fname == "mode":
- if self.__deleted:
- info.append("deleted")
- assert not (self.__exc or self.__shr)
- elif self.__exc:
- info.append(_EXCLUSIVE_TEXT)
- elif self.__shr:
- info.append(_SHARED_TEXT)
- else:
- info.append(None)
- elif fname == "owner":
- if self.__exc:
- owner = [self.__exc]
- else:
- owner = self.__shr
-
- if owner:
- assert not self.__deleted
- info.append([i.getName() for i in owner])
- else:
- info.append(None)
- elif fname == "pending":
- data = []
-
- # Sorting instead of copying and using heaq functions for simplicity
- for (_, prioqueue) in sorted(self.__pending):
- for cond in prioqueue:
- if cond.shared:
- mode = _SHARED_TEXT
- else:
- mode = _EXCLUSIVE_TEXT
-
- # This function should be fast as it runs with the lock held.
- # Hence not using utils.NiceSort.
- data.append((mode, sorted(i.getName()
- for i in cond.get_waiting())))
-
- info.append(data)
+ mode = None
+ owner_names = None
+
+ if query.LQ_MODE in requested:
+ if self.__deleted:
+ mode = _DELETED_TEXT
+ assert not (self.__exc or self.__shr)
+ elif self.__exc:
+ mode = _EXCLUSIVE_TEXT
+ elif self.__shr:
+ mode = _SHARED_TEXT
+
+ # Current owner(s) are wanted
+ if query.LQ_OWNER in requested:
+ if self.__exc:
+ owner = [self.__exc]
else:
- raise errors.OpExecError("Invalid query field '%s'" % fname)
+ owner = self.__shr
+
+ if owner:
+ assert not self.__deleted
+ owner_names = [i.getName() for i in owner]
- return info
+ # Pending acquires are wanted
+ if query.LQ_PENDING in requested:
+ pending = []
+
+ # Sorting instead of copying and using heaq functions for simplicity
+ for (_, prioqueue) in sorted(self.__pending):
+ for cond in prioqueue:
+ if cond.shared:
+ pendmode = _SHARED_TEXT
+ else:
+ pendmode = _EXCLUSIVE_TEXT
+
+ # List of names will be sorted in L{query._GetLockPending}
+ pending.append((pendmode, [i.getName()
+ for i in cond.get_waiting()]))
+ else:
+ pending = None
+
+ return (self.name, mode, owner_names, pending)
finally:
self.__lock.release()
# We need to keep track of how long we spent waiting for a lock. The
# timeout passed to this function is over all lock acquires.
- running_timeout = RunningTimeout(timeout, False)
+ running_timeout = utils.RunningTimeout(timeout, False)
try:
if names is not None:
# element is not there anymore.
continue
- raise errors.LockError("Non-existing lock %s in set %s" %
- (lname, self.name))
+ raise errors.LockError("Non-existing lock %s in set %s (it may have"
+ " been removed)" % (lname, self.name))
acquire_list.append((lname, lock))
# particular element is not there anymore.
continue
- raise errors.LockError("Non-existing lock %s in set %s" %
- (lname, self.name))
+ raise errors.LockError("Non-existing lock %s in set %s (it may"
+ " have been removed)" % (lname, self.name))
if not acq_success:
# Couldn't get lock or timeout occurred
# the same time.
LEVEL_CLUSTER = 0
LEVEL_INSTANCE = 1
-LEVEL_NODE = 2
+LEVEL_NODEGROUP = 2
+LEVEL_NODE = 3
LEVELS = [LEVEL_CLUSTER,
LEVEL_INSTANCE,
+ LEVEL_NODEGROUP,
LEVEL_NODE]
# Lock levels which are modifiable
-LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
+LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE]
LEVEL_NAMES = {
LEVEL_CLUSTER: "cluster",
LEVEL_INSTANCE: "instance",
+ LEVEL_NODEGROUP: "nodegroup",
LEVEL_NODE: "node",
}
"""
_instance = None
- def __init__(self, nodes=None, instances=None):
+ def __init__(self, nodes, nodegroups, instances):
"""Constructs a new GanetiLockManager object.
There should be only a GanetiLockManager object at any time, so this
function raises an error if this is not the case.
@param nodes: list of node names
+ @param nodegroups: list of nodegroup uuids
@param instances: list of instance names
"""
self.__keyring = {
LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
+ LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor),
LEVEL_INSTANCE: LockSet(instances, "instances",
monitor=self._monitor),
}
- def QueryLocks(self, fields, sync):
+ def QueryLocks(self, fields):
"""Queries information from all locks.
See L{LockMonitor.QueryLocks}.
"""
- return self._monitor.QueryLocks(fields, sync)
+ return self._monitor.QueryLocks(fields)
+
+ def OldStyleQueryLocks(self, fields):
+ """Queries information from all locks, returning old-style data.
+
+ See L{LockMonitor.OldStyleQueryLocks}.
+
+ """
+ return self._monitor.OldStyleQueryLocks(fields)
def _names(self, level):
"""List the lock names at the given level.
"""
return level == LEVEL_CLUSTER and (names is None or BGL in names)
- def acquire(self, level, names, timeout=None, shared=0):
+ def acquire(self, level, names, timeout=None, shared=0, priority=None):
"""Acquire a set of resource locks, at the same level.
@type level: member of locking.LEVELS
an exclusive lock will be acquired
@type timeout: float
@param timeout: Maximum time to acquire all locks
+ @type priority: integer
+ @param priority: Priority for acquiring lock
"""
assert level in LEVELS, "Invalid locking level %s" % level
" while owning some at a greater one")
# Acquire the locks in the set.
- return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
+ return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
+ priority=priority)
def release(self, level, names=None):
"""Release a set of resource locks, at the same level.
self._locks[lock] = None
@ssynchronized(_LOCK_ATTR)
- def _GetLockInfo(self, fields):
+ def _GetLockInfo(self, requested):
"""Get information from all locks while the monitor lock is held.
"""
- result = {}
+ return [lock.GetInfo(requested) for lock in self._locks.keys()]
- for lock in self._locks.keys():
- assert lock.name not in result, "Found duplicate lock name"
- result[lock.name] = lock.GetInfo(fields)
+ def _Query(self, fields):
+ """Queries information from all locks.
- return result
+ @type fields: list of strings
+ @param fields: List of fields to return
+
+ """
+ qobj = query.Query(query.LOCK_FIELDS, fields)
+
+ # Get all data and sort by name
+ lockinfo = utils.NiceSort(self._GetLockInfo(qobj.RequestedData()),
+ key=operator.itemgetter(0))
+
+ return (qobj, query.LockQueryData(lockinfo))
- def QueryLocks(self, fields, sync):
+ def QueryLocks(self, fields):
"""Queries information from all locks.
@type fields: list of strings
@param fields: List of fields to return
- @type sync: boolean
- @param sync: Whether to operate in synchronous mode
"""
- if sync:
- raise NotImplementedError("Synchronous queries are not implemented")
+ (qobj, ctx) = self._Query(fields)
- # Get all data without sorting
- result = self._GetLockInfo(fields)
+ # Prepare query response
+ return query.GetQueryResponse(qobj, ctx)
+
+ def OldStyleQueryLocks(self, fields):
+ """Queries information from all locks, returning old-style data.
+
+ @type fields: list of strings
+ @param fields: List of fields to return
+
+ """
+ (qobj, ctx) = self._Query(fields)
- # Sort by name
- return [result[name] for name in utils.NiceSort(result.keys())]
+ return qobj.OldStyleQuery(ctx)