#!/usr/bin/python # # Copyright (C) 2010, 2012 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301, USA. """Tool to merge two or more clusters together. The clusters have to run the same version of Ganeti! """ # pylint: disable=C0103 # C0103: Invalid name cluster-merge import logging import os import optparse import shutil import sys import tempfile from ganeti import cli from ganeti import config from ganeti import constants from ganeti import errors from ganeti import ssh from ganeti import utils from ganeti import pathutils from ganeti import compat _GROUPS_MERGE = "merge" _GROUPS_RENAME = "rename" _CLUSTERMERGE_ECID = "clustermerge-ecid" _RESTART_ALL = "all" _RESTART_UP = "up" _RESTART_NONE = "none" _RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE) _PARAMS_STRICT = "strict" _PARAMS_WARN = "warn" _PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN) PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800, action="store", type="int", dest="pause_period", help=("Amount of time in seconds watcher" " should be suspended from running")) GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY", choices=(_GROUPS_MERGE, _GROUPS_RENAME), dest="groups", help=("How to handle groups that have the" " same name (One of: %s/%s)" % (_GROUPS_MERGE, _GROUPS_RENAME))) PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT, metavar="STRATEGY", choices=_PARAMS_CHOICES, dest="params", help=("How to handle params that have" " different values (One of: %s/%s)" % _PARAMS_CHOICES)) RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL, metavar="STRATEGY", choices=_RESTART_CHOICES, dest="restart", help=("How to handle restarting instances" " same name (One of: %s/%s/%s)" % _RESTART_CHOICES)) SKIP_STOP_INSTANCES_OPT = \ cli.cli_option("--skip-stop-instances", default=True, action="store_false", dest="stop_instances", help=("Don't stop the instances on the clusters, just check " "that none is running")) def Flatten(unflattened_list): """Flattens a list. @param unflattened_list: A list of unflattened list objects. @return: A flattened list """ flattened_list = [] for item in unflattened_list: if isinstance(item, list): flattened_list.extend(Flatten(item)) else: flattened_list.append(item) return flattened_list class MergerData(object): """Container class to hold data used for merger. """ def __init__(self, cluster, key_path, nodes, instances, master_node, config_path=None): """Initialize the container. @param cluster: The name of the cluster @param key_path: Path to the ssh private key used for authentication @param nodes: List of online nodes in the merging cluster @param instances: List of instances running on merging cluster @param master_node: Name of the master node @param config_path: Path to the merging cluster config """ self.cluster = cluster self.key_path = key_path self.nodes = nodes self.instances = instances self.master_node = master_node self.config_path = config_path class Merger(object): """Handling the merge. """ RUNNING_STATUSES = compat.UniqueFrozenset([ constants.INSTST_RUNNING, constants.INSTST_ERRORUP, ]) def __init__(self, clusters, pause_period, groups, restart, params, stop_instances): """Initialize object with sane defaults and infos required. @param clusters: The list of clusters to merge in @param pause_period: The time watcher shall be disabled for @param groups: How to handle group conflicts @param restart: How to handle instance restart @param stop_instances: Indicates whether the instances must be stopped (True) or if the Merger must only check if no instances are running on the mergee clusters (False) """ self.merger_data = [] self.clusters = clusters self.pause_period = pause_period self.work_dir = tempfile.mkdtemp(suffix="cluster-merger") (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"]) self.ssh_runner = ssh.SshRunner(self.cluster_name) self.groups = groups self.restart = restart self.params = params self.stop_instances = stop_instances if self.restart == _RESTART_UP: raise NotImplementedError def Setup(self): """Sets up our end so we can do the merger. This method is setting us up as a preparation for the merger. It makes the initial contact and gathers information needed. @raise errors.RemoteError: for errors in communication/grabbing """ (remote_path, _, _) = ssh.GetUserFiles("root") if self.cluster_name in self.clusters: raise errors.CommandError("Cannot merge cluster %s with itself" % self.cluster_name) # Fetch remotes private key for cluster in self.clusters: result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False, ask_key=False) if result.failed: raise errors.RemoteError("There was an error while grabbing ssh private" " key from %s. Fail reason: %s; output: %s" % (cluster, result.fail_reason, result.output)) key_path = utils.PathJoin(self.work_dir, cluster) utils.WriteFile(key_path, mode=0600, data=result.stdout) result = self._RunCmd(cluster, "gnt-node list -o name,offline" " --no-headers --separator=,", private_key=key_path) if result.failed: raise errors.RemoteError("Unable to retrieve list of nodes from %s." " Fail reason: %s; output: %s" % (cluster, result.fail_reason, result.output)) nodes_statuses = [line.split(",") for line in result.stdout.splitlines()] nodes = [node_status[0] for node_status in nodes_statuses if node_status[1] == "N"] result = self._RunCmd(cluster, "gnt-instance list -o name --no-headers", private_key=key_path) if result.failed: raise errors.RemoteError("Unable to retrieve list of instances from" " %s. Fail reason: %s; output: %s" % (cluster, result.fail_reason, result.output)) instances = result.stdout.splitlines() path = utils.PathJoin(pathutils.DATA_DIR, "ssconf_%s" % constants.SS_MASTER_NODE) result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path) if result.failed: raise errors.RemoteError("Unable to retrieve the master node name from" " %s. Fail reason: %s; output: %s" % (cluster, result.fail_reason, result.output)) master_node = result.stdout.strip() self.merger_data.append(MergerData(cluster, key_path, nodes, instances, master_node)) def _PrepareAuthorizedKeys(self): """Prepare the authorized_keys on every merging node. This method add our public key to remotes authorized_key for further communication. """ (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root") pub_key = utils.ReadFile(pub_key_file) for data in self.merger_data: for node in data.nodes: result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" % (auth_keys, pub_key)), private_key=data.key_path, max_attempts=3) if result.failed: raise errors.RemoteError("Unable to add our public key to %s in %s." " Fail reason: %s; output: %s" % (node, data.cluster, result.fail_reason, result.output)) def _RunCmd(self, hostname, command, user="root", use_cluster_key=False, strict_host_check=False, private_key=None, batch=True, ask_key=False, max_attempts=1): """Wrapping SshRunner.Run with default parameters. For explanation of parameters see L{ganeti.ssh.SshRunner.Run}. """ for _ in range(max_attempts): result = self.ssh_runner.Run(hostname=hostname, command=command, user=user, use_cluster_key=use_cluster_key, strict_host_check=strict_host_check, private_key=private_key, batch=batch, ask_key=ask_key) if not result.failed: break return result def _CheckRunningInstances(self): """Checks if on the clusters to be merged there are running instances @rtype: boolean @return: True if there are running instances, False otherwise """ for cluster in self.clusters: result = self._RunCmd(cluster, "gnt-instance list -o status") if self.RUNNING_STATUSES.intersection(result.output.splitlines()): return True return False def _StopMergingInstances(self): """Stop instances on merging clusters. """ for cluster in self.clusters: result = self._RunCmd(cluster, "gnt-instance shutdown --all" " --force-multiple") if result.failed: raise errors.RemoteError("Unable to stop instances on %s." " Fail reason: %s; output: %s" % (cluster, result.fail_reason, result.output)) def _DisableWatcher(self): """Disable watch on all merging clusters, including ourself. """ for cluster in ["localhost"] + self.clusters: result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" % self.pause_period) if result.failed: raise errors.RemoteError("Unable to pause watcher on %s." " Fail reason: %s; output: %s" % (cluster, result.fail_reason, result.output)) def _RemoveMasterIps(self): """Removes the master IPs from the master nodes of each cluster. """ for data in self.merger_data: result = self._RunCmd(data.master_node, "gnt-cluster deactivate-master-ip --yes") if result.failed: raise errors.RemoteError("Unable to remove master IP on %s." " Fail reason: %s; output: %s" % (data.master_node, result.fail_reason, result.output)) def _StopDaemons(self): """Stop all daemons on merging nodes. """ cmd = "%s stop-all" % pathutils.DAEMON_UTIL for data in self.merger_data: for node in data.nodes: result = self._RunCmd(node, cmd, max_attempts=3) if result.failed: raise errors.RemoteError("Unable to stop daemons on %s." " Fail reason: %s; output: %s." % (node, result.fail_reason, result.output)) def _FetchRemoteConfig(self): """Fetches and stores remote cluster config from the master. This step is needed before we can merge the config. """ for data in self.merger_data: result = self._RunCmd(data.cluster, "cat %s" % pathutils.CLUSTER_CONF_FILE) if result.failed: raise errors.RemoteError("Unable to retrieve remote config on %s." " Fail reason: %s; output %s" % (data.cluster, result.fail_reason, result.output)) data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" % data.cluster) utils.WriteFile(data.config_path, data=result.stdout) # R0201: Method could be a function def _KillMasterDaemon(self): # pylint: disable=R0201 """Kills the local master daemon. @raise errors.CommandError: If unable to kill """ result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"]) if result.failed: raise errors.CommandError("Unable to stop master daemons." " Fail reason: %s; output: %s" % (result.fail_reason, result.output)) def _MergeConfig(self): """Merges all foreign config into our own config. """ my_config = config.ConfigWriter(offline=True) fake_ec_id = 0 # Needs to be uniq over the whole config merge for data in self.merger_data: other_config = config.ConfigWriter(data.config_path, accept_foreign=True) self._MergeClusterConfigs(my_config, other_config) self._MergeNodeGroups(my_config, other_config) for node in other_config.GetNodeList(): node_info = other_config.GetNodeInfo(node) # Offline the node, it will be reonlined later at node readd node_info.master_candidate = False node_info.drained = False node_info.offline = True my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id)) fake_ec_id += 1 for instance in other_config.GetInstanceList(): instance_info = other_config.GetInstanceInfo(instance) # Update the DRBD port assignments # This is a little bit hackish for dsk in instance_info.disks: if dsk.dev_type in constants.LDS_DRBD: port = my_config.AllocatePort() logical_id = list(dsk.logical_id) logical_id[2] = port dsk.logical_id = tuple(logical_id) physical_id = list(dsk.physical_id) physical_id[1] = physical_id[3] = port dsk.physical_id = tuple(physical_id) my_config.AddInstance(instance_info, _CLUSTERMERGE_ECID + str(fake_ec_id)) fake_ec_id += 1 def _MergeClusterConfigs(self, my_config, other_config): """Checks that all relevant cluster parameters are compatible """ my_cluster = my_config.GetClusterInfo() other_cluster = other_config.GetClusterInfo() err_count = 0 # # Generic checks # check_params = [ "beparams", "default_iallocator", "drbd_usermode_helper", "hidden_os", "maintain_node_health", "master_netdev", "ndparams", "nicparams", "primary_ip_family", "tags", "uid_pool", ] check_params_strict = [ "volume_group_name", ] if my_cluster.IsFileStorageEnabled() or \ other_cluster.IsFileStorageEnabled(): check_params_strict.append("file_storage_dir") if my_cluster.IsSharedFileStorageEnabled() or \ other_cluster.IsSharedFileStorageEnabled(): check_params_strict.append("shared_file_storage_dir") check_params.extend(check_params_strict) if self.params == _PARAMS_STRICT: params_strict = True else: params_strict = False for param_name in check_params: my_param = getattr(my_cluster, param_name) other_param = getattr(other_cluster, param_name) if my_param != other_param: logging.error("The value (%s) of the cluster parameter %s on %s" " differs to this cluster's value (%s)", other_param, param_name, other_cluster.cluster_name, my_param) if params_strict or param_name in check_params_strict: err_count += 1 # # Custom checks # # Check default hypervisor my_defhyp = my_cluster.enabled_hypervisors[0] other_defhyp = other_cluster.enabled_hypervisors[0] if my_defhyp != other_defhyp: logging.warning("The default hypervisor (%s) differs on %s, new" " instances will be created with this cluster's" " default hypervisor (%s)", other_defhyp, other_cluster.cluster_name, my_defhyp) if (set(my_cluster.enabled_hypervisors) != set(other_cluster.enabled_hypervisors)): logging.error("The set of enabled hypervisors (%s) on %s differs to" " this cluster's set (%s)", other_cluster.enabled_hypervisors, other_cluster.cluster_name, my_cluster.enabled_hypervisors) err_count += 1 # Check hypervisor params for hypervisors we care about for hyp in my_cluster.enabled_hypervisors: for param in my_cluster.hvparams[hyp]: my_value = my_cluster.hvparams[hyp][param] other_value = other_cluster.hvparams[hyp][param] if my_value != other_value: logging.error("The value (%s) of the %s parameter of the %s" " hypervisor on %s differs to this cluster's parameter" " (%s)", other_value, param, hyp, other_cluster.cluster_name, my_value) if params_strict: err_count += 1 # Check os hypervisor params for hypervisors we care about for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()): for hyp in my_cluster.enabled_hypervisors: my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp) other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp) if my_os_hvp != other_os_hvp: logging.error("The OS parameters (%s) for the %s OS for the %s" " hypervisor on %s differs to this cluster's parameters" " (%s)", other_os_hvp, os_name, hyp, other_cluster.cluster_name, my_os_hvp) if params_strict: err_count += 1 # # Warnings # if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts: logging.warning("The modify_etc_hosts value (%s) differs on %s," " this cluster's value (%s) will take precedence", other_cluster.modify_etc_hosts, other_cluster.cluster_name, my_cluster.modify_etc_hosts) if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup: logging.warning("The modify_ssh_setup value (%s) differs on %s," " this cluster's value (%s) will take precedence", other_cluster.modify_ssh_setup, other_cluster.cluster_name, my_cluster.modify_ssh_setup) # # Actual merging # my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs + other_cluster.reserved_lvs)) if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks: logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this" " cluster's value (%s). The least permissive value (%s)" " will be used", other_cluster.prealloc_wipe_disks, other_cluster.cluster_name, my_cluster.prealloc_wipe_disks, True) my_cluster.prealloc_wipe_disks = True for os_, osparams in other_cluster.osparams.items(): if os_ not in my_cluster.osparams: my_cluster.osparams[os_] = osparams elif my_cluster.osparams[os_] != osparams: logging.error("The OS parameters (%s) for the %s OS on %s differs to" " this cluster's parameters (%s)", osparams, os_, other_cluster.cluster_name, my_cluster.osparams[os_]) if params_strict: err_count += 1 if err_count: raise errors.ConfigurationError("Cluster config for %s has incompatible" " values, please fix and re-run" % other_cluster.cluster_name) # R0201: Method could be a function def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable=R0201 if os_name in cluster.os_hvp: return cluster.os_hvp[os_name].get(hyp, None) else: return None # R0201: Method could be a function def _MergeNodeGroups(self, my_config, other_config): """Adds foreign node groups ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts. """ # pylint: disable=R0201 logging.info("Node group conflict strategy: %s", self.groups) my_grps = my_config.GetAllNodeGroupsInfo().values() other_grps = other_config.GetAllNodeGroupsInfo().values() # Check for node group naming conflicts: conflicts = [] for other_grp in other_grps: for my_grp in my_grps: if other_grp.name == my_grp.name: conflicts.append(other_grp) if conflicts: conflict_names = utils.CommaJoin([g.name for g in conflicts]) logging.info("Node groups in both local and remote cluster: %s", conflict_names) # User hasn't specified how to handle conflicts if not self.groups: raise errors.CommandError("The following node group(s) are in both" " clusters, and no merge strategy has been" " supplied (see the --groups option): %s" % conflict_names) # User wants to rename conflicts elif self.groups == _GROUPS_RENAME: for grp in conflicts: new_name = "%s-%s" % (grp.name, other_config.GetClusterName()) logging.info("Renaming remote node group from %s to %s" " to resolve conflict", grp.name, new_name) grp.name = new_name # User wants to merge conflicting groups elif self.groups == _GROUPS_MERGE: for other_grp in conflicts: logging.info("Merging local and remote '%s' groups", other_grp.name) for node_name in other_grp.members[:]: node = other_config.GetNodeInfo(node_name) # Access to a protected member of a client class # pylint: disable=W0212 other_config._UnlockedRemoveNodeFromGroup(node) # Access to a protected member of a client class # pylint: disable=W0212 my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name) # Access to a protected member of a client class # pylint: disable=W0212 my_config._UnlockedAddNodeToGroup(node, my_grp_uuid) node.group = my_grp_uuid # Remove from list of groups to add other_grps.remove(other_grp) for grp in other_grps: #TODO: handle node group conflicts my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID) # R0201: Method could be a function def _StartMasterDaemon(self, no_vote=False): # pylint: disable=R0201 """Starts the local master daemon. @param no_vote: Should the masterd started without voting? default: False @raise errors.CommandError: If unable to start daemon. """ env = {} if no_vote: env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it" result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env) if result.failed: raise errors.CommandError("Couldn't start ganeti master." " Fail reason: %s; output: %s" % (result.fail_reason, result.output)) def _ReaddMergedNodesAndRedist(self): """Readds all merging nodes and make sure their config is up-to-date. @raise errors.CommandError: If anything fails. """ for data in self.merger_data: for node in data.nodes: logging.info("Readding node %s", node) result = utils.RunCmd(["gnt-node", "add", "--readd", "--no-ssh-key-check", node]) if result.failed: logging.error("%s failed to be readded. Reason: %s, output: %s", node, result.fail_reason, result.output) result = utils.RunCmd(["gnt-cluster", "redist-conf"]) if result.failed: raise errors.CommandError("Redistribution failed. Fail reason: %s;" " output: %s" % (result.fail_reason, result.output)) # R0201: Method could be a function def _StartupAllInstances(self): # pylint: disable=R0201 """Starts up all instances (locally). @raise errors.CommandError: If unable to start clusters """ result = utils.RunCmd(["gnt-instance", "startup", "--all", "--force-multiple"]) if result.failed: raise errors.CommandError("Unable to start all instances." " Fail reason: %s; output: %s" % (result.fail_reason, result.output)) # R0201: Method could be a function # TODO: make this overridable, for some verify errors def _VerifyCluster(self): # pylint: disable=R0201 """Runs gnt-cluster verify to verify the health. @raise errors.ProgrammError: If cluster fails on verification """ result = utils.RunCmd(["gnt-cluster", "verify"]) if result.failed: raise errors.CommandError("Verification of cluster failed." " Fail reason: %s; output: %s" % (result.fail_reason, result.output)) def Merge(self): """Does the actual merge. It runs all the steps in the right order and updates the user about steps taken. Also it keeps track of rollback_steps to undo everything. """ rbsteps = [] try: logging.info("Pre cluster verification") self._VerifyCluster() logging.info("Prepare authorized_keys") rbsteps.append("Remove our key from authorized_keys on nodes:" " %(nodes)s") self._PrepareAuthorizedKeys() rbsteps.append("Start all instances again on the merging" " clusters: %(clusters)s") if self.stop_instances: logging.info("Stopping merging instances (takes a while)") self._StopMergingInstances() logging.info("Checking that no instances are running on the mergees") instances_running = self._CheckRunningInstances() if instances_running: raise errors.CommandError("Some instances are still running on the" " mergees") logging.info("Disable watcher") self._DisableWatcher() logging.info("Merging config") self._FetchRemoteConfig() logging.info("Removing master IPs on mergee master nodes") self._RemoveMasterIps() logging.info("Stop daemons on merging nodes") self._StopDaemons() logging.info("Stopping master daemon") self._KillMasterDaemon() rbsteps.append("Restore %s from another master candidate" " and restart master daemon" % pathutils.CLUSTER_CONF_FILE) self._MergeConfig() self._StartMasterDaemon(no_vote=True) # Point of no return, delete rbsteps del rbsteps[:] logging.warning("We are at the point of no return. Merge can not easily" " be undone after this point.") logging.info("Readd nodes") self._ReaddMergedNodesAndRedist() logging.info("Merge done, restart master daemon normally") self._KillMasterDaemon() self._StartMasterDaemon() if self.restart == _RESTART_ALL: logging.info("Starting instances again") self._StartupAllInstances() else: logging.info("Not starting instances again") logging.info("Post cluster verification") self._VerifyCluster() except errors.GenericError, e: logging.exception(e) if rbsteps: nodes = Flatten([data.nodes for data in self.merger_data]) info = { "clusters": self.clusters, "nodes": nodes, } logging.critical("In order to rollback do the following:") for step in rbsteps: logging.critical(" * %s", step % info) else: logging.critical("Nothing to rollback.") # TODO: Keep track of steps done for a flawless resume? def Cleanup(self): """Clean up our environment. This cleans up remote private keys and configs and after that deletes the temporary directory. """ shutil.rmtree(self.work_dir) def main(): """Main routine. """ program = os.path.basename(sys.argv[0]) parser = optparse.OptionParser(usage="%%prog [options...] ", prog=program) parser.add_option(cli.DEBUG_OPT) parser.add_option(cli.VERBOSE_OPT) parser.add_option(PAUSE_PERIOD_OPT) parser.add_option(GROUPS_OPT) parser.add_option(RESTART_OPT) parser.add_option(PARAMS_OPT) parser.add_option(SKIP_STOP_INSTANCES_OPT) (options, args) = parser.parse_args() utils.SetupToolLogging(options.debug, options.verbose) if not args: parser.error("No clusters specified") cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period, options.groups, options.restart, options.params, options.stop_instances) try: try: cluster_merger.Setup() cluster_merger.Merge() except errors.GenericError, e: logging.exception(e) return constants.EXIT_FAILURE finally: cluster_merger.Cleanup() return constants.EXIT_SUCCESS if __name__ == "__main__": sys.exit(main())