4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Tool to merge two or more clusters together.
23 The clusters have to run the same version of Ganeti!
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
45 _GROUPS_MERGE = "merge"
46 _GROUPS_RENAME = "rename"
47 _CLUSTERMERGE_ECID = "clustermerge-ecid"
49 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
50 action="store", type="int",
52 help=("Amount of time in seconds watcher"
53 " should be suspended from running"))
54 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
55 choices=(_GROUPS_MERGE, _GROUPS_RENAME),
57 help=("How to handle groups that have the"
58 " same name (One of: %s/%s)" %
59 (_GROUPS_MERGE, _GROUPS_RENAME)))
62 def Flatten(unflattened_list):
65 @param unflattened_list: A list of unflattened list objects.
66 @return: A flattened list
71 for item in unflattened_list:
72 if isinstance(item, list):
73 flattened_list.extend(Flatten(item))
75 flattened_list.append(item)
79 class MergerData(object):
80 """Container class to hold data used for merger.
83 def __init__(self, cluster, key_path, nodes, instances, config_path=None):
84 """Initialize the container.
86 @param cluster: The name of the cluster
87 @param key_path: Path to the ssh private key used for authentication
88 @param nodes: List of online nodes in the merging cluster
89 @param instances: List of instances running on merging cluster
90 @param config_path: Path to the merging cluster config
93 self.cluster = cluster
94 self.key_path = key_path
96 self.instances = instances
97 self.config_path = config_path
100 class Merger(object):
101 """Handling the merge.
104 def __init__(self, clusters, pause_period, groups):
105 """Initialize object with sane defaults and infos required.
107 @param clusters: The list of clusters to merge in
108 @param pause_period: The time watcher shall be disabled for
109 @param groups: How to handle group conflicts
112 self.merger_data = []
113 self.clusters = clusters
114 self.pause_period = pause_period
115 self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
116 (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
117 self.ssh_runner = ssh.SshRunner(self.cluster_name)
121 """Sets up our end so we can do the merger.
123 This method is setting us up as a preparation for the merger.
124 It makes the initial contact and gathers information needed.
126 @raise errors.RemoteError: for errors in communication/grabbing
129 (remote_path, _, _) = ssh.GetUserFiles("root")
131 if self.cluster_name in self.clusters:
132 raise errors.CommandError("Cannot merge cluster %s with itself" %
135 # Fetch remotes private key
136 for cluster in self.clusters:
137 result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
140 raise errors.RemoteError("There was an error while grabbing ssh private"
141 " key from %s. Fail reason: %s; output: %s" %
142 (cluster, result.fail_reason, result.output))
144 key_path = utils.PathJoin(self.work_dir, cluster)
145 utils.WriteFile(key_path, mode=0600, data=result.stdout)
147 result = self._RunCmd(cluster, "gnt-node list -o name,offline"
148 " --no-header --separator=,", private_key=key_path)
150 raise errors.RemoteError("Unable to retrieve list of nodes from %s."
151 " Fail reason: %s; output: %s" %
152 (cluster, result.fail_reason, result.output))
153 nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
154 nodes = [node_status[0] for node_status in nodes_statuses
155 if node_status[1] == "N"]
157 result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
158 private_key=key_path)
160 raise errors.RemoteError("Unable to retrieve list of instances from"
161 " %s. Fail reason: %s; output: %s" %
162 (cluster, result.fail_reason, result.output))
163 instances = result.stdout.splitlines()
165 self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
167 def _PrepareAuthorizedKeys(self):
168 """Prepare the authorized_keys on every merging node.
170 This method add our public key to remotes authorized_key for further
174 (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
175 pub_key = utils.ReadFile(pub_key_file)
177 for data in self.merger_data:
178 for node in data.nodes:
179 result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
180 (auth_keys, pub_key)),
181 private_key=data.key_path)
184 raise errors.RemoteError("Unable to add our public key to %s in %s."
185 " Fail reason: %s; output: %s" %
186 (node, data.cluster, result.fail_reason,
189 def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
190 strict_host_check=False, private_key=None, batch=True,
192 """Wrapping SshRunner.Run with default parameters.
194 For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
197 return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
198 use_cluster_key=use_cluster_key,
199 strict_host_check=strict_host_check,
200 private_key=private_key, batch=batch,
203 def _StopMergingInstances(self):
204 """Stop instances on merging clusters.
207 for cluster in self.clusters:
208 result = self._RunCmd(cluster, "gnt-instance shutdown --all"
212 raise errors.RemoteError("Unable to stop instances on %s."
213 " Fail reason: %s; output: %s" %
214 (cluster, result.fail_reason, result.output))
216 def _DisableWatcher(self):
217 """Disable watch on all merging clusters, including ourself.
220 for cluster in ["localhost"] + self.clusters:
221 result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
225 raise errors.RemoteError("Unable to pause watcher on %s."
226 " Fail reason: %s; output: %s" %
227 (cluster, result.fail_reason, result.output))
229 def _StopDaemons(self):
230 """Stop all daemons on merging nodes.
233 cmd = "%s stop-all" % constants.DAEMON_UTIL
234 for data in self.merger_data:
235 for node in data.nodes:
236 result = self._RunCmd(node, cmd)
239 raise errors.RemoteError("Unable to stop daemons on %s."
240 " Fail reason: %s; output: %s." %
241 (node, result.fail_reason, result.output))
243 def _FetchRemoteConfig(self):
244 """Fetches and stores remote cluster config from the master.
246 This step is needed before we can merge the config.
249 for data in self.merger_data:
250 result = self._RunCmd(data.cluster, "cat %s" %
251 constants.CLUSTER_CONF_FILE)
254 raise errors.RemoteError("Unable to retrieve remote config on %s."
255 " Fail reason: %s; output %s" %
256 (data.cluster, result.fail_reason,
259 data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
261 utils.WriteFile(data.config_path, data=result.stdout)
263 # R0201: Method could be a function
264 def _KillMasterDaemon(self): # pylint: disable-msg=R0201
265 """Kills the local master daemon.
267 @raise errors.CommandError: If unable to kill
270 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
272 raise errors.CommandError("Unable to stop master daemons."
273 " Fail reason: %s; output: %s" %
274 (result.fail_reason, result.output))
276 def _MergeConfig(self):
277 """Merges all foreign config into our own config.
280 my_config = config.ConfigWriter(offline=True)
281 fake_ec_id = 0 # Needs to be uniq over the whole config merge
283 for data in self.merger_data:
284 other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
285 self._MergeClusterConfigs(my_config, other_config)
286 self._MergeNodeGroups(my_config, other_config)
288 for node in other_config.GetNodeList():
289 node_info = other_config.GetNodeInfo(node)
290 my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
293 for instance in other_config.GetInstanceList():
294 instance_info = other_config.GetInstanceInfo(instance)
296 # Update the DRBD port assignments
297 # This is a little bit hackish
298 for dsk in instance_info.disks:
299 if dsk.dev_type in constants.LDS_DRBD:
300 port = my_config.AllocatePort()
302 logical_id = list(dsk.logical_id)
304 dsk.logical_id = tuple(logical_id)
306 physical_id = list(dsk.physical_id)
307 physical_id[1] = physical_id[3] = port
308 dsk.physical_id = tuple(physical_id)
310 my_config.AddInstance(instance_info,
311 _CLUSTERMERGE_ECID + str(fake_ec_id))
314 # R0201: Method could be a function
315 def _MergeClusterConfigs(self, my_config, other_config):
316 """Checks that all relevant cluster parameters are compatible
319 # pylint: disable-msg=R0201
320 my_cluster = my_config.GetClusterInfo()
321 other_cluster = other_config.GetClusterInfo()
329 "default_iallocator",
330 "drbd_usermode_helper",
333 "maintain_node_health",
342 for param_name in check_params:
343 my_param = getattr(my_cluster, param_name)
344 other_param = getattr(other_cluster, param_name)
345 if my_param != other_param:
346 logging.error("The value (%s) of the cluster parameter %s on %s"
347 " differs to this cluster's value (%s)",
348 other_param, param_name, other_cluster.cluster_name,
356 # Check default hypervisor
357 my_defhyp = my_cluster.enabled_hypervisors[0]
358 other_defhyp = other_cluster.enabled_hypervisors[0]
359 if my_defhyp != other_defhyp:
360 logging.warning("The default hypervisor (%s) differs on %s, new"
361 " instances will be created with this cluster's"
362 " default hypervisor (%s)", other_defhyp,
363 other_cluster.cluster_name, my_defhyp)
365 if (set(my_cluster.enabled_hypervisors) !=
366 set(other_cluster.enabled_hypervisors)):
367 logging.error("The set of enabled hypervisors (%s) on %s differs to"
368 " this cluster's set (%s)",
369 other_cluster.enabled_hypervisors,
370 other_cluster.cluster_name, my_cluster.enabled_hypervisors)
373 # Check hypervisor params for hypervisors we care about
374 # TODO: we probably don't care about all params for a given hypervisor
375 for hyp in my_cluster.enabled_hypervisors:
376 for param in my_cluster.hvparams[hyp]:
377 my_value = my_cluster.hvparams[hyp][param]
378 other_value = other_cluster.hvparams[hyp][param]
379 if my_value != other_value:
380 logging.error("The value (%s) of the %s parameter of the %s"
381 " hypervisor on %s differs to this cluster's parameter"
383 other_value, param, hyp, other_cluster.cluster_name,
387 # Check os hypervisor params for hypervisors we care about
388 for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
389 for hyp in my_cluster.enabled_hypervisors:
390 my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
391 other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
392 if my_os_hvp != other_os_hvp:
393 logging.error("The OS parameters (%s) for the %s OS for the %s"
394 " hypervisor on %s differs to this cluster's parameters"
396 other_os_hvp, os_name, hyp, other_cluster.cluster_name,
403 if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
404 logging.warning("The modify_etc_hosts value (%s) differs on %s,"
405 " this cluster's value (%s) will take precedence",
406 other_cluster.modify_etc_hosts,
407 other_cluster.cluster_name,
408 my_cluster.modify_etc_hosts)
410 if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
411 logging.warning("The modify_ssh_setup value (%s) differs on %s,"
412 " this cluster's value (%s) will take precedence",
413 other_cluster.modify_ssh_setup,
414 other_cluster.cluster_name,
415 my_cluster.modify_ssh_setup)
420 my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
421 other_cluster.reserved_lvs))
423 if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
424 logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
425 " cluster's value (%s). The least permissive value (%s)"
426 " will be used", other_cluster.prealloc_wipe_disks,
427 other_cluster.cluster_name,
428 my_cluster.prealloc_wipe_disks, True)
429 my_cluster.prealloc_wipe_disks = True
431 for os_, osparams in other_cluster.osparams.items():
432 if os_ not in my_cluster.osparams:
433 my_cluster.osparams[os_] = osparams
434 elif my_cluster.osparams[os_] != osparams:
435 logging.error("The OS parameters (%s) for the %s OS on %s differs to"
436 " this cluster's parameters (%s)",
437 osparams, os_, other_cluster.cluster_name,
438 my_cluster.osparams[os_])
442 raise errors.ConfigurationError("Cluster config for %s has incompatible"
443 " values, please fix and re-run" %
444 other_cluster.cluster_name)
446 # R0201: Method could be a function
447 def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201
448 if os_name in cluster.os_hvp:
449 return cluster.os_hvp[os_name].get(hyp, None)
453 # R0201: Method could be a function
454 def _MergeNodeGroups(self, my_config, other_config):
455 """Adds foreign node groups
457 ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
459 # pylint: disable-msg=R0201
460 logging.info("Node group conflict strategy: %s", self.groups)
462 my_grps = my_config.GetAllNodeGroupsInfo().values()
463 other_grps = other_config.GetAllNodeGroupsInfo().values()
465 # Check for node group naming conflicts:
467 for other_grp in other_grps:
468 for my_grp in my_grps:
469 if other_grp.name == my_grp.name:
470 conflicts.append(other_grp)
473 conflict_names = utils.CommaJoin([g.name for g in conflicts])
474 logging.info("Node groups in both local and remote cluster: %s",
477 # User hasn't specified how to handle conflicts
479 raise errors.CommandError("The following node group(s) are in both"
480 " clusters, and no merge strategy has been"
481 " supplied (see the --groups option): %s" %
484 # User wants to rename conflicts
485 elif self.groups == _GROUPS_RENAME:
486 for grp in conflicts:
487 new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
488 logging.info("Renaming remote node group from %s to %s"
489 " to resolve conflict", grp.name, new_name)
492 # User wants to merge conflicting groups
493 elif self.groups == 'merge':
494 for other_grp in conflicts:
495 logging.info("Merging local and remote '%s' groups", other_grp.name)
496 for node_name in other_grp.members[:]:
497 node = other_config.GetNodeInfo(node_name)
498 # Access to a protected member of a client class
499 # pylint: disable-msg=W0212
500 other_config._UnlockedRemoveNodeFromGroup(node)
502 # Access to a protected member of a client class
503 # pylint: disable-msg=W0212
504 my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
506 # Access to a protected member of a client class
507 # pylint: disable-msg=W0212
508 my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
509 node.group = my_grp_uuid
510 # Remove from list of groups to add
511 other_grps.remove(other_grp)
513 for grp in other_grps:
514 #TODO: handle node group conflicts
515 my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
517 # R0201: Method could be a function
518 def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
519 """Starts the local master daemon.
521 @param no_vote: Should the masterd started without voting? default: False
522 @raise errors.CommandError: If unable to start daemon.
527 env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
529 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
531 raise errors.CommandError("Couldn't start ganeti master."
532 " Fail reason: %s; output: %s" %
533 (result.fail_reason, result.output))
535 def _ReaddMergedNodesAndRedist(self):
536 """Readds all merging nodes and make sure their config is up-to-date.
538 @raise errors.CommandError: If anything fails.
541 for data in self.merger_data:
542 for node in data.nodes:
543 result = utils.RunCmd(["gnt-node", "add", "--readd",
544 "--no-ssh-key-check", "--force-join", node])
546 raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
547 " output: %s" % (node, result.fail_reason,
550 result = utils.RunCmd(["gnt-cluster", "redist-conf"])
552 raise errors.CommandError("Redistribution failed. Fail reason: %s;"
553 " output: %s" % (result.fail_reason,
556 # R0201: Method could be a function
557 def _StartupAllInstances(self): # pylint: disable-msg=R0201
558 """Starts up all instances (locally).
560 @raise errors.CommandError: If unable to start clusters
563 result = utils.RunCmd(["gnt-instance", "startup", "--all",
566 raise errors.CommandError("Unable to start all instances."
567 " Fail reason: %s; output: %s" %
568 (result.fail_reason, result.output))
570 # R0201: Method could be a function
571 def _VerifyCluster(self): # pylint: disable-msg=R0201
572 """Runs gnt-cluster verify to verify the health.
574 @raise errors.ProgrammError: If cluster fails on verification
577 result = utils.RunCmd(["gnt-cluster", "verify"])
579 raise errors.CommandError("Verification of cluster failed."
580 " Fail reason: %s; output: %s" %
581 (result.fail_reason, result.output))
584 """Does the actual merge.
586 It runs all the steps in the right order and updates the user about steps
587 taken. Also it keeps track of rollback_steps to undo everything.
592 logging.info("Pre cluster verification")
593 self._VerifyCluster()
595 logging.info("Prepare authorized_keys")
596 rbsteps.append("Remove our key from authorized_keys on nodes:"
598 self._PrepareAuthorizedKeys()
600 rbsteps.append("Start all instances again on the merging"
601 " clusters: %(clusters)s")
602 logging.info("Stopping merging instances (takes a while)")
603 self._StopMergingInstances()
605 logging.info("Disable watcher")
606 self._DisableWatcher()
607 logging.info("Stop daemons on merging nodes")
609 logging.info("Merging config")
610 self._FetchRemoteConfig()
612 logging.info("Stopping master daemon")
613 self._KillMasterDaemon()
615 rbsteps.append("Restore %s from another master candidate"
616 " and restart master daemon" %
617 constants.CLUSTER_CONF_FILE)
619 self._StartMasterDaemon(no_vote=True)
621 # Point of no return, delete rbsteps
624 logging.warning("We are at the point of no return. Merge can not easily"
625 " be undone after this point.")
626 logging.info("Readd nodes")
627 self._ReaddMergedNodesAndRedist()
629 logging.info("Merge done, restart master daemon normally")
630 self._KillMasterDaemon()
631 self._StartMasterDaemon()
633 logging.info("Starting instances again")
634 self._StartupAllInstances()
635 logging.info("Post cluster verification")
636 self._VerifyCluster()
637 except errors.GenericError, e:
641 nodes = Flatten([data.nodes for data in self.merger_data])
643 "clusters": self.clusters,
646 logging.critical("In order to rollback do the following:")
648 logging.critical(" * %s", step % info)
650 logging.critical("Nothing to rollback.")
652 # TODO: Keep track of steps done for a flawless resume?
655 """Clean up our environment.
657 This cleans up remote private keys and configs and after that
658 deletes the temporary directory.
661 shutil.rmtree(self.work_dir)
664 def SetupLogging(options):
665 """Setting up logging infrastructure.
667 @param options: Parsed command line options
670 formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
672 stderr_handler = logging.StreamHandler()
673 stderr_handler.setFormatter(formatter)
675 stderr_handler.setLevel(logging.NOTSET)
676 elif options.verbose:
677 stderr_handler.setLevel(logging.INFO)
679 stderr_handler.setLevel(logging.WARNING)
681 root_logger = logging.getLogger("")
682 root_logger.setLevel(logging.NOTSET)
683 root_logger.addHandler(stderr_handler)
690 program = os.path.basename(sys.argv[0])
692 parser = optparse.OptionParser(usage=("%%prog [--debug|--verbose]"
693 " [--watcher-pause-period SECONDS]"
694 " [--groups [%s|%s]]"
695 " <cluster> [<cluster...>]" %
696 (_GROUPS_MERGE, _GROUPS_RENAME)),
698 parser.add_option(cli.DEBUG_OPT)
699 parser.add_option(cli.VERBOSE_OPT)
700 parser.add_option(PAUSE_PERIOD_OPT)
701 parser.add_option(GROUPS_OPT)
703 (options, args) = parser.parse_args()
705 SetupLogging(options)
708 parser.error("No clusters specified")
710 cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
714 cluster_merger.Setup()
715 cluster_merger.Merge()
716 except errors.GenericError, e:
718 return constants.EXIT_FAILURE
720 cluster_merger.Cleanup()
722 return constants.EXIT_SUCCESS
725 if __name__ == "__main__":