X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/07150497ab3469967f82e481e15292937dacd172..958d01f8f739093f752bff4af259f3e9bb7ff4c7:/lib/cli.py diff --git a/lib/cli.py b/lib/cli.py index d4dfee6..61c0e14 100644 --- a/lib/cli.py +++ b/lib/cli.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -25,9 +25,9 @@ import sys import textwrap import os.path -import copy import time import logging +import errno from cStringIO import StringIO from ganeti import utils @@ -37,6 +37,10 @@ from ganeti import opcodes from ganeti import luxi from ganeti import ssconf from ganeti import rpc +from ganeti import ssh +from ganeti import compat +from ganeti import netutils +from ganeti import qlang from optparse import (OptionParser, TitledHelpFormatter, Option, OptionValueError) @@ -44,11 +48,18 @@ from optparse import (OptionParser, TitledHelpFormatter, __all__ = [ # Command line options + "ADD_UIDS_OPT", "ALLOCATABLE_OPT", + "ALLOC_POLICY_OPT", "ALL_OPT", + "AUTO_PROMOTE_OPT", "AUTO_REPLACE_OPT", "BACKEND_OPT", + "BLK_OS_OPT", + "CAPAB_MASTER_OPT", + "CAPAB_VM_OPT", "CLEANUP_OPT", + "CLUSTER_DOMAIN_SECRET_OPT", "CONFIRM_OPT", "CP_SIZE_OPT", "DEBUG_OPT", @@ -57,33 +68,57 @@ __all__ = [ "DISK_OPT", "DISK_TEMPLATE_OPT", "DRAINED_OPT", + "DRY_RUN_OPT", + "DRBD_HELPER_OPT", + "EARLY_RELEASE_OPT", "ENABLED_HV_OPT", "ERROR_CODES_OPT", "FIELDS_OPT", "FILESTORE_DIR_OPT", "FILESTORE_DRIVER_OPT", + "FORCE_OPT", + "FORCE_VARIANT_OPT", "GLOBAL_FILEDIR_OPT", + "HID_OS_OPT", "HVLIST_OPT", "HVOPTS_OPT", "HYPERVISOR_OPT", "IALLOCATOR_OPT", + "DEFAULT_IALLOCATOR_OPT", + "IDENTIFY_DEFAULTS_OPT", "IGNORE_CONSIST_OPT", "IGNORE_FAILURES_OPT", + "IGNORE_OFFLINE_OPT", + "IGNORE_REMOVE_FAILURES_OPT", "IGNORE_SECONDARIES_OPT", "IGNORE_SIZE_OPT", - "FORCE_OPT", + "INTERVAL_OPT", "MAC_PREFIX_OPT", + "MAINTAIN_NODE_HEALTH_OPT", "MASTER_NETDEV_OPT", "MC_OPT", + "MIGRATION_MODE_OPT", "NET_OPT", + "NEW_CLUSTER_CERT_OPT", + "NEW_CLUSTER_DOMAIN_SECRET_OPT", + "NEW_CONFD_HMAC_KEY_OPT", + "NEW_RAPI_CERT_OPT", "NEW_SECONDARY_OPT", "NIC_PARAMS_OPT", + "NODE_FORCE_JOIN_OPT", "NODE_LIST_OPT", "NODE_PLACEMENT_OPT", + "NODEGROUP_OPT", + "NODE_PARAMS_OPT", + "NODE_POWERED_OPT", + "NODRBD_STORAGE_OPT", "NOHDR_OPT", "NOIPCHECK_OPT", + "NO_INSTALL_OPT", + "NONAMECHECK_OPT", "NOLVM_STORAGE_OPT", "NOMODIFY_ETCHOSTS_OPT", + "NOMODIFY_SSH_SETUP_OPT", "NONICS_OPT", "NONLIVE_OPT", "NONPLUS1_OPT", @@ -95,14 +130,24 @@ __all__ = [ "ON_PRIMARY_OPT", "ON_SECONDARY_OPT", "OFFLINE_OPT", + "OSPARAMS_OPT", "OS_OPT", "OS_SIZE_OPT", + "PREALLOC_WIPE_DISKS_OPT", + "PRIMARY_IP_VERSION_OPT", + "PRIORITY_OPT", + "RAPI_CERT_OPT", "READD_OPT", "REBOOT_TYPE_OPT", + "REMOVE_INSTANCE_OPT", + "REMOVE_UIDS_OPT", + "RESERVED_LVS_OPT", + "ROMAN_OPT", "SECONDARY_IP_OPT", "SELECT_OS_OPT", "SEP_OPT", "SHOWCMD_OPT", + "SHUTDOWN_TIMEOUT_OPT", "SINGLE_NODE_OPT", "SRC_DIR_OPT", "SRC_NODE_OPT", @@ -110,27 +155,36 @@ __all__ = [ "STATIC_OPT", "SYNC_OPT", "TAG_SRC_OPT", + "TIMEOUT_OPT", + "UIDPOOL_OPT", "USEUNITS_OPT", + "USE_REPL_NET_OPT", "VERBOSE_OPT", "VG_NAME_OPT", "YES_DOIT_OPT", # Generic functions for CLI programs "GenericMain", "GenericInstanceCreate", + "GenericList", + "GenericListFields", "GetClient", "GetOnlineNodes", "JobExecutor", "JobSubmittedException", "ParseTimespec", + "RunWhileClusterStopped", "SubmitOpCode", "SubmitOrSend", "UsesRPC", # Formatting functions "ToStderr", "ToStdout", "FormatError", + "FormatQueryResult", + "FormatParameterDict", "GenerateTable", "AskUser", "FormatTimestamp", + "FormatLogMessage", # Tags functions "ListTags", "AddTags", @@ -138,16 +192,21 @@ __all__ = [ # command line options support infrastructure "ARGS_MANY_INSTANCES", "ARGS_MANY_NODES", + "ARGS_MANY_GROUPS", "ARGS_NONE", "ARGS_ONE_INSTANCE", "ARGS_ONE_NODE", + "ARGS_ONE_GROUP", + "ARGS_ONE_OS", "ArgChoice", "ArgCommand", "ArgFile", + "ArgGroup", "ArgHost", "ArgInstance", "ArgJobId", "ArgNode", + "ArgOs", "ArgSuggest", "ArgUnknown", "OPT_COMPL_INST_ADD_NODES", @@ -155,18 +214,38 @@ __all__ = [ "OPT_COMPL_ONE_IALLOCATOR", "OPT_COMPL_ONE_INSTANCE", "OPT_COMPL_ONE_NODE", + "OPT_COMPL_ONE_NODEGROUP", "OPT_COMPL_ONE_OS", "cli_option", "SplitNodeOption", "CalculateOSNames", + "ParseFields", + "COMMON_CREATE_OPTS", ] NO_PREFIX = "no_" UN_PREFIX = "-" +#: Priorities (sorted) +_PRIORITY_NAMES = [ + ("low", constants.OP_PRIO_LOW), + ("normal", constants.OP_PRIO_NORMAL), + ("high", constants.OP_PRIO_HIGH), + ] + +#: Priority dictionary for easier lookup +# TODO: Replace this and _PRIORITY_NAMES with a single sorted dictionary once +# we migrate to Python 2.6 +_PRIONAME_TO_VALUE = dict(_PRIORITY_NAMES) + +# Query result status for clients +(QR_NORMAL, + QR_UNKNOWN, + QR_INCOMPLETE) = range(3) + class _Argument: - def __init__(self, min=0, max=None): + def __init__(self, min=0, max=None): # pylint: disable-msg=W0622 self.min = min self.max = max @@ -181,6 +260,7 @@ class ArgSuggest(_Argument): Value can be any of the ones passed to the constructor. """ + # pylint: disable-msg=W0622 def __init__(self, min=0, max=None, choices=None): _Argument.__init__(self, min=min, max=max) self.choices = choices @@ -216,6 +296,13 @@ class ArgNode(_Argument): """ + +class ArgGroup(_Argument): + """Node group argument. + + """ + + class ArgJobId(_Argument): """Job ID argument. @@ -240,12 +327,21 @@ class ArgHost(_Argument): """ +class ArgOs(_Argument): + """OS argument. + + """ + + ARGS_NONE = [] ARGS_MANY_INSTANCES = [ArgInstance()] ARGS_MANY_NODES = [ArgNode()] +ARGS_MANY_GROUPS = [ArgGroup()] ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)] ARGS_ONE_NODE = [ArgNode(min=1, max=1)] - +# TODO +ARGS_ONE_GROUP = [ArgGroup(min=1, max=1)] +ARGS_ONE_OS = [ArgOs(min=1, max=1)] def _ExtractTagsObject(opts, args): @@ -308,8 +404,8 @@ def ListTags(opts, args): """ kind, name = _ExtractTagsObject(opts, args) - op = opcodes.OpGetTags(kind=kind, name=name) - result = SubmitOpCode(op) + cl = GetClient() + result = cl.QueryTags(kind, name) result = list(result) result.sort() for tag in result: @@ -329,8 +425,8 @@ def AddTags(opts, args): _ExtendTags(opts, args) if not args: raise errors.OpPrereqError("No tags to be added") - op = opcodes.OpAddTags(kind=kind, name=name, tags=args) - SubmitOpCode(op) + op = opcodes.OpTagsSet(kind=kind, name=name, tags=args) + SubmitOpCode(op, opts=opts) def RemoveTags(opts, args): @@ -346,11 +442,11 @@ def RemoveTags(opts, args): _ExtendTags(opts, args) if not args: raise errors.OpPrereqError("No tags to be removed") - op = opcodes.OpDelTags(kind=kind, name=name, tags=args) - SubmitOpCode(op) + op = opcodes.OpTagsDel(kind=kind, name=name, tags=args) + SubmitOpCode(op, opts=opts) -def check_unit(option, opt, value): +def check_unit(option, opt, value): # pylint: disable-msg=W0613 """OptParsers custom converter for units. """ @@ -380,7 +476,7 @@ def _SplitKeyVal(opt, data): """ kv_dict = {} if data: - for elem in data.split(","): + for elem in utils.UnescapeAndSplit(data, sep=","): if "=" in elem: key, val = elem.split("=", 1) else: @@ -397,7 +493,7 @@ def _SplitKeyVal(opt, data): return kv_dict -def check_ident_key_val(option, opt, value): +def check_ident_key_val(option, opt, value): # pylint: disable-msg=W0613 """Custom parser for ident:key=val,key=val options. This will store the parsed values as a tuple (ident, {key: val}). As such, @@ -425,7 +521,7 @@ def check_ident_key_val(option, opt, value): return retval -def check_key_val(option, opt, value): +def check_key_val(option, opt, value): # pylint: disable-msg=W0613 """Custom parser class for key=val,key=val options. This will store the parsed values as a dict {key: val}. @@ -434,6 +530,21 @@ def check_key_val(option, opt, value): return _SplitKeyVal(opt, value) +def check_bool(option, opt, value): # pylint: disable-msg=W0613 + """Custom parser for yes/no options. + + This will store the parsed value as either True or False. + + """ + value = value.lower() + if value == constants.VALUE_FALSE or value == "no": + return False + elif value == constants.VALUE_TRUE or value == "yes": + return True + else: + raise errors.ParameterError("Invalid boolean value '%s'" % value) + + # completion_suggestion is normally a list. Using numeric values not evaluating # to False for dynamic completion. (OPT_COMPL_MANY_NODES, @@ -441,7 +552,8 @@ def check_key_val(option, opt, value): OPT_COMPL_ONE_INSTANCE, OPT_COMPL_ONE_OS, OPT_COMPL_ONE_IALLOCATOR, - OPT_COMPL_INST_ADD_NODES) = range(100, 106) + OPT_COMPL_INST_ADD_NODES, + OPT_COMPL_ONE_NODEGROUP) = range(100, 107) OPT_COMPL_ALL = frozenset([ OPT_COMPL_MANY_NODES, @@ -450,6 +562,7 @@ OPT_COMPL_ALL = frozenset([ OPT_COMPL_ONE_OS, OPT_COMPL_ONE_IALLOCATOR, OPT_COMPL_INST_ADD_NODES, + OPT_COMPL_ONE_NODEGROUP, ]) @@ -464,23 +577,23 @@ class CliOption(Option): "identkeyval", "keyval", "unit", + "bool", ) TYPE_CHECKER = Option.TYPE_CHECKER.copy() TYPE_CHECKER["identkeyval"] = check_ident_key_val TYPE_CHECKER["keyval"] = check_key_val TYPE_CHECKER["unit"] = check_unit + TYPE_CHECKER["bool"] = check_bool # optparse.py sets make_option, so we do it for our own option class, too cli_option = CliOption -_YESNO = ("yes", "no") _YORNO = "yes|no" -DEBUG_OPT = cli_option("-d", "--debug", default=False, - action="store_true", - help="Turn debugging on") +DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count", + help="Increase debugging level") NOHDR_OPT = cli_option("--no-headers", default=False, action="store_true", dest="no_headers", @@ -493,7 +606,7 @@ SEP_OPT = cli_option("--separator", default=None, USEUNITS_OPT = cli_option("--units", default=None, dest="units", choices=('h', 'm', 'g', 't'), - help="Specify units for output (one of hmgt)") + help="Specify units for output (one of h/m/g/t)") FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store", type="string", metavar="FIELDS", @@ -505,6 +618,11 @@ FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true", CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true", default=False, help="Do not require confirmation") +IGNORE_OFFLINE_OPT = cli_option("--ignore-offline", dest="ignore_offline", + action="store_true", default=False, + help=("Ignore offline nodes and do as much" + " as possible")) + TAG_SRC_OPT = cli_option("--from", dest="tags_source", default=None, help="File with tag names") @@ -518,11 +636,11 @@ SYNC_OPT = cli_option("--sync", dest="do_locking", help=("Grab locks while doing the queries" " in order to ensure more consistent results")) -_DRY_RUN_OPT = cli_option("--dry-run", default=False, - action="store_true", - help=("Do not execute the operation, just run the" - " check steps and verify it it could be" - " executed")) +DRY_RUN_OPT = cli_option("--dry-run", default=False, + action="store_true", + help=("Do not execute the operation, just run the" + " check steps and verify it it could be" + " executed")) VERBOSE_OPT = cli_option("-v", "--verbose", default=False, action="store_true", @@ -563,10 +681,29 @@ IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="", default=None, type="string", completion_suggest=OPT_COMPL_ONE_IALLOCATOR) +DEFAULT_IALLOCATOR_OPT = cli_option("-I", "--default-iallocator", + metavar="", + help="Set the default instance allocator plugin", + default=None, type="string", + completion_suggest=OPT_COMPL_ONE_IALLOCATOR) + OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run", metavar="", completion_suggest=OPT_COMPL_ONE_OS) +OSPARAMS_OPT = cli_option("-O", "--os-parameters", dest="osparams", + type="keyval", default={}, + help="OS parameters") + +FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant", + action="store_true", default=False, + help="Force an unknown variant") + +NO_INSTALL_OPT = cli_option("--no-install", dest="no_install", + action="store_true", default=False, + help="Do not install the OS (will" + " enable no-start)") + BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams", type="keyval", default={}, help="Backend parameters") @@ -590,6 +727,11 @@ NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True, help="Don't check that the instance's IP" " is alive") +NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check", + default=True, action="store_false", + help="Don't check that the instance's name" + " is resolvable") + NET_OPT = cli_option("--net", help="NIC parameters", default=[], dest="nics", action="append", type="identkeyval") @@ -619,6 +761,12 @@ NONLIVE_OPT = cli_option("--non-live", dest="live", " freeze the instance, save the state, transfer and" " only then resume running on the secondary node)") +MIGRATION_MODE_OPT = cli_option("--migration-mode", dest="migration_mode", + default=None, + choices=list(constants.HT_MIGRATION_MODES), + help="Override default migration mode (choose" + " either live or non-live") + NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node", help="Target node and optional secondary node", metavar="[:]", @@ -630,6 +778,13 @@ NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[], " times, if not given defaults to all nodes)", completion_suggest=OPT_COMPL_ONE_NODE) +NODEGROUP_OPT = cli_option("-g", "--node-group", + dest="nodegroup", + help="Node group (name or uuid)", + metavar="", + default=None, type="string", + completion_suggest=OPT_COMPL_ONE_NODEGROUP) + SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node", metavar="", completion_suggest=OPT_COMPL_ONE_NODE) @@ -671,6 +826,18 @@ IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures", " configuration even if there are failures" " during the removal process") +IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures", + dest="ignore_remove_failures", + action="store_true", default=False, + help="Remove the instance from the" + " cluster configuration even if there" + " are failures during the removal" + " process") + +REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance", + action="store_true", default=False, + help="Remove the instance from the cluster") + NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node", help="Specifies the new secondary node", metavar="NODE", default=None, @@ -686,6 +853,11 @@ ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary", help="Replace the disk(s) on the secondary" " node (only for the drbd template)") +AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote", + default=False, action="store_true", + help="Lock all nodes and auto-promote as needed" + " to MC status") + AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto", default=False, action="store_true", help="Automatically replace faulty disks" @@ -716,21 +888,35 @@ NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check", default=True, action="store_false", help="Disable SSH key fingerprint checking") +NODE_FORCE_JOIN_OPT = cli_option("--force-join", dest="force_join", + default=False, action="store_true", + help="Force the joining of a node") MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate", - choices=_YESNO, default=None, metavar=_YORNO, + type="bool", default=None, metavar=_YORNO, help="Set the master_candidate flag on the node") OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO, - choices=_YESNO, default=None, - help="Set the offline flag on the node") + type="bool", default=None, + help=("Set the offline flag on the node" + " (cluster does not communicate with offline" + " nodes)")) DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO, - choices=_YESNO, default=None, - help="Set the drained flag on the node") + type="bool", default=None, + help=("Set the drained flag on the node" + " (excluded from allocation operations)")) + +CAPAB_MASTER_OPT = cli_option("--master-capable", dest="master_capable", + type="bool", default=None, metavar=_YORNO, + help="Set the master_capable flag on the node") + +CAPAB_VM_OPT = cli_option("--vm-capable", dest="vm_capable", + type="bool", default=None, metavar=_YORNO, + help="Set the vm_capable flag on the node") ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable", - choices=_YESNO, default=None, metavar=_YORNO, + type="bool", default=None, metavar=_YORNO, help="Set the allocatable flag on a volume") NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage", @@ -751,9 +937,10 @@ CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None, dest="candidate_pool_size", type="int", help="Set the candidate pool size") -VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name", - help="Enables LVM and specifies the volume group" - " name (cluster-wide) for disk allocation [xenvg]", +VG_NAME_OPT = cli_option("--vg-name", dest="vg_name", + help=("Enables LVM and specifies the volume group" + " name (cluster-wide) for disk allocation" + " [%s]" % constants.DEFAULT_VG), metavar="VG", default=None) YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it", @@ -771,11 +958,11 @@ MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix", MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev", help="Specify the node interface (cluster-wide)" - " on which the master IP address will be added " - " [%s]" % constants.DEFAULT_BRIDGE, + " on which the master IP address will be added" + " (cluster init default: %s)" % + constants.DEFAULT_BRIDGE, metavar="NETDEV", - default=constants.DEFAULT_BRIDGE) - + default=None) GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir", help="Specify the default directory (cluster-" @@ -788,6 +975,10 @@ NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts", help="Don't modify /etc/hosts", action="store_false", default=True) +NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup", + help="Don't initialize SSH keys", + action="store_false", default=True) + ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes", help="Enable parseable error messages", action="store_true", default=False) @@ -807,10 +998,183 @@ IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries", default=False, action="store_true", help="Ignore errors from secondaries") -NOSHUTDOWN_OPT = cli_option("","--noshutdown", dest="shutdown", +NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown", action="store_false", default=True, help="Don't shutdown the instance (unsafe)") +TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int", + default=constants.DEFAULT_SHUTDOWN_TIMEOUT, + help="Maximum time to wait") + +SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout", + dest="shutdown_timeout", type="int", + default=constants.DEFAULT_SHUTDOWN_TIMEOUT, + help="Maximum time to wait for instance shutdown") + +INTERVAL_OPT = cli_option("--interval", dest="interval", type="int", + default=None, + help=("Number of seconds between repetions of the" + " command")) + +EARLY_RELEASE_OPT = cli_option("--early-release", + dest="early_release", default=False, + action="store_true", + help="Release the locks on the secondary" + " node(s) early") + +NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate", + dest="new_cluster_cert", + default=False, action="store_true", + help="Generate a new cluster certificate") + +RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert", + default=None, + help="File containing new RAPI certificate") + +NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert", + default=None, action="store_true", + help=("Generate a new self-signed RAPI" + " certificate")) + +NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key", + dest="new_confd_hmac_key", + default=False, action="store_true", + help=("Create a new HMAC key for %s" % + constants.CONFD)) + +CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret", + dest="cluster_domain_secret", + default=None, + help=("Load new new cluster domain" + " secret from file")) + +NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret", + dest="new_cluster_domain_secret", + default=False, action="store_true", + help=("Create a new cluster domain" + " secret")) + +USE_REPL_NET_OPT = cli_option("--use-replication-network", + dest="use_replication_network", + help="Whether to use the replication network" + " for talking to the nodes", + action="store_true", default=False) + +MAINTAIN_NODE_HEALTH_OPT = \ + cli_option("--maintain-node-health", dest="maintain_node_health", + metavar=_YORNO, default=None, type="bool", + help="Configure the cluster to automatically maintain node" + " health, by shutting down unknown instances, shutting down" + " unknown DRBD devices, etc.") + +IDENTIFY_DEFAULTS_OPT = \ + cli_option("--identify-defaults", dest="identify_defaults", + default=False, action="store_true", + help="Identify which saved instance parameters are equal to" + " the current cluster defaults and set them as such, instead" + " of marking them as overridden") + +UIDPOOL_OPT = cli_option("--uid-pool", default=None, + action="store", dest="uid_pool", + help=("A list of user-ids or user-id" + " ranges separated by commas")) + +ADD_UIDS_OPT = cli_option("--add-uids", default=None, + action="store", dest="add_uids", + help=("A list of user-ids or user-id" + " ranges separated by commas, to be" + " added to the user-id pool")) + +REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None, + action="store", dest="remove_uids", + help=("A list of user-ids or user-id" + " ranges separated by commas, to be" + " removed from the user-id pool")) + +RESERVED_LVS_OPT = cli_option("--reserved-lvs", default=None, + action="store", dest="reserved_lvs", + help=("A comma-separated list of reserved" + " logical volumes names, that will be" + " ignored by cluster verify")) + +ROMAN_OPT = cli_option("--roman", + dest="roman_integers", default=False, + action="store_true", + help="Use roman numbers for positive integers") + +DRBD_HELPER_OPT = cli_option("--drbd-usermode-helper", dest="drbd_helper", + action="store", default=None, + help="Specifies usermode helper for DRBD") + +NODRBD_STORAGE_OPT = cli_option("--no-drbd-storage", dest="drbd_storage", + action="store_false", default=True, + help="Disable support for DRBD") + +PRIMARY_IP_VERSION_OPT = \ + cli_option("--primary-ip-version", default=constants.IP4_VERSION, + action="store", dest="primary_ip_version", + metavar="%d|%d" % (constants.IP4_VERSION, + constants.IP6_VERSION), + help="Cluster-wide IP version for primary IP") + +PRIORITY_OPT = cli_option("--priority", default=None, dest="priority", + metavar="|".join(name for name, _ in _PRIORITY_NAMES), + choices=_PRIONAME_TO_VALUE.keys(), + help="Priority for opcode processing") + +HID_OS_OPT = cli_option("--hidden", dest="hidden", + type="bool", default=None, metavar=_YORNO, + help="Sets the hidden flag on the OS") + +BLK_OS_OPT = cli_option("--blacklisted", dest="blacklisted", + type="bool", default=None, metavar=_YORNO, + help="Sets the blacklisted flag on the OS") + +PREALLOC_WIPE_DISKS_OPT = cli_option("--prealloc-wipe-disks", default=None, + type="bool", metavar=_YORNO, + dest="prealloc_wipe_disks", + help=("Wipe disks prior to instance" + " creation")) + +NODE_PARAMS_OPT = cli_option("--node-parameters", dest="ndparams", + type="keyval", default=None, + help="Node parameters") + +ALLOC_POLICY_OPT = cli_option("--alloc-policy", dest="alloc_policy", + action="store", metavar="POLICY", default=None, + help="Allocation policy for the node group") + +NODE_POWERED_OPT = cli_option("--node-powered", default=None, + type="bool", metavar=_YORNO, + dest="node_powered", + help="Specify if the SoR for node is powered") + + +#: Options provided by all commands +COMMON_OPTS = [DEBUG_OPT] + +# common options for creating instances. add and import then add their own +# specific ones. +COMMON_CREATE_OPTS = [ + BACKEND_OPT, + DISK_OPT, + DISK_TEMPLATE_OPT, + FILESTORE_DIR_OPT, + FILESTORE_DRIVER_OPT, + HYPERVISOR_OPT, + IALLOCATOR_OPT, + NET_OPT, + NODE_PLACEMENT_OPT, + NOIPCHECK_OPT, + NONAMECHECK_OPT, + NONICS_OPT, + NWSYNC_OPT, + OSPARAMS_OPT, + OS_SIZE_OPT, + SUBMIT_OPT, + DRY_RUN_OPT, + PRIORITY_OPT, + ] def _ParseArgs(argv, commands, aliases): @@ -831,7 +1195,8 @@ def _ParseArgs(argv, commands, aliases): binary = argv[0].split("/")[-1] if len(argv) > 1 and argv[1] == "--version": - ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION) + ToStdout("%s (ganeti %s) %s", binary, constants.VCS_VERSION, + constants.RELEASE_VERSION) # Quit right away. That way we don't have to care about this special # argument. optparse.py does it the same. sys.exit(0) @@ -878,7 +1243,7 @@ def _ParseArgs(argv, commands, aliases): cmd = aliases[cmd] func, args_def, parser_opts, usage, description = commands[cmd] - parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT], + parser = OptionParser(option_list=parser_opts + COMMON_OPTS, description=description, formatter=TitledHelpFormatter(), usage="%%prog %s %s" % (cmd, usage)) @@ -989,14 +1354,25 @@ def CalculateOSNames(os_name, os_variants): return [os_name] -def UsesRPC(fn): - def wrapper(*args, **kwargs): - rpc.Init() - try: - return fn(*args, **kwargs) - finally: - rpc.Shutdown() - return wrapper +def ParseFields(selected, default): + """Parses the values of "--field"-like options. + + @type selected: string or None + @param selected: User-selected options + @type default: list + @param default: Default fields + + """ + if selected is None: + return default + + if selected.startswith("+"): + return default + selected[1:].split(",") + + return selected.split(",") + + +UsesRPC = rpc.RunWithRPC def AskUser(text, choices=None): @@ -1085,41 +1461,44 @@ def SendJob(ops, cl=None): return job_id -def PollJob(job_id, cl=None, feedback_fn=None): - """Function to poll for the result of a job. +def GenericPollJob(job_id, cbs, report_cbs): + """Generic job-polling function. - @type job_id: job identified - @param job_id: the job to poll for results - @type cl: luxi.Client - @param cl: the luxi client to use for communicating with the master; - if None, a new client will be created + @type job_id: number + @param job_id: Job ID + @type cbs: Instance of L{JobPollCbBase} + @param cbs: Data callbacks + @type report_cbs: Instance of L{JobPollReportCbBase} + @param report_cbs: Reporting callbacks """ - if cl is None: - cl = GetClient() - prev_job_info = None prev_logmsg_serial = None + status = None + while True: - result = cl.WaitForJobChange(job_id, ["status"], prev_job_info, - prev_logmsg_serial) + result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info, + prev_logmsg_serial) if not result: # job not found, go away! raise errors.JobLost("Job with id %s lost" % job_id) + if result == constants.JOB_NOTCHANGED: + report_cbs.ReportNotChanged(job_id, status) + + # Wait again + continue + # Split result, a tuple of (field values, log entries) (job_info, log_entries) = result (status, ) = job_info if log_entries: for log_entry in log_entries: - (serial, timestamp, _, message) = log_entry - if callable(feedback_fn): - feedback_fn(log_entry[1:]) - else: - encoded = utils.SafeEncode(message) - ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded) + (serial, timestamp, log_type, message) = log_entry + report_cbs.ReportLogMessage(job_id, serial, timestamp, + log_type, message) prev_logmsg_serial = max(prev_logmsg_serial, serial) # TODO: Handle canceled and archived jobs @@ -1131,33 +1510,205 @@ def PollJob(job_id, cl=None, feedback_fn=None): prev_job_info = job_info - jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"]) + jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"]) if not jobs: raise errors.JobLost("Job with id %s lost" % job_id) status, opstatus, result = jobs[0] + if status == constants.JOB_STATUS_SUCCESS: return result - elif status in (constants.JOB_STATUS_CANCELING, - constants.JOB_STATUS_CANCELED): + + if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED): raise errors.OpExecError("Job was canceled") - else: - has_ok = False - for idx, (status, msg) in enumerate(zip(opstatus, result)): - if status == constants.OP_STATUS_SUCCESS: - has_ok = True - elif status == constants.OP_STATUS_ERROR: - errors.MaybeRaise(msg) - if has_ok: - raise errors.OpExecError("partial failure (opcode %d): %s" % - (idx, msg)) - else: - raise errors.OpExecError(str(msg)) - # default failure mode - raise errors.OpExecError(result) + + has_ok = False + for idx, (status, msg) in enumerate(zip(opstatus, result)): + if status == constants.OP_STATUS_SUCCESS: + has_ok = True + elif status == constants.OP_STATUS_ERROR: + errors.MaybeRaise(msg) + + if has_ok: + raise errors.OpExecError("partial failure (opcode %d): %s" % + (idx, msg)) + + raise errors.OpExecError(str(msg)) + + # default failure mode + raise errors.OpExecError(result) + + +class JobPollCbBase: + """Base class for L{GenericPollJob} callbacks. + + """ + def __init__(self): + """Initializes this class. + + """ + + def WaitForJobChangeOnce(self, job_id, fields, + prev_job_info, prev_log_serial): + """Waits for changes on a job. + + """ + raise NotImplementedError() + + def QueryJobs(self, job_ids, fields): + """Returns the selected fields for the selected job IDs. + + @type job_ids: list of numbers + @param job_ids: Job IDs + @type fields: list of strings + @param fields: Fields + + """ + raise NotImplementedError() + + +class JobPollReportCbBase: + """Base class for L{GenericPollJob} reporting callbacks. + + """ + def __init__(self): + """Initializes this class. + + """ + + def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg): + """Handles a log message. + + """ + raise NotImplementedError() + + def ReportNotChanged(self, job_id, status): + """Called for if a job hasn't changed in a while. + + @type job_id: number + @param job_id: Job ID + @type status: string or None + @param status: Job status if available + + """ + raise NotImplementedError() + + +class _LuxiJobPollCb(JobPollCbBase): + def __init__(self, cl): + """Initializes this class. + + """ + JobPollCbBase.__init__(self) + self.cl = cl + + def WaitForJobChangeOnce(self, job_id, fields, + prev_job_info, prev_log_serial): + """Waits for changes on a job. + + """ + return self.cl.WaitForJobChangeOnce(job_id, fields, + prev_job_info, prev_log_serial) + + def QueryJobs(self, job_ids, fields): + """Returns the selected fields for the selected job IDs. + + """ + return self.cl.QueryJobs(job_ids, fields) + + +class FeedbackFnJobPollReportCb(JobPollReportCbBase): + def __init__(self, feedback_fn): + """Initializes this class. + + """ + JobPollReportCbBase.__init__(self) + + self.feedback_fn = feedback_fn + + assert callable(feedback_fn) + + def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg): + """Handles a log message. + + """ + self.feedback_fn((timestamp, log_type, log_msg)) + + def ReportNotChanged(self, job_id, status): + """Called if a job hasn't changed in a while. + + """ + # Ignore + + +class StdioJobPollReportCb(JobPollReportCbBase): + def __init__(self): + """Initializes this class. + + """ + JobPollReportCbBase.__init__(self) + + self.notified_queued = False + self.notified_waitlock = False + + def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg): + """Handles a log message. + + """ + ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), + FormatLogMessage(log_type, log_msg)) + + def ReportNotChanged(self, job_id, status): + """Called if a job hasn't changed in a while. + + """ + if status is None: + return + + if status == constants.JOB_STATUS_QUEUED and not self.notified_queued: + ToStderr("Job %s is waiting in queue", job_id) + self.notified_queued = True + + elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock: + ToStderr("Job %s is trying to acquire all necessary locks", job_id) + self.notified_waitlock = True -def SubmitOpCode(op, cl=None, feedback_fn=None): +def FormatLogMessage(log_type, log_msg): + """Formats a job message according to its type. + + """ + if log_type != constants.ELOG_MESSAGE: + log_msg = str(log_msg) + + return utils.SafeEncode(log_msg) + + +def PollJob(job_id, cl=None, feedback_fn=None, reporter=None): + """Function to poll for the result of a job. + + @type job_id: job identified + @param job_id: the job to poll for results + @type cl: luxi.Client + @param cl: the luxi client to use for communicating with the master; + if None, a new client will be created + + """ + if cl is None: + cl = GetClient() + + if reporter is None: + if feedback_fn: + reporter = FeedbackFnJobPollReportCb(feedback_fn) + else: + reporter = StdioJobPollReportCb() + elif feedback_fn: + raise errors.ProgrammerError("Can't specify reporter and feedback function") + + return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter) + + +def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None, reporter=None): """Legacy function to submit an opcode. This is just a simple wrapper over the construction of the processor @@ -1168,9 +1719,12 @@ def SubmitOpCode(op, cl=None, feedback_fn=None): if cl is None: cl = GetClient() - job_id = SendJob([op], cl) + SetGenericOpcodeOpts([op], opts) - op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn) + job_id = SendJob([op], cl=cl) + + op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn, + reporter=reporter) return op_results[0] @@ -1183,16 +1737,38 @@ def SubmitOrSend(op, opts, cl=None, feedback_fn=None): whether to just send the job and print its identifier. It is used in order to simplify the implementation of the '--submit' option. - It will also add the dry-run parameter from the options passed, if true. + It will also process the opcodes if we're sending the via SendJob + (otherwise SubmitOpCode does it). """ - if opts and opts.dry_run: - op.dry_run = opts.dry_run if opts and opts.submit_only: - job_id = SendJob([op], cl=cl) + job = [op] + SetGenericOpcodeOpts(job, opts) + job_id = SendJob(job, cl=cl) raise JobSubmittedException(job_id) else: - return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn) + return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts) + + +def SetGenericOpcodeOpts(opcode_list, options): + """Processor for generic options. + + This function updates the given opcodes based on generic command + line options (like debug, dry-run, etc.). + + @param opcode_list: list of opcodes + @param options: command line options or None + @return: None (in-place modification) + + """ + if not options: + return + for op in opcode_list: + op.debug_level = options.debug + if hasattr(options, "dry_run"): + op.dry_run = options.dry_run + if getattr(options, "priority", None) is not None: + op.priority = _PRIONAME_TO_VALUE[options.priority] def GetClient(): @@ -1200,13 +1776,21 @@ def GetClient(): try: client = luxi.Client() except luxi.NoMasterError: - master, myself = ssconf.GetMasterAndMyself() + ss = ssconf.SimpleStore() + + # Try to read ssconf file + try: + ss.GetMasterNode() + except errors.ConfigurationError: + raise errors.OpPrereqError("Cluster not initialized or this machine is" + " not part of a cluster") + + master, myself = ssconf.GetMasterAndMyself(ss=ss) if master != myself: raise errors.OpPrereqError("This is not the master node, please connect" " to node '%s' and rerun the command" % master) - else: - raise + raise return client @@ -1240,15 +1824,20 @@ def FormatError(err): elif isinstance(err, errors.HooksFailure): obuf.write("Failure: hooks general failure: %s" % msg) elif isinstance(err, errors.ResolverError): - this_host = utils.HostInfo.SysName() + this_host = netutils.Hostname.GetSysName() if err.args[0] == this_host: msg = "Failure: can't resolve my own hostname ('%s')" else: msg = "Failure: can't resolve hostname '%s'" obuf.write(msg % err.args[0]) elif isinstance(err, errors.OpPrereqError): - obuf.write("Failure: prerequisites not met for this" - " operation:\n%s" % msg) + if len(err.args) == 2: + obuf.write("Failure: prerequisites not met for this" + " operation:\nerror type: %s, error details:\n%s" % + (err.args[1], err.args[0])) + else: + obuf.write("Failure: prerequisites not met for this" + " operation:\n%s" % msg) elif isinstance(err, errors.OpExecError): obuf.write("Failure: command execution error:\n%s" % msg) elif isinstance(err, errors.TagError): @@ -1263,17 +1852,25 @@ def FormatError(err): obuf.write("Parameter Error: %s" % msg) elif isinstance(err, errors.ParameterError): obuf.write("Failure: unknown/wrong parameter name '%s'" % msg) - elif isinstance(err, errors.GenericError): - obuf.write("Unhandled Ganeti error: %s" % msg) elif isinstance(err, luxi.NoMasterError): obuf.write("Cannot communicate with the master daemon.\nIs it running" " and listening for connections?") elif isinstance(err, luxi.TimeoutError): - obuf.write("Timeout while talking to the master daemon. Error:\n" - "%s" % msg) + obuf.write("Timeout while talking to the master daemon. Jobs might have" + " been submitted and will continue to run even if the call" + " timed out. Useful commands in this situation are \"gnt-job" + " list\", \"gnt-job cancel\" and \"gnt-job watch\". Error:\n") + obuf.write(msg) + elif isinstance(err, luxi.PermissionError): + obuf.write("It seems you don't have permissions to connect to the" + " master daemon.\nPlease retry as a different user.") elif isinstance(err, luxi.ProtocolError): obuf.write("Unhandled protocol error while talking to the master daemon:\n" "%s" % msg) + elif isinstance(err, errors.JobLost): + obuf.write("Error checking job status: %s" % msg) + elif isinstance(err, errors.GenericError): + obuf.write("Unhandled Ganeti error: %s" % msg) elif isinstance(err, JobSubmittedException): obuf.write("JobID: %s\n" % err.args[0]) retcode = 0 @@ -1323,8 +1920,8 @@ def GenericMain(commands, override=None, aliases=None): for key, val in override.iteritems(): setattr(options, key, val) - utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug, - stderr_logging=True, program=binary) + utils.SetupLogging(constants.LOG_COMMANDS, binary, debug=options.debug, + stderr_logging=True) if old_cmdline: logging.info("run with arguments '%s'", old_cmdline) @@ -1338,10 +1935,45 @@ def GenericMain(commands, override=None, aliases=None): result, err_msg = FormatError(err) logging.exception("Error during command processing") ToStderr(err_msg) + except KeyboardInterrupt: + result = constants.EXIT_FAILURE + ToStderr("Aborted. Note that if the operation created any jobs, they" + " might have been submitted and" + " will continue to run in the background.") + except IOError, err: + if err.errno == errno.EPIPE: + # our terminal went away, we'll exit + sys.exit(constants.EXIT_FAILURE) + else: + raise return result +def ParseNicOption(optvalue): + """Parses the value of the --net option(s). + + """ + try: + nic_max = max(int(nidx[0]) + 1 for nidx in optvalue) + except (TypeError, ValueError), err: + raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err)) + + nics = [{}] * nic_max + for nidx, ndict in optvalue: + nidx = int(nidx) + + if not isinstance(ndict, dict): + raise errors.OpPrereqError("Invalid nic/%d value: expected dict," + " got %s" % (nidx, ndict)) + + utils.ForceDictType(ndict, constants.INIC_PARAMS_TYPES) + + nics[nidx] = ndict + + return nics + + def GenericInstanceCreate(mode, opts, args): """Add an instance to the cluster via either creation or import. @@ -1363,23 +1995,16 @@ def GenericInstanceCreate(mode, opts, args): hypervisor, hvparams = opts.hypervisor if opts.nics: - try: - nic_max = max(int(nidx[0])+1 for nidx in opts.nics) - except ValueError, err: - raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err)) - nics = [{}] * nic_max - for nidx, ndict in opts.nics: - nidx = int(nidx) - if not isinstance(ndict, dict): - msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict) - raise errors.OpPrereqError(msg) - nics[nidx] = ndict + nics = ParseNicOption(opts.nics) elif opts.no_nics: # no nics nics = [] - else: + elif mode == constants.INSTANCE_CREATE: # default of one nic, all auto nics = [{}] + else: + # mode == import + nics = [] if opts.disk_template == constants.DT_DISKLESS: if opts.disks or opts.sd_size is not None: @@ -1387,30 +2012,45 @@ def GenericInstanceCreate(mode, opts, args): " information passed") disks = [] else: - if not opts.disks and not opts.sd_size: + if (not opts.disks and not opts.sd_size + and mode == constants.INSTANCE_CREATE): raise errors.OpPrereqError("No disk information specified") if opts.disks and opts.sd_size is not None: raise errors.OpPrereqError("Please use either the '--disk' or" " '-s' option") if opts.sd_size is not None: opts.disks = [(0, {"size": opts.sd_size})] - try: - disk_max = max(int(didx[0])+1 for didx in opts.disks) - except ValueError, err: - raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err)) - disks = [{}] * disk_max + + if opts.disks: + try: + disk_max = max(int(didx[0]) + 1 for didx in opts.disks) + except ValueError, err: + raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err)) + disks = [{}] * disk_max + else: + disks = [] for didx, ddict in opts.disks: didx = int(didx) if not isinstance(ddict, dict): msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict) raise errors.OpPrereqError(msg) - elif "size" not in ddict: - raise errors.OpPrereqError("Missing size for disk %d" % didx) - try: - ddict["size"] = utils.ParseUnit(ddict["size"]) - except ValueError, err: - raise errors.OpPrereqError("Invalid disk size for disk %d: %s" % - (didx, err)) + elif "size" in ddict: + if "adopt" in ddict: + raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed" + " (disk %d)" % didx) + try: + ddict["size"] = utils.ParseUnit(ddict["size"]) + except ValueError, err: + raise errors.OpPrereqError("Invalid disk size for disk %d: %s" % + (didx, err)) + elif "adopt" in ddict: + if mode == constants.INSTANCE_IMPORT: + raise errors.OpPrereqError("Disk adoption not allowed for instance" + " import") + ddict["size"] = 0 + else: + raise errors.OpPrereqError("Missing size or adoption source for" + " disk %d" % didx) disks[didx] = ddict utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES) @@ -1419,22 +2059,29 @@ def GenericInstanceCreate(mode, opts, args): if mode == constants.INSTANCE_CREATE: start = opts.start os_type = opts.os + force_variant = opts.force_variant src_node = None src_path = None + no_install = opts.no_install + identify_defaults = False elif mode == constants.INSTANCE_IMPORT: start = False os_type = None + force_variant = False src_node = opts.src_node src_path = opts.src_dir + no_install = None + identify_defaults = opts.identify_defaults else: raise errors.ProgrammerError("Invalid creation mode %s" % mode) - op = opcodes.OpCreateInstance(instance_name=instance, + op = opcodes.OpInstanceCreate(instance_name=instance, disks=disks, disk_template=opts.disk_template, nics=nics, pnode=pnode, snode=snode, ip_check=opts.ip_check, + name_check=opts.name_check, wait_for_sync=opts.wait_for_sync, file_storage_dir=opts.file_storage_dir, file_driver=opts.file_driver, @@ -1442,16 +2089,143 @@ def GenericInstanceCreate(mode, opts, args): hypervisor=hypervisor, hvparams=hvparams, beparams=opts.beparams, + osparams=opts.osparams, mode=mode, start=start, os_type=os_type, + force_variant=force_variant, src_node=src_node, - src_path=src_path) + src_path=src_path, + no_install=no_install, + identify_defaults=identify_defaults) SubmitOrSend(op, opts) return 0 +class _RunWhileClusterStoppedHelper: + """Helper class for L{RunWhileClusterStopped} to simplify state management + + """ + def __init__(self, feedback_fn, cluster_name, master_node, online_nodes): + """Initializes this class. + + @type feedback_fn: callable + @param feedback_fn: Feedback function + @type cluster_name: string + @param cluster_name: Cluster name + @type master_node: string + @param master_node Master node name + @type online_nodes: list + @param online_nodes: List of names of online nodes + + """ + self.feedback_fn = feedback_fn + self.cluster_name = cluster_name + self.master_node = master_node + self.online_nodes = online_nodes + + self.ssh = ssh.SshRunner(self.cluster_name) + + self.nonmaster_nodes = [name for name in online_nodes + if name != master_node] + + assert self.master_node not in self.nonmaster_nodes + + def _RunCmd(self, node_name, cmd): + """Runs a command on the local or a remote machine. + + @type node_name: string + @param node_name: Machine name + @type cmd: list + @param cmd: Command + + """ + if node_name is None or node_name == self.master_node: + # No need to use SSH + result = utils.RunCmd(cmd) + else: + result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd)) + + if result.failed: + errmsg = ["Failed to run command %s" % result.cmd] + if node_name: + errmsg.append("on node %s" % node_name) + errmsg.append(": exitcode %s and error %s" % + (result.exit_code, result.output)) + raise errors.OpExecError(" ".join(errmsg)) + + def Call(self, fn, *args): + """Call function while all daemons are stopped. + + @type fn: callable + @param fn: Function to be called + + """ + # Pause watcher by acquiring an exclusive lock on watcher state file + self.feedback_fn("Blocking watcher") + watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE) + try: + # TODO: Currently, this just blocks. There's no timeout. + # TODO: Should it be a shared lock? + watcher_block.Exclusive(blocking=True) + + # Stop master daemons, so that no new jobs can come in and all running + # ones are finished + self.feedback_fn("Stopping master daemons") + self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"]) + try: + # Stop daemons on all nodes + for node_name in self.online_nodes: + self.feedback_fn("Stopping daemons on %s" % node_name) + self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"]) + + # All daemons are shut down now + try: + return fn(self, *args) + except Exception, err: + _, errmsg = FormatError(err) + logging.exception("Caught exception") + self.feedback_fn(errmsg) + raise + finally: + # Start cluster again, master node last + for node_name in self.nonmaster_nodes + [self.master_node]: + self.feedback_fn("Starting daemons on %s" % node_name) + self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"]) + finally: + # Resume watcher + watcher_block.Close() + + +def RunWhileClusterStopped(feedback_fn, fn, *args): + """Calls a function while all cluster daemons are stopped. + + @type feedback_fn: callable + @param feedback_fn: Feedback function + @type fn: callable + @param fn: Function to be called when daemons are stopped + + """ + feedback_fn("Gathering cluster information") + + # This ensures we're running on the master daemon + cl = GetClient() + + (cluster_name, master_node) = \ + cl.QueryConfigValues(["cluster_name", "master_node"]) + + online_nodes = GetOnlineNodes([], cl=cl) + + # Don't keep a reference to the client. The master daemon will go away. + del cl + + assert master_node in online_nodes + + return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node, + online_nodes).Call(fn, *args) + + def GenerateTable(headers, fields, separator, data, numfields=None, unitfields=None, units=None): @@ -1492,8 +2266,8 @@ def GenerateTable(headers, fields, separator, data, if unitfields is None: unitfields = [] - numfields = utils.FieldSet(*numfields) - unitfields = utils.FieldSet(*unitfields) + numfields = utils.FieldSet(*numfields) # pylint: disable-msg=W0142 + unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142 format_fields = [] for field in fields: @@ -1511,9 +2285,9 @@ def GenerateTable(headers, fields, separator, data, if separator is None: mlens = [0 for name in fields] - format = ' '.join(format_fields) + format_str = ' '.join(format_fields) else: - format = separator.replace("%", "%%").join(format_fields) + format_str = separator.replace("%", "%%").join(format_fields) for row in data: if row is None: @@ -1522,7 +2296,7 @@ def GenerateTable(headers, fields, separator, data, if unitfields.Matches(fields[idx]): try: val = int(val) - except ValueError: + except (TypeError, ValueError): pass else: val = row[idx] = utils.FormatUnit(val, units) @@ -1539,7 +2313,13 @@ def GenerateTable(headers, fields, separator, data, mlens[idx] = max(mlens[idx], len(hdr)) args.append(mlens[idx]) args.append(hdr) - result.append(format % tuple(args)) + result.append(format_str % tuple(args)) + + if separator is None: + assert len(mlens) == len(fields) + + if fields and not numfields.Matches(fields[-1]): + mlens[-1] = 0 for line in data: args = [] @@ -1549,11 +2329,366 @@ def GenerateTable(headers, fields, separator, data, if separator is None: args.append(mlens[idx]) args.append(line[idx]) - result.append(format % tuple(args)) + result.append(format_str % tuple(args)) return result +def _FormatBool(value): + """Formats a boolean value as a string. + + """ + if value: + return "Y" + return "N" + + +#: Default formatting for query results; (callback, align right) +_DEFAULT_FORMAT_QUERY = { + constants.QFT_TEXT: (str, False), + constants.QFT_BOOL: (_FormatBool, False), + constants.QFT_NUMBER: (str, True), + constants.QFT_TIMESTAMP: (utils.FormatTime, False), + constants.QFT_OTHER: (str, False), + constants.QFT_UNKNOWN: (str, False), + } + + +def _GetColumnFormatter(fdef, override, unit): + """Returns formatting function for a field. + + @type fdef: L{objects.QueryFieldDefinition} + @type override: dict + @param override: Dictionary for overriding field formatting functions, + indexed by field name, contents like L{_DEFAULT_FORMAT_QUERY} + @type unit: string + @param unit: Unit used for formatting fields of type L{constants.QFT_UNIT} + @rtype: tuple; (callable, bool) + @return: Returns the function to format a value (takes one parameter) and a + boolean for aligning the value on the right-hand side + + """ + fmt = override.get(fdef.name, None) + if fmt is not None: + return fmt + + assert constants.QFT_UNIT not in _DEFAULT_FORMAT_QUERY + + if fdef.kind == constants.QFT_UNIT: + # Can't keep this information in the static dictionary + return (lambda value: utils.FormatUnit(value, unit), True) + + fmt = _DEFAULT_FORMAT_QUERY.get(fdef.kind, None) + if fmt is not None: + return fmt + + raise NotImplementedError("Can't format column type '%s'" % fdef.kind) + + +class _QueryColumnFormatter: + """Callable class for formatting fields of a query. + + """ + def __init__(self, fn, status_fn, verbose): + """Initializes this class. + + @type fn: callable + @param fn: Formatting function + @type status_fn: callable + @param status_fn: Function to report fields' status + @type verbose: boolean + @param verbose: whether to use verbose field descriptions or not + + """ + self._fn = fn + self._status_fn = status_fn + if verbose: + self._desc_index = 0 + else: + self._desc_index = 1 + + def __call__(self, data): + """Returns a field's string representation. + + """ + (status, value) = data + + # Report status + self._status_fn(status) + + if status == constants.RS_NORMAL: + return self._fn(value) + + assert value is None, \ + "Found value %r for abnormal status %s" % (value, status) + + if status in constants.RSS_DESCRIPTION: + return constants.RSS_DESCRIPTION[status][self._desc_index] + + raise NotImplementedError("Unknown status %s" % status) + + +def FormatQueryResult(result, unit=None, format_override=None, separator=None, + header=False, verbose=False): + """Formats data in L{objects.QueryResponse}. + + @type result: L{objects.QueryResponse} + @param result: result of query operation + @type unit: string + @param unit: Unit used for formatting fields of type L{constants.QFT_UNIT}, + see L{utils.text.FormatUnit} + @type format_override: dict + @param format_override: Dictionary for overriding field formatting functions, + indexed by field name, contents like L{_DEFAULT_FORMAT_QUERY} + @type separator: string or None + @param separator: String used to separate fields + @type header: bool + @param header: Whether to output header row + @type verbose: boolean + @param verbose: whether to use verbose field descriptions or not + + """ + if unit is None: + if separator: + unit = "m" + else: + unit = "h" + + if format_override is None: + format_override = {} + + stats = dict.fromkeys(constants.RS_ALL, 0) + + def _RecordStatus(status): + if status in stats: + stats[status] += 1 + + columns = [] + for fdef in result.fields: + assert fdef.title and fdef.name + (fn, align_right) = _GetColumnFormatter(fdef, format_override, unit) + columns.append(TableColumn(fdef.title, + _QueryColumnFormatter(fn, _RecordStatus, + verbose), + align_right)) + + table = FormatTable(result.data, columns, header, separator) + + # Collect statistics + assert len(stats) == len(constants.RS_ALL) + assert compat.all(count >= 0 for count in stats.values()) + + # Determine overall status. If there was no data, unknown fields must be + # detected via the field definitions. + if (stats[constants.RS_UNKNOWN] or + (not result.data and _GetUnknownFields(result.fields))): + status = QR_UNKNOWN + elif compat.any(count > 0 for key, count in stats.items() + if key != constants.RS_NORMAL): + status = QR_INCOMPLETE + else: + status = QR_NORMAL + + return (status, table) + + +def _GetUnknownFields(fdefs): + """Returns list of unknown fields included in C{fdefs}. + + @type fdefs: list of L{objects.QueryFieldDefinition} + + """ + return [fdef for fdef in fdefs + if fdef.kind == constants.QFT_UNKNOWN] + + +def _WarnUnknownFields(fdefs): + """Prints a warning to stderr if a query included unknown fields. + + @type fdefs: list of L{objects.QueryFieldDefinition} + + """ + unknown = _GetUnknownFields(fdefs) + if unknown: + ToStderr("Warning: Queried for unknown fields %s", + utils.CommaJoin(fdef.name for fdef in unknown)) + return True + + return False + + +def GenericList(resource, fields, names, unit, separator, header, cl=None, + format_override=None, verbose=False): + """Generic implementation for listing all items of a resource. + + @param resource: One of L{constants.QR_OP_LUXI} + @type fields: list of strings + @param fields: List of fields to query for + @type names: list of strings + @param names: Names of items to query for + @type unit: string or None + @param unit: Unit used for formatting fields of type L{constants.QFT_UNIT} or + None for automatic choice (human-readable for non-separator usage, + otherwise megabytes); this is a one-letter string + @type separator: string or None + @param separator: String used to separate fields + @type header: bool + @param header: Whether to show header row + @type format_override: dict + @param format_override: Dictionary for overriding field formatting functions, + indexed by field name, contents like L{_DEFAULT_FORMAT_QUERY} + @type verbose: boolean + @param verbose: whether to use verbose field descriptions or not + + """ + if cl is None: + cl = GetClient() + + if not names: + names = None + + response = cl.Query(resource, fields, qlang.MakeSimpleFilter("name", names)) + + found_unknown = _WarnUnknownFields(response.fields) + + (status, data) = FormatQueryResult(response, unit=unit, separator=separator, + header=header, + format_override=format_override, + verbose=verbose) + + for line in data: + ToStdout(line) + + assert ((found_unknown and status == QR_UNKNOWN) or + (not found_unknown and status != QR_UNKNOWN)) + + if status == QR_UNKNOWN: + return constants.EXIT_UNKNOWN_FIELD + + # TODO: Should the list command fail if not all data could be collected? + return constants.EXIT_SUCCESS + + +def GenericListFields(resource, fields, separator, header, cl=None): + """Generic implementation for listing fields for a resource. + + @param resource: One of L{constants.QR_OP_LUXI} + @type fields: list of strings + @param fields: List of fields to query for + @type separator: string or None + @param separator: String used to separate fields + @type header: bool + @param header: Whether to show header row + + """ + if cl is None: + cl = GetClient() + + if not fields: + fields = None + + response = cl.QueryFields(resource, fields) + + found_unknown = _WarnUnknownFields(response.fields) + + columns = [ + TableColumn("Name", str, False), + TableColumn("Title", str, False), + # TODO: Add field description to master daemon + ] + + rows = [[fdef.name, fdef.title] for fdef in response.fields] + + for line in FormatTable(rows, columns, header, separator): + ToStdout(line) + + if found_unknown: + return constants.EXIT_UNKNOWN_FIELD + + return constants.EXIT_SUCCESS + + +class TableColumn: + """Describes a column for L{FormatTable}. + + """ + def __init__(self, title, fn, align_right): + """Initializes this class. + + @type title: string + @param title: Column title + @type fn: callable + @param fn: Formatting function + @type align_right: bool + @param align_right: Whether to align values on the right-hand side + + """ + self.title = title + self.format = fn + self.align_right = align_right + + +def _GetColFormatString(width, align_right): + """Returns the format string for a field. + + """ + if align_right: + sign = "" + else: + sign = "-" + + return "%%%s%ss" % (sign, width) + + +def FormatTable(rows, columns, header, separator): + """Formats data as a table. + + @type rows: list of lists + @param rows: Row data, one list per row + @type columns: list of L{TableColumn} + @param columns: Column descriptions + @type header: bool + @param header: Whether to show header row + @type separator: string or None + @param separator: String used to separate columns + + """ + if header: + data = [[col.title for col in columns]] + colwidth = [len(col.title) for col in columns] + else: + data = [] + colwidth = [0 for _ in columns] + + # Format row data + for row in rows: + assert len(row) == len(columns) + + formatted = [col.format(value) for value, col in zip(row, columns)] + + if separator is None: + # Update column widths + for idx, (oldwidth, value) in enumerate(zip(colwidth, formatted)): + # Modifying a list's items while iterating is fine + colwidth[idx] = max(oldwidth, len(value)) + + data.append(formatted) + + if separator is not None: + # Return early if a separator is used + return [separator.join(row) for row in data] + + if columns and not columns[-1].align_right: + # Avoid unnecessary spaces at end of line + colwidth[-1] = 0 + + # Build format string + fmt = " ".join([_GetColFormatString(width, col.align_right) + for col, width in zip(columns, colwidth)]) + + return [fmt % tuple(row) for row in data] + + def FormatTimestamp(ts): """Formats a given timestamp. @@ -1597,7 +2732,7 @@ def ParseTimespec(value): if value[-1] not in suffix_map: try: value = int(value) - except ValueError: + except (TypeError, ValueError): raise errors.OpPrereqError("Invalid time specification '%s'" % value) else: multiplier = suffix_map[value[-1]] @@ -1607,12 +2742,13 @@ def ParseTimespec(value): " suffix passed)") try: value = int(value) * multiplier - except ValueError: + except (TypeError, ValueError): raise errors.OpPrereqError("Invalid time specification '%s'" % value) return value -def GetOnlineNodes(nodes, cl=None, nowarn=False): +def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False, + filter_master=False): """Returns the names of online nodes. This function will also log a warning on stderr with the names of @@ -1625,17 +2761,36 @@ def GetOnlineNodes(nodes, cl=None, nowarn=False): @param nowarn: by default, this function will output a note with the offline nodes that are skipped; if this parameter is True the note is not displayed + @type secondary_ips: boolean + @param secondary_ips: if True, return the secondary IPs instead of the + names, useful for doing network traffic over the replication interface + (if any) + @type filter_master: boolean + @param filter_master: if True, do not return the master node in the list + (useful in coordination with secondary_ips where we cannot check our + node name against the list) """ if cl is None: cl = GetClient() - result = cl.QueryNodes(names=nodes, fields=["name", "offline"], + if secondary_ips: + name_idx = 2 + else: + name_idx = 0 + + if filter_master: + master_node = cl.QueryConfigValues(["master_node"])[0] + filter_fn = lambda x: x != master_node + else: + filter_fn = lambda _: True + + result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"], use_locking=False) offline = [row[0] for row in result if row[1]] if offline and not nowarn: - ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline)) - return [row[0] for row in result if not row[1]] + ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline)) + return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])] def _ToStream(stream, txt, *args): @@ -1647,13 +2802,20 @@ def _ToStream(stream, txt, *args): @param txt: the message """ - if args: - args = tuple(args) - stream.write(txt % args) - else: - stream.write(txt) - stream.write('\n') - stream.flush() + try: + if args: + args = tuple(args) + stream.write(txt % args) + else: + stream.write(txt) + stream.write('\n') + stream.flush() + except IOError, err: + if err.errno == errno.EPIPE: + # our terminal went away, we'll exit + sys.exit(constants.EXIT_FAILURE) + else: + raise def ToStdout(txt, *args): @@ -1687,13 +2849,15 @@ class JobExecutor(object): GetResults() calls. """ - def __init__(self, cl=None, verbose=True): + def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None): self.queue = [] if cl is None: cl = GetClient() self.cl = cl self.verbose = verbose self.jobs = [] + self.opts = opts + self.feedback_fn = feedback_fn def QueueJob(self, name, *ops): """Record a job for later submit. @@ -1701,15 +2865,47 @@ class JobExecutor(object): @type name: string @param name: a description of the job, will be used in WaitJobSet """ + SetGenericOpcodeOpts(ops, self.opts) self.queue.append((name, ops)) - def SubmitPending(self): + def SubmitPending(self, each=False): """Submit all pending jobs. """ - results = self.cl.SubmitManyJobs([row[1] for row in self.queue]) - for ((status, data), (name, _)) in zip(results, self.queue): - self.jobs.append((status, data, name)) + if each: + results = [] + for row in self.queue: + # SubmitJob will remove the success status, but raise an exception if + # the submission fails, so we'll notice that anyway. + results.append([True, self.cl.SubmitJob(row[1])]) + else: + results = self.cl.SubmitManyJobs([row[1] for row in self.queue]) + for (idx, ((status, data), (name, _))) in enumerate(zip(results, + self.queue)): + self.jobs.append((idx, status, data, name)) + + def _ChooseJob(self): + """Choose a non-waiting/queued job to poll next. + + """ + assert self.jobs, "_ChooseJob called with empty job list" + + result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"]) + assert result + + for job_data, status in zip(self.jobs, result): + if (isinstance(status, list) and status and + status[0] in (constants.JOB_STATUS_QUEUED, + constants.JOB_STATUS_WAITLOCK, + constants.JOB_STATUS_CANCELING)): + # job is still present and waiting + continue + # good candidate found (either running job or lost job) + self.jobs.remove(job_data) + return job_data + + # no job found + return self.jobs.pop(0) def GetResults(self): """Wait for and return the results of all jobs. @@ -1724,26 +2920,39 @@ class JobExecutor(object): self.SubmitPending() results = [] if self.verbose: - ok_jobs = [row[1] for row in self.jobs if row[0]] + ok_jobs = [row[2] for row in self.jobs if row[1]] if ok_jobs: - ToStdout("Submitted jobs %s", ", ".join(ok_jobs)) - for submit_status, jid, name in self.jobs: - if not submit_status: - ToStderr("Failed to submit job for %s: %s", name, jid) - results.append((False, jid)) - continue - if self.verbose: - ToStdout("Waiting for job %s for %s...", jid, name) + ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs)) + + # first, remove any non-submitted jobs + self.jobs, failures = compat.partition(self.jobs, lambda x: x[1]) + for idx, _, jid, name in failures: + ToStderr("Failed to submit job for %s: %s", name, jid) + results.append((idx, False, jid)) + + while self.jobs: + (idx, _, jid, name) = self._ChooseJob() + ToStdout("Waiting for job %s for %s...", jid, name) try: - job_result = PollJob(jid, cl=self.cl) + job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn) success = True + except errors.JobLost, err: + _, job_result = FormatError(err) + ToStderr("Job %s for %s has been archived, cannot check its result", + jid, name) + success = False except (errors.GenericError, luxi.ProtocolError), err: _, job_result = FormatError(err) success = False # the error message will always be shown, verbose or not ToStderr("Job %s for %s has failed: %s", jid, name, job_result) - results.append((success, job_result)) + results.append((idx, success, job_result)) + + # sort based on the index, then drop it + results.sort() + results = [i[1:] for i in results] + return results def WaitOrShow(self, wait): @@ -1758,8 +2967,27 @@ class JobExecutor(object): else: if not self.jobs: self.SubmitPending() - for status, result, name in self.jobs: + for _, status, result, name in self.jobs: if status: ToStdout("%s: %s", result, name) else: ToStderr("Failure for %s: %s", name, result) + return [row[1:3] for row in self.jobs] + + +def FormatParameterDict(buf, param_dict, actual, level=1): + """Formats a parameter dictionary. + + @type buf: L{StringIO} + @param buf: the buffer into which to write + @type param_dict: dict + @param param_dict: the own parameters + @type actual: dict + @param actual: the current parameter set (including defaults) + @param level: Level of indent + + """ + indent = " " * level + for key in sorted(actual): + val = param_dict.get(key, "default (%s)" % actual[key]) + buf.write("%s- %s: %s\n" % (indent, key, val))