import weakref
import logging
import heapq
+import operator
from ganeti import errors
from ganeti import utils
from ganeti import compat
+from ganeti import query
_EXCLUSIVE_TEXT = "exclusive"
_SHARED_TEXT = "shared"
+_DELETED_TEXT = "deleted"
_DEFAULT_PRIORITY = 0
if monitor:
monitor.RegisterLock(self)
- def GetInfo(self, fields):
+ def GetInfo(self, requested):
"""Retrieves information for querying locks.
- @type fields: list of strings
- @param fields: List of fields to return
+ @type requested: set
+ @param requested: Requested information, see C{query.LQ_*}
"""
self.__lock.acquire()
try:
- info = []
-
# Note: to avoid unintentional race conditions, no references to
# modifiable objects should be returned unless they were created in this
# function.
- for fname in fields:
- if fname == "name":
- info.append(self.name)
- elif fname == "mode":
- if self.__deleted:
- info.append("deleted")
- assert not (self.__exc or self.__shr)
- elif self.__exc:
- info.append(_EXCLUSIVE_TEXT)
- elif self.__shr:
- info.append(_SHARED_TEXT)
- else:
- info.append(None)
- elif fname == "owner":
- if self.__exc:
- owner = [self.__exc]
- else:
- owner = self.__shr
-
- if owner:
- assert not self.__deleted
- info.append([i.getName() for i in owner])
- else:
- info.append(None)
- elif fname == "pending":
- data = []
-
- # Sorting instead of copying and using heaq functions for simplicity
- for (_, prioqueue) in sorted(self.__pending):
- for cond in prioqueue:
- if cond.shared:
- mode = _SHARED_TEXT
- else:
- mode = _EXCLUSIVE_TEXT
-
- # This function should be fast as it runs with the lock held.
- # Hence not using utils.NiceSort.
- data.append((mode, sorted(i.getName()
- for i in cond.get_waiting())))
-
- info.append(data)
+ mode = None
+ owner_names = None
+
+ if query.LQ_MODE in requested:
+ if self.__deleted:
+ mode = _DELETED_TEXT
+ assert not (self.__exc or self.__shr)
+ elif self.__exc:
+ mode = _EXCLUSIVE_TEXT
+ elif self.__shr:
+ mode = _SHARED_TEXT
+
+ # Current owner(s) are wanted
+ if query.LQ_OWNER in requested:
+ if self.__exc:
+ owner = [self.__exc]
else:
- raise errors.OpExecError("Invalid query field '%s'" % fname)
+ owner = self.__shr
+
+ if owner:
+ assert not self.__deleted
+ owner_names = [i.getName() for i in owner]
- return info
+ # Pending acquires are wanted
+ if query.LQ_PENDING in requested:
+ pending = []
+
+ # Sorting instead of copying and using heaq functions for simplicity
+ for (_, prioqueue) in sorted(self.__pending):
+ for cond in prioqueue:
+ if cond.shared:
+ pendmode = _SHARED_TEXT
+ else:
+ pendmode = _EXCLUSIVE_TEXT
+
+ # List of names will be sorted in L{query._GetLockPending}
+ pending.append((pendmode, [i.getName()
+ for i in cond.get_waiting()]))
+ else:
+ pending = None
+
+ return (self.name, mode, owner_names, pending)
finally:
self.__lock.release()
monitor=self._monitor),
}
- def QueryLocks(self, fields, sync):
+ def QueryLocks(self, fields):
"""Queries information from all locks.
See L{LockMonitor.QueryLocks}.
"""
- return self._monitor.QueryLocks(fields, sync)
+ return self._monitor.QueryLocks(fields)
+
+ def OldStyleQueryLocks(self, fields):
+ """Queries information from all locks, returning old-style data.
+
+ See L{LockMonitor.OldStyleQueryLocks}.
+
+ """
+ return self._monitor.OldStyleQueryLocks(fields)
def _names(self, level):
"""List the lock names at the given level.
self._locks[lock] = None
@ssynchronized(_LOCK_ATTR)
- def _GetLockInfo(self, fields):
+ def _GetLockInfo(self, requested):
"""Get information from all locks while the monitor lock is held.
"""
- result = {}
+ return [lock.GetInfo(requested) for lock in self._locks.keys()]
- for lock in self._locks.keys():
- assert lock.name not in result, "Found duplicate lock name"
- result[lock.name] = lock.GetInfo(fields)
+ def _Query(self, fields):
+ """Queries information from all locks.
- return result
+ @type fields: list of strings
+ @param fields: List of fields to return
+
+ """
+ qobj = query.Query(query.LOCK_FIELDS, fields)
+
+ # Get all data and sort by name
+ lockinfo = utils.NiceSort(self._GetLockInfo(qobj.RequestedData()),
+ key=operator.itemgetter(0))
+
+ return (qobj, query.LockQueryData(lockinfo))
- def QueryLocks(self, fields, sync):
+ def QueryLocks(self, fields):
"""Queries information from all locks.
@type fields: list of strings
@param fields: List of fields to return
- @type sync: boolean
- @param sync: Whether to operate in synchronous mode
"""
- if sync:
- raise NotImplementedError("Synchronous queries are not implemented")
+ (qobj, ctx) = self._Query(fields)
- # Get all data without sorting
- result = self._GetLockInfo(fields)
+ # Prepare query response
+ return query.GetQueryResponse(qobj, ctx)
+
+ def OldStyleQueryLocks(self, fields):
+ """Queries information from all locks, returning old-style data.
+
+ @type fields: list of strings
+ @param fields: List of fields to return
+
+ """
+ (qobj, ctx) = self._Query(fields)
- # Sort by name
- return [result[name] for name in utils.NiceSort(result.keys())]
+ return qobj.OldStyleQuery(ctx)
IQ_LIVE,
IQ_DISKUSAGE) = range(100, 103)
+(LQ_MODE,
+ LQ_OWNER,
+ LQ_PENDING) = range(10, 13)
FIELD_NAME_RE = re.compile(r"^[a-z0-9/._]+$")
TITLE_RE = re.compile(r"^[^\s]+$")
return _PrepareFieldList(fields)
+class LockQueryData:
+ """Data container for lock data queries.
+
+ """
+ def __init__(self, lockdata):
+ """Initializes this class.
+
+ """
+ self.lockdata = lockdata
+
+ def __iter__(self):
+ """Iterate over all locks.
+
+ """
+ return iter(self.lockdata)
+
+
+def _GetLockOwners(_, data):
+ """Returns a sorted list of a lock's current owners.
+
+ """
+ (_, _, owners, _) = data
+
+ if owners:
+ owners = utils.NiceSort(owners)
+
+ return (constants.QRFS_NORMAL, owners)
+
+
+def _GetLockPending(_, data):
+ """Returns a sorted list of a lock's pending acquires.
+
+ """
+ (_, _, _, pending) = data
+
+ if pending:
+ pending = [(mode, utils.NiceSort(names))
+ for (mode, names) in pending]
+
+ return (constants.QRFS_NORMAL, pending)
+
+
+def _BuildLockFields():
+ """Builds list of fields for lock queries.
+
+ """
+ return _PrepareFieldList([
+ (_MakeField("name", "Name", constants.QFT_TEXT), None,
+ lambda ctx, (name, mode, owners, pending): (constants.QRFS_NORMAL, name)),
+ (_MakeField("mode", "Mode", constants.QFT_OTHER), LQ_MODE,
+ lambda ctx, (name, mode, owners, pending): (constants.QRFS_NORMAL, mode)),
+ (_MakeField("owner", "Owner", constants.QFT_OTHER), LQ_OWNER,
+ _GetLockOwners),
+ (_MakeField("pending", "Pending", constants.QFT_OTHER), LQ_PENDING,
+ _GetLockPending),
+ ])
+
+
#: Fields available for node queries
NODE_FIELDS = _BuildNodeFields()
#: Fields available for instance queries
INSTANCE_FIELDS = _BuildInstanceFields()
+
+#: Fields available for lock queries
+LOCK_FIELDS = _BuildLockFields()
import random
import itertools
+from ganeti import constants
from ganeti import locking
from ganeti import errors
from ganeti import utils
from ganeti import compat
+from ganeti import objects
+from ganeti import query
import testutils
prev.wait()
# Check lock information
- self.assertEqual(self.sl.GetInfo(["name"]), [self.sl.name])
- self.assertEqual(self.sl.GetInfo(["mode", "owner"]),
- ["exclusive", [threading.currentThread().getName()]])
- self.assertEqual(self.sl.GetInfo(["name", "pending"]),
- [self.sl.name,
- [(["exclusive", "shared"][int(bool(shared))],
- sorted([t.getName() for t in threads]))
- for acquires in [perprio[i]
- for i in sorted(perprio.keys())]
- for (shared, _, threads) in acquires]])
+ self.assertEqual(self.sl.GetInfo(set()), (self.sl.name, None, None, None))
+ self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])),
+ (self.sl.name, "exclusive",
+ [threading.currentThread().getName()], None))
+
+ self._VerifyPrioPending(self.sl.GetInfo(set([query.LQ_PENDING])), perprio)
# Let threads acquire the lock
self.sl.release()
self.assertRaises(Queue.Empty, self.done.get_nowait)
+ def _VerifyPrioPending(self, (name, mode, owner, pending), perprio):
+ self.assertEqual(name, self.sl.name)
+ self.assert_(mode is None)
+ self.assert_(owner is None)
+
+ self.assertEqual([(pendmode, sorted(waiting))
+ for (pendmode, waiting) in pending],
+ [(["exclusive", "shared"][int(bool(shared))],
+ sorted(t.getName() for t in threads))
+ for acquires in [perprio[i]
+ for i in sorted(perprio.keys())]
+ for (shared, _, threads) in acquires])
+
class TestSharedLockInCondition(_ThreadedTestCase):
"""SharedLock as a condition lock tests"""
locks.append(locking.SharedLock(name, monitor=self.lm))
self.assertEqual(len(self.lm._locks), len(locks))
-
- self.assertEqual(len(self.lm.QueryLocks(["name"], False)),
- 100)
+ result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
+ self.assertEqual(len(result.fields), 1)
+ self.assertEqual(len(result.data), 100)
# Delete all locks
del locks[:]
# Check order in which locks were added
self.assertEqual([i.name for i in locks], expnames)
- # Sync queries are not supported
- self.assertRaises(NotImplementedError, self.lm.QueryLocks, ["name"], True)
-
# Check query result
- self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"],
- False),
- [[name, None, None, []]
+ result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+ self.assert_(isinstance(result, dict))
+ response = objects.QueryResponse.FromDict(result)
+ self.assertEqual(response.data,
+ [[(constants.QRFS_NORMAL, name),
+ (constants.QRFS_NORMAL, None),
+ (constants.QRFS_NORMAL, None),
+ (constants.QRFS_NORMAL, [])]
for name in utils.NiceSort(expnames)])
+ self.assertEqual(len(response.fields), 4)
+ self.assertEqual(["name", "mode", "owner", "pending"],
+ [fdef.name for fdef in response.fields])
# Test exclusive acquire
for tlock in locks[::4]:
try:
def _GetExpResult(name):
if tlock.name == name:
- return [name, "exclusive", [threading.currentThread().getName()],
- []]
- return [name, None, None, []]
-
- self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner",
- "pending"], False),
+ return [(constants.QRFS_NORMAL, name),
+ (constants.QRFS_NORMAL, "exclusive"),
+ (constants.QRFS_NORMAL,
+ [threading.currentThread().getName()]),
+ (constants.QRFS_NORMAL, [])]
+ return [(constants.QRFS_NORMAL, name),
+ (constants.QRFS_NORMAL, None),
+ (constants.QRFS_NORMAL, None),
+ (constants.QRFS_NORMAL, [])]
+
+ result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
[_GetExpResult(name)
for name in utils.NiceSort(expnames)])
finally:
i.wait()
# Check query result
- for (name, mode, owner) in self.lm.QueryLocks(["name", "mode",
- "owner"], False):
- if name == tlock1.name:
- self.assertEqual(mode, "shared")
- self.assertEqual(set(owner), set(i.getName() for i in tthreads1))
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ response = objects.QueryResponse.FromDict(result)
+ for (name, mode, owner) in response.data:
+ (name_status, name_value) = name
+ (owner_status, owner_value) = owner
+
+ self.assertEqual(name_status, constants.QRFS_NORMAL)
+ self.assertEqual(owner_status, constants.QRFS_NORMAL)
+
+ if name_value == tlock1.name:
+ self.assertEqual(mode, (constants.QRFS_NORMAL, "shared"))
+ self.assertEqual(set(owner_value),
+ set(i.getName() for i in tthreads1))
continue
- if name == tlock2.name:
- self.assertEqual(mode, "shared")
- self.assertEqual(owner, [tthread2.getName()])
+ if name_value == tlock2.name:
+ self.assertEqual(mode, (constants.QRFS_NORMAL, "shared"))
+ self.assertEqual(owner_value, [tthread2.getName()])
continue
- if name == tlock3.name:
- self.assertEqual(mode, "exclusive")
- self.assertEqual(owner, [tthread3.getName()])
+ if name_value == tlock3.name:
+ self.assertEqual(mode, (constants.QRFS_NORMAL, "exclusive"))
+ self.assertEqual(owner_value, [tthread3.getName()])
continue
- self.assert_(name in expnames)
- self.assert_(mode is None)
- self.assert_(owner is None)
+ self.assert_(name_value in expnames)
+ self.assertEqual(mode, (constants.QRFS_NORMAL, None))
+ self.assert_(owner_value is None)
# Release locks again
releaseev.set()
self._waitThreads()
- self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
- [[name, None, None]
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.QRFS_NORMAL, name),
+ (constants.QRFS_NORMAL, None),
+ (constants.QRFS_NORMAL, None)]
for name in utils.NiceSort(expnames)])
def testDelete(self):
lock = locking.SharedLock("TestLock", monitor=self.lm)
self.assertEqual(len(self.lm._locks), 1)
- self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
- [[lock.name, None, None]])
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.QRFS_NORMAL, lock.name),
+ (constants.QRFS_NORMAL, None),
+ (constants.QRFS_NORMAL, None)]])
lock.delete()
- self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
- [[lock.name, "deleted", None]])
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.QRFS_NORMAL, lock.name),
+ (constants.QRFS_NORMAL, "deleted"),
+ (constants.QRFS_NORMAL, None)]])
self.assertEqual(len(self.lm._locks), 1)
def testPending(self):
lock.acquire()
try:
self.assertEqual(len(self.lm._locks), 1)
- self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
- [[lock.name, "exclusive",
- [threading.currentThread().getName()]]])
+ result = self.lm.QueryLocks(["name", "mode", "owner"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.QRFS_NORMAL, lock.name),
+ (constants.QRFS_NORMAL, "exclusive"),
+ (constants.QRFS_NORMAL,
+ [threading.currentThread().getName()])]])
threads = []
# All acquires are waiting now
if shared:
- pending = [("shared", sorted([t.getName() for t in threads]))]
+ pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
else:
pending = [("exclusive", [t.getName()]) for t in threads]
- self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner",
- "pending"], False),
- [[lock.name, "exclusive",
- [threading.currentThread().getName()],
- pending]])
+ result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.QRFS_NORMAL, lock.name),
+ (constants.QRFS_NORMAL, "exclusive"),
+ (constants.QRFS_NORMAL,
+ [threading.currentThread().getName()]),
+ (constants.QRFS_NORMAL, pending)]])
self.assertEqual(len(self.lm._locks), 1)
finally:
self._waitThreads()
# No pending acquires
- self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"],
- False),
- [[lock.name, None, None, []]])
+ result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
+ self.assertEqual(objects.QueryResponse.FromDict(result).data,
+ [[(constants.QRFS_NORMAL, lock.name),
+ (constants.QRFS_NORMAL, None),
+ (constants.QRFS_NORMAL, None),
+ (constants.QRFS_NORMAL, [])]])
self.assertEqual(len(self.lm._locks), 1)