X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/edd49f9bfeec2f7a8a896027a7db82908979b695..4a78c361a6de3bcbf98f02abfe41ae3b11de2b00:/lib/query.py diff --git a/lib/query.py b/lib/query.py index 2091190..ffd4d20 100644 --- a/lib/query.py +++ b/lib/query.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2010, 2011 Google Inc. +# Copyright (C) 2010, 2011, 2012 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -62,6 +62,7 @@ from ganeti import utils from ganeti import compat from ganeti import objects from ganeti import ht +from ganeti import runtime from ganeti import qlang from ganeti.constants import (QFT_UNKNOWN, QFT_TEXT, QFT_BOOL, QFT_NUMBER, @@ -92,7 +93,12 @@ from ganeti.constants import (QFT_UNKNOWN, QFT_TEXT, QFT_BOOL, QFT_NUMBER, (GQ_CONFIG, GQ_NODE, - GQ_INST) = range(200, 203) + GQ_INST, + GQ_DISKPARAMS) = range(200, 204) + +(CQ_CONFIG, + CQ_QUEUE_DRAINED, + CQ_WATCHER_PAUSE) = range(300, 303) # Query field flags QFF_HOSTNAME = 0x01 @@ -136,6 +142,12 @@ _VTToQFT = { _SERIAL_NO_DOC = "%s object serial number, incremented on each modification" +# TODO: Consider moving titles closer to constants +NDP_TITLE = { + constants.ND_OOB_PROGRAM: "OutOfBandProgram", + constants.ND_SPINDLE_COUNT: "SpindleCount", + } + def _GetUnknownField(ctx, item): # pylint: disable=W0613 """Gets the contents of an unknown field. @@ -868,6 +880,20 @@ def _MakeField(name, title, kind, doc): doc=doc) +def _StaticValueInner(value, ctx, _): # pylint: disable=W0613 + """Returns a static value. + + """ + return value + + +def _StaticValue(value): + """Prepares a function to return a static value. + + """ + return compat.partial(_StaticValueInner, value) + + def _GetNodeRole(node, master_name): """Determine node role. @@ -899,6 +925,34 @@ def _GetItemAttr(attr): return lambda _, item: getter(item) +def _GetNDParam(name): + """Return a field function to return an ND parameter out of the context. + + """ + def _helper(ctx, _): + if ctx.ndparams is None: + return _FS_UNAVAIL + else: + return ctx.ndparams.get(name, None) + return _helper + + +def _BuildNDFields(is_group): + """Builds all the ndparam fields. + + @param is_group: whether this is called at group or node level + + """ + if is_group: + field_kind = GQ_CONFIG + else: + field_kind = NQ_GROUP + return [(_MakeField("ndp/%s" % name, NDP_TITLE.get(name, "ndp/%s" % name), + _VTToQFT[kind], "The \"%s\" node parameter" % name), + field_kind, 0, _GetNDParam(name)) + for name, kind in constants.NDS_PARAMETER_TYPES.items()] + + def _ConvWrapInner(convert, fn, ctx, item): """Wrapper for converting values. @@ -982,6 +1036,7 @@ class NodeQueryData: # Used for individual rows self.curlive_data = None + self.ndparams = None def __iter__(self): """Iterate over all nodes. @@ -991,6 +1046,11 @@ class NodeQueryData: """ for node in self.nodes: + group = self.groups.get(node.group, None) + if group is None: + self.ndparams = None + else: + self.ndparams = self.cluster.FillND(node, group) if self.live_data: self.curlive_data = self.live_data.get(node.name, None) else: @@ -1198,6 +1258,8 @@ def _BuildNodeFields(): NQ_CONFIG, 0, _GetNodeDiskState), ] + fields.extend(_BuildNDFields(False)) + # Node role role_values = (constants.NR_MASTER, constants.NR_MCANDIDATE, constants.NR_REGULAR, constants.NR_DRAINED, @@ -1941,7 +2003,8 @@ class GroupQueryData: """Data container for node group data queries. """ - def __init__(self, cluster, groups, group_to_nodes, group_to_instances): + def __init__(self, cluster, groups, group_to_nodes, group_to_instances, + want_diskparams): """Initializes this class. @param cluster: Cluster object @@ -1950,15 +2013,20 @@ class GroupQueryData: @param group_to_nodes: Per-group list of nodes @type group_to_instances: dict; group UUID as key @param group_to_instances: Per-group list of (primary) instances + @type want_diskparams: bool + @param want_diskparams: Whether diskparamters should be calculated """ self.groups = groups self.group_to_nodes = group_to_nodes self.group_to_instances = group_to_instances self.cluster = cluster + self.want_diskparams = want_diskparams # Used for individual rows self.group_ipolicy = None + self.ndparams = None + self.group_dp = None def __iter__(self): """Iterate over all node groups. @@ -1969,6 +2037,11 @@ class GroupQueryData: """ for group in self.groups: self.group_ipolicy = self.cluster.SimpleFillIPolicy(group.ipolicy) + self.ndparams = self.cluster.SimpleFillND(group.ndparams) + if self.want_diskparams: + self.group_dp = self.cluster.SimpleFillDP(group.diskparams) + else: + self.group_dp = None yield group @@ -1977,7 +2050,6 @@ _GROUP_SIMPLE_FIELDS = { "name": ("Group", QFT_TEXT, "Group name"), "serial_no": ("SerialNo", QFT_NUMBER, _SERIAL_NO_DOC % "Group"), "uuid": ("UUID", QFT_TEXT, "Group UUID"), - "ndparams": ("NDParams", QFT_OTHER, "Node parameters"), } @@ -2027,8 +2099,23 @@ def _BuildGroupFields(): (_MakeField("custom_ipolicy", "CustomInstancePolicy", QFT_OTHER, "Custom instance policy limitations"), GQ_CONFIG, 0, _GetItemAttr("ipolicy")), + (_MakeField("custom_ndparams", "CustomNDParams", QFT_OTHER, + "Custom node parameters"), + GQ_CONFIG, 0, _GetItemAttr("ndparams")), + (_MakeField("ndparams", "NDParams", QFT_OTHER, + "Node parameters"), + GQ_CONFIG, 0, lambda ctx, _: ctx.ndparams), + (_MakeField("diskparams", "DiskParameters", QFT_OTHER, + "Disk parameters (merged)"), + GQ_DISKPARAMS, 0, lambda ctx, _: ctx.group_dp), + (_MakeField("custom_diskparams", "CustomDiskParameters", QFT_OTHER, + "Custom disk parameters"), + GQ_CONFIG, 0, _GetItemAttr("diskparams")), ]) + # ND parameters + fields.extend(_BuildNDFields(True)) + fields.extend(_GetItemTimestampFields(GQ_CONFIG)) return _PrepareFieldList(fields, []) @@ -2081,6 +2168,230 @@ def _BuildOsFields(): return _PrepareFieldList(fields, []) +def _JobUnavailInner(fn, ctx, (job_id, job)): # pylint: disable=W0613 + """Return L{_FS_UNAVAIL} if job is None. + + When listing specifc jobs (e.g. "gnt-job list 1 2 3"), a job may not be + found, in which case this function converts it to L{_FS_UNAVAIL}. + + """ + if job is None: + return _FS_UNAVAIL + else: + return fn(job) + + +def _JobUnavail(inner): + """Wrapper for L{_JobUnavailInner}. + + """ + return compat.partial(_JobUnavailInner, inner) + + +def _PerJobOpInner(fn, job): + """Executes a function per opcode in a job. + + """ + return map(fn, job.ops) + + +def _PerJobOp(fn): + """Wrapper for L{_PerJobOpInner}. + + """ + return _JobUnavail(compat.partial(_PerJobOpInner, fn)) + + +def _JobTimestampInner(fn, job): + """Converts unavailable timestamp to L{_FS_UNAVAIL}. + + """ + timestamp = fn(job) + + if timestamp is None: + return _FS_UNAVAIL + else: + return timestamp + + +def _JobTimestamp(fn): + """Wrapper for L{_JobTimestampInner}. + + """ + return _JobUnavail(compat.partial(_JobTimestampInner, fn)) + + +def _BuildJobFields(): + """Builds list of fields for job queries. + + """ + fields = [ + (_MakeField("id", "ID", QFT_TEXT, "Job ID"), + None, 0, lambda _, (job_id, job): job_id), + (_MakeField("status", "Status", QFT_TEXT, "Job status"), + None, 0, _JobUnavail(lambda job: job.CalcStatus())), + (_MakeField("priority", "Priority", QFT_NUMBER, + ("Current job priority (%s to %s)" % + (constants.OP_PRIO_LOWEST, constants.OP_PRIO_HIGHEST))), + None, 0, _JobUnavail(lambda job: job.CalcPriority())), + (_MakeField("ops", "OpCodes", QFT_OTHER, "List of all opcodes"), + None, 0, _PerJobOp(lambda op: op.input.__getstate__())), + (_MakeField("opresult", "OpCode_result", QFT_OTHER, + "List of opcodes results"), + None, 0, _PerJobOp(operator.attrgetter("result"))), + (_MakeField("opstatus", "OpCode_status", QFT_OTHER, + "List of opcodes status"), + None, 0, _PerJobOp(operator.attrgetter("status"))), + (_MakeField("oplog", "OpCode_log", QFT_OTHER, + "List of opcode output logs"), + None, 0, _PerJobOp(operator.attrgetter("log"))), + (_MakeField("opstart", "OpCode_start", QFT_OTHER, + "List of opcode start timestamps (before acquiring locks)"), + None, 0, _PerJobOp(operator.attrgetter("start_timestamp"))), + (_MakeField("opexec", "OpCode_exec", QFT_OTHER, + "List of opcode execution start timestamps (after acquiring" + " locks)"), + None, 0, _PerJobOp(operator.attrgetter("exec_timestamp"))), + (_MakeField("opend", "OpCode_end", QFT_OTHER, + "List of opcode execution end timestamps"), + None, 0, _PerJobOp(operator.attrgetter("end_timestamp"))), + (_MakeField("oppriority", "OpCode_prio", QFT_OTHER, + "List of opcode priorities"), + None, 0, _PerJobOp(operator.attrgetter("priority"))), + (_MakeField("received_ts", "Received", QFT_OTHER, + "Timestamp of when job was received"), + None, 0, _JobTimestamp(operator.attrgetter("received_timestamp"))), + (_MakeField("start_ts", "Start", QFT_OTHER, + "Timestamp of job start"), + None, 0, _JobTimestamp(operator.attrgetter("start_timestamp"))), + (_MakeField("end_ts", "End", QFT_OTHER, + "Timestamp of job end"), + None, 0, _JobTimestamp(operator.attrgetter("end_timestamp"))), + (_MakeField("summary", "Summary", QFT_OTHER, + "List of per-opcode summaries"), + None, 0, _PerJobOp(lambda op: op.input.Summary())), + ] + + return _PrepareFieldList(fields, []) + + +def _GetExportName(_, (node_name, expname)): # pylint: disable=W0613 + """Returns an export name if available. + + """ + if expname is None: + return _FS_UNAVAIL + else: + return expname + + +def _BuildExportFields(): + """Builds list of fields for exports. + + """ + fields = [ + (_MakeField("node", "Node", QFT_TEXT, "Node name"), + None, QFF_HOSTNAME, lambda _, (node_name, expname): node_name), + (_MakeField("export", "Export", QFT_TEXT, "Export name"), + None, 0, _GetExportName), + ] + + return _PrepareFieldList(fields, []) + + +_CLUSTER_VERSION_FIELDS = { + "software_version": ("SoftwareVersion", QFT_TEXT, constants.RELEASE_VERSION, + "Software version"), + "protocol_version": ("ProtocolVersion", QFT_NUMBER, + constants.PROTOCOL_VERSION, + "RPC protocol version"), + "config_version": ("ConfigVersion", QFT_NUMBER, constants.CONFIG_VERSION, + "Configuration format version"), + "os_api_version": ("OsApiVersion", QFT_NUMBER, max(constants.OS_API_VERSIONS), + "API version for OS template scripts"), + "export_version": ("ExportVersion", QFT_NUMBER, constants.EXPORT_VERSION, + "Import/export file format version"), + } + + +_CLUSTER_SIMPLE_FIELDS = { + "cluster_name": ("Name", QFT_TEXT, QFF_HOSTNAME, "Cluster name"), + "master_node": ("Master", QFT_TEXT, QFF_HOSTNAME, "Master node name"), + "volume_group_name": ("VgName", QFT_TEXT, 0, "LVM volume group name"), + } + + +class ClusterQueryData: + def __init__(self, cluster, drain_flag, watcher_pause): + """Initializes this class. + + @type cluster: L{objects.Cluster} + @param cluster: Instance of cluster object + @type drain_flag: bool + @param drain_flag: Whether job queue is drained + @type watcher_pause: number + @param watcher_pause: Until when watcher is paused (Unix timestamp) + + """ + self._cluster = cluster + self.drain_flag = drain_flag + self.watcher_pause = watcher_pause + + def __iter__(self): + return iter([self._cluster]) + + +def _ClusterWatcherPause(ctx, _): + """Returns until when watcher is paused (if available). + + """ + if ctx.watcher_pause is None: + return _FS_UNAVAIL + else: + return ctx.watcher_pause + + +def _BuildClusterFields(): + """Builds list of fields for cluster information. + + """ + fields = [ + (_MakeField("tags", "Tags", QFT_OTHER, "Tags"), CQ_CONFIG, 0, + lambda ctx, cluster: list(cluster.GetTags())), + (_MakeField("architecture", "ArchInfo", QFT_OTHER, + "Architecture information"), None, 0, + lambda ctx, _: runtime.GetArchInfo()), + (_MakeField("drain_flag", "QueueDrained", QFT_BOOL, + "Flag whether job queue is drained"), CQ_QUEUE_DRAINED, 0, + lambda ctx, _: ctx.drain_flag), + (_MakeField("watcher_pause", "WatcherPause", QFT_TIMESTAMP, + "Until when watcher is paused"), CQ_WATCHER_PAUSE, 0, + _ClusterWatcherPause), + ] + + # Simple fields + fields.extend([ + (_MakeField(name, title, kind, doc), CQ_CONFIG, flags, _GetItemAttr(name)) + for (name, (title, kind, flags, doc)) in _CLUSTER_SIMPLE_FIELDS.items() + ]) + + # Version fields + fields.extend([ + (_MakeField(name, title, kind, doc), None, 0, _StaticValue(value)) + for (name, (title, kind, value, doc)) in _CLUSTER_VERSION_FIELDS.items() + ]) + + # Add timestamps + fields.extend(_GetItemTimestampFields(CQ_CONFIG)) + + return _PrepareFieldList(fields, [ + ("name", "cluster_name"), + ]) + + +#: Fields for cluster information +CLUSTER_FIELDS = _BuildClusterFields() + #: Fields available for node queries NODE_FIELDS = _BuildNodeFields() @@ -2096,13 +2407,22 @@ GROUP_FIELDS = _BuildGroupFields() #: Fields available for operating system queries OS_FIELDS = _BuildOsFields() +#: Fields available for job queries +JOB_FIELDS = _BuildJobFields() + +#: Fields available for exports +EXPORT_FIELDS = _BuildExportFields() + #: All available resources ALL_FIELDS = { + constants.QR_CLUSTER: CLUSTER_FIELDS, constants.QR_INSTANCE: INSTANCE_FIELDS, constants.QR_NODE: NODE_FIELDS, constants.QR_LOCK: LOCK_FIELDS, constants.QR_GROUP: GROUP_FIELDS, constants.QR_OS: OS_FIELDS, + constants.QR_JOB: JOB_FIELDS, + constants.QR_EXPORT: EXPORT_FIELDS, } #: All available field lists