4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Tool to merge two or more clusters together.
23 The clusters have to run the same version of Ganeti!
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
45 _GROUPS_MERGE = "merge"
46 _GROUPS_RENAME = "rename"
47 _CLUSTERMERGE_ECID = "clustermerge-ecid"
50 _RESTART_NONE = "none"
51 _RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
52 _PARAMS_STRICT = "strict"
54 _PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
57 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
58 action="store", type="int",
60 help=("Amount of time in seconds watcher"
61 " should be suspended from running"))
62 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
63 choices=(_GROUPS_MERGE, _GROUPS_RENAME),
65 help=("How to handle groups that have the"
66 " same name (One of: %s/%s)" %
67 (_GROUPS_MERGE, _GROUPS_RENAME)))
68 PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
70 choices=_PARAMS_CHOICES,
72 help=("How to handle params that have"
73 " different values (One of: %s/%s)" %
76 RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
78 choices=_RESTART_CHOICES,
80 help=("How to handle restarting instances"
81 " same name (One of: %s/%s/%s)" %
85 def Flatten(unflattened_list):
88 @param unflattened_list: A list of unflattened list objects.
89 @return: A flattened list
94 for item in unflattened_list:
95 if isinstance(item, list):
96 flattened_list.extend(Flatten(item))
98 flattened_list.append(item)
102 class MergerData(object):
103 """Container class to hold data used for merger.
106 def __init__(self, cluster, key_path, nodes, instances, config_path=None):
107 """Initialize the container.
109 @param cluster: The name of the cluster
110 @param key_path: Path to the ssh private key used for authentication
111 @param nodes: List of online nodes in the merging cluster
112 @param instances: List of instances running on merging cluster
113 @param config_path: Path to the merging cluster config
116 self.cluster = cluster
117 self.key_path = key_path
119 self.instances = instances
120 self.config_path = config_path
123 class Merger(object):
124 """Handling the merge.
127 def __init__(self, clusters, pause_period, groups, restart, params):
128 """Initialize object with sane defaults and infos required.
130 @param clusters: The list of clusters to merge in
131 @param pause_period: The time watcher shall be disabled for
132 @param groups: How to handle group conflicts
133 @param restart: How to handle instance restart
136 self.merger_data = []
137 self.clusters = clusters
138 self.pause_period = pause_period
139 self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
140 (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
141 self.ssh_runner = ssh.SshRunner(self.cluster_name)
143 self.restart = restart
145 if self.restart == _RESTART_UP:
146 raise NotImplementedError
150 """Sets up our end so we can do the merger.
152 This method is setting us up as a preparation for the merger.
153 It makes the initial contact and gathers information needed.
155 @raise errors.RemoteError: for errors in communication/grabbing
158 (remote_path, _, _) = ssh.GetUserFiles("root")
160 if self.cluster_name in self.clusters:
161 raise errors.CommandError("Cannot merge cluster %s with itself" %
164 # Fetch remotes private key
165 for cluster in self.clusters:
166 result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
169 raise errors.RemoteError("There was an error while grabbing ssh private"
170 " key from %s. Fail reason: %s; output: %s" %
171 (cluster, result.fail_reason, result.output))
173 key_path = utils.PathJoin(self.work_dir, cluster)
174 utils.WriteFile(key_path, mode=0600, data=result.stdout)
176 result = self._RunCmd(cluster, "gnt-node list -o name,offline"
177 " --no-header --separator=,", private_key=key_path)
179 raise errors.RemoteError("Unable to retrieve list of nodes from %s."
180 " Fail reason: %s; output: %s" %
181 (cluster, result.fail_reason, result.output))
182 nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
183 nodes = [node_status[0] for node_status in nodes_statuses
184 if node_status[1] == "N"]
186 result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
187 private_key=key_path)
189 raise errors.RemoteError("Unable to retrieve list of instances from"
190 " %s. Fail reason: %s; output: %s" %
191 (cluster, result.fail_reason, result.output))
192 instances = result.stdout.splitlines()
194 self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
196 def _PrepareAuthorizedKeys(self):
197 """Prepare the authorized_keys on every merging node.
199 This method add our public key to remotes authorized_key for further
203 (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
204 pub_key = utils.ReadFile(pub_key_file)
206 for data in self.merger_data:
207 for node in data.nodes:
208 result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
209 (auth_keys, pub_key)),
210 private_key=data.key_path)
213 raise errors.RemoteError("Unable to add our public key to %s in %s."
214 " Fail reason: %s; output: %s" %
215 (node, data.cluster, result.fail_reason,
218 def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
219 strict_host_check=False, private_key=None, batch=True,
221 """Wrapping SshRunner.Run with default parameters.
223 For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
226 return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
227 use_cluster_key=use_cluster_key,
228 strict_host_check=strict_host_check,
229 private_key=private_key, batch=batch,
232 def _StopMergingInstances(self):
233 """Stop instances on merging clusters.
236 for cluster in self.clusters:
237 result = self._RunCmd(cluster, "gnt-instance shutdown --all"
241 raise errors.RemoteError("Unable to stop instances on %s."
242 " Fail reason: %s; output: %s" %
243 (cluster, result.fail_reason, result.output))
245 def _DisableWatcher(self):
246 """Disable watch on all merging clusters, including ourself.
249 for cluster in ["localhost"] + self.clusters:
250 result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
254 raise errors.RemoteError("Unable to pause watcher on %s."
255 " Fail reason: %s; output: %s" %
256 (cluster, result.fail_reason, result.output))
258 def _StopDaemons(self):
259 """Stop all daemons on merging nodes.
262 cmd = "%s stop-all" % constants.DAEMON_UTIL
263 for data in self.merger_data:
264 for node in data.nodes:
265 result = self._RunCmd(node, cmd)
268 raise errors.RemoteError("Unable to stop daemons on %s."
269 " Fail reason: %s; output: %s." %
270 (node, result.fail_reason, result.output))
272 def _FetchRemoteConfig(self):
273 """Fetches and stores remote cluster config from the master.
275 This step is needed before we can merge the config.
278 for data in self.merger_data:
279 result = self._RunCmd(data.cluster, "cat %s" %
280 constants.CLUSTER_CONF_FILE)
283 raise errors.RemoteError("Unable to retrieve remote config on %s."
284 " Fail reason: %s; output %s" %
285 (data.cluster, result.fail_reason,
288 data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
290 utils.WriteFile(data.config_path, data=result.stdout)
292 # R0201: Method could be a function
293 def _KillMasterDaemon(self): # pylint: disable-msg=R0201
294 """Kills the local master daemon.
296 @raise errors.CommandError: If unable to kill
299 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
301 raise errors.CommandError("Unable to stop master daemons."
302 " Fail reason: %s; output: %s" %
303 (result.fail_reason, result.output))
305 def _MergeConfig(self):
306 """Merges all foreign config into our own config.
309 my_config = config.ConfigWriter(offline=True)
310 fake_ec_id = 0 # Needs to be uniq over the whole config merge
312 for data in self.merger_data:
313 other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
314 self._MergeClusterConfigs(my_config, other_config)
315 self._MergeNodeGroups(my_config, other_config)
317 for node in other_config.GetNodeList():
318 node_info = other_config.GetNodeInfo(node)
319 # Offline the node, it will be reonlined later at node readd
320 node_info.master_candidate = False
321 node_info.drained = False
322 node_info.offline = True
323 my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
326 for instance in other_config.GetInstanceList():
327 instance_info = other_config.GetInstanceInfo(instance)
329 # Update the DRBD port assignments
330 # This is a little bit hackish
331 for dsk in instance_info.disks:
332 if dsk.dev_type in constants.LDS_DRBD:
333 port = my_config.AllocatePort()
335 logical_id = list(dsk.logical_id)
337 dsk.logical_id = tuple(logical_id)
339 physical_id = list(dsk.physical_id)
340 physical_id[1] = physical_id[3] = port
341 dsk.physical_id = tuple(physical_id)
343 my_config.AddInstance(instance_info,
344 _CLUSTERMERGE_ECID + str(fake_ec_id))
347 def _MergeClusterConfigs(self, my_config, other_config):
348 """Checks that all relevant cluster parameters are compatible
351 my_cluster = my_config.GetClusterInfo()
352 other_cluster = other_config.GetClusterInfo()
360 "default_iallocator",
361 "drbd_usermode_helper",
363 "maintain_node_health",
371 check_params_strict = [
374 if constants.ENABLE_FILE_STORAGE:
375 check_params_strict.append("file_storage_dir")
376 if constants.ENABLE_SHARED_FILE_STORAGE:
377 check_params_strict.append("shared_file_storage_dir")
378 check_params.extend(check_params_strict)
380 if self.params == _PARAMS_STRICT:
383 params_strict = False
385 for param_name in check_params:
386 my_param = getattr(my_cluster, param_name)
387 other_param = getattr(other_cluster, param_name)
388 if my_param != other_param:
389 logging.error("The value (%s) of the cluster parameter %s on %s"
390 " differs to this cluster's value (%s)",
391 other_param, param_name, other_cluster.cluster_name,
393 if params_strict or param_name in check_params_strict:
400 # Check default hypervisor
401 my_defhyp = my_cluster.enabled_hypervisors[0]
402 other_defhyp = other_cluster.enabled_hypervisors[0]
403 if my_defhyp != other_defhyp:
404 logging.warning("The default hypervisor (%s) differs on %s, new"
405 " instances will be created with this cluster's"
406 " default hypervisor (%s)", other_defhyp,
407 other_cluster.cluster_name, my_defhyp)
409 if (set(my_cluster.enabled_hypervisors) !=
410 set(other_cluster.enabled_hypervisors)):
411 logging.error("The set of enabled hypervisors (%s) on %s differs to"
412 " this cluster's set (%s)",
413 other_cluster.enabled_hypervisors,
414 other_cluster.cluster_name, my_cluster.enabled_hypervisors)
417 # Check hypervisor params for hypervisors we care about
418 for hyp in my_cluster.enabled_hypervisors:
419 for param in my_cluster.hvparams[hyp]:
420 my_value = my_cluster.hvparams[hyp][param]
421 other_value = other_cluster.hvparams[hyp][param]
422 if my_value != other_value:
423 logging.error("The value (%s) of the %s parameter of the %s"
424 " hypervisor on %s differs to this cluster's parameter"
426 other_value, param, hyp, other_cluster.cluster_name,
431 # Check os hypervisor params for hypervisors we care about
432 for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
433 for hyp in my_cluster.enabled_hypervisors:
434 my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
435 other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
436 if my_os_hvp != other_os_hvp:
437 logging.error("The OS parameters (%s) for the %s OS for the %s"
438 " hypervisor on %s differs to this cluster's parameters"
440 other_os_hvp, os_name, hyp, other_cluster.cluster_name,
448 if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
449 logging.warning("The modify_etc_hosts value (%s) differs on %s,"
450 " this cluster's value (%s) will take precedence",
451 other_cluster.modify_etc_hosts,
452 other_cluster.cluster_name,
453 my_cluster.modify_etc_hosts)
455 if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
456 logging.warning("The modify_ssh_setup value (%s) differs on %s,"
457 " this cluster's value (%s) will take precedence",
458 other_cluster.modify_ssh_setup,
459 other_cluster.cluster_name,
460 my_cluster.modify_ssh_setup)
465 my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
466 other_cluster.reserved_lvs))
468 if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
469 logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
470 " cluster's value (%s). The least permissive value (%s)"
471 " will be used", other_cluster.prealloc_wipe_disks,
472 other_cluster.cluster_name,
473 my_cluster.prealloc_wipe_disks, True)
474 my_cluster.prealloc_wipe_disks = True
476 for os_, osparams in other_cluster.osparams.items():
477 if os_ not in my_cluster.osparams:
478 my_cluster.osparams[os_] = osparams
479 elif my_cluster.osparams[os_] != osparams:
480 logging.error("The OS parameters (%s) for the %s OS on %s differs to"
481 " this cluster's parameters (%s)",
482 osparams, os_, other_cluster.cluster_name,
483 my_cluster.osparams[os_])
488 raise errors.ConfigurationError("Cluster config for %s has incompatible"
489 " values, please fix and re-run" %
490 other_cluster.cluster_name)
492 # R0201: Method could be a function
493 def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201
494 if os_name in cluster.os_hvp:
495 return cluster.os_hvp[os_name].get(hyp, None)
499 # R0201: Method could be a function
500 def _MergeNodeGroups(self, my_config, other_config):
501 """Adds foreign node groups
503 ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
505 # pylint: disable-msg=R0201
506 logging.info("Node group conflict strategy: %s", self.groups)
508 my_grps = my_config.GetAllNodeGroupsInfo().values()
509 other_grps = other_config.GetAllNodeGroupsInfo().values()
511 # Check for node group naming conflicts:
513 for other_grp in other_grps:
514 for my_grp in my_grps:
515 if other_grp.name == my_grp.name:
516 conflicts.append(other_grp)
519 conflict_names = utils.CommaJoin([g.name for g in conflicts])
520 logging.info("Node groups in both local and remote cluster: %s",
523 # User hasn't specified how to handle conflicts
525 raise errors.CommandError("The following node group(s) are in both"
526 " clusters, and no merge strategy has been"
527 " supplied (see the --groups option): %s" %
530 # User wants to rename conflicts
531 elif self.groups == _GROUPS_RENAME:
532 for grp in conflicts:
533 new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
534 logging.info("Renaming remote node group from %s to %s"
535 " to resolve conflict", grp.name, new_name)
538 # User wants to merge conflicting groups
539 elif self.groups == _GROUPS_MERGE:
540 for other_grp in conflicts:
541 logging.info("Merging local and remote '%s' groups", other_grp.name)
542 for node_name in other_grp.members[:]:
543 node = other_config.GetNodeInfo(node_name)
544 # Access to a protected member of a client class
545 # pylint: disable-msg=W0212
546 other_config._UnlockedRemoveNodeFromGroup(node)
548 # Access to a protected member of a client class
549 # pylint: disable-msg=W0212
550 my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
552 # Access to a protected member of a client class
553 # pylint: disable-msg=W0212
554 my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
555 node.group = my_grp_uuid
556 # Remove from list of groups to add
557 other_grps.remove(other_grp)
559 for grp in other_grps:
560 #TODO: handle node group conflicts
561 my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
563 # R0201: Method could be a function
564 def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
565 """Starts the local master daemon.
567 @param no_vote: Should the masterd started without voting? default: False
568 @raise errors.CommandError: If unable to start daemon.
573 env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
575 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
577 raise errors.CommandError("Couldn't start ganeti master."
578 " Fail reason: %s; output: %s" %
579 (result.fail_reason, result.output))
581 def _ReaddMergedNodesAndRedist(self):
582 """Readds all merging nodes and make sure their config is up-to-date.
584 @raise errors.CommandError: If anything fails.
587 for data in self.merger_data:
588 for node in data.nodes:
589 result = utils.RunCmd(["gnt-node", "add", "--readd",
590 "--no-ssh-key-check", "--force-join", node])
592 raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
593 " output: %s" % (node, result.fail_reason,
596 result = utils.RunCmd(["gnt-cluster", "redist-conf"])
598 raise errors.CommandError("Redistribution failed. Fail reason: %s;"
599 " output: %s" % (result.fail_reason,
602 # R0201: Method could be a function
603 def _StartupAllInstances(self): # pylint: disable-msg=R0201
604 """Starts up all instances (locally).
606 @raise errors.CommandError: If unable to start clusters
609 result = utils.RunCmd(["gnt-instance", "startup", "--all",
612 raise errors.CommandError("Unable to start all instances."
613 " Fail reason: %s; output: %s" %
614 (result.fail_reason, result.output))
616 # R0201: Method could be a function
617 def _VerifyCluster(self): # pylint: disable-msg=R0201
618 """Runs gnt-cluster verify to verify the health.
620 @raise errors.ProgrammError: If cluster fails on verification
623 result = utils.RunCmd(["gnt-cluster", "verify"])
625 raise errors.CommandError("Verification of cluster failed."
626 " Fail reason: %s; output: %s" %
627 (result.fail_reason, result.output))
630 """Does the actual merge.
632 It runs all the steps in the right order and updates the user about steps
633 taken. Also it keeps track of rollback_steps to undo everything.
638 logging.info("Pre cluster verification")
639 self._VerifyCluster()
641 logging.info("Prepare authorized_keys")
642 rbsteps.append("Remove our key from authorized_keys on nodes:"
644 self._PrepareAuthorizedKeys()
646 rbsteps.append("Start all instances again on the merging"
647 " clusters: %(clusters)s")
648 logging.info("Stopping merging instances (takes a while)")
649 self._StopMergingInstances()
651 logging.info("Disable watcher")
652 self._DisableWatcher()
653 logging.info("Stop daemons on merging nodes")
655 logging.info("Merging config")
656 self._FetchRemoteConfig()
658 logging.info("Stopping master daemon")
659 self._KillMasterDaemon()
661 rbsteps.append("Restore %s from another master candidate"
662 " and restart master daemon" %
663 constants.CLUSTER_CONF_FILE)
665 self._StartMasterDaemon(no_vote=True)
667 # Point of no return, delete rbsteps
670 logging.warning("We are at the point of no return. Merge can not easily"
671 " be undone after this point.")
672 logging.info("Readd nodes")
673 self._ReaddMergedNodesAndRedist()
675 logging.info("Merge done, restart master daemon normally")
676 self._KillMasterDaemon()
677 self._StartMasterDaemon()
679 if self.restart == _RESTART_ALL:
680 logging.info("Starting instances again")
681 self._StartupAllInstances()
683 logging.info("Not starting instances again")
684 logging.info("Post cluster verification")
685 self._VerifyCluster()
686 except errors.GenericError, e:
690 nodes = Flatten([data.nodes for data in self.merger_data])
692 "clusters": self.clusters,
695 logging.critical("In order to rollback do the following:")
697 logging.critical(" * %s", step % info)
699 logging.critical("Nothing to rollback.")
701 # TODO: Keep track of steps done for a flawless resume?
704 """Clean up our environment.
706 This cleans up remote private keys and configs and after that
707 deletes the temporary directory.
710 shutil.rmtree(self.work_dir)
713 def SetupLogging(options):
714 """Setting up logging infrastructure.
716 @param options: Parsed command line options
719 formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
721 stderr_handler = logging.StreamHandler()
722 stderr_handler.setFormatter(formatter)
724 stderr_handler.setLevel(logging.NOTSET)
725 elif options.verbose:
726 stderr_handler.setLevel(logging.INFO)
728 stderr_handler.setLevel(logging.WARNING)
730 root_logger = logging.getLogger("")
731 root_logger.setLevel(logging.NOTSET)
732 root_logger.addHandler(stderr_handler)
739 program = os.path.basename(sys.argv[0])
741 parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
743 parser.add_option(cli.DEBUG_OPT)
744 parser.add_option(cli.VERBOSE_OPT)
745 parser.add_option(PAUSE_PERIOD_OPT)
746 parser.add_option(GROUPS_OPT)
747 parser.add_option(RESTART_OPT)
748 parser.add_option(PARAMS_OPT)
750 (options, args) = parser.parse_args()
752 SetupLogging(options)
755 parser.error("No clusters specified")
757 cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
758 options.groups, options.restart, options.params)
761 cluster_merger.Setup()
762 cluster_merger.Merge()
763 except errors.GenericError, e:
765 return constants.EXIT_FAILURE
767 cluster_merger.Cleanup()
769 return constants.EXIT_SUCCESS
772 if __name__ == "__main__":