X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/76ae1d65e222cf0c5c2f991e7b1715f1bb41e2c9..4a78c361a6de3bcbf98f02abfe41ae3b11de2b00:/tools/cluster-merge diff --git a/tools/cluster-merge b/tools/cluster-merge index 2de09bc..f94a32f 100755 --- a/tools/cluster-merge +++ b/tools/cluster-merge @@ -24,7 +24,7 @@ The clusters have to run the same version of Ganeti! """ -# pylint: disable-msg=C0103 +# pylint: disable=C0103 # C0103: Invalid name cluster-merge import logging @@ -45,6 +45,14 @@ from ganeti import utils _GROUPS_MERGE = "merge" _GROUPS_RENAME = "rename" _CLUSTERMERGE_ECID = "clustermerge-ecid" +_RESTART_ALL = "all" +_RESTART_UP = "up" +_RESTART_NONE = "none" +_RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE) +_PARAMS_STRICT = "strict" +_PARAMS_WARN = "warn" +_PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN) + PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800, action="store", type="int", @@ -57,6 +65,27 @@ GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY", help=("How to handle groups that have the" " same name (One of: %s/%s)" % (_GROUPS_MERGE, _GROUPS_RENAME))) +PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT, + metavar="STRATEGY", + choices=_PARAMS_CHOICES, + dest="params", + help=("How to handle params that have" + " different values (One of: %s/%s)" % + _PARAMS_CHOICES)) + +RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL, + metavar="STRATEGY", + choices=_RESTART_CHOICES, + dest="restart", + help=("How to handle restarting instances" + " same name (One of: %s/%s/%s)" % + _RESTART_CHOICES)) + +SKIP_STOP_INSTANCES_OPT = \ + cli.cli_option("--skip-stop-instances", default=True, action="store_false", + dest="stop_instances", + help=("Don't stop the instances on the clusters, just check " + "that none is running")) def Flatten(unflattened_list): @@ -80,13 +109,15 @@ class MergerData(object): """Container class to hold data used for merger. """ - def __init__(self, cluster, key_path, nodes, instances, config_path=None): + def __init__(self, cluster, key_path, nodes, instances, master_node, + config_path=None): """Initialize the container. @param cluster: The name of the cluster @param key_path: Path to the ssh private key used for authentication @param nodes: List of online nodes in the merging cluster @param instances: List of instances running on merging cluster + @param master_node: Name of the master node @param config_path: Path to the merging cluster config """ @@ -94,6 +125,7 @@ class MergerData(object): self.key_path = key_path self.nodes = nodes self.instances = instances + self.master_node = master_node self.config_path = config_path @@ -101,12 +133,22 @@ class Merger(object): """Handling the merge. """ - def __init__(self, clusters, pause_period, groups): + RUNNING_STATUSES = frozenset([ + constants.INSTST_RUNNING, + constants.INSTST_ERRORUP, + ]) + + def __init__(self, clusters, pause_period, groups, restart, params, + stop_instances): """Initialize object with sane defaults and infos required. @param clusters: The list of clusters to merge in @param pause_period: The time watcher shall be disabled for @param groups: How to handle group conflicts + @param restart: How to handle instance restart + @param stop_instances: Indicates whether the instances must be stopped + (True) or if the Merger must only check if no + instances are running on the mergee clusters (False) """ self.merger_data = [] @@ -116,6 +158,11 @@ class Merger(object): (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"]) self.ssh_runner = ssh.SshRunner(self.cluster_name) self.groups = groups + self.restart = restart + self.params = params + self.stop_instances = stop_instances + if self.restart == _RESTART_UP: + raise NotImplementedError def Setup(self): """Sets up our end so we can do the merger. @@ -145,16 +192,16 @@ class Merger(object): utils.WriteFile(key_path, mode=0600, data=result.stdout) result = self._RunCmd(cluster, "gnt-node list -o name,offline" - " --no-header --separator=,", private_key=key_path) + " --no-headers --separator=,", private_key=key_path) if result.failed: raise errors.RemoteError("Unable to retrieve list of nodes from %s." " Fail reason: %s; output: %s" % (cluster, result.fail_reason, result.output)) - nodes_statuses = [line.split(',') for line in result.stdout.splitlines()] + nodes_statuses = [line.split(",") for line in result.stdout.splitlines()] nodes = [node_status[0] for node_status in nodes_statuses if node_status[1] == "N"] - result = self._RunCmd(cluster, "gnt-instance list -o name --no-header", + result = self._RunCmd(cluster, "gnt-instance list -o name --no-headers", private_key=key_path) if result.failed: raise errors.RemoteError("Unable to retrieve list of instances from" @@ -162,7 +209,17 @@ class Merger(object): (cluster, result.fail_reason, result.output)) instances = result.stdout.splitlines() - self.merger_data.append(MergerData(cluster, key_path, nodes, instances)) + path = utils.PathJoin(constants.DATA_DIR, "ssconf_%s" % + constants.SS_MASTER_NODE) + result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path) + if result.failed: + raise errors.RemoteError("Unable to retrieve the master node name from" + " %s. Fail reason: %s; output: %s" % + (cluster, result.fail_reason, result.output)) + master_node = result.stdout.strip() + + self.merger_data.append(MergerData(cluster, key_path, nodes, instances, + master_node)) def _PrepareAuthorizedKeys(self): """Prepare the authorized_keys on every merging node. @@ -178,7 +235,7 @@ class Merger(object): for node in data.nodes: result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" % (auth_keys, pub_key)), - private_key=data.key_path) + private_key=data.key_path, max_attempts=3) if result.failed: raise errors.RemoteError("Unable to add our public key to %s in %s." @@ -188,17 +245,36 @@ class Merger(object): def _RunCmd(self, hostname, command, user="root", use_cluster_key=False, strict_host_check=False, private_key=None, batch=True, - ask_key=False): + ask_key=False, max_attempts=1): """Wrapping SshRunner.Run with default parameters. For explanation of parameters see L{ganeti.ssh.SshRunner.Run}. """ - return self.ssh_runner.Run(hostname=hostname, command=command, user=user, - use_cluster_key=use_cluster_key, - strict_host_check=strict_host_check, - private_key=private_key, batch=batch, - ask_key=ask_key) + for _ in range(max_attempts): + result = self.ssh_runner.Run(hostname=hostname, command=command, + user=user, use_cluster_key=use_cluster_key, + strict_host_check=strict_host_check, + private_key=private_key, batch=batch, + ask_key=ask_key) + if not result.failed: + break + + return result + + def _CheckRunningInstances(self): + """Checks if on the clusters to be merged there are running instances + + @rtype: boolean + @return: True if there are running instances, False otherwise + + """ + for cluster in self.clusters: + result = self._RunCmd(cluster, "gnt-instance list -o status") + if self.RUNNING_STATUSES.intersection(result.output.splitlines()): + return True + + return False def _StopMergingInstances(self): """Stop instances on merging clusters. @@ -226,6 +302,21 @@ class Merger(object): " Fail reason: %s; output: %s" % (cluster, result.fail_reason, result.output)) + def _RemoveMasterIps(self): + """Removes the master IPs from the master nodes of each cluster. + + """ + for data in self.merger_data: + result = self._RunCmd(data.master_node, + "gnt-cluster deactivate-master-ip --yes") + + if result.failed: + raise errors.RemoteError("Unable to remove master IP on %s." + " Fail reason: %s; output: %s" % + (data.master_node, + result.fail_reason, + result.output)) + def _StopDaemons(self): """Stop all daemons on merging nodes. @@ -233,7 +324,7 @@ class Merger(object): cmd = "%s stop-all" % constants.DAEMON_UTIL for data in self.merger_data: for node in data.nodes: - result = self._RunCmd(node, cmd) + result = self._RunCmd(node, cmd, max_attempts=3) if result.failed: raise errors.RemoteError("Unable to stop daemons on %s." @@ -261,7 +352,7 @@ class Merger(object): utils.WriteFile(data.config_path, data=result.stdout) # R0201: Method could be a function - def _KillMasterDaemon(self): # pylint: disable-msg=R0201 + def _KillMasterDaemon(self): # pylint: disable=R0201 """Kills the local master daemon. @raise errors.CommandError: If unable to kill @@ -287,6 +378,10 @@ class Merger(object): for node in other_config.GetNodeList(): node_info = other_config.GetNodeInfo(node) + # Offline the node, it will be reonlined later at node readd + node_info.master_candidate = False + node_info.drained = False + node_info.offline = True my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id)) fake_ec_id += 1 @@ -311,12 +406,10 @@ class Merger(object): _CLUSTERMERGE_ECID + str(fake_ec_id)) fake_ec_id += 1 - # R0201: Method could be a function def _MergeClusterConfigs(self, my_config, other_config): """Checks that all relevant cluster parameters are compatible """ - # pylint: disable-msg=R0201 my_cluster = my_config.GetClusterInfo() other_cluster = other_config.GetClusterInfo() err_count = 0 @@ -324,11 +417,10 @@ class Merger(object): # # Generic checks # - check_params = ( + check_params = [ "beparams", "default_iallocator", "drbd_usermode_helper", - "file_storage_dir", "hidden_os", "maintain_node_health", "master_netdev", @@ -337,8 +429,21 @@ class Merger(object): "primary_ip_family", "tags", "uid_pool", + ] + check_params_strict = [ "volume_group_name", - ) + ] + if constants.ENABLE_FILE_STORAGE: + check_params_strict.append("file_storage_dir") + if constants.ENABLE_SHARED_FILE_STORAGE: + check_params_strict.append("shared_file_storage_dir") + check_params.extend(check_params_strict) + + if self.params == _PARAMS_STRICT: + params_strict = True + else: + params_strict = False + for param_name in check_params: my_param = getattr(my_cluster, param_name) other_param = getattr(other_cluster, param_name) @@ -347,7 +452,8 @@ class Merger(object): " differs to this cluster's value (%s)", other_param, param_name, other_cluster.cluster_name, my_param) - err_count += 1 + if params_strict or param_name in check_params_strict: + err_count += 1 # # Custom checks @@ -371,7 +477,6 @@ class Merger(object): err_count += 1 # Check hypervisor params for hypervisors we care about - # TODO: we probably don't care about all params for a given hypervisor for hyp in my_cluster.enabled_hypervisors: for param in my_cluster.hvparams[hyp]: my_value = my_cluster.hvparams[hyp][param] @@ -382,7 +487,8 @@ class Merger(object): " (%s)", other_value, param, hyp, other_cluster.cluster_name, my_value) - err_count += 1 + if params_strict: + err_count += 1 # Check os hypervisor params for hypervisors we care about for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()): @@ -395,7 +501,8 @@ class Merger(object): " (%s)", other_os_hvp, os_name, hyp, other_cluster.cluster_name, my_os_hvp) - err_count += 1 + if params_strict: + err_count += 1 # # Warnings @@ -436,7 +543,8 @@ class Merger(object): " this cluster's parameters (%s)", osparams, os_, other_cluster.cluster_name, my_cluster.osparams[os_]) - err_count += 1 + if params_strict: + err_count += 1 if err_count: raise errors.ConfigurationError("Cluster config for %s has incompatible" @@ -444,7 +552,7 @@ class Merger(object): other_cluster.cluster_name) # R0201: Method could be a function - def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201 + def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable=R0201 if os_name in cluster.os_hvp: return cluster.os_hvp[os_name].get(hyp, None) else: @@ -456,7 +564,7 @@ class Merger(object): ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts. """ - # pylint: disable-msg=R0201 + # pylint: disable=R0201 logging.info("Node group conflict strategy: %s", self.groups) my_grps = my_config.GetAllNodeGroupsInfo().values() @@ -490,21 +598,21 @@ class Merger(object): grp.name = new_name # User wants to merge conflicting groups - elif self.groups == 'merge': + elif self.groups == _GROUPS_MERGE: for other_grp in conflicts: logging.info("Merging local and remote '%s' groups", other_grp.name) for node_name in other_grp.members[:]: node = other_config.GetNodeInfo(node_name) # Access to a protected member of a client class - # pylint: disable-msg=W0212 + # pylint: disable=W0212 other_config._UnlockedRemoveNodeFromGroup(node) # Access to a protected member of a client class - # pylint: disable-msg=W0212 + # pylint: disable=W0212 my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name) # Access to a protected member of a client class - # pylint: disable-msg=W0212 + # pylint: disable=W0212 my_config._UnlockedAddNodeToGroup(node, my_grp_uuid) node.group = my_grp_uuid # Remove from list of groups to add @@ -515,7 +623,7 @@ class Merger(object): my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID) # R0201: Method could be a function - def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201 + def _StartMasterDaemon(self, no_vote=False): # pylint: disable=R0201 """Starts the local master daemon. @param no_vote: Should the masterd started without voting? default: False @@ -540,12 +648,12 @@ class Merger(object): """ for data in self.merger_data: for node in data.nodes: + logging.info("Readding node %s", node) result = utils.RunCmd(["gnt-node", "add", "--readd", "--no-ssh-key-check", "--force-join", node]) if result.failed: - raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;" - " output: %s" % (node, result.fail_reason, - result.output)) + logging.error("%s failed to be readded. Reason: %s, output: %s", + node, result.fail_reason, result.output) result = utils.RunCmd(["gnt-cluster", "redist-conf"]) if result.failed: @@ -554,7 +662,7 @@ class Merger(object): result.output)) # R0201: Method could be a function - def _StartupAllInstances(self): # pylint: disable-msg=R0201 + def _StartupAllInstances(self): # pylint: disable=R0201 """Starts up all instances (locally). @raise errors.CommandError: If unable to start clusters @@ -568,7 +676,8 @@ class Merger(object): (result.fail_reason, result.output)) # R0201: Method could be a function - def _VerifyCluster(self): # pylint: disable-msg=R0201 + # TODO: make this overridable, for some verify errors + def _VerifyCluster(self): # pylint: disable=R0201 """Runs gnt-cluster verify to verify the health. @raise errors.ProgrammError: If cluster fails on verification @@ -599,15 +708,22 @@ class Merger(object): rbsteps.append("Start all instances again on the merging" " clusters: %(clusters)s") - logging.info("Stopping merging instances (takes a while)") - self._StopMergingInstances() - + if self.stop_instances: + logging.info("Stopping merging instances (takes a while)") + self._StopMergingInstances() + logging.info("Checking that no instances are running on the mergees") + instances_running = self._CheckRunningInstances() + if instances_running: + raise errors.CommandError("Some instances are still running on the" + " mergees") logging.info("Disable watcher") self._DisableWatcher() - logging.info("Stop daemons on merging nodes") - self._StopDaemons() logging.info("Merging config") self._FetchRemoteConfig() + logging.info("Removing master IPs on mergee master nodes") + self._RemoveMasterIps() + logging.info("Stop daemons on merging nodes") + self._StopDaemons() logging.info("Stopping master daemon") self._KillMasterDaemon() @@ -630,8 +746,11 @@ class Merger(object): self._KillMasterDaemon() self._StartMasterDaemon() - logging.info("Starting instances again") - self._StartupAllInstances() + if self.restart == _RESTART_ALL: + logging.info("Starting instances again") + self._StartupAllInstances() + else: + logging.info("Not starting instances again") logging.info("Post cluster verification") self._VerifyCluster() except errors.GenericError, e: @@ -689,16 +808,15 @@ def main(): """ program = os.path.basename(sys.argv[0]) - parser = optparse.OptionParser(usage=("%%prog [--debug|--verbose]" - " [--watcher-pause-period SECONDS]" - " [--groups [%s|%s]]" - " []" % - (_GROUPS_MERGE, _GROUPS_RENAME)), - prog=program) + parser = optparse.OptionParser(usage="%%prog [options...] ", + prog=program) parser.add_option(cli.DEBUG_OPT) parser.add_option(cli.VERBOSE_OPT) parser.add_option(PAUSE_PERIOD_OPT) parser.add_option(GROUPS_OPT) + parser.add_option(RESTART_OPT) + parser.add_option(PARAMS_OPT) + parser.add_option(SKIP_STOP_INSTANCES_OPT) (options, args) = parser.parse_args() @@ -708,7 +826,8 @@ def main(): parser.error("No clusters specified") cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period, - options.groups) + options.groups, options.restart, options.params, + options.stop_instances) try: try: cluster_merger.Setup()