X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/1fcd3b81456ba1bd1a10cd6e60f2f5e51d06bf53..4a78c361a6de3bcbf98f02abfe41ae3b11de2b00:/tools/cluster-merge diff --git a/tools/cluster-merge b/tools/cluster-merge index 9b62f6f..f94a32f 100755 --- a/tools/cluster-merge +++ b/tools/cluster-merge @@ -24,7 +24,7 @@ The clusters have to run the same version of Ganeti! """ -# pylint: disable-msg=C0103 +# pylint: disable=C0103 # C0103: Invalid name cluster-merge import logging @@ -81,6 +81,12 @@ RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL, " same name (One of: %s/%s/%s)" % _RESTART_CHOICES)) +SKIP_STOP_INSTANCES_OPT = \ + cli.cli_option("--skip-stop-instances", default=True, action="store_false", + dest="stop_instances", + help=("Don't stop the instances on the clusters, just check " + "that none is running")) + def Flatten(unflattened_list): """Flattens a list. @@ -103,13 +109,15 @@ class MergerData(object): """Container class to hold data used for merger. """ - def __init__(self, cluster, key_path, nodes, instances, config_path=None): + def __init__(self, cluster, key_path, nodes, instances, master_node, + config_path=None): """Initialize the container. @param cluster: The name of the cluster @param key_path: Path to the ssh private key used for authentication @param nodes: List of online nodes in the merging cluster @param instances: List of instances running on merging cluster + @param master_node: Name of the master node @param config_path: Path to the merging cluster config """ @@ -117,6 +125,7 @@ class MergerData(object): self.key_path = key_path self.nodes = nodes self.instances = instances + self.master_node = master_node self.config_path = config_path @@ -124,13 +133,22 @@ class Merger(object): """Handling the merge. """ - def __init__(self, clusters, pause_period, groups, restart, params): + RUNNING_STATUSES = frozenset([ + constants.INSTST_RUNNING, + constants.INSTST_ERRORUP, + ]) + + def __init__(self, clusters, pause_period, groups, restart, params, + stop_instances): """Initialize object with sane defaults and infos required. @param clusters: The list of clusters to merge in @param pause_period: The time watcher shall be disabled for @param groups: How to handle group conflicts @param restart: How to handle instance restart + @param stop_instances: Indicates whether the instances must be stopped + (True) or if the Merger must only check if no + instances are running on the mergee clusters (False) """ self.merger_data = [] @@ -142,10 +160,10 @@ class Merger(object): self.groups = groups self.restart = restart self.params = params + self.stop_instances = stop_instances if self.restart == _RESTART_UP: raise NotImplementedError - def Setup(self): """Sets up our end so we can do the merger. @@ -174,16 +192,16 @@ class Merger(object): utils.WriteFile(key_path, mode=0600, data=result.stdout) result = self._RunCmd(cluster, "gnt-node list -o name,offline" - " --no-header --separator=,", private_key=key_path) + " --no-headers --separator=,", private_key=key_path) if result.failed: raise errors.RemoteError("Unable to retrieve list of nodes from %s." " Fail reason: %s; output: %s" % (cluster, result.fail_reason, result.output)) - nodes_statuses = [line.split(',') for line in result.stdout.splitlines()] + nodes_statuses = [line.split(",") for line in result.stdout.splitlines()] nodes = [node_status[0] for node_status in nodes_statuses if node_status[1] == "N"] - result = self._RunCmd(cluster, "gnt-instance list -o name --no-header", + result = self._RunCmd(cluster, "gnt-instance list -o name --no-headers", private_key=key_path) if result.failed: raise errors.RemoteError("Unable to retrieve list of instances from" @@ -191,7 +209,17 @@ class Merger(object): (cluster, result.fail_reason, result.output)) instances = result.stdout.splitlines() - self.merger_data.append(MergerData(cluster, key_path, nodes, instances)) + path = utils.PathJoin(constants.DATA_DIR, "ssconf_%s" % + constants.SS_MASTER_NODE) + result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path) + if result.failed: + raise errors.RemoteError("Unable to retrieve the master node name from" + " %s. Fail reason: %s; output: %s" % + (cluster, result.fail_reason, result.output)) + master_node = result.stdout.strip() + + self.merger_data.append(MergerData(cluster, key_path, nodes, instances, + master_node)) def _PrepareAuthorizedKeys(self): """Prepare the authorized_keys on every merging node. @@ -207,7 +235,7 @@ class Merger(object): for node in data.nodes: result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" % (auth_keys, pub_key)), - private_key=data.key_path) + private_key=data.key_path, max_attempts=3) if result.failed: raise errors.RemoteError("Unable to add our public key to %s in %s." @@ -217,17 +245,36 @@ class Merger(object): def _RunCmd(self, hostname, command, user="root", use_cluster_key=False, strict_host_check=False, private_key=None, batch=True, - ask_key=False): + ask_key=False, max_attempts=1): """Wrapping SshRunner.Run with default parameters. For explanation of parameters see L{ganeti.ssh.SshRunner.Run}. """ - return self.ssh_runner.Run(hostname=hostname, command=command, user=user, - use_cluster_key=use_cluster_key, - strict_host_check=strict_host_check, - private_key=private_key, batch=batch, - ask_key=ask_key) + for _ in range(max_attempts): + result = self.ssh_runner.Run(hostname=hostname, command=command, + user=user, use_cluster_key=use_cluster_key, + strict_host_check=strict_host_check, + private_key=private_key, batch=batch, + ask_key=ask_key) + if not result.failed: + break + + return result + + def _CheckRunningInstances(self): + """Checks if on the clusters to be merged there are running instances + + @rtype: boolean + @return: True if there are running instances, False otherwise + + """ + for cluster in self.clusters: + result = self._RunCmd(cluster, "gnt-instance list -o status") + if self.RUNNING_STATUSES.intersection(result.output.splitlines()): + return True + + return False def _StopMergingInstances(self): """Stop instances on merging clusters. @@ -255,6 +302,21 @@ class Merger(object): " Fail reason: %s; output: %s" % (cluster, result.fail_reason, result.output)) + def _RemoveMasterIps(self): + """Removes the master IPs from the master nodes of each cluster. + + """ + for data in self.merger_data: + result = self._RunCmd(data.master_node, + "gnt-cluster deactivate-master-ip --yes") + + if result.failed: + raise errors.RemoteError("Unable to remove master IP on %s." + " Fail reason: %s; output: %s" % + (data.master_node, + result.fail_reason, + result.output)) + def _StopDaemons(self): """Stop all daemons on merging nodes. @@ -262,7 +324,7 @@ class Merger(object): cmd = "%s stop-all" % constants.DAEMON_UTIL for data in self.merger_data: for node in data.nodes: - result = self._RunCmd(node, cmd) + result = self._RunCmd(node, cmd, max_attempts=3) if result.failed: raise errors.RemoteError("Unable to stop daemons on %s." @@ -290,7 +352,7 @@ class Merger(object): utils.WriteFile(data.config_path, data=result.stdout) # R0201: Method could be a function - def _KillMasterDaemon(self): # pylint: disable-msg=R0201 + def _KillMasterDaemon(self): # pylint: disable=R0201 """Kills the local master daemon. @raise errors.CommandError: If unable to kill @@ -316,6 +378,10 @@ class Merger(object): for node in other_config.GetNodeList(): node_info = other_config.GetNodeInfo(node) + # Offline the node, it will be reonlined later at node readd + node_info.master_candidate = False + node_info.drained = False + node_info.offline = True my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id)) fake_ec_id += 1 @@ -486,7 +552,7 @@ class Merger(object): other_cluster.cluster_name) # R0201: Method could be a function - def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201 + def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable=R0201 if os_name in cluster.os_hvp: return cluster.os_hvp[os_name].get(hyp, None) else: @@ -498,7 +564,7 @@ class Merger(object): ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts. """ - # pylint: disable-msg=R0201 + # pylint: disable=R0201 logging.info("Node group conflict strategy: %s", self.groups) my_grps = my_config.GetAllNodeGroupsInfo().values() @@ -538,15 +604,15 @@ class Merger(object): for node_name in other_grp.members[:]: node = other_config.GetNodeInfo(node_name) # Access to a protected member of a client class - # pylint: disable-msg=W0212 + # pylint: disable=W0212 other_config._UnlockedRemoveNodeFromGroup(node) # Access to a protected member of a client class - # pylint: disable-msg=W0212 + # pylint: disable=W0212 my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name) # Access to a protected member of a client class - # pylint: disable-msg=W0212 + # pylint: disable=W0212 my_config._UnlockedAddNodeToGroup(node, my_grp_uuid) node.group = my_grp_uuid # Remove from list of groups to add @@ -557,7 +623,7 @@ class Merger(object): my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID) # R0201: Method could be a function - def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201 + def _StartMasterDaemon(self, no_vote=False): # pylint: disable=R0201 """Starts the local master daemon. @param no_vote: Should the masterd started without voting? default: False @@ -582,12 +648,12 @@ class Merger(object): """ for data in self.merger_data: for node in data.nodes: + logging.info("Readding node %s", node) result = utils.RunCmd(["gnt-node", "add", "--readd", "--no-ssh-key-check", "--force-join", node]) if result.failed: - raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;" - " output: %s" % (node, result.fail_reason, - result.output)) + logging.error("%s failed to be readded. Reason: %s, output: %s", + node, result.fail_reason, result.output) result = utils.RunCmd(["gnt-cluster", "redist-conf"]) if result.failed: @@ -596,7 +662,7 @@ class Merger(object): result.output)) # R0201: Method could be a function - def _StartupAllInstances(self): # pylint: disable-msg=R0201 + def _StartupAllInstances(self): # pylint: disable=R0201 """Starts up all instances (locally). @raise errors.CommandError: If unable to start clusters @@ -610,7 +676,8 @@ class Merger(object): (result.fail_reason, result.output)) # R0201: Method could be a function - def _VerifyCluster(self): # pylint: disable-msg=R0201 + # TODO: make this overridable, for some verify errors + def _VerifyCluster(self): # pylint: disable=R0201 """Runs gnt-cluster verify to verify the health. @raise errors.ProgrammError: If cluster fails on verification @@ -641,15 +708,22 @@ class Merger(object): rbsteps.append("Start all instances again on the merging" " clusters: %(clusters)s") - logging.info("Stopping merging instances (takes a while)") - self._StopMergingInstances() - + if self.stop_instances: + logging.info("Stopping merging instances (takes a while)") + self._StopMergingInstances() + logging.info("Checking that no instances are running on the mergees") + instances_running = self._CheckRunningInstances() + if instances_running: + raise errors.CommandError("Some instances are still running on the" + " mergees") logging.info("Disable watcher") self._DisableWatcher() - logging.info("Stop daemons on merging nodes") - self._StopDaemons() logging.info("Merging config") self._FetchRemoteConfig() + logging.info("Removing master IPs on mergee master nodes") + self._RemoveMasterIps() + logging.info("Stop daemons on merging nodes") + self._StopDaemons() logging.info("Stopping master daemon") self._KillMasterDaemon() @@ -742,6 +816,7 @@ def main(): parser.add_option(GROUPS_OPT) parser.add_option(RESTART_OPT) parser.add_option(PARAMS_OPT) + parser.add_option(SKIP_STOP_INSTANCES_OPT) (options, args) = parser.parse_args() @@ -751,7 +826,8 @@ def main(): parser.error("No clusters specified") cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period, - options.groups, options.restart, options.params) + options.groups, options.restart, options.params, + options.stop_instances) try: try: cluster_merger.Setup()