format_override=None, verbose=False):
"""Generic implementation for listing all items of a resource.
- @param resource: One of L{constants.QR_OP_LUXI}
+ @param resource: One of L{constants.QR_VIA_LUXI}
@type fields: list of strings
@param fields: List of fields to query for
@type names: list of strings
def GenericListFields(resource, fields, separator, header, cl=None):
"""Generic implementation for listing fields for a resource.
- @param resource: One of L{constants.QR_OP_LUXI}
+ @param resource: One of L{constants.QR_VIA_LUXI}
@type fields: list of strings
@param fields: List of fields to query for
@type separator: string or None
constants.QR_OS: _OsQuery,
}
-assert set(_QUERY_IMPL.keys()) == constants.QR_OP_QUERY
+assert set(_QUERY_IMPL.keys()) == constants.QR_VIA_OP
def _GetQueryImplementation(name):
"""Returns the implemtnation for a query type.
- @param name: Query type, must be one of L{constants.QR_OP_QUERY}
+ @param name: Query type, must be one of L{constants.QR_VIA_OP}
"""
try:
QR_OS = "os"
#: List of resources which can be queried using L{opcodes.OpQuery}
-QR_OP_QUERY = frozenset([QR_INSTANCE, QR_NODE, QR_GROUP, QR_OS])
+QR_VIA_OP = frozenset([QR_INSTANCE, QR_NODE, QR_GROUP, QR_OS])
#: List of resources which can be queried using Local UniX Interface
-QR_OP_LUXI = QR_OP_QUERY.union([
+QR_VIA_LUXI = QR_VIA_OP.union([
QR_LOCK,
])
+#: List of resources which can be queried using RAPI
+QR_VIA_RAPI = QR_VIA_LUXI
+
# Query field types
QFT_UNKNOWN = "unknown"
QFT_TEXT = "text"
def Query(self, what, fields, filter_):
"""Query for resources/items.
- @param what: One of L{constants.QR_OP_LUXI}
+ @param what: One of L{constants.QR_VIA_LUXI}
@type fields: List of strings
@param fields: List of requested fields
@type filter_: None or list
def QueryFields(self, what, fields):
"""Query for available fields.
- @param what: One of L{constants.QR_OP_LUXI}
+ @param what: One of L{constants.QR_VIA_LUXI}
@type fields: None or list of strings
@param fields: List of requested fields
@rtype: L{objects.QueryFieldsResponse}
_PGroupNodeParams = ("ndparams", None, ht.TMaybeDict,
"Default node parameters for group")
-_PQueryWhat = ("what", ht.NoDefault, ht.TElemOf(constants.QR_OP_QUERY),
+_PQueryWhat = ("what", ht.NoDefault, ht.TElemOf(constants.QR_VIA_OP),
"Resource(s) to query for")
_PIpCheckDoc = "Whether to ensure instance's IP address is inactive"
class OpQuery(OpCode):
"""Query for resources/items.
- @ivar what: Resources to query for, must be one of L{constants.QR_OP_QUERY}
+ @ivar what: Resources to query for, must be one of L{constants.QR_VIA_OP}
@ivar fields: List of fields to retrieve
@ivar filter: Query filter
class OpQueryFields(OpCode):
"""Query for available resource/item fields.
- @ivar what: Resources to query for, must be one of L{constants.QR_OP_QUERY}
+ @ivar what: Resources to query for, must be one of L{constants.QR_VIA_OP}
@ivar fields: List of fields to retrieve
"""
elif method == luxi.REQ_QUERY:
req = objects.QueryRequest.FromDict(args)
- if req.what in constants.QR_OP_QUERY:
+ if req.what in constants.QR_VIA_OP:
result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
filter=req.filter))
elif req.what == constants.QR_LOCK:
if req.filter is not None:
raise errors.OpPrereqError("Lock queries can't be filtered")
return self.server.context.glm.QueryLocks(req.fields)
- elif req.what in constants.QR_OP_LUXI:
+ elif req.what in constants.QR_VIA_LUXI:
raise NotImplementedError
else:
raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
elif method == luxi.REQ_QUERY_FIELDS:
req = objects.QueryFieldsRequest.FromDict(args)
- if req.what in constants.QR_OP_QUERY:
+ if req.what in constants.QR_VIA_OP:
result = self._Query(opcodes.OpQueryFields(what=req.what,
fields=req.fields))
elif req.what == constants.QR_LOCK:
return query.QueryFields(query.LOCK_FIELDS, req.fields)
- elif req.what in constants.QR_OP_LUXI:
+ elif req.what in constants.QR_VIA_LUXI:
raise NotImplementedError
else:
raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
class TestLUQuery(unittest.TestCase):
def test(self):
self.assertEqual(sorted(cmdlib._QUERY_IMPL.keys()),
- sorted(constants.QR_OP_QUERY))
+ sorted(constants.QR_VIA_OP))
- assert constants.QR_NODE in constants.QR_OP_QUERY
- assert constants.QR_INSTANCE in constants.QR_OP_QUERY
+ assert constants.QR_NODE in constants.QR_VIA_OP
+ assert constants.QR_INSTANCE in constants.QR_VIA_OP
- for i in constants.QR_OP_QUERY:
+ for i in constants.QR_VIA_OP:
self.assert_(cmdlib._GetQueryImplementation(i))
self.assertRaises(errors.OpPrereqError, cmdlib._GetQueryImplementation, "")