X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/d47319e2fc3c1f6065b72ca5113cc6a2f1f706ba..916c0e6fbad2874c09bc18a3e29b2b0ba25821f8:/tools/cluster-merge diff --git a/tools/cluster-merge b/tools/cluster-merge index cff79dd..066440e 100755 --- a/tools/cluster-merge +++ b/tools/cluster-merge @@ -1,7 +1,7 @@ #!/usr/bin/python # -# Copyright (C) 2010 Google Inc. +# Copyright (C) 2010, 2012 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -24,7 +24,7 @@ The clusters have to run the same version of Ganeti! """ -# pylint: disable-msg=C0103 +# pylint: disable=C0103 # C0103: Invalid name cluster-merge import logging @@ -40,6 +40,8 @@ from ganeti import constants from ganeti import errors from ganeti import ssh from ganeti import utils +from ganeti import pathutils +from ganeti import compat _GROUPS_MERGE = "merge" @@ -81,6 +83,12 @@ RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL, " same name (One of: %s/%s/%s)" % _RESTART_CHOICES)) +SKIP_STOP_INSTANCES_OPT = \ + cli.cli_option("--skip-stop-instances", default=True, action="store_false", + dest="stop_instances", + help=("Don't stop the instances on the clusters, just check " + "that none is running")) + def Flatten(unflattened_list): """Flattens a list. @@ -103,13 +111,15 @@ class MergerData(object): """Container class to hold data used for merger. """ - def __init__(self, cluster, key_path, nodes, instances, config_path=None): + def __init__(self, cluster, key_path, nodes, instances, master_node, + config_path=None): """Initialize the container. @param cluster: The name of the cluster @param key_path: Path to the ssh private key used for authentication @param nodes: List of online nodes in the merging cluster @param instances: List of instances running on merging cluster + @param master_node: Name of the master node @param config_path: Path to the merging cluster config """ @@ -117,6 +127,7 @@ class MergerData(object): self.key_path = key_path self.nodes = nodes self.instances = instances + self.master_node = master_node self.config_path = config_path @@ -124,13 +135,22 @@ class Merger(object): """Handling the merge. """ - def __init__(self, clusters, pause_period, groups, restart, params): + RUNNING_STATUSES = compat.UniqueFrozenset([ + constants.INSTST_RUNNING, + constants.INSTST_ERRORUP, + ]) + + def __init__(self, clusters, pause_period, groups, restart, params, + stop_instances): """Initialize object with sane defaults and infos required. @param clusters: The list of clusters to merge in @param pause_period: The time watcher shall be disabled for @param groups: How to handle group conflicts @param restart: How to handle instance restart + @param stop_instances: Indicates whether the instances must be stopped + (True) or if the Merger must only check if no + instances are running on the mergee clusters (False) """ self.merger_data = [] @@ -142,10 +162,10 @@ class Merger(object): self.groups = groups self.restart = restart self.params = params + self.stop_instances = stop_instances if self.restart == _RESTART_UP: raise NotImplementedError - def Setup(self): """Sets up our end so we can do the merger. @@ -174,16 +194,16 @@ class Merger(object): utils.WriteFile(key_path, mode=0600, data=result.stdout) result = self._RunCmd(cluster, "gnt-node list -o name,offline" - " --no-header --separator=,", private_key=key_path) + " --no-headers --separator=,", private_key=key_path) if result.failed: raise errors.RemoteError("Unable to retrieve list of nodes from %s." " Fail reason: %s; output: %s" % (cluster, result.fail_reason, result.output)) - nodes_statuses = [line.split(',') for line in result.stdout.splitlines()] + nodes_statuses = [line.split(",") for line in result.stdout.splitlines()] nodes = [node_status[0] for node_status in nodes_statuses if node_status[1] == "N"] - result = self._RunCmd(cluster, "gnt-instance list -o name --no-header", + result = self._RunCmd(cluster, "gnt-instance list -o name --no-headers", private_key=key_path) if result.failed: raise errors.RemoteError("Unable to retrieve list of instances from" @@ -191,7 +211,17 @@ class Merger(object): (cluster, result.fail_reason, result.output)) instances = result.stdout.splitlines() - self.merger_data.append(MergerData(cluster, key_path, nodes, instances)) + path = utils.PathJoin(pathutils.DATA_DIR, "ssconf_%s" % + constants.SS_MASTER_NODE) + result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path) + if result.failed: + raise errors.RemoteError("Unable to retrieve the master node name from" + " %s. Fail reason: %s; output: %s" % + (cluster, result.fail_reason, result.output)) + master_node = result.stdout.strip() + + self.merger_data.append(MergerData(cluster, key_path, nodes, instances, + master_node)) def _PrepareAuthorizedKeys(self): """Prepare the authorized_keys on every merging node. @@ -207,7 +237,7 @@ class Merger(object): for node in data.nodes: result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" % (auth_keys, pub_key)), - private_key=data.key_path) + private_key=data.key_path, max_attempts=3) if result.failed: raise errors.RemoteError("Unable to add our public key to %s in %s." @@ -217,17 +247,36 @@ class Merger(object): def _RunCmd(self, hostname, command, user="root", use_cluster_key=False, strict_host_check=False, private_key=None, batch=True, - ask_key=False): + ask_key=False, max_attempts=1): """Wrapping SshRunner.Run with default parameters. For explanation of parameters see L{ganeti.ssh.SshRunner.Run}. """ - return self.ssh_runner.Run(hostname=hostname, command=command, user=user, - use_cluster_key=use_cluster_key, - strict_host_check=strict_host_check, - private_key=private_key, batch=batch, - ask_key=ask_key) + for _ in range(max_attempts): + result = self.ssh_runner.Run(hostname=hostname, command=command, + user=user, use_cluster_key=use_cluster_key, + strict_host_check=strict_host_check, + private_key=private_key, batch=batch, + ask_key=ask_key) + if not result.failed: + break + + return result + + def _CheckRunningInstances(self): + """Checks if on the clusters to be merged there are running instances + + @rtype: boolean + @return: True if there are running instances, False otherwise + + """ + for cluster in self.clusters: + result = self._RunCmd(cluster, "gnt-instance list -o status") + if self.RUNNING_STATUSES.intersection(result.output.splitlines()): + return True + + return False def _StopMergingInstances(self): """Stop instances on merging clusters. @@ -255,14 +304,29 @@ class Merger(object): " Fail reason: %s; output: %s" % (cluster, result.fail_reason, result.output)) + def _RemoveMasterIps(self): + """Removes the master IPs from the master nodes of each cluster. + + """ + for data in self.merger_data: + result = self._RunCmd(data.master_node, + "gnt-cluster deactivate-master-ip --yes") + + if result.failed: + raise errors.RemoteError("Unable to remove master IP on %s." + " Fail reason: %s; output: %s" % + (data.master_node, + result.fail_reason, + result.output)) + def _StopDaemons(self): """Stop all daemons on merging nodes. """ - cmd = "%s stop-all" % constants.DAEMON_UTIL + cmd = "%s stop-all" % pathutils.DAEMON_UTIL for data in self.merger_data: for node in data.nodes: - result = self._RunCmd(node, cmd) + result = self._RunCmd(node, cmd, max_attempts=3) if result.failed: raise errors.RemoteError("Unable to stop daemons on %s." @@ -277,7 +341,7 @@ class Merger(object): """ for data in self.merger_data: result = self._RunCmd(data.cluster, "cat %s" % - constants.CLUSTER_CONF_FILE) + pathutils.CLUSTER_CONF_FILE) if result.failed: raise errors.RemoteError("Unable to retrieve remote config on %s." @@ -290,13 +354,13 @@ class Merger(object): utils.WriteFile(data.config_path, data=result.stdout) # R0201: Method could be a function - def _KillMasterDaemon(self): # pylint: disable-msg=R0201 + def _KillMasterDaemon(self): # pylint: disable=R0201 """Kills the local master daemon. @raise errors.CommandError: If unable to kill """ - result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"]) + result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"]) if result.failed: raise errors.CommandError("Unable to stop master daemons." " Fail reason: %s; output: %s" % @@ -329,17 +393,13 @@ class Merger(object): # Update the DRBD port assignments # This is a little bit hackish for dsk in instance_info.disks: - if dsk.dev_type in constants.LDS_DRBD: + if dsk.dev_type in constants.DTS_DRBD: port = my_config.AllocatePort() logical_id = list(dsk.logical_id) logical_id[2] = port dsk.logical_id = tuple(logical_id) - physical_id = list(dsk.physical_id) - physical_id[1] = physical_id[3] = port - dsk.physical_id = tuple(physical_id) - my_config.AddInstance(instance_info, _CLUSTERMERGE_ECID + str(fake_ec_id)) fake_ec_id += 1 @@ -371,9 +431,11 @@ class Merger(object): check_params_strict = [ "volume_group_name", ] - if constants.ENABLE_FILE_STORAGE: + if my_cluster.IsFileStorageEnabled() or \ + other_cluster.IsFileStorageEnabled(): check_params_strict.append("file_storage_dir") - if constants.ENABLE_SHARED_FILE_STORAGE: + if my_cluster.IsSharedFileStorageEnabled() or \ + other_cluster.IsSharedFileStorageEnabled(): check_params_strict.append("shared_file_storage_dir") check_params.extend(check_params_strict) @@ -490,7 +552,7 @@ class Merger(object): other_cluster.cluster_name) # R0201: Method could be a function - def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201 + def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable=R0201 if os_name in cluster.os_hvp: return cluster.os_hvp[os_name].get(hyp, None) else: @@ -502,7 +564,7 @@ class Merger(object): ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts. """ - # pylint: disable-msg=R0201 + # pylint: disable=R0201 logging.info("Node group conflict strategy: %s", self.groups) my_grps = my_config.GetAllNodeGroupsInfo().values() @@ -542,15 +604,15 @@ class Merger(object): for node_name in other_grp.members[:]: node = other_config.GetNodeInfo(node_name) # Access to a protected member of a client class - # pylint: disable-msg=W0212 + # pylint: disable=W0212 other_config._UnlockedRemoveNodeFromGroup(node) # Access to a protected member of a client class - # pylint: disable-msg=W0212 + # pylint: disable=W0212 my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name) # Access to a protected member of a client class - # pylint: disable-msg=W0212 + # pylint: disable=W0212 my_config._UnlockedAddNodeToGroup(node, my_grp_uuid) node.group = my_grp_uuid # Remove from list of groups to add @@ -561,7 +623,7 @@ class Merger(object): my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID) # R0201: Method could be a function - def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201 + def _StartMasterDaemon(self, no_vote=False): # pylint: disable=R0201 """Starts the local master daemon. @param no_vote: Should the masterd started without voting? default: False @@ -572,7 +634,7 @@ class Merger(object): if no_vote: env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it" - result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env) + result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env) if result.failed: raise errors.CommandError("Couldn't start ganeti master." " Fail reason: %s; output: %s" % @@ -586,8 +648,9 @@ class Merger(object): """ for data in self.merger_data: for node in data.nodes: + logging.info("Readding node %s", node) result = utils.RunCmd(["gnt-node", "add", "--readd", - "--no-ssh-key-check", "--force-join", node]) + "--no-ssh-key-check", node]) if result.failed: logging.error("%s failed to be readded. Reason: %s, output: %s", node, result.fail_reason, result.output) @@ -596,10 +659,10 @@ class Merger(object): if result.failed: raise errors.CommandError("Redistribution failed. Fail reason: %s;" " output: %s" % (result.fail_reason, - result.output)) + result.output)) # R0201: Method could be a function - def _StartupAllInstances(self): # pylint: disable-msg=R0201 + def _StartupAllInstances(self): # pylint: disable=R0201 """Starts up all instances (locally). @raise errors.CommandError: If unable to start clusters @@ -613,7 +676,8 @@ class Merger(object): (result.fail_reason, result.output)) # R0201: Method could be a function - def _VerifyCluster(self): # pylint: disable-msg=R0201 + # TODO: make this overridable, for some verify errors + def _VerifyCluster(self): # pylint: disable=R0201 """Runs gnt-cluster verify to verify the health. @raise errors.ProgrammError: If cluster fails on verification @@ -644,22 +708,29 @@ class Merger(object): rbsteps.append("Start all instances again on the merging" " clusters: %(clusters)s") - logging.info("Stopping merging instances (takes a while)") - self._StopMergingInstances() - + if self.stop_instances: + logging.info("Stopping merging instances (takes a while)") + self._StopMergingInstances() + logging.info("Checking that no instances are running on the mergees") + instances_running = self._CheckRunningInstances() + if instances_running: + raise errors.CommandError("Some instances are still running on the" + " mergees") logging.info("Disable watcher") self._DisableWatcher() - logging.info("Stop daemons on merging nodes") - self._StopDaemons() logging.info("Merging config") self._FetchRemoteConfig() + logging.info("Removing master IPs on mergee master nodes") + self._RemoveMasterIps() + logging.info("Stop daemons on merging nodes") + self._StopDaemons() logging.info("Stopping master daemon") self._KillMasterDaemon() rbsteps.append("Restore %s from another master candidate" " and restart master daemon" % - constants.CLUSTER_CONF_FILE) + pathutils.CLUSTER_CONF_FILE) self._MergeConfig() self._StartMasterDaemon(no_vote=True) @@ -709,28 +780,6 @@ class Merger(object): shutil.rmtree(self.work_dir) -def SetupLogging(options): - """Setting up logging infrastructure. - - @param options: Parsed command line options - - """ - formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s") - - stderr_handler = logging.StreamHandler() - stderr_handler.setFormatter(formatter) - if options.debug: - stderr_handler.setLevel(logging.NOTSET) - elif options.verbose: - stderr_handler.setLevel(logging.INFO) - else: - stderr_handler.setLevel(logging.WARNING) - - root_logger = logging.getLogger("") - root_logger.setLevel(logging.NOTSET) - root_logger.addHandler(stderr_handler) - - def main(): """Main routine. @@ -745,16 +794,18 @@ def main(): parser.add_option(GROUPS_OPT) parser.add_option(RESTART_OPT) parser.add_option(PARAMS_OPT) + parser.add_option(SKIP_STOP_INSTANCES_OPT) (options, args) = parser.parse_args() - SetupLogging(options) + utils.SetupToolLogging(options.debug, options.verbose) if not args: parser.error("No clusters specified") cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period, - options.groups, options.restart, options.params) + options.groups, options.restart, options.params, + options.stop_instances) try: try: cluster_merger.Setup()