X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/3d8f154f1863291dfd14793f45515e8996e25563..4a78c361a6de3bcbf98f02abfe41ae3b11de2b00:/tools/cluster-merge diff --git a/tools/cluster-merge b/tools/cluster-merge index 593693d..f94a32f 100755 --- a/tools/cluster-merge +++ b/tools/cluster-merge @@ -24,7 +24,7 @@ The clusters have to run the same version of Ganeti! """ -# pylint: disable-msg=C0103 +# pylint: disable=C0103 # C0103: Invalid name cluster-merge import logging @@ -42,12 +42,50 @@ from ganeti import ssh from ganeti import utils +_GROUPS_MERGE = "merge" +_GROUPS_RENAME = "rename" +_CLUSTERMERGE_ECID = "clustermerge-ecid" +_RESTART_ALL = "all" +_RESTART_UP = "up" +_RESTART_NONE = "none" +_RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE) +_PARAMS_STRICT = "strict" +_PARAMS_WARN = "warn" +_PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN) + + PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800, action="store", type="int", dest="pause_period", help=("Amount of time in seconds watcher" " should be suspended from running")) -_CLUSTERMERGE_ECID = "clustermerge-ecid" +GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY", + choices=(_GROUPS_MERGE, _GROUPS_RENAME), + dest="groups", + help=("How to handle groups that have the" + " same name (One of: %s/%s)" % + (_GROUPS_MERGE, _GROUPS_RENAME))) +PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT, + metavar="STRATEGY", + choices=_PARAMS_CHOICES, + dest="params", + help=("How to handle params that have" + " different values (One of: %s/%s)" % + _PARAMS_CHOICES)) + +RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL, + metavar="STRATEGY", + choices=_RESTART_CHOICES, + dest="restart", + help=("How to handle restarting instances" + " same name (One of: %s/%s/%s)" % + _RESTART_CHOICES)) + +SKIP_STOP_INSTANCES_OPT = \ + cli.cli_option("--skip-stop-instances", default=True, action="store_false", + dest="stop_instances", + help=("Don't stop the instances on the clusters, just check " + "that none is running")) def Flatten(unflattened_list): @@ -71,13 +109,15 @@ class MergerData(object): """Container class to hold data used for merger. """ - def __init__(self, cluster, key_path, nodes, instances, config_path=None): + def __init__(self, cluster, key_path, nodes, instances, master_node, + config_path=None): """Initialize the container. @param cluster: The name of the cluster @param key_path: Path to the ssh private key used for authentication - @param nodes: List of nodes in the merging cluster + @param nodes: List of online nodes in the merging cluster @param instances: List of instances running on merging cluster + @param master_node: Name of the master node @param config_path: Path to the merging cluster config """ @@ -85,6 +125,7 @@ class MergerData(object): self.key_path = key_path self.nodes = nodes self.instances = instances + self.master_node = master_node self.config_path = config_path @@ -92,11 +133,22 @@ class Merger(object): """Handling the merge. """ - def __init__(self, clusters, pause_period): + RUNNING_STATUSES = frozenset([ + constants.INSTST_RUNNING, + constants.INSTST_ERRORUP, + ]) + + def __init__(self, clusters, pause_period, groups, restart, params, + stop_instances): """Initialize object with sane defaults and infos required. @param clusters: The list of clusters to merge in @param pause_period: The time watcher shall be disabled for + @param groups: How to handle group conflicts + @param restart: How to handle instance restart + @param stop_instances: Indicates whether the instances must be stopped + (True) or if the Merger must only check if no + instances are running on the mergee clusters (False) """ self.merger_data = [] @@ -105,6 +157,12 @@ class Merger(object): self.work_dir = tempfile.mkdtemp(suffix="cluster-merger") (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"]) self.ssh_runner = ssh.SshRunner(self.cluster_name) + self.groups = groups + self.restart = restart + self.params = params + self.stop_instances = stop_instances + if self.restart == _RESTART_UP: + raise NotImplementedError def Setup(self): """Sets up our end so we can do the merger. @@ -133,15 +191,17 @@ class Merger(object): key_path = utils.PathJoin(self.work_dir, cluster) utils.WriteFile(key_path, mode=0600, data=result.stdout) - result = self._RunCmd(cluster, "gnt-node list -o name --no-header", - private_key=key_path) + result = self._RunCmd(cluster, "gnt-node list -o name,offline" + " --no-headers --separator=,", private_key=key_path) if result.failed: raise errors.RemoteError("Unable to retrieve list of nodes from %s." " Fail reason: %s; output: %s" % (cluster, result.fail_reason, result.output)) - nodes = result.stdout.splitlines() + nodes_statuses = [line.split(",") for line in result.stdout.splitlines()] + nodes = [node_status[0] for node_status in nodes_statuses + if node_status[1] == "N"] - result = self._RunCmd(cluster, "gnt-instance list -o name --no-header", + result = self._RunCmd(cluster, "gnt-instance list -o name --no-headers", private_key=key_path) if result.failed: raise errors.RemoteError("Unable to retrieve list of instances from" @@ -149,7 +209,17 @@ class Merger(object): (cluster, result.fail_reason, result.output)) instances = result.stdout.splitlines() - self.merger_data.append(MergerData(cluster, key_path, nodes, instances)) + path = utils.PathJoin(constants.DATA_DIR, "ssconf_%s" % + constants.SS_MASTER_NODE) + result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path) + if result.failed: + raise errors.RemoteError("Unable to retrieve the master node name from" + " %s. Fail reason: %s; output: %s" % + (cluster, result.fail_reason, result.output)) + master_node = result.stdout.strip() + + self.merger_data.append(MergerData(cluster, key_path, nodes, instances, + master_node)) def _PrepareAuthorizedKeys(self): """Prepare the authorized_keys on every merging node. @@ -165,7 +235,7 @@ class Merger(object): for node in data.nodes: result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" % (auth_keys, pub_key)), - private_key=data.key_path) + private_key=data.key_path, max_attempts=3) if result.failed: raise errors.RemoteError("Unable to add our public key to %s in %s." @@ -175,17 +245,36 @@ class Merger(object): def _RunCmd(self, hostname, command, user="root", use_cluster_key=False, strict_host_check=False, private_key=None, batch=True, - ask_key=False): + ask_key=False, max_attempts=1): """Wrapping SshRunner.Run with default parameters. For explanation of parameters see L{ganeti.ssh.SshRunner.Run}. """ - return self.ssh_runner.Run(hostname=hostname, command=command, user=user, - use_cluster_key=use_cluster_key, - strict_host_check=strict_host_check, - private_key=private_key, batch=batch, - ask_key=ask_key) + for _ in range(max_attempts): + result = self.ssh_runner.Run(hostname=hostname, command=command, + user=user, use_cluster_key=use_cluster_key, + strict_host_check=strict_host_check, + private_key=private_key, batch=batch, + ask_key=ask_key) + if not result.failed: + break + + return result + + def _CheckRunningInstances(self): + """Checks if on the clusters to be merged there are running instances + + @rtype: boolean + @return: True if there are running instances, False otherwise + + """ + for cluster in self.clusters: + result = self._RunCmd(cluster, "gnt-instance list -o status") + if self.RUNNING_STATUSES.intersection(result.output.splitlines()): + return True + + return False def _StopMergingInstances(self): """Stop instances on merging clusters. @@ -213,6 +302,21 @@ class Merger(object): " Fail reason: %s; output: %s" % (cluster, result.fail_reason, result.output)) + def _RemoveMasterIps(self): + """Removes the master IPs from the master nodes of each cluster. + + """ + for data in self.merger_data: + result = self._RunCmd(data.master_node, + "gnt-cluster deactivate-master-ip --yes") + + if result.failed: + raise errors.RemoteError("Unable to remove master IP on %s." + " Fail reason: %s; output: %s" % + (data.master_node, + result.fail_reason, + result.output)) + def _StopDaemons(self): """Stop all daemons on merging nodes. @@ -220,7 +324,7 @@ class Merger(object): cmd = "%s stop-all" % constants.DAEMON_UTIL for data in self.merger_data: for node in data.nodes: - result = self._RunCmd(node, cmd) + result = self._RunCmd(node, cmd, max_attempts=3) if result.failed: raise errors.RemoteError("Unable to stop daemons on %s." @@ -248,7 +352,7 @@ class Merger(object): utils.WriteFile(data.config_path, data=result.stdout) # R0201: Method could be a function - def _KillMasterDaemon(self): # pylint: disable-msg=R0201 + def _KillMasterDaemon(self): # pylint: disable=R0201 """Kills the local master daemon. @raise errors.CommandError: If unable to kill @@ -269,12 +373,16 @@ class Merger(object): for data in self.merger_data: other_config = config.ConfigWriter(data.config_path, accept_foreign=True) + self._MergeClusterConfigs(my_config, other_config) self._MergeNodeGroups(my_config, other_config) for node in other_config.GetNodeList(): node_info = other_config.GetNodeInfo(node) + # Offline the node, it will be reonlined later at node readd node_info.master_candidate = False - my_config.AddNode(node_info, str(fake_ec_id)) + node_info.drained = False + node_info.offline = True + my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id)) fake_ec_id += 1 for instance in other_config.GetInstanceList(): @@ -294,22 +402,228 @@ class Merger(object): physical_id[1] = physical_id[3] = port dsk.physical_id = tuple(physical_id) - my_config.AddInstance(instance_info, str(fake_ec_id)) + my_config.AddInstance(instance_info, + _CLUSTERMERGE_ECID + str(fake_ec_id)) fake_ec_id += 1 + def _MergeClusterConfigs(self, my_config, other_config): + """Checks that all relevant cluster parameters are compatible + + """ + my_cluster = my_config.GetClusterInfo() + other_cluster = other_config.GetClusterInfo() + err_count = 0 + + # + # Generic checks + # + check_params = [ + "beparams", + "default_iallocator", + "drbd_usermode_helper", + "hidden_os", + "maintain_node_health", + "master_netdev", + "ndparams", + "nicparams", + "primary_ip_family", + "tags", + "uid_pool", + ] + check_params_strict = [ + "volume_group_name", + ] + if constants.ENABLE_FILE_STORAGE: + check_params_strict.append("file_storage_dir") + if constants.ENABLE_SHARED_FILE_STORAGE: + check_params_strict.append("shared_file_storage_dir") + check_params.extend(check_params_strict) + + if self.params == _PARAMS_STRICT: + params_strict = True + else: + params_strict = False + + for param_name in check_params: + my_param = getattr(my_cluster, param_name) + other_param = getattr(other_cluster, param_name) + if my_param != other_param: + logging.error("The value (%s) of the cluster parameter %s on %s" + " differs to this cluster's value (%s)", + other_param, param_name, other_cluster.cluster_name, + my_param) + if params_strict or param_name in check_params_strict: + err_count += 1 + + # + # Custom checks + # + + # Check default hypervisor + my_defhyp = my_cluster.enabled_hypervisors[0] + other_defhyp = other_cluster.enabled_hypervisors[0] + if my_defhyp != other_defhyp: + logging.warning("The default hypervisor (%s) differs on %s, new" + " instances will be created with this cluster's" + " default hypervisor (%s)", other_defhyp, + other_cluster.cluster_name, my_defhyp) + + if (set(my_cluster.enabled_hypervisors) != + set(other_cluster.enabled_hypervisors)): + logging.error("The set of enabled hypervisors (%s) on %s differs to" + " this cluster's set (%s)", + other_cluster.enabled_hypervisors, + other_cluster.cluster_name, my_cluster.enabled_hypervisors) + err_count += 1 + + # Check hypervisor params for hypervisors we care about + for hyp in my_cluster.enabled_hypervisors: + for param in my_cluster.hvparams[hyp]: + my_value = my_cluster.hvparams[hyp][param] + other_value = other_cluster.hvparams[hyp][param] + if my_value != other_value: + logging.error("The value (%s) of the %s parameter of the %s" + " hypervisor on %s differs to this cluster's parameter" + " (%s)", + other_value, param, hyp, other_cluster.cluster_name, + my_value) + if params_strict: + err_count += 1 + + # Check os hypervisor params for hypervisors we care about + for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()): + for hyp in my_cluster.enabled_hypervisors: + my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp) + other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp) + if my_os_hvp != other_os_hvp: + logging.error("The OS parameters (%s) for the %s OS for the %s" + " hypervisor on %s differs to this cluster's parameters" + " (%s)", + other_os_hvp, os_name, hyp, other_cluster.cluster_name, + my_os_hvp) + if params_strict: + err_count += 1 + + # + # Warnings + # + if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts: + logging.warning("The modify_etc_hosts value (%s) differs on %s," + " this cluster's value (%s) will take precedence", + other_cluster.modify_etc_hosts, + other_cluster.cluster_name, + my_cluster.modify_etc_hosts) + + if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup: + logging.warning("The modify_ssh_setup value (%s) differs on %s," + " this cluster's value (%s) will take precedence", + other_cluster.modify_ssh_setup, + other_cluster.cluster_name, + my_cluster.modify_ssh_setup) + + # + # Actual merging + # + my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs + + other_cluster.reserved_lvs)) + + if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks: + logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this" + " cluster's value (%s). The least permissive value (%s)" + " will be used", other_cluster.prealloc_wipe_disks, + other_cluster.cluster_name, + my_cluster.prealloc_wipe_disks, True) + my_cluster.prealloc_wipe_disks = True + + for os_, osparams in other_cluster.osparams.items(): + if os_ not in my_cluster.osparams: + my_cluster.osparams[os_] = osparams + elif my_cluster.osparams[os_] != osparams: + logging.error("The OS parameters (%s) for the %s OS on %s differs to" + " this cluster's parameters (%s)", + osparams, os_, other_cluster.cluster_name, + my_cluster.osparams[os_]) + if params_strict: + err_count += 1 + + if err_count: + raise errors.ConfigurationError("Cluster config for %s has incompatible" + " values, please fix and re-run" % + other_cluster.cluster_name) + + # R0201: Method could be a function + def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable=R0201 + if os_name in cluster.os_hvp: + return cluster.os_hvp[os_name].get(hyp, None) + else: + return None + # R0201: Method could be a function def _MergeNodeGroups(self, my_config, other_config): """Adds foreign node groups ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts. """ - # pylint: disable-msg=R0201 - for grp in other_config.GetAllNodeGroupsInfo().values(): + # pylint: disable=R0201 + logging.info("Node group conflict strategy: %s", self.groups) + + my_grps = my_config.GetAllNodeGroupsInfo().values() + other_grps = other_config.GetAllNodeGroupsInfo().values() + + # Check for node group naming conflicts: + conflicts = [] + for other_grp in other_grps: + for my_grp in my_grps: + if other_grp.name == my_grp.name: + conflicts.append(other_grp) + + if conflicts: + conflict_names = utils.CommaJoin([g.name for g in conflicts]) + logging.info("Node groups in both local and remote cluster: %s", + conflict_names) + + # User hasn't specified how to handle conflicts + if not self.groups: + raise errors.CommandError("The following node group(s) are in both" + " clusters, and no merge strategy has been" + " supplied (see the --groups option): %s" % + conflict_names) + + # User wants to rename conflicts + elif self.groups == _GROUPS_RENAME: + for grp in conflicts: + new_name = "%s-%s" % (grp.name, other_config.GetClusterName()) + logging.info("Renaming remote node group from %s to %s" + " to resolve conflict", grp.name, new_name) + grp.name = new_name + + # User wants to merge conflicting groups + elif self.groups == _GROUPS_MERGE: + for other_grp in conflicts: + logging.info("Merging local and remote '%s' groups", other_grp.name) + for node_name in other_grp.members[:]: + node = other_config.GetNodeInfo(node_name) + # Access to a protected member of a client class + # pylint: disable=W0212 + other_config._UnlockedRemoveNodeFromGroup(node) + + # Access to a protected member of a client class + # pylint: disable=W0212 + my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name) + + # Access to a protected member of a client class + # pylint: disable=W0212 + my_config._UnlockedAddNodeToGroup(node, my_grp_uuid) + node.group = my_grp_uuid + # Remove from list of groups to add + other_grps.remove(other_grp) + + for grp in other_grps: #TODO: handle node group conflicts my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID) # R0201: Method could be a function - def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201 + def _StartMasterDaemon(self, no_vote=False): # pylint: disable=R0201 """Starts the local master daemon. @param no_vote: Should the masterd started without voting? default: False @@ -334,12 +648,12 @@ class Merger(object): """ for data in self.merger_data: for node in data.nodes: + logging.info("Readding node %s", node) result = utils.RunCmd(["gnt-node", "add", "--readd", "--no-ssh-key-check", "--force-join", node]) if result.failed: - raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;" - " output: %s" % (node, result.fail_reason, - result.output)) + logging.error("%s failed to be readded. Reason: %s, output: %s", + node, result.fail_reason, result.output) result = utils.RunCmd(["gnt-cluster", "redist-conf"]) if result.failed: @@ -348,7 +662,7 @@ class Merger(object): result.output)) # R0201: Method could be a function - def _StartupAllInstances(self): # pylint: disable-msg=R0201 + def _StartupAllInstances(self): # pylint: disable=R0201 """Starts up all instances (locally). @raise errors.CommandError: If unable to start clusters @@ -362,7 +676,8 @@ class Merger(object): (result.fail_reason, result.output)) # R0201: Method could be a function - def _VerifyCluster(self): # pylint: disable-msg=R0201 + # TODO: make this overridable, for some verify errors + def _VerifyCluster(self): # pylint: disable=R0201 """Runs gnt-cluster verify to verify the health. @raise errors.ProgrammError: If cluster fails on verification @@ -393,15 +708,22 @@ class Merger(object): rbsteps.append("Start all instances again on the merging" " clusters: %(clusters)s") - logging.info("Stopping merging instances (takes a while)") - self._StopMergingInstances() - + if self.stop_instances: + logging.info("Stopping merging instances (takes a while)") + self._StopMergingInstances() + logging.info("Checking that no instances are running on the mergees") + instances_running = self._CheckRunningInstances() + if instances_running: + raise errors.CommandError("Some instances are still running on the" + " mergees") logging.info("Disable watcher") self._DisableWatcher() - logging.info("Stop daemons on merging nodes") - self._StopDaemons() logging.info("Merging config") self._FetchRemoteConfig() + logging.info("Removing master IPs on mergee master nodes") + self._RemoveMasterIps() + logging.info("Stop daemons on merging nodes") + self._StopDaemons() logging.info("Stopping master daemon") self._KillMasterDaemon() @@ -424,8 +746,11 @@ class Merger(object): self._KillMasterDaemon() self._StartMasterDaemon() - logging.info("Starting instances again") - self._StartupAllInstances() + if self.restart == _RESTART_ALL: + logging.info("Starting instances again") + self._StartupAllInstances() + else: + logging.info("Not starting instances again") logging.info("Post cluster verification") self._VerifyCluster() except errors.GenericError, e: @@ -470,7 +795,7 @@ def SetupLogging(options): elif options.verbose: stderr_handler.setLevel(logging.INFO) else: - stderr_handler.setLevel(logging.ERROR) + stderr_handler.setLevel(logging.WARNING) root_logger = logging.getLogger("") root_logger.setLevel(logging.NOTSET) @@ -483,13 +808,15 @@ def main(): """ program = os.path.basename(sys.argv[0]) - parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]" - " [--watcher-pause-period SECONDS]" - " "), - prog=program) + parser = optparse.OptionParser(usage="%%prog [options...] ", + prog=program) parser.add_option(cli.DEBUG_OPT) parser.add_option(cli.VERBOSE_OPT) parser.add_option(PAUSE_PERIOD_OPT) + parser.add_option(GROUPS_OPT) + parser.add_option(RESTART_OPT) + parser.add_option(PARAMS_OPT) + parser.add_option(SKIP_STOP_INSTANCES_OPT) (options, args) = parser.parse_args() @@ -498,7 +825,9 @@ def main(): if not args: parser.error("No clusters specified") - cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period) + cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period, + options.groups, options.restart, options.params, + options.stop_instances) try: try: cluster_merger.Setup()