"""
-# pylint: disable-msg=C0103
+# pylint: disable=C0103
# C0103: Invalid name cluster-merge
import logging
_GROUPS_MERGE = "merge"
_GROUPS_RENAME = "rename"
_CLUSTERMERGE_ECID = "clustermerge-ecid"
+_RESTART_ALL = "all"
+_RESTART_UP = "up"
+_RESTART_NONE = "none"
+_RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
+_PARAMS_STRICT = "strict"
+_PARAMS_WARN = "warn"
+_PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
+
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
action="store", type="int",
help=("How to handle groups that have the"
" same name (One of: %s/%s)" %
(_GROUPS_MERGE, _GROUPS_RENAME)))
+PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
+ metavar="STRATEGY",
+ choices=_PARAMS_CHOICES,
+ dest="params",
+ help=("How to handle params that have"
+ " different values (One of: %s/%s)" %
+ _PARAMS_CHOICES))
+
+RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
+ metavar="STRATEGY",
+ choices=_RESTART_CHOICES,
+ dest="restart",
+ help=("How to handle restarting instances"
+ " same name (One of: %s/%s/%s)" %
+ _RESTART_CHOICES))
+
+SKIP_STOP_INSTANCES_OPT = \
+ cli.cli_option("--skip-stop-instances", default=True, action="store_false",
+ dest="stop_instances",
+ help=("Don't stop the instances on the clusters, just check "
+ "that none is running"))
def Flatten(unflattened_list):
"""Container class to hold data used for merger.
"""
- def __init__(self, cluster, key_path, nodes, instances, config_path=None):
+ def __init__(self, cluster, key_path, nodes, instances, master_node,
+ config_path=None):
"""Initialize the container.
@param cluster: The name of the cluster
@param key_path: Path to the ssh private key used for authentication
@param nodes: List of online nodes in the merging cluster
@param instances: List of instances running on merging cluster
+ @param master_node: Name of the master node
@param config_path: Path to the merging cluster config
"""
self.key_path = key_path
self.nodes = nodes
self.instances = instances
+ self.master_node = master_node
self.config_path = config_path
"""Handling the merge.
"""
- def __init__(self, clusters, pause_period, groups):
+ RUNNING_STATUSES = frozenset([
+ constants.INSTST_RUNNING,
+ constants.INSTST_ERRORUP,
+ ])
+
+ def __init__(self, clusters, pause_period, groups, restart, params,
+ stop_instances):
"""Initialize object with sane defaults and infos required.
@param clusters: The list of clusters to merge in
@param pause_period: The time watcher shall be disabled for
@param groups: How to handle group conflicts
+ @param restart: How to handle instance restart
+ @param stop_instances: Indicates whether the instances must be stopped
+ (True) or if the Merger must only check if no
+ instances are running on the mergee clusters (False)
"""
self.merger_data = []
(self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
self.ssh_runner = ssh.SshRunner(self.cluster_name)
self.groups = groups
+ self.restart = restart
+ self.params = params
+ self.stop_instances = stop_instances
+ if self.restart == _RESTART_UP:
+ raise NotImplementedError
def Setup(self):
"""Sets up our end so we can do the merger.
utils.WriteFile(key_path, mode=0600, data=result.stdout)
result = self._RunCmd(cluster, "gnt-node list -o name,offline"
- " --no-header --separator=,", private_key=key_path)
+ " --no-headers --separator=,", private_key=key_path)
if result.failed:
raise errors.RemoteError("Unable to retrieve list of nodes from %s."
" Fail reason: %s; output: %s" %
(cluster, result.fail_reason, result.output))
- nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
+ nodes_statuses = [line.split(",") for line in result.stdout.splitlines()]
nodes = [node_status[0] for node_status in nodes_statuses
if node_status[1] == "N"]
- result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
+ result = self._RunCmd(cluster, "gnt-instance list -o name --no-headers",
private_key=key_path)
if result.failed:
raise errors.RemoteError("Unable to retrieve list of instances from"
(cluster, result.fail_reason, result.output))
instances = result.stdout.splitlines()
- self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
+ path = utils.PathJoin(constants.DATA_DIR, "ssconf_%s" %
+ constants.SS_MASTER_NODE)
+ result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path)
+ if result.failed:
+ raise errors.RemoteError("Unable to retrieve the master node name from"
+ " %s. Fail reason: %s; output: %s" %
+ (cluster, result.fail_reason, result.output))
+ master_node = result.stdout.strip()
+
+ self.merger_data.append(MergerData(cluster, key_path, nodes, instances,
+ master_node))
def _PrepareAuthorizedKeys(self):
"""Prepare the authorized_keys on every merging node.
for node in data.nodes:
result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
(auth_keys, pub_key)),
- private_key=data.key_path)
+ private_key=data.key_path, max_attempts=3)
if result.failed:
raise errors.RemoteError("Unable to add our public key to %s in %s."
def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
strict_host_check=False, private_key=None, batch=True,
- ask_key=False):
+ ask_key=False, max_attempts=1):
"""Wrapping SshRunner.Run with default parameters.
For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
"""
- return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
- use_cluster_key=use_cluster_key,
- strict_host_check=strict_host_check,
- private_key=private_key, batch=batch,
- ask_key=ask_key)
+ for _ in range(max_attempts):
+ result = self.ssh_runner.Run(hostname=hostname, command=command,
+ user=user, use_cluster_key=use_cluster_key,
+ strict_host_check=strict_host_check,
+ private_key=private_key, batch=batch,
+ ask_key=ask_key)
+ if not result.failed:
+ break
+
+ return result
+
+ def _CheckRunningInstances(self):
+ """Checks if on the clusters to be merged there are running instances
+
+ @rtype: boolean
+ @return: True if there are running instances, False otherwise
+
+ """
+ for cluster in self.clusters:
+ result = self._RunCmd(cluster, "gnt-instance list -o status")
+ if self.RUNNING_STATUSES.intersection(result.output.splitlines()):
+ return True
+
+ return False
def _StopMergingInstances(self):
"""Stop instances on merging clusters.
" Fail reason: %s; output: %s" %
(cluster, result.fail_reason, result.output))
+ def _RemoveMasterIps(self):
+ """Removes the master IPs from the master nodes of each cluster.
+
+ """
+ for data in self.merger_data:
+ result = self._RunCmd(data.master_node,
+ "gnt-cluster deactivate-master-ip --yes")
+
+ if result.failed:
+ raise errors.RemoteError("Unable to remove master IP on %s."
+ " Fail reason: %s; output: %s" %
+ (data.master_node,
+ result.fail_reason,
+ result.output))
+
def _StopDaemons(self):
"""Stop all daemons on merging nodes.
cmd = "%s stop-all" % constants.DAEMON_UTIL
for data in self.merger_data:
for node in data.nodes:
- result = self._RunCmd(node, cmd)
+ result = self._RunCmd(node, cmd, max_attempts=3)
if result.failed:
raise errors.RemoteError("Unable to stop daemons on %s."
utils.WriteFile(data.config_path, data=result.stdout)
# R0201: Method could be a function
- def _KillMasterDaemon(self): # pylint: disable-msg=R0201
+ def _KillMasterDaemon(self): # pylint: disable=R0201
"""Kills the local master daemon.
@raise errors.CommandError: If unable to kill
for node in other_config.GetNodeList():
node_info = other_config.GetNodeInfo(node)
+ # Offline the node, it will be reonlined later at node readd
+ node_info.master_candidate = False
+ node_info.drained = False
+ node_info.offline = True
my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
fake_ec_id += 1
_CLUSTERMERGE_ECID + str(fake_ec_id))
fake_ec_id += 1
- # R0201: Method could be a function
def _MergeClusterConfigs(self, my_config, other_config):
"""Checks that all relevant cluster parameters are compatible
"""
- # pylint: disable-msg=R0201
my_cluster = my_config.GetClusterInfo()
other_cluster = other_config.GetClusterInfo()
err_count = 0
#
# Generic checks
#
- check_params = (
+ check_params = [
"beparams",
"default_iallocator",
"drbd_usermode_helper",
- "file_storage_dir",
"hidden_os",
"maintain_node_health",
"master_netdev",
"primary_ip_family",
"tags",
"uid_pool",
+ ]
+ check_params_strict = [
"volume_group_name",
- )
+ ]
+ if constants.ENABLE_FILE_STORAGE:
+ check_params_strict.append("file_storage_dir")
+ if constants.ENABLE_SHARED_FILE_STORAGE:
+ check_params_strict.append("shared_file_storage_dir")
+ check_params.extend(check_params_strict)
+
+ if self.params == _PARAMS_STRICT:
+ params_strict = True
+ else:
+ params_strict = False
+
for param_name in check_params:
my_param = getattr(my_cluster, param_name)
other_param = getattr(other_cluster, param_name)
" differs to this cluster's value (%s)",
other_param, param_name, other_cluster.cluster_name,
my_param)
- err_count += 1
+ if params_strict or param_name in check_params_strict:
+ err_count += 1
#
# Custom checks
err_count += 1
# Check hypervisor params for hypervisors we care about
- # TODO: we probably don't care about all params for a given hypervisor
for hyp in my_cluster.enabled_hypervisors:
for param in my_cluster.hvparams[hyp]:
my_value = my_cluster.hvparams[hyp][param]
" (%s)",
other_value, param, hyp, other_cluster.cluster_name,
my_value)
- err_count += 1
+ if params_strict:
+ err_count += 1
# Check os hypervisor params for hypervisors we care about
for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
" (%s)",
other_os_hvp, os_name, hyp, other_cluster.cluster_name,
my_os_hvp)
- err_count += 1
+ if params_strict:
+ err_count += 1
#
# Warnings
" this cluster's parameters (%s)",
osparams, os_, other_cluster.cluster_name,
my_cluster.osparams[os_])
- err_count += 1
+ if params_strict:
+ err_count += 1
if err_count:
raise errors.ConfigurationError("Cluster config for %s has incompatible"
other_cluster.cluster_name)
# R0201: Method could be a function
- def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201
+ def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable=R0201
if os_name in cluster.os_hvp:
return cluster.os_hvp[os_name].get(hyp, None)
else:
ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
"""
- # pylint: disable-msg=R0201
+ # pylint: disable=R0201
logging.info("Node group conflict strategy: %s", self.groups)
my_grps = my_config.GetAllNodeGroupsInfo().values()
grp.name = new_name
# User wants to merge conflicting groups
- elif self.groups == 'merge':
+ elif self.groups == _GROUPS_MERGE:
for other_grp in conflicts:
logging.info("Merging local and remote '%s' groups", other_grp.name)
for node_name in other_grp.members[:]:
node = other_config.GetNodeInfo(node_name)
# Access to a protected member of a client class
- # pylint: disable-msg=W0212
+ # pylint: disable=W0212
other_config._UnlockedRemoveNodeFromGroup(node)
# Access to a protected member of a client class
- # pylint: disable-msg=W0212
+ # pylint: disable=W0212
my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
# Access to a protected member of a client class
- # pylint: disable-msg=W0212
+ # pylint: disable=W0212
my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
node.group = my_grp_uuid
# Remove from list of groups to add
my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
# R0201: Method could be a function
- def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
+ def _StartMasterDaemon(self, no_vote=False): # pylint: disable=R0201
"""Starts the local master daemon.
@param no_vote: Should the masterd started without voting? default: False
"""
for data in self.merger_data:
for node in data.nodes:
+ logging.info("Readding node %s", node)
result = utils.RunCmd(["gnt-node", "add", "--readd",
"--no-ssh-key-check", "--force-join", node])
if result.failed:
- raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
- " output: %s" % (node, result.fail_reason,
- result.output))
+ logging.error("%s failed to be readded. Reason: %s, output: %s",
+ node, result.fail_reason, result.output)
result = utils.RunCmd(["gnt-cluster", "redist-conf"])
if result.failed:
result.output))
# R0201: Method could be a function
- def _StartupAllInstances(self): # pylint: disable-msg=R0201
+ def _StartupAllInstances(self): # pylint: disable=R0201
"""Starts up all instances (locally).
@raise errors.CommandError: If unable to start clusters
(result.fail_reason, result.output))
# R0201: Method could be a function
- def _VerifyCluster(self): # pylint: disable-msg=R0201
+ # TODO: make this overridable, for some verify errors
+ def _VerifyCluster(self): # pylint: disable=R0201
"""Runs gnt-cluster verify to verify the health.
@raise errors.ProgrammError: If cluster fails on verification
rbsteps.append("Start all instances again on the merging"
" clusters: %(clusters)s")
- logging.info("Stopping merging instances (takes a while)")
- self._StopMergingInstances()
-
+ if self.stop_instances:
+ logging.info("Stopping merging instances (takes a while)")
+ self._StopMergingInstances()
+ logging.info("Checking that no instances are running on the mergees")
+ instances_running = self._CheckRunningInstances()
+ if instances_running:
+ raise errors.CommandError("Some instances are still running on the"
+ " mergees")
logging.info("Disable watcher")
self._DisableWatcher()
- logging.info("Stop daemons on merging nodes")
- self._StopDaemons()
logging.info("Merging config")
self._FetchRemoteConfig()
+ logging.info("Removing master IPs on mergee master nodes")
+ self._RemoveMasterIps()
+ logging.info("Stop daemons on merging nodes")
+ self._StopDaemons()
logging.info("Stopping master daemon")
self._KillMasterDaemon()
self._KillMasterDaemon()
self._StartMasterDaemon()
- logging.info("Starting instances again")
- self._StartupAllInstances()
+ if self.restart == _RESTART_ALL:
+ logging.info("Starting instances again")
+ self._StartupAllInstances()
+ else:
+ logging.info("Not starting instances again")
logging.info("Post cluster verification")
self._VerifyCluster()
except errors.GenericError, e:
"""
program = os.path.basename(sys.argv[0])
- parser = optparse.OptionParser(usage=("%%prog [--debug|--verbose]"
- " [--watcher-pause-period SECONDS]"
- " [--groups [%s|%s]]"
- " <cluster> [<cluster...>]" %
- (_GROUPS_MERGE, _GROUPS_RENAME)),
- prog=program)
+ parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
+ prog=program)
parser.add_option(cli.DEBUG_OPT)
parser.add_option(cli.VERBOSE_OPT)
parser.add_option(PAUSE_PERIOD_OPT)
parser.add_option(GROUPS_OPT)
+ parser.add_option(RESTART_OPT)
+ parser.add_option(PARAMS_OPT)
+ parser.add_option(SKIP_STOP_INSTANCES_OPT)
(options, args) = parser.parse_args()
parser.error("No clusters specified")
cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
- options.groups)
+ options.groups, options.restart, options.params,
+ options.stop_instances)
try:
try:
cluster_merger.Setup()