X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/25be0c756d404fd0c5891e272962d12914dfceb9..fb44c6dbca84285813fd3278b14687ddc41de678:/lib/client/gnt_cluster.py diff --git a/lib/client/gnt_cluster.py b/lib/client/gnt_cluster.py index 91dafab..f3ada54 100644 --- a/lib/client/gnt_cluster.py +++ b/lib/client/gnt_cluster.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2010 Google Inc. +# Copyright (C) 2006, 2007, 2010, 2011 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -20,7 +20,7 @@ """Cluster related commands""" -# pylint: disable-msg=W0401,W0613,W0614,C0103 +# pylint: disable=W0401,W0613,W0614,C0103 # W0401: Wildcard import ganeti.cli # W0613: Unused argument, since all functions follow the same API # W0614: Unused import %s from wildcard import (since we need cli) @@ -29,6 +29,7 @@ import os.path import time import OpenSSL +import itertools from ganeti.cli import * from ganeti import opcodes @@ -40,6 +41,20 @@ from ganeti import ssh from ganeti import objects from ganeti import uidpool from ganeti import compat +from ganeti import netutils + + +ON_OPT = cli_option("--on", default=False, + action="store_true", dest="on", + help="Recover from an EPO") + +GROUPS_OPT = cli_option("--groups", default=False, + action="store_true", dest="groups", + help="Arguments are node groups instead of nodes") + +_EPO_PING_INTERVAL = 30 # 30 seconds between pings +_EPO_PING_TIMEOUT = 1 # 1 second +_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes @UsesRPC @@ -130,6 +145,7 @@ def InitCluster(opts, args): mac_prefix=opts.mac_prefix, master_netdev=master_netdev, file_storage_dir=opts.file_storage_dir, + shared_file_storage_dir=opts.shared_file_storage_dir, enabled_hypervisors=hvlist, hvparams=hvparams, beparams=beparams, @@ -145,7 +161,7 @@ def InitCluster(opts, args): primary_ip_version=primary_ip_version, prealloc_wipe_disks=opts.prealloc_wipe_disks, ) - op = opcodes.OpPostInitCluster() + op = opcodes.OpClusterPostInit() SubmitOpCode(op, opts=opts) return 0 @@ -166,7 +182,7 @@ def DestroyCluster(opts, args): " destroy this cluster, supply the --yes-do-it option.") return 1 - op = opcodes.OpDestroyCluster() + op = opcodes.OpClusterDestroy() master = SubmitOpCode(op, opts=opts) # if we reached this, the opcode didn't fail; we can proceed to # shutdown all the daemons @@ -198,7 +214,7 @@ def RenameCluster(opts, args): if not AskUser(usertext): return 1 - op = opcodes.OpRenameCluster(name=new_name) + op = opcodes.OpClusterRename(name=new_name) result = SubmitOpCode(op, opts=opts, cl=cl) if result: @@ -207,6 +223,32 @@ def RenameCluster(opts, args): return 0 +def ActivateMasterIp(opts, args): + """Activates the master IP. + + """ + op = opcodes.OpClusterActivateMasterIp() + SubmitOpCode(op) + return 0 + + +def DeactivateMasterIp(opts, args): + """Deactivates the master IP. + + """ + if not opts.confirm: + usertext = ("This will disable the master IP. All the open connections to" + " the master IP will be closed. To reach the master you will" + " need to use its node IP." + " Continue?") + if not AskUser(usertext): + return 1 + + op = opcodes.OpClusterDeactivateMasterIp() + SubmitOpCode(op) + return 0 + + def RedistributeConfig(opts, args): """Forces push of the cluster configuration. @@ -217,7 +259,7 @@ def RedistributeConfig(opts, args): @return: the desired exit code """ - op = opcodes.OpRedistributeConfig() + op = opcodes.OpClusterRedistConf() SubmitOrSend(op, opts) return 0 @@ -321,6 +363,9 @@ def ShowClusterConfig(opts, args): ToStdout("OS parameters:") _PrintGroupedParams(result["osparams"]) + ToStdout("Hidden OSes: %s", utils.CommaJoin(result["hidden_os"])) + ToStdout("Blacklisted OSes: %s", utils.CommaJoin(result["blacklisted_os"])) + ToStdout("Cluster parameters:") ToStdout(" - candidate pool size: %s", compat.TryToRoman(result["candidate_pool_size"], @@ -334,6 +379,8 @@ def ShowClusterConfig(opts, args): ToStdout(" - lvm reserved volumes: %s", reserved_lvs) ToStdout(" - drbd usermode helper: %s", result["drbd_usermode_helper"]) ToStdout(" - file storage path: %s", result["file_storage_dir"]) + ToStdout(" - shared file storage path: %s", + result["shared_file_storage_dir"]) ToStdout(" - maintenance of node health: %s", result["maintain_node_health"]) ToStdout(" - uid pool: %s", @@ -342,6 +389,10 @@ def ShowClusterConfig(opts, args): ToStdout(" - default instance allocator: %s", result["default_iallocator"]) ToStdout(" - primary ip version: %d", result["primary_ip_version"]) ToStdout(" - preallocation wipe disks: %s", result["prealloc_wipe_disks"]) + ToStdout(" - OS search path: %s", utils.CommaJoin(constants.OS_SEARCH_PATH)) + + ToStdout("Default node parameters:") + _PrintGroupedParams(result["ndparams"], roman=opts.roman_integers) ToStdout("Default instance parameters:") _PrintGroupedParams(result["beparams"], roman=opts.roman_integers) @@ -373,7 +424,8 @@ def ClusterCopyFile(opts, args): cluster_name = cl.QueryConfigValues(["cluster_name"])[0] results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True, - secondary_ips=opts.use_replication_network) + secondary_ips=opts.use_replication_network, + nodegroup=opts.nodegroup) srun = ssh.SshRunner(cluster_name=cluster_name) for node in results: @@ -397,7 +449,7 @@ def RunClusterCommand(opts, args): command = " ".join(args) - nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl) + nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup) cluster_name, master_node = cl.QueryConfigValues(["cluster_name", "master_node"]) @@ -430,16 +482,45 @@ def VerifyCluster(opts, args): """ skip_checks = [] + if opts.skip_nplusone_mem: skip_checks.append(constants.VERIFY_NPLUSONE_MEM) - op = opcodes.OpVerifyCluster(skip_checks=skip_checks, - verbose=opts.verbose, + + cl = GetClient() + + op = opcodes.OpClusterVerify(verbose=opts.verbose, error_codes=opts.error_codes, - debug_simulate_errors=opts.simulate_errors) - if SubmitOpCode(op, opts=opts): - return 0 + debug_simulate_errors=opts.simulate_errors, + skip_checks=skip_checks, + group_name=opts.nodegroup) + result = SubmitOpCode(op, cl=cl, opts=opts) + + # Keep track of submitted jobs + jex = JobExecutor(cl=cl, opts=opts) + + for (status, job_id) in result[constants.JOB_IDS_KEY]: + jex.AddJobId(None, status, job_id) + + results = jex.GetResults() + + (bad_jobs, bad_results) = \ + map(len, + # Convert iterators to lists + map(list, + # Count errors + map(compat.partial(itertools.ifilterfalse, bool), + # Convert result to booleans in a tuple + zip(*((job_success, len(op_results) == 1 and op_results[0]) + for (job_success, op_results) in results))))) + + if bad_jobs == 0 and bad_results == 0: + rcode = constants.EXIT_SUCCESS else: - return 1 + rcode = constants.EXIT_FAILURE + if bad_jobs > 0: + ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs) + + return rcode def VerifyDisks(opts, args): @@ -454,27 +535,35 @@ def VerifyDisks(opts, args): """ cl = GetClient() - op = opcodes.OpVerifyDisks() - result = SubmitOpCode(op, opts=opts, cl=cl) - if not isinstance(result, (list, tuple)) or len(result) != 3: - raise errors.ProgrammerError("Unknown result type for OpVerifyDisks") + op = opcodes.OpClusterVerifyDisks() + + result = SubmitOpCode(op, cl=cl, opts=opts) - bad_nodes, instances, missing = result + # Keep track of submitted jobs + jex = JobExecutor(cl=cl, opts=opts) + + for (status, job_id) in result[constants.JOB_IDS_KEY]: + jex.AddJobId(None, status, job_id) retcode = constants.EXIT_SUCCESS - if bad_nodes: + for (status, result) in jex.GetResults(): + if not status: + ToStdout("Job failed: %s", result) + continue + + ((bad_nodes, instances, missing), ) = result + for node, text in bad_nodes.items(): ToStdout("Error gathering data on node %s: %s", node, utils.SafeEncode(text[-400:])) - retcode |= 1 + retcode = constants.EXIT_FAILURE ToStdout("You need to fix these nodes first before fixing instances") - if instances: for iname in instances: if iname in missing: continue - op = opcodes.OpActivateInstanceDisks(instance_name=iname) + op = opcodes.OpInstanceActivateDisks(instance_name=iname) try: ToStdout("Activating disks for instance '%s'", iname) SubmitOpCode(op, opts=opts, cl=cl) @@ -483,26 +572,24 @@ def VerifyDisks(opts, args): retcode |= nret ToStderr("Error activating disks for instance %s: %s", iname, msg) - if missing: - (vg_name, ) = cl.QueryConfigValues(["volume_group_name"]) - - for iname, ival in missing.iteritems(): - all_missing = compat.all(x[0] in bad_nodes for x in ival) - if all_missing: - ToStdout("Instance %s cannot be verified as it lives on" - " broken nodes", iname) - else: - ToStdout("Instance %s has missing logical volumes:", iname) - ival.sort() - for node, vol in ival: - if node in bad_nodes: - ToStdout("\tbroken node %s /dev/%s/%s", node, vg_name, vol) - else: - ToStdout("\t%s /dev/%s/%s", node, vg_name, vol) - - ToStdout("You need to run replace_disks for all the above" - " instances, if this message persist after fixing nodes.") - retcode |= 1 + if missing: + for iname, ival in missing.iteritems(): + all_missing = compat.all(x[0] in bad_nodes for x in ival) + if all_missing: + ToStdout("Instance %s cannot be verified as it lives on" + " broken nodes", iname) + else: + ToStdout("Instance %s has missing logical volumes:", iname) + ival.sort() + for node, vol in ival: + if node in bad_nodes: + ToStdout("\tbroken node %s /dev/%s", node, vol) + else: + ToStdout("\t%s /dev/%s", node, vol) + + ToStdout("You need to replace or recreate disks for all the above" + " instances if this message persists after fixing broken nodes.") + retcode = constants.EXIT_FAILURE return retcode @@ -517,7 +604,7 @@ def RepairDiskSizes(opts, args): @return: the desired exit code """ - op = opcodes.OpRepairDiskSizes(instances=args) + op = opcodes.OpClusterRepairDiskSizes(instances=args) SubmitOpCode(op, opts=opts) @@ -561,7 +648,7 @@ def MasterPing(opts, args): cl = GetClient() cl.QueryClusterInfo() return 0 - except Exception: # pylint: disable-msg=W0703 + except Exception: # pylint: disable=W0703 return 1 @@ -575,7 +662,7 @@ def SearchTags(opts, args): @return: the desired exit code """ - op = opcodes.OpSearchTags(pattern=args[0]) + op = opcodes.OpTagsSearch(pattern=args[0]) result = SubmitOpCode(op, opts=opts) if not result: return 1 @@ -585,9 +672,45 @@ def SearchTags(opts, args): ToStdout("%s %s", path, tag) -def _RenewCrypto(new_cluster_cert, new_rapi_cert, rapi_cert_filename, - new_confd_hmac_key, new_cds, cds_filename, - force): +def _ReadAndVerifyCert(cert_filename, verify_private_key=False): + """Reads and verifies an X509 certificate. + + @type cert_filename: string + @param cert_filename: the path of the file containing the certificate to + verify encoded in PEM format + @type verify_private_key: bool + @param verify_private_key: whether to verify the private key in addition to + the public certificate + @rtype: string + @return: a string containing the PEM-encoded certificate. + + """ + try: + pem = utils.ReadFile(cert_filename) + except IOError, err: + raise errors.X509CertError(cert_filename, + "Unable to read certificate: %s" % str(err)) + + try: + OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem) + except Exception, err: + raise errors.X509CertError(cert_filename, + "Unable to load certificate: %s" % str(err)) + + if verify_private_key: + try: + OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem) + except Exception, err: + raise errors.X509CertError(cert_filename, + "Unable to load private key: %s" % str(err)) + + return pem + + +def _RenewCrypto(new_cluster_cert, new_rapi_cert, #pylint: disable=R0911 + rapi_cert_filename, new_spice_cert, spice_cert_filename, + spice_cacert_filename, new_confd_hmac_key, new_cds, + cds_filename, force): """Renews cluster certificates, keys and secrets. @type new_cluster_cert: bool @@ -596,6 +719,13 @@ def _RenewCrypto(new_cluster_cert, new_rapi_cert, rapi_cert_filename, @param new_rapi_cert: Whether to generate a new RAPI certificate @type rapi_cert_filename: string @param rapi_cert_filename: Path to file containing new RAPI certificate + @type new_spice_cert: bool + @param new_spice_cert: Whether to generate a new SPICE certificate + @type spice_cert_filename: string + @param spice_cert_filename: Path to file containing new SPICE certificate + @type spice_cacert_filename: string + @param spice_cacert_filename: Path to file containing the certificate of the + CA that signed the SPICE certificate @type new_confd_hmac_key: bool @param new_confd_hmac_key: Whether to generate a new HMAC key @type new_cds: bool @@ -607,7 +737,7 @@ def _RenewCrypto(new_cluster_cert, new_rapi_cert, rapi_cert_filename, """ if new_rapi_cert and rapi_cert_filename: - ToStderr("Only one of the --new-rapi-certficate and --rapi-certificate" + ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate" " options can be specified at the same time.") return 1 @@ -617,32 +747,31 @@ def _RenewCrypto(new_cluster_cert, new_rapi_cert, rapi_cert_filename, " the same time.") return 1 - if rapi_cert_filename: - # Read and verify new certificate - try: - rapi_cert_pem = utils.ReadFile(rapi_cert_filename) - - OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, - rapi_cert_pem) - except Exception, err: # pylint: disable-msg=W0703 - ToStderr("Can't load new RAPI certificate from %s: %s" % - (rapi_cert_filename, str(err))) - return 1 + if new_spice_cert and (spice_cert_filename or spice_cacert_filename): + ToStderr("When using --new-spice-certificate, the --spice-certificate" + " and --spice-ca-certificate must not be used.") + return 1 - try: - OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, rapi_cert_pem) - except Exception, err: # pylint: disable-msg=W0703 - ToStderr("Can't load new RAPI private key from %s: %s" % - (rapi_cert_filename, str(err))) - return 1 + if bool(spice_cacert_filename) ^ bool(spice_cert_filename): + ToStderr("Both --spice-certificate and --spice-ca-certificate must be" + " specified.") + return 1 - else: - rapi_cert_pem = None + rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None) + try: + if rapi_cert_filename: + rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True) + if spice_cert_filename: + spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True) + spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename) + except errors.X509CertError, err: + ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1]) + return 1 if cds_filename: try: cds = utils.ReadFile(cds_filename) - except Exception, err: # pylint: disable-msg=W0703 + except Exception, err: # pylint: disable=W0703 ToStderr("Can't load new cluster domain secret from %s: %s" % (cds_filename, str(err))) return 1 @@ -657,10 +786,14 @@ def _RenewCrypto(new_cluster_cert, new_rapi_cert, rapi_cert_filename, def _RenewCryptoInner(ctx): ctx.feedback_fn("Updating certificates and keys") - bootstrap.GenerateClusterCrypto(new_cluster_cert, new_rapi_cert, + bootstrap.GenerateClusterCrypto(new_cluster_cert, + new_rapi_cert, + new_spice_cert, new_confd_hmac_key, new_cds, rapi_cert_pem=rapi_cert_pem, + spice_cert_pem=spice_cert_pem, + spice_cacert_pem=spice_cacert_pem, cds=cds) files_to_copy = [] @@ -671,6 +804,10 @@ def _RenewCrypto(new_cluster_cert, new_rapi_cert, rapi_cert_filename, if new_rapi_cert or rapi_cert_pem: files_to_copy.append(constants.RAPI_CERT_FILE) + if new_spice_cert or spice_cert_pem: + files_to_copy.append(constants.SPICE_CERT_FILE) + files_to_copy.append(constants.SPICE_CACERT_FILE) + if new_confd_hmac_key: files_to_copy.append(constants.CONFD_HMAC_KEY) @@ -699,6 +836,9 @@ def RenewCrypto(opts, args): return _RenewCrypto(opts.new_cluster_cert, opts.new_rapi_cert, opts.rapi_cert, + opts.new_spice_cert, + opts.spice_cert, + opts.spice_cacert, opts.new_confd_hmac_key, opts.new_cluster_domain_secret, opts.cluster_domain_secret, @@ -786,7 +926,7 @@ def SetClusterParams(opts, args): else: opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",") - op = opcodes.OpSetClusterParams(vg_name=vg_name, + op = opcodes.OpClusterSetParams(vg_name=vg_name, drbd_helper=drbd_helper, enabled_hypervisors=hvlist, hvparams=hvparams, @@ -878,8 +1018,325 @@ def WatcherOps(opts, args): return 0 +def _OobPower(opts, node_list, power): + """Puts the node in the list to desired power state. + + @param opts: The command line options selected by the user + @param node_list: The list of nodes to operate on + @param power: True if they should be powered on, False otherwise + @return: The success of the operation (none failed) + + """ + if power: + command = constants.OOB_POWER_ON + else: + command = constants.OOB_POWER_OFF + + op = opcodes.OpOobCommand(node_names=node_list, + command=command, + ignore_status=True, + timeout=opts.oob_timeout, + power_delay=opts.power_delay) + result = SubmitOpCode(op, opts=opts) + errs = 0 + for node_result in result: + (node_tuple, data_tuple) = node_result + (_, node_name) = node_tuple + (data_status, _) = data_tuple + if data_status != constants.RS_NORMAL: + assert data_status != constants.RS_UNAVAIL + errs += 1 + ToStderr("There was a problem changing power for %s, please investigate", + node_name) + + if errs > 0: + return False + + return True + + +def _InstanceStart(opts, inst_list, start): + """Puts the instances in the list to desired state. + + @param opts: The command line options selected by the user + @param inst_list: The list of instances to operate on + @param start: True if they should be started, False for shutdown + @return: The success of the operation (none failed) + + """ + if start: + opcls = opcodes.OpInstanceStartup + text_submit, text_success, text_failed = ("startup", "started", "starting") + else: + opcls = compat.partial(opcodes.OpInstanceShutdown, + timeout=opts.shutdown_timeout) + text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping") + + jex = JobExecutor(opts=opts) + + for inst in inst_list: + ToStdout("Submit %s of instance %s", text_submit, inst) + op = opcls(instance_name=inst) + jex.QueueJob(inst, op) + + results = jex.GetResults() + bad_cnt = len([1 for (success, _) in results if not success]) + + if bad_cnt == 0: + ToStdout("All instances have been %s successfully", text_success) + else: + ToStderr("There were errors while %s instances:\n" + "%d error(s) out of %d instance(s)", text_failed, bad_cnt, + len(results)) + return False + + return True + + +class _RunWhenNodesReachableHelper: + """Helper class to make shared internal state sharing easier. + + @ivar success: Indicates if all action_cb calls were successful + + """ + def __init__(self, node_list, action_cb, node2ip, port, feedback_fn, + _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep): + """Init the object. + + @param node_list: The list of nodes to be reachable + @param action_cb: Callback called when a new host is reachable + @type node2ip: dict + @param node2ip: Node to ip mapping + @param port: The port to use for the TCP ping + @param feedback_fn: The function used for feedback + @param _ping_fn: Function to check reachabilty (for unittest use only) + @param _sleep_fn: Function to sleep (for unittest use only) + + """ + self.down = set(node_list) + self.up = set() + self.node2ip = node2ip + self.success = True + self.action_cb = action_cb + self.port = port + self.feedback_fn = feedback_fn + self._ping_fn = _ping_fn + self._sleep_fn = _sleep_fn + + def __call__(self): + """When called we run action_cb. + + @raises utils.RetryAgain: When there are still down nodes + + """ + if not self.action_cb(self.up): + self.success = False + + if self.down: + raise utils.RetryAgain() + else: + return self.success + + def Wait(self, secs): + """Checks if a host is up or waits remaining seconds. + + @param secs: The secs remaining + + """ + start = time.time() + for node in self.down: + if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT, + live_port_needed=True): + self.feedback_fn("Node %s became available" % node) + self.up.add(node) + self.down -= self.up + # If we have a node available there is the possibility to run the + # action callback successfully, therefore we don't wait and return + return + + self._sleep_fn(max(0.0, start + secs - time.time())) + + +def _RunWhenNodesReachable(node_list, action_cb, interval): + """Run action_cb when nodes become reachable. + + @param node_list: The list of nodes to be reachable + @param action_cb: Callback called when a new host is reachable + @param interval: The earliest time to retry + + """ + client = GetClient() + cluster_info = client.QueryClusterInfo() + if cluster_info["primary_ip_version"] == constants.IP4_VERSION: + family = netutils.IPAddress.family + else: + family = netutils.IP6Address.family + + node2ip = dict((node, netutils.GetHostname(node, family=family).ip) + for node in node_list) + + port = netutils.GetDaemonPort(constants.NODED) + helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port, + ToStdout) + + try: + return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT, + wait_fn=helper.Wait) + except utils.RetryTimeout: + ToStderr("Time exceeded while waiting for nodes to become reachable" + " again:\n - %s", " - ".join(helper.down)) + return False + + +def _MaybeInstanceStartup(opts, inst_map, nodes_online, + _instance_start_fn=_InstanceStart): + """Start the instances conditional based on node_states. + + @param opts: The command line options selected by the user + @param inst_map: A dict of inst -> nodes mapping + @param nodes_online: A list of nodes online + @param _instance_start_fn: Callback to start instances (unittest use only) + @return: Success of the operation on all instances + + """ + start_inst_list = [] + for (inst, nodes) in inst_map.items(): + if not (nodes - nodes_online): + # All nodes the instance lives on are back online + start_inst_list.append(inst) + + for inst in start_inst_list: + del inst_map[inst] + + if start_inst_list: + return _instance_start_fn(opts, start_inst_list, True) + + return True + + +def _EpoOn(opts, full_node_list, node_list, inst_map): + """Does the actual power on. + + @param opts: The command line options selected by the user + @param full_node_list: All nodes to operate on (includes nodes not supporting + OOB) + @param node_list: The list of nodes to operate on (all need to support OOB) + @param inst_map: A dict of inst -> nodes mapping + @return: The desired exit status + + """ + if node_list and not _OobPower(opts, node_list, False): + ToStderr("Not all nodes seem to get back up, investigate and start" + " manually if needed") + + # Wait for the nodes to be back up + action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map)) + + ToStdout("Waiting until all nodes are available again") + if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL): + ToStderr("Please investigate and start stopped instances manually") + return constants.EXIT_FAILURE + + return constants.EXIT_SUCCESS + + +def _EpoOff(opts, node_list, inst_map): + """Does the actual power off. + + @param opts: The command line options selected by the user + @param node_list: The list of nodes to operate on (all need to support OOB) + @param inst_map: A dict of inst -> nodes mapping + @return: The desired exit status + + """ + if not _InstanceStart(opts, inst_map.keys(), False): + ToStderr("Please investigate and stop instances manually before continuing") + return constants.EXIT_FAILURE + + if not node_list: + return constants.EXIT_SUCCESS + + if _OobPower(opts, node_list, False): + return constants.EXIT_SUCCESS + else: + return constants.EXIT_FAILURE + + +def Epo(opts, args): + """EPO operations. + + @param opts: the command line options selected by the user + @type args: list + @param args: should contain only one element, the subcommand + @rtype: int + @return: the desired exit code + + """ + if opts.groups and opts.show_all: + ToStderr("Only one of --groups or --all are allowed") + return constants.EXIT_FAILURE + elif args and opts.show_all: + ToStderr("Arguments in combination with --all are not allowed") + return constants.EXIT_FAILURE + + client = GetClient() + + if opts.groups: + node_query_list = itertools.chain(*client.QueryGroups(names=args, + fields=["node_list"], + use_locking=False)) + else: + node_query_list = args + + result = client.QueryNodes(names=node_query_list, + fields=["name", "master", "pinst_list", + "sinst_list", "powered", "offline"], + use_locking=False) + node_list = [] + inst_map = {} + for (idx, (node, master, pinsts, sinsts, powered, + offline)) in enumerate(result): + # Normalize the node_query_list as well + if not opts.show_all: + node_query_list[idx] = node + if not offline: + for inst in (pinsts + sinsts): + if inst in inst_map: + if not master: + inst_map[inst].add(node) + elif master: + inst_map[inst] = set() + else: + inst_map[inst] = set([node]) + + if master and opts.on: + # We ignore the master for turning on the machines, in fact we are + # already operating on the master at this point :) + continue + elif master and not opts.show_all: + ToStderr("%s is the master node, please do a master-failover to another" + " node not affected by the EPO or use --all if you intend to" + " shutdown the whole cluster", node) + return constants.EXIT_FAILURE + elif powered is None: + ToStdout("Node %s does not support out-of-band handling, it can not be" + " handled in a fully automated manner", node) + elif powered == opts.on: + ToStdout("Node %s is already in desired power state, skipping", node) + elif not offline or (offline and powered): + node_list.append(node) + + if not opts.force and not ConfirmOperation(node_query_list, "nodes", "epo"): + return constants.EXIT_FAILURE + + if opts.on: + return _EpoOn(opts, node_query_list, node_list, inst_map) + else: + return _EpoOff(opts, node_list, inst_map) + + commands = { - 'init': ( + "init": ( InitCluster, [ArgHost(min=1, max=1)], [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT, HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, NIC_PARAMS_OPT, @@ -887,77 +1344,77 @@ commands = { SECONDARY_IP_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT, - NODE_PARAMS_OPT], + NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT], "[opts...] ", "Initialises a new cluster configuration"), - 'destroy': ( + "destroy": ( DestroyCluster, ARGS_NONE, [YES_DOIT_OPT], "", "Destroy cluster"), - 'rename': ( + "rename": ( RenameCluster, [ArgHost(min=1, max=1)], [FORCE_OPT, DRY_RUN_OPT], "", "Renames the cluster"), - 'redist-conf': ( + "redist-conf": ( RedistributeConfig, ARGS_NONE, [SUBMIT_OPT, DRY_RUN_OPT, PRIORITY_OPT], "", "Forces a push of the configuration file and ssconf files" " to the nodes in the cluster"), - 'verify': ( + "verify": ( VerifyCluster, ARGS_NONE, [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT, - DRY_RUN_OPT, PRIORITY_OPT], + DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT], "", "Does a check on the cluster configuration"), - 'verify-disks': ( + "verify-disks": ( VerifyDisks, ARGS_NONE, [PRIORITY_OPT], "", "Does a check on the cluster disk status"), - 'repair-disk-sizes': ( + "repair-disk-sizes": ( RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT], "", "Updates mismatches in recorded disk sizes"), - 'master-failover': ( + "master-failover": ( MasterFailover, ARGS_NONE, [NOVOTING_OPT], "", "Makes the current node the master"), - 'master-ping': ( + "master-ping": ( MasterPing, ARGS_NONE, [], "", "Checks if the master is alive"), - 'version': ( + "version": ( ShowClusterVersion, ARGS_NONE, [], "", "Shows the cluster version"), - 'getmaster': ( + "getmaster": ( ShowClusterMaster, ARGS_NONE, [], "", "Shows the cluster master"), - 'copyfile': ( + "copyfile": ( ClusterCopyFile, [ArgFile(min=1, max=1)], - [NODE_LIST_OPT, USE_REPL_NET_OPT], + [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT], "[-n node...] ", "Copies a file to all (or only some) nodes"), - 'command': ( + "command": ( RunClusterCommand, [ArgCommand(min=1)], - [NODE_LIST_OPT], + [NODE_LIST_OPT, NODEGROUP_OPT], "[-n node...] ", "Runs a command on all (or only some) nodes"), - 'info': ( + "info": ( ShowClusterConfig, ARGS_NONE, [ROMAN_OPT], "[--roman]", "Show cluster configuration"), - 'list-tags': ( + "list-tags": ( ListTags, ARGS_NONE, [], "", "List the tags of the cluster"), - 'add-tags': ( + "add-tags": ( AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT], "tag...", "Add tags to the cluster"), - 'remove-tags': ( + "remove-tags": ( RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT], "tag...", "Remove tags from the cluster"), - 'search-tags': ( + "search-tags": ( SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "", "Searches the tags on all objects on" " the cluster for a given pattern (regex)"), - 'queue': ( + "queue": ( QueueOps, [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])], [], "drain|undrain|info", "Change queue properties"), - 'watcher': ( + "watcher": ( WatcherOps, [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]), ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])], [], "{pause |continue|info}", "Change watcher properties"), - 'modify': ( + "modify": ( SetClusterParams, ARGS_NONE, [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT, @@ -970,15 +1427,27 @@ commands = { RenewCrypto, ARGS_NONE, [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT, NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT, - NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT], + NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT, + NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT], "[opts...]", "Renews cluster certificates, keys and secrets"), + "epo": ( + Epo, [ArgUnknown()], + [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT, + SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT], + "[opts...] [args]", + "Performs an emergency power-off on given args"), + "activate-master-ip": ( + ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"), + "deactivate-master-ip": ( + DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "", + "Deactivates the master IP"), } #: dictionary with aliases for commands aliases = { - 'masterfailover': 'master-failover', + "masterfailover": "master-failover", }