X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/797506fc9fee5a68d6f54688fa8aaf2e9391b8ec..5d97d6ddf838c40a52cc81eadd9513e5743ef4f5:/lib/cli.py diff --git a/lib/cli.py b/lib/cli.py index 924208d..b7722e2 100644 --- a/lib/cli.py +++ b/lib/cli.py @@ -36,6 +36,8 @@ from ganeti import opcodes from ganeti import luxi from ganeti import ssconf from ganeti import rpc +from ganeti import ssh +from ganeti import compat from optparse import (OptionParser, TitledHelpFormatter, Option, OptionValueError) @@ -43,11 +45,14 @@ from optparse import (OptionParser, TitledHelpFormatter, __all__ = [ # Command line options + "ADD_UIDS_OPT", "ALLOCATABLE_OPT", "ALL_OPT", + "AUTO_PROMOTE_OPT", "AUTO_REPLACE_OPT", "BACKEND_OPT", "CLEANUP_OPT", + "CLUSTER_DOMAIN_SECRET_OPT", "CONFIRM_OPT", "CP_SIZE_OPT", "DEBUG_OPT", @@ -56,6 +61,7 @@ __all__ = [ "DISK_OPT", "DISK_TEMPLATE_OPT", "DRAINED_OPT", + "EARLY_RELEASE_OPT", "ENABLED_HV_OPT", "ERROR_CODES_OPT", "FIELDS_OPT", @@ -68,20 +74,28 @@ __all__ = [ "HVOPTS_OPT", "HYPERVISOR_OPT", "IALLOCATOR_OPT", + "IDENTIFY_DEFAULTS_OPT", "IGNORE_CONSIST_OPT", "IGNORE_FAILURES_OPT", + "IGNORE_REMOVE_FAILURES_OPT", "IGNORE_SECONDARIES_OPT", "IGNORE_SIZE_OPT", "MAC_PREFIX_OPT", + "MAINTAIN_NODE_HEALTH_OPT", "MASTER_NETDEV_OPT", "MC_OPT", "NET_OPT", + "NEW_CLUSTER_CERT_OPT", + "NEW_CLUSTER_DOMAIN_SECRET_OPT", + "NEW_CONFD_HMAC_KEY_OPT", + "NEW_RAPI_CERT_OPT", "NEW_SECONDARY_OPT", "NIC_PARAMS_OPT", "NODE_LIST_OPT", "NODE_PLACEMENT_OPT", "NOHDR_OPT", "NOIPCHECK_OPT", + "NO_INSTALL_OPT", "NONAMECHECK_OPT", "NOLVM_STORAGE_OPT", "NOMODIFY_ETCHOSTS_OPT", @@ -99,8 +113,11 @@ __all__ = [ "OFFLINE_OPT", "OS_OPT", "OS_SIZE_OPT", + "RAPI_CERT_OPT", "READD_OPT", "REBOOT_TYPE_OPT", + "REMOVE_INSTANCE_OPT", + "REMOVE_UIDS_OPT", "SECONDARY_IP_OPT", "SELECT_OS_OPT", "SEP_OPT", @@ -114,7 +131,9 @@ __all__ = [ "SYNC_OPT", "TAG_SRC_OPT", "TIMEOUT_OPT", + "UIDPOOL_OPT", "USEUNITS_OPT", + "USE_REPL_NET_OPT", "VERBOSE_OPT", "VG_NAME_OPT", "YES_DOIT_OPT", @@ -126,6 +145,7 @@ __all__ = [ "JobExecutor", "JobSubmittedException", "ParseTimespec", + "RunWhileClusterStopped", "SubmitOpCode", "SubmitOrSend", "UsesRPC", @@ -145,6 +165,7 @@ __all__ = [ "ARGS_NONE", "ARGS_ONE_INSTANCE", "ARGS_ONE_NODE", + "ARGS_ONE_OS", "ArgChoice", "ArgCommand", "ArgFile", @@ -152,6 +173,7 @@ __all__ = [ "ArgInstance", "ArgJobId", "ArgNode", + "ArgOs", "ArgSuggest", "ArgUnknown", "OPT_COMPL_INST_ADD_NODES", @@ -245,12 +267,18 @@ class ArgHost(_Argument): """ +class ArgOs(_Argument): + """OS argument. + + """ + + ARGS_NONE = [] ARGS_MANY_INSTANCES = [ArgInstance()] ARGS_MANY_NODES = [ArgNode()] ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)] ARGS_ONE_NODE = [ArgNode(min=1, max=1)] - +ARGS_ONE_OS = [ArgOs(min=1, max=1)] def _ExtractTagsObject(opts, args): @@ -313,8 +341,8 @@ def ListTags(opts, args): """ kind, name = _ExtractTagsObject(opts, args) - op = opcodes.OpGetTags(kind=kind, name=name) - result = SubmitOpCode(op) + cl = GetClient() + result = cl.QueryTags(kind, name) result = list(result) result.sort() for tag in result: @@ -385,7 +413,7 @@ def _SplitKeyVal(opt, data): """ kv_dict = {} if data: - for elem in data.split(","): + for elem in utils.UnescapeAndSplit(data, sep=","): if "=" in elem: key, val = elem.split("=", 1) else: @@ -439,6 +467,21 @@ def check_key_val(option, opt, value): # pylint: disable-msg=W0613 return _SplitKeyVal(opt, value) +def check_bool(option, opt, value): # pylint: disable-msg=W0613 + """Custom parser for yes/no options. + + This will store the parsed value as either True or False. + + """ + value = value.lower() + if value == constants.VALUE_FALSE or value == "no": + return False + elif value == constants.VALUE_TRUE or value == "yes": + return True + else: + raise errors.ParameterError("Invalid boolean value '%s'" % value) + + # completion_suggestion is normally a list. Using numeric values not evaluating # to False for dynamic completion. (OPT_COMPL_MANY_NODES, @@ -469,23 +512,23 @@ class CliOption(Option): "identkeyval", "keyval", "unit", + "bool", ) TYPE_CHECKER = Option.TYPE_CHECKER.copy() TYPE_CHECKER["identkeyval"] = check_ident_key_val TYPE_CHECKER["keyval"] = check_key_val TYPE_CHECKER["unit"] = check_unit + TYPE_CHECKER["bool"] = check_bool # optparse.py sets make_option, so we do it for our own option class, too cli_option = CliOption -_YESNO = ("yes", "no") _YORNO = "yes|no" -DEBUG_OPT = cli_option("-d", "--debug", default=False, - action="store_true", - help="Turn debugging on") +DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count", + help="Increase debugging level") NOHDR_OPT = cli_option("--no-headers", default=False, action="store_true", dest="no_headers", @@ -576,6 +619,11 @@ FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant", action="store_true", default=False, help="Force an unknown variant") +NO_INSTALL_OPT = cli_option("--no-install", dest="no_install", + action="store_true", default=False, + help="Do not install the OS (will" + " enable no-start)") + BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams", type="keyval", default={}, help="Backend parameters") @@ -685,6 +733,18 @@ IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures", " configuration even if there are failures" " during the removal process") +IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures", + dest="ignore_remove_failures", + action="store_true", default=False, + help="Remove the instance from the" + " cluster configuration even if there" + " are failures during the removal" + " process") + +REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance", + action="store_true", default=False, + help="Remove the instance from the cluster") + NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node", help="Specifies the new secondary node", metavar="NODE", default=None, @@ -700,6 +760,11 @@ ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary", help="Replace the disk(s) on the secondary" " node (only for the drbd template)") +AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote", + default=False, action="store_true", + help="Lock all nodes and auto-promote as needed" + " to MC status") + AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto", default=False, action="store_true", help="Automatically replace faulty disks" @@ -732,19 +797,19 @@ NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check", MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate", - choices=_YESNO, default=None, metavar=_YORNO, + type="bool", default=None, metavar=_YORNO, help="Set the master_candidate flag on the node") OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO, - choices=_YESNO, default=None, + type="bool", default=None, help="Set the offline flag on the node") DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO, - choices=_YESNO, default=None, + type="bool", default=None, help="Set the drained flag on the node") ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable", - choices=_YESNO, default=None, metavar=_YORNO, + type="bool", default=None, metavar=_YORNO, help="Set the allocatable flag on a volume") NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage", @@ -790,7 +855,6 @@ MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev", metavar="NETDEV", default=constants.DEFAULT_BRIDGE) - GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir", help="Specify the default directory (cluster-" "wide) for storing the file-based disks [%s]" % @@ -838,6 +902,81 @@ SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout", default=constants.DEFAULT_SHUTDOWN_TIMEOUT, help="Maximum time to wait for instance shutdown") +EARLY_RELEASE_OPT = cli_option("--early-release", + dest="early_release", default=False, + action="store_true", + help="Release the locks on the secondary" + " node(s) early") + +NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate", + dest="new_cluster_cert", + default=False, action="store_true", + help="Generate a new cluster certificate") + +RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert", + default=None, + help="File containing new RAPI certificate") + +NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert", + default=None, action="store_true", + help=("Generate a new self-signed RAPI" + " certificate")) + +NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key", + dest="new_confd_hmac_key", + default=False, action="store_true", + help=("Create a new HMAC key for %s" % + constants.CONFD)) + +CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret", + dest="cluster_domain_secret", + default=None, + help=("Load new new cluster domain" + " secret from file")) + +NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret", + dest="new_cluster_domain_secret", + default=False, action="store_true", + help=("Create a new cluster domain" + " secret")) + +USE_REPL_NET_OPT = cli_option("--use-replication-network", + dest="use_replication_network", + help="Whether to use the replication network" + " for talking to the nodes", + action="store_true", default=False) + +MAINTAIN_NODE_HEALTH_OPT = \ + cli_option("--maintain-node-health", dest="maintain_node_health", + metavar=_YORNO, default=None, type="bool", + help="Configure the cluster to automatically maintain node" + " health, by shutting down unknown instances, shutting down" + " unknown DRBD devices, etc.") + +IDENTIFY_DEFAULTS_OPT = \ + cli_option("--identify-defaults", dest="identify_defaults", + default=False, action="store_true", + help="Identify which saved instance parameters are equal to" + " the current cluster defaults and set them as such, instead" + " of marking them as overridden") + +UIDPOOL_OPT = cli_option("--uid-pool", default=None, + action="store", dest="uid_pool", + help=("A list of user-ids or user-id" + " ranges separated by commas")) + +ADD_UIDS_OPT = cli_option("--add-uids", default=None, + action="store", dest="add_uids", + help=("A list of user-ids or user-id" + " ranges separated by commas, to be" + " added to the user-id pool")) + +REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None, + action="store", dest="remove_uids", + help=("A list of user-ids or user-id" + " ranges separated by commas, to be" + " removed from the user-id pool")) + def _ParseArgs(argv, commands, aliases): """Parser for the command line arguments. @@ -1127,12 +1266,28 @@ def PollJob(job_id, cl=None, feedback_fn=None): prev_job_info = None prev_logmsg_serial = None + status = None + + notified_queued = False + notified_waitlock = False + while True: - result = cl.WaitForJobChange(job_id, ["status"], prev_job_info, - prev_logmsg_serial) + result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info, + prev_logmsg_serial) if not result: # job not found, go away! raise errors.JobLost("Job with id %s lost" % job_id) + elif result == constants.JOB_NOTCHANGED: + if status is not None and not callable(feedback_fn): + if status == constants.JOB_STATUS_QUEUED and not notified_queued: + ToStderr("Job %s is waiting in queue", job_id) + notified_queued = True + elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock: + ToStderr("Job %s is trying to acquire all necessary locks", job_id) + notified_waitlock = True + + # Wait again + continue # Split result, a tuple of (field values, log entries) (job_info, log_entries) = result @@ -1183,7 +1338,7 @@ def PollJob(job_id, cl=None, feedback_fn=None): raise errors.OpExecError(result) -def SubmitOpCode(op, cl=None, feedback_fn=None): +def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None): """Legacy function to submit an opcode. This is just a simple wrapper over the construction of the processor @@ -1194,6 +1349,8 @@ def SubmitOpCode(op, cl=None, feedback_fn=None): if cl is None: cl = GetClient() + SetGenericOpcodeOpts([op], opts) + job_id = SendJob([op], cl) op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn) @@ -1209,16 +1366,35 @@ def SubmitOrSend(op, opts, cl=None, feedback_fn=None): whether to just send the job and print its identifier. It is used in order to simplify the implementation of the '--submit' option. - It will also add the dry-run parameter from the options passed, if true. + It will also process the opcodes if we're sending the via SendJob + (otherwise SubmitOpCode does it). """ - if opts and opts.dry_run: - op.dry_run = opts.dry_run if opts and opts.submit_only: - job_id = SendJob([op], cl=cl) + job = [op] + SetGenericOpcodeOpts(job, opts) + job_id = SendJob(job, cl=cl) raise JobSubmittedException(job_id) else: - return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn) + return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts) + + +def SetGenericOpcodeOpts(opcode_list, options): + """Processor for generic options. + + This function updates the given opcodes based on generic command + line options (like debug, dry-run, etc.). + + @param opcode_list: list of opcodes + @param options: command line options or None + @return: None (in-place modification) + + """ + if not options: + return + for op in opcode_list: + op.dry_run = options.dry_run + op.debug_level = options.debug def GetClient(): @@ -1416,9 +1592,12 @@ def GenericInstanceCreate(mode, opts, args): elif opts.no_nics: # no nics nics = [] - else: + elif mode == constants.INSTANCE_CREATE: # default of one nic, all auto nics = [{}] + else: + # mode == import + nics = [] if opts.disk_template == constants.DT_DISKLESS: if opts.disks or opts.sd_size is not None: @@ -1426,30 +1605,45 @@ def GenericInstanceCreate(mode, opts, args): " information passed") disks = [] else: - if not opts.disks and not opts.sd_size: + if (not opts.disks and not opts.sd_size + and mode == constants.INSTANCE_CREATE): raise errors.OpPrereqError("No disk information specified") if opts.disks and opts.sd_size is not None: raise errors.OpPrereqError("Please use either the '--disk' or" " '-s' option") if opts.sd_size is not None: opts.disks = [(0, {"size": opts.sd_size})] - try: - disk_max = max(int(didx[0]) + 1 for didx in opts.disks) - except ValueError, err: - raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err)) - disks = [{}] * disk_max + + if opts.disks: + try: + disk_max = max(int(didx[0]) + 1 for didx in opts.disks) + except ValueError, err: + raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err)) + disks = [{}] * disk_max + else: + disks = [] for didx, ddict in opts.disks: didx = int(didx) if not isinstance(ddict, dict): msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict) raise errors.OpPrereqError(msg) - elif "size" not in ddict: - raise errors.OpPrereqError("Missing size for disk %d" % didx) - try: - ddict["size"] = utils.ParseUnit(ddict["size"]) - except ValueError, err: - raise errors.OpPrereqError("Invalid disk size for disk %d: %s" % - (didx, err)) + elif "size" in ddict: + if "adopt" in ddict: + raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed" + " (disk %d)" % didx) + try: + ddict["size"] = utils.ParseUnit(ddict["size"]) + except ValueError, err: + raise errors.OpPrereqError("Invalid disk size for disk %d: %s" % + (didx, err)) + elif "adopt" in ddict: + if mode == constants.INSTANCE_IMPORT: + raise errors.OpPrereqError("Disk adoption not allowed for instance" + " import") + ddict["size"] = 0 + else: + raise errors.OpPrereqError("Missing size or adoption source for" + " disk %d" % didx) disks[didx] = ddict utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES) @@ -1460,11 +1654,15 @@ def GenericInstanceCreate(mode, opts, args): os_type = opts.os src_node = None src_path = None + no_install = opts.no_install + identify_defaults = False elif mode == constants.INSTANCE_IMPORT: start = False os_type = None src_node = opts.src_node src_path = opts.src_dir + no_install = None + identify_defaults = opts.identify_defaults else: raise errors.ProgrammerError("Invalid creation mode %s" % mode) @@ -1486,12 +1684,137 @@ def GenericInstanceCreate(mode, opts, args): start=start, os_type=os_type, src_node=src_node, - src_path=src_path) + src_path=src_path, + no_install=no_install, + identify_defaults=identify_defaults) SubmitOrSend(op, opts) return 0 +class _RunWhileClusterStoppedHelper: + """Helper class for L{RunWhileClusterStopped} to simplify state management + + """ + def __init__(self, feedback_fn, cluster_name, master_node, online_nodes): + """Initializes this class. + + @type feedback_fn: callable + @param feedback_fn: Feedback function + @type cluster_name: string + @param cluster_name: Cluster name + @type master_node: string + @param master_node Master node name + @type online_nodes: list + @param online_nodes: List of names of online nodes + + """ + self.feedback_fn = feedback_fn + self.cluster_name = cluster_name + self.master_node = master_node + self.online_nodes = online_nodes + + self.ssh = ssh.SshRunner(self.cluster_name) + + self.nonmaster_nodes = [name for name in online_nodes + if name != master_node] + + assert self.master_node not in self.nonmaster_nodes + + def _RunCmd(self, node_name, cmd): + """Runs a command on the local or a remote machine. + + @type node_name: string + @param node_name: Machine name + @type cmd: list + @param cmd: Command + + """ + if node_name is None or node_name == self.master_node: + # No need to use SSH + result = utils.RunCmd(cmd) + else: + result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd)) + + if result.failed: + errmsg = ["Failed to run command %s" % result.cmd] + if node_name: + errmsg.append("on node %s" % node_name) + errmsg.append(": exitcode %s and error %s" % + (result.exit_code, result.output)) + raise errors.OpExecError(" ".join(errmsg)) + + def Call(self, fn, *args): + """Call function while all daemons are stopped. + + @type fn: callable + @param fn: Function to be called + + """ + # Pause watcher by acquiring an exclusive lock on watcher state file + self.feedback_fn("Blocking watcher") + watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE) + try: + # TODO: Currently, this just blocks. There's no timeout. + # TODO: Should it be a shared lock? + watcher_block.Exclusive(blocking=True) + + # Stop master daemons, so that no new jobs can come in and all running + # ones are finished + self.feedback_fn("Stopping master daemons") + self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"]) + try: + # Stop daemons on all nodes + for node_name in self.online_nodes: + self.feedback_fn("Stopping daemons on %s" % node_name) + self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"]) + + # All daemons are shut down now + try: + return fn(self, *args) + except Exception, err: + _, errmsg = FormatError(err) + logging.exception("Caught exception") + self.feedback_fn(errmsg) + raise + finally: + # Start cluster again, master node last + for node_name in self.nonmaster_nodes + [self.master_node]: + self.feedback_fn("Starting daemons on %s" % node_name) + self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"]) + finally: + # Resume watcher + watcher_block.Close() + + +def RunWhileClusterStopped(feedback_fn, fn, *args): + """Calls a function while all cluster daemons are stopped. + + @type feedback_fn: callable + @param feedback_fn: Feedback function + @type fn: callable + @param fn: Function to be called when daemons are stopped + + """ + feedback_fn("Gathering cluster information") + + # This ensures we're running on the master daemon + cl = GetClient() + + (cluster_name, master_node) = \ + cl.QueryConfigValues(["cluster_name", "master_node"]) + + online_nodes = GetOnlineNodes([], cl=cl) + + # Don't keep a reference to the client. The master daemon will go away. + del cl + + assert master_node in online_nodes + + return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node, + online_nodes).Call(fn, *args) + + def GenerateTable(headers, fields, separator, data, numfields=None, unitfields=None, units=None): @@ -1562,7 +1885,7 @@ def GenerateTable(headers, fields, separator, data, if unitfields.Matches(fields[idx]): try: val = int(val) - except ValueError: + except (TypeError, ValueError): pass else: val = row[idx] = utils.FormatUnit(val, units) @@ -1581,6 +1904,12 @@ def GenerateTable(headers, fields, separator, data, args.append(hdr) result.append(format % tuple(args)) + if separator is None: + assert len(mlens) == len(fields) + + if fields and not numfields.Matches(fields[-1]): + mlens[-1] = 0 + for line in data: args = [] if line is None: @@ -1637,7 +1966,7 @@ def ParseTimespec(value): if value[-1] not in suffix_map: try: value = int(value) - except ValueError: + except (TypeError, ValueError): raise errors.OpPrereqError("Invalid time specification '%s'" % value) else: multiplier = suffix_map[value[-1]] @@ -1647,12 +1976,13 @@ def ParseTimespec(value): " suffix passed)") try: value = int(value) * multiplier - except ValueError: + except (TypeError, ValueError): raise errors.OpPrereqError("Invalid time specification '%s'" % value) return value -def GetOnlineNodes(nodes, cl=None, nowarn=False): +def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False, + filter_master=False): """Returns the names of online nodes. This function will also log a warning on stderr with the names of @@ -1665,17 +1995,36 @@ def GetOnlineNodes(nodes, cl=None, nowarn=False): @param nowarn: by default, this function will output a note with the offline nodes that are skipped; if this parameter is True the note is not displayed + @type secondary_ips: boolean + @param secondary_ips: if True, return the secondary IPs instead of the + names, useful for doing network traffic over the replication interface + (if any) + @type filter_master: boolean + @param filter_master: if True, do not return the master node in the list + (useful in coordination with secondary_ips where we cannot check our + node name against the list) """ if cl is None: cl = GetClient() - result = cl.QueryNodes(names=nodes, fields=["name", "offline"], + if secondary_ips: + name_idx = 2 + else: + name_idx = 0 + + if filter_master: + master_node = cl.QueryConfigValues(["master_node"])[0] + filter_fn = lambda x: x != master_node + else: + filter_fn = lambda _: True + + result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"], use_locking=False) offline = [row[0] for row in result if row[1]] if offline and not nowarn: ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline)) - return [row[0] for row in result if not row[1]] + return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])] def _ToStream(stream, txt, *args): @@ -1727,13 +2076,15 @@ class JobExecutor(object): GetResults() calls. """ - def __init__(self, cl=None, verbose=True): + def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None): self.queue = [] if cl is None: cl = GetClient() self.cl = cl self.verbose = verbose self.jobs = [] + self.opts = opts + self.feedback_fn = feedback_fn def QueueJob(self, name, *ops): """Record a job for later submit. @@ -1741,6 +2092,7 @@ class JobExecutor(object): @type name: string @param name: a description of the job, will be used in WaitJobSet """ + SetGenericOpcodeOpts(ops, self.opts) self.queue.append((name, ops)) def SubmitPending(self): @@ -1748,8 +2100,31 @@ class JobExecutor(object): """ results = self.cl.SubmitManyJobs([row[1] for row in self.queue]) - for ((status, data), (name, _)) in zip(results, self.queue): - self.jobs.append((status, data, name)) + for (idx, ((status, data), (name, _))) in enumerate(zip(results, + self.queue)): + self.jobs.append((idx, status, data, name)) + + def _ChooseJob(self): + """Choose a non-waiting/queued job to poll next. + + """ + assert self.jobs, "_ChooseJob called with empty job list" + + result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"]) + assert result + + for job_data, status in zip(self.jobs, result): + if status[0] in (constants.JOB_STATUS_QUEUED, + constants.JOB_STATUS_WAITLOCK, + constants.JOB_STATUS_CANCELING): + # job is still waiting + continue + # good candidate found + self.jobs.remove(job_data) + return job_data + + # no job found + return self.jobs.pop(0) def GetResults(self): """Wait for and return the results of all jobs. @@ -1764,18 +2139,21 @@ class JobExecutor(object): self.SubmitPending() results = [] if self.verbose: - ok_jobs = [row[1] for row in self.jobs if row[0]] + ok_jobs = [row[2] for row in self.jobs if row[1]] if ok_jobs: ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs)) - for submit_status, jid, name in self.jobs: - if not submit_status: - ToStderr("Failed to submit job for %s: %s", name, jid) - results.append((False, jid)) - continue - if self.verbose: - ToStdout("Waiting for job %s for %s...", jid, name) + + # first, remove any non-submitted jobs + self.jobs, failures = compat.partition(self.jobs, lambda x: x[1]) + for idx, _, jid, name in failures: + ToStderr("Failed to submit job for %s: %s", name, jid) + results.append((idx, False, jid)) + + while self.jobs: + (idx, _, jid, name) = self._ChooseJob() + ToStdout("Waiting for job %s for %s...", jid, name) try: - job_result = PollJob(jid, cl=self.cl) + job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn) success = True except (errors.GenericError, luxi.ProtocolError), err: _, job_result = FormatError(err) @@ -1783,7 +2161,12 @@ class JobExecutor(object): # the error message will always be shown, verbose or not ToStderr("Job %s for %s has failed: %s", jid, name, job_result) - results.append((success, job_result)) + results.append((idx, success, job_result)) + + # sort based on the index, then drop it + results.sort() + results = [i[1:] for i in results] + return results def WaitOrShow(self, wait): @@ -1798,7 +2181,7 @@ class JobExecutor(object): else: if not self.jobs: self.SubmitPending() - for status, result, name in self.jobs: + for _, status, result, name in self.jobs: if status: ToStdout("%s: %s", result, name) else: