4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Tool to merge two or more clusters together.
23 The clusters have to run the same version of Ganeti!
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
45 _GROUPS_MERGE = "merge"
46 _GROUPS_RENAME = "rename"
47 _CLUSTERMERGE_ECID = "clustermerge-ecid"
50 _RESTART_NONE = "none"
51 _RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
54 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
55 action="store", type="int",
57 help=("Amount of time in seconds watcher"
58 " should be suspended from running"))
59 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
60 choices=(_GROUPS_MERGE, _GROUPS_RENAME),
62 help=("How to handle groups that have the"
63 " same name (One of: %s/%s)" %
64 (_GROUPS_MERGE, _GROUPS_RENAME)))
65 RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
67 choices=_RESTART_CHOICES,
69 help=("How to handle restarting instances"
70 " same name (One of: %s/%s/%s)" %
74 def Flatten(unflattened_list):
77 @param unflattened_list: A list of unflattened list objects.
78 @return: A flattened list
83 for item in unflattened_list:
84 if isinstance(item, list):
85 flattened_list.extend(Flatten(item))
87 flattened_list.append(item)
91 class MergerData(object):
92 """Container class to hold data used for merger.
95 def __init__(self, cluster, key_path, nodes, instances, config_path=None):
96 """Initialize the container.
98 @param cluster: The name of the cluster
99 @param key_path: Path to the ssh private key used for authentication
100 @param nodes: List of online nodes in the merging cluster
101 @param instances: List of instances running on merging cluster
102 @param config_path: Path to the merging cluster config
105 self.cluster = cluster
106 self.key_path = key_path
108 self.instances = instances
109 self.config_path = config_path
112 class Merger(object):
113 """Handling the merge.
116 def __init__(self, clusters, pause_period, groups, restart):
117 """Initialize object with sane defaults and infos required.
119 @param clusters: The list of clusters to merge in
120 @param pause_period: The time watcher shall be disabled for
121 @param groups: How to handle group conflicts
122 @param restart: How to handle instance restart
125 self.merger_data = []
126 self.clusters = clusters
127 self.pause_period = pause_period
128 self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
129 (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
130 self.ssh_runner = ssh.SshRunner(self.cluster_name)
132 self.restart = restart
133 if self.restart == _RESTART_UP:
134 raise NotImplementedError
138 """Sets up our end so we can do the merger.
140 This method is setting us up as a preparation for the merger.
141 It makes the initial contact and gathers information needed.
143 @raise errors.RemoteError: for errors in communication/grabbing
146 (remote_path, _, _) = ssh.GetUserFiles("root")
148 if self.cluster_name in self.clusters:
149 raise errors.CommandError("Cannot merge cluster %s with itself" %
152 # Fetch remotes private key
153 for cluster in self.clusters:
154 result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
157 raise errors.RemoteError("There was an error while grabbing ssh private"
158 " key from %s. Fail reason: %s; output: %s" %
159 (cluster, result.fail_reason, result.output))
161 key_path = utils.PathJoin(self.work_dir, cluster)
162 utils.WriteFile(key_path, mode=0600, data=result.stdout)
164 result = self._RunCmd(cluster, "gnt-node list -o name,offline"
165 " --no-header --separator=,", private_key=key_path)
167 raise errors.RemoteError("Unable to retrieve list of nodes from %s."
168 " Fail reason: %s; output: %s" %
169 (cluster, result.fail_reason, result.output))
170 nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
171 nodes = [node_status[0] for node_status in nodes_statuses
172 if node_status[1] == "N"]
174 result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
175 private_key=key_path)
177 raise errors.RemoteError("Unable to retrieve list of instances from"
178 " %s. Fail reason: %s; output: %s" %
179 (cluster, result.fail_reason, result.output))
180 instances = result.stdout.splitlines()
182 self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
184 def _PrepareAuthorizedKeys(self):
185 """Prepare the authorized_keys on every merging node.
187 This method add our public key to remotes authorized_key for further
191 (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
192 pub_key = utils.ReadFile(pub_key_file)
194 for data in self.merger_data:
195 for node in data.nodes:
196 result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
197 (auth_keys, pub_key)),
198 private_key=data.key_path)
201 raise errors.RemoteError("Unable to add our public key to %s in %s."
202 " Fail reason: %s; output: %s" %
203 (node, data.cluster, result.fail_reason,
206 def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
207 strict_host_check=False, private_key=None, batch=True,
209 """Wrapping SshRunner.Run with default parameters.
211 For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
214 return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
215 use_cluster_key=use_cluster_key,
216 strict_host_check=strict_host_check,
217 private_key=private_key, batch=batch,
220 def _StopMergingInstances(self):
221 """Stop instances on merging clusters.
224 for cluster in self.clusters:
225 result = self._RunCmd(cluster, "gnt-instance shutdown --all"
229 raise errors.RemoteError("Unable to stop instances on %s."
230 " Fail reason: %s; output: %s" %
231 (cluster, result.fail_reason, result.output))
233 def _DisableWatcher(self):
234 """Disable watch on all merging clusters, including ourself.
237 for cluster in ["localhost"] + self.clusters:
238 result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
242 raise errors.RemoteError("Unable to pause watcher on %s."
243 " Fail reason: %s; output: %s" %
244 (cluster, result.fail_reason, result.output))
246 def _StopDaemons(self):
247 """Stop all daemons on merging nodes.
250 cmd = "%s stop-all" % constants.DAEMON_UTIL
251 for data in self.merger_data:
252 for node in data.nodes:
253 result = self._RunCmd(node, cmd)
256 raise errors.RemoteError("Unable to stop daemons on %s."
257 " Fail reason: %s; output: %s." %
258 (node, result.fail_reason, result.output))
260 def _FetchRemoteConfig(self):
261 """Fetches and stores remote cluster config from the master.
263 This step is needed before we can merge the config.
266 for data in self.merger_data:
267 result = self._RunCmd(data.cluster, "cat %s" %
268 constants.CLUSTER_CONF_FILE)
271 raise errors.RemoteError("Unable to retrieve remote config on %s."
272 " Fail reason: %s; output %s" %
273 (data.cluster, result.fail_reason,
276 data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
278 utils.WriteFile(data.config_path, data=result.stdout)
280 # R0201: Method could be a function
281 def _KillMasterDaemon(self): # pylint: disable-msg=R0201
282 """Kills the local master daemon.
284 @raise errors.CommandError: If unable to kill
287 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
289 raise errors.CommandError("Unable to stop master daemons."
290 " Fail reason: %s; output: %s" %
291 (result.fail_reason, result.output))
293 def _MergeConfig(self):
294 """Merges all foreign config into our own config.
297 my_config = config.ConfigWriter(offline=True)
298 fake_ec_id = 0 # Needs to be uniq over the whole config merge
300 for data in self.merger_data:
301 other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
302 self._MergeClusterConfigs(my_config, other_config)
303 self._MergeNodeGroups(my_config, other_config)
305 for node in other_config.GetNodeList():
306 node_info = other_config.GetNodeInfo(node)
307 my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
310 for instance in other_config.GetInstanceList():
311 instance_info = other_config.GetInstanceInfo(instance)
313 # Update the DRBD port assignments
314 # This is a little bit hackish
315 for dsk in instance_info.disks:
316 if dsk.dev_type in constants.LDS_DRBD:
317 port = my_config.AllocatePort()
319 logical_id = list(dsk.logical_id)
321 dsk.logical_id = tuple(logical_id)
323 physical_id = list(dsk.physical_id)
324 physical_id[1] = physical_id[3] = port
325 dsk.physical_id = tuple(physical_id)
327 my_config.AddInstance(instance_info,
328 _CLUSTERMERGE_ECID + str(fake_ec_id))
331 # R0201: Method could be a function
332 def _MergeClusterConfigs(self, my_config, other_config):
333 """Checks that all relevant cluster parameters are compatible
336 # pylint: disable-msg=R0201
337 my_cluster = my_config.GetClusterInfo()
338 other_cluster = other_config.GetClusterInfo()
346 "default_iallocator",
347 "drbd_usermode_helper",
349 "maintain_node_health",
358 if constants.ENABLE_FILE_STORAGE:
359 check_params.append("file_storage_dir")
361 for param_name in check_params:
362 my_param = getattr(my_cluster, param_name)
363 other_param = getattr(other_cluster, param_name)
364 if my_param != other_param:
365 logging.error("The value (%s) of the cluster parameter %s on %s"
366 " differs to this cluster's value (%s)",
367 other_param, param_name, other_cluster.cluster_name,
375 # Check default hypervisor
376 my_defhyp = my_cluster.enabled_hypervisors[0]
377 other_defhyp = other_cluster.enabled_hypervisors[0]
378 if my_defhyp != other_defhyp:
379 logging.warning("The default hypervisor (%s) differs on %s, new"
380 " instances will be created with this cluster's"
381 " default hypervisor (%s)", other_defhyp,
382 other_cluster.cluster_name, my_defhyp)
384 if (set(my_cluster.enabled_hypervisors) !=
385 set(other_cluster.enabled_hypervisors)):
386 logging.error("The set of enabled hypervisors (%s) on %s differs to"
387 " this cluster's set (%s)",
388 other_cluster.enabled_hypervisors,
389 other_cluster.cluster_name, my_cluster.enabled_hypervisors)
392 # Check hypervisor params for hypervisors we care about
393 # TODO: we probably don't care about all params for a given hypervisor
394 for hyp in my_cluster.enabled_hypervisors:
395 for param in my_cluster.hvparams[hyp]:
396 my_value = my_cluster.hvparams[hyp][param]
397 other_value = other_cluster.hvparams[hyp][param]
398 if my_value != other_value:
399 logging.error("The value (%s) of the %s parameter of the %s"
400 " hypervisor on %s differs to this cluster's parameter"
402 other_value, param, hyp, other_cluster.cluster_name,
406 # Check os hypervisor params for hypervisors we care about
407 for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
408 for hyp in my_cluster.enabled_hypervisors:
409 my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
410 other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
411 if my_os_hvp != other_os_hvp:
412 logging.error("The OS parameters (%s) for the %s OS for the %s"
413 " hypervisor on %s differs to this cluster's parameters"
415 other_os_hvp, os_name, hyp, other_cluster.cluster_name,
422 if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
423 logging.warning("The modify_etc_hosts value (%s) differs on %s,"
424 " this cluster's value (%s) will take precedence",
425 other_cluster.modify_etc_hosts,
426 other_cluster.cluster_name,
427 my_cluster.modify_etc_hosts)
429 if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
430 logging.warning("The modify_ssh_setup value (%s) differs on %s,"
431 " this cluster's value (%s) will take precedence",
432 other_cluster.modify_ssh_setup,
433 other_cluster.cluster_name,
434 my_cluster.modify_ssh_setup)
439 my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
440 other_cluster.reserved_lvs))
442 if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
443 logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
444 " cluster's value (%s). The least permissive value (%s)"
445 " will be used", other_cluster.prealloc_wipe_disks,
446 other_cluster.cluster_name,
447 my_cluster.prealloc_wipe_disks, True)
448 my_cluster.prealloc_wipe_disks = True
450 for os_, osparams in other_cluster.osparams.items():
451 if os_ not in my_cluster.osparams:
452 my_cluster.osparams[os_] = osparams
453 elif my_cluster.osparams[os_] != osparams:
454 logging.error("The OS parameters (%s) for the %s OS on %s differs to"
455 " this cluster's parameters (%s)",
456 osparams, os_, other_cluster.cluster_name,
457 my_cluster.osparams[os_])
461 raise errors.ConfigurationError("Cluster config for %s has incompatible"
462 " values, please fix and re-run" %
463 other_cluster.cluster_name)
465 # R0201: Method could be a function
466 def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201
467 if os_name in cluster.os_hvp:
468 return cluster.os_hvp[os_name].get(hyp, None)
472 # R0201: Method could be a function
473 def _MergeNodeGroups(self, my_config, other_config):
474 """Adds foreign node groups
476 ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
478 # pylint: disable-msg=R0201
479 logging.info("Node group conflict strategy: %s", self.groups)
481 my_grps = my_config.GetAllNodeGroupsInfo().values()
482 other_grps = other_config.GetAllNodeGroupsInfo().values()
484 # Check for node group naming conflicts:
486 for other_grp in other_grps:
487 for my_grp in my_grps:
488 if other_grp.name == my_grp.name:
489 conflicts.append(other_grp)
492 conflict_names = utils.CommaJoin([g.name for g in conflicts])
493 logging.info("Node groups in both local and remote cluster: %s",
496 # User hasn't specified how to handle conflicts
498 raise errors.CommandError("The following node group(s) are in both"
499 " clusters, and no merge strategy has been"
500 " supplied (see the --groups option): %s" %
503 # User wants to rename conflicts
504 elif self.groups == _GROUPS_RENAME:
505 for grp in conflicts:
506 new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
507 logging.info("Renaming remote node group from %s to %s"
508 " to resolve conflict", grp.name, new_name)
511 # User wants to merge conflicting groups
512 elif self.groups == _GROUPS_MERGE:
513 for other_grp in conflicts:
514 logging.info("Merging local and remote '%s' groups", other_grp.name)
515 for node_name in other_grp.members[:]:
516 node = other_config.GetNodeInfo(node_name)
517 # Access to a protected member of a client class
518 # pylint: disable-msg=W0212
519 other_config._UnlockedRemoveNodeFromGroup(node)
521 # Access to a protected member of a client class
522 # pylint: disable-msg=W0212
523 my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
525 # Access to a protected member of a client class
526 # pylint: disable-msg=W0212
527 my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
528 node.group = my_grp_uuid
529 # Remove from list of groups to add
530 other_grps.remove(other_grp)
532 for grp in other_grps:
533 #TODO: handle node group conflicts
534 my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
536 # R0201: Method could be a function
537 def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
538 """Starts the local master daemon.
540 @param no_vote: Should the masterd started without voting? default: False
541 @raise errors.CommandError: If unable to start daemon.
546 env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
548 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
550 raise errors.CommandError("Couldn't start ganeti master."
551 " Fail reason: %s; output: %s" %
552 (result.fail_reason, result.output))
554 def _ReaddMergedNodesAndRedist(self):
555 """Readds all merging nodes and make sure their config is up-to-date.
557 @raise errors.CommandError: If anything fails.
560 for data in self.merger_data:
561 for node in data.nodes:
562 result = utils.RunCmd(["gnt-node", "add", "--readd",
563 "--no-ssh-key-check", "--force-join", node])
565 raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
566 " output: %s" % (node, result.fail_reason,
569 result = utils.RunCmd(["gnt-cluster", "redist-conf"])
571 raise errors.CommandError("Redistribution failed. Fail reason: %s;"
572 " output: %s" % (result.fail_reason,
575 # R0201: Method could be a function
576 def _StartupAllInstances(self): # pylint: disable-msg=R0201
577 """Starts up all instances (locally).
579 @raise errors.CommandError: If unable to start clusters
582 result = utils.RunCmd(["gnt-instance", "startup", "--all",
585 raise errors.CommandError("Unable to start all instances."
586 " Fail reason: %s; output: %s" %
587 (result.fail_reason, result.output))
589 # R0201: Method could be a function
590 def _VerifyCluster(self): # pylint: disable-msg=R0201
591 """Runs gnt-cluster verify to verify the health.
593 @raise errors.ProgrammError: If cluster fails on verification
596 result = utils.RunCmd(["gnt-cluster", "verify"])
598 raise errors.CommandError("Verification of cluster failed."
599 " Fail reason: %s; output: %s" %
600 (result.fail_reason, result.output))
603 """Does the actual merge.
605 It runs all the steps in the right order and updates the user about steps
606 taken. Also it keeps track of rollback_steps to undo everything.
611 logging.info("Pre cluster verification")
612 self._VerifyCluster()
614 logging.info("Prepare authorized_keys")
615 rbsteps.append("Remove our key from authorized_keys on nodes:"
617 self._PrepareAuthorizedKeys()
619 rbsteps.append("Start all instances again on the merging"
620 " clusters: %(clusters)s")
621 logging.info("Stopping merging instances (takes a while)")
622 self._StopMergingInstances()
624 logging.info("Disable watcher")
625 self._DisableWatcher()
626 logging.info("Stop daemons on merging nodes")
628 logging.info("Merging config")
629 self._FetchRemoteConfig()
631 logging.info("Stopping master daemon")
632 self._KillMasterDaemon()
634 rbsteps.append("Restore %s from another master candidate"
635 " and restart master daemon" %
636 constants.CLUSTER_CONF_FILE)
638 self._StartMasterDaemon(no_vote=True)
640 # Point of no return, delete rbsteps
643 logging.warning("We are at the point of no return. Merge can not easily"
644 " be undone after this point.")
645 logging.info("Readd nodes")
646 self._ReaddMergedNodesAndRedist()
648 logging.info("Merge done, restart master daemon normally")
649 self._KillMasterDaemon()
650 self._StartMasterDaemon()
652 if self.restart == _RESTART_ALL:
653 logging.info("Starting instances again")
654 self._StartupAllInstances()
656 logging.info("Not starting instances again")
657 logging.info("Post cluster verification")
658 self._VerifyCluster()
659 except errors.GenericError, e:
663 nodes = Flatten([data.nodes for data in self.merger_data])
665 "clusters": self.clusters,
668 logging.critical("In order to rollback do the following:")
670 logging.critical(" * %s", step % info)
672 logging.critical("Nothing to rollback.")
674 # TODO: Keep track of steps done for a flawless resume?
677 """Clean up our environment.
679 This cleans up remote private keys and configs and after that
680 deletes the temporary directory.
683 shutil.rmtree(self.work_dir)
686 def SetupLogging(options):
687 """Setting up logging infrastructure.
689 @param options: Parsed command line options
692 formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
694 stderr_handler = logging.StreamHandler()
695 stderr_handler.setFormatter(formatter)
697 stderr_handler.setLevel(logging.NOTSET)
698 elif options.verbose:
699 stderr_handler.setLevel(logging.INFO)
701 stderr_handler.setLevel(logging.WARNING)
703 root_logger = logging.getLogger("")
704 root_logger.setLevel(logging.NOTSET)
705 root_logger.addHandler(stderr_handler)
712 program = os.path.basename(sys.argv[0])
714 parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
716 parser.add_option(cli.DEBUG_OPT)
717 parser.add_option(cli.VERBOSE_OPT)
718 parser.add_option(PAUSE_PERIOD_OPT)
719 parser.add_option(GROUPS_OPT)
720 parser.add_option(RESTART_OPT)
722 (options, args) = parser.parse_args()
724 SetupLogging(options)
727 parser.error("No clusters specified")
729 cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
730 options.groups, options.restart)
733 cluster_merger.Setup()
734 cluster_merger.Merge()
735 except errors.GenericError, e:
737 return constants.EXIT_FAILURE
739 cluster_merger.Cleanup()
741 return constants.EXIT_SUCCESS
744 if __name__ == "__main__":