4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Tool to merge two or more clusters together.
23 The clusters have to run the same version of Ganeti!
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
45 _GROUPS_MERGE = "merge"
46 _GROUPS_RENAME = "rename"
47 _CLUSTERMERGE_ECID = "clustermerge-ecid"
50 _RESTART_NONE = "none"
51 _RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
54 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
55 action="store", type="int",
57 help=("Amount of time in seconds watcher"
58 " should be suspended from running"))
59 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
60 choices=(_GROUPS_MERGE, _GROUPS_RENAME),
62 help=("How to handle groups that have the"
63 " same name (One of: %s/%s)" %
64 (_GROUPS_MERGE, _GROUPS_RENAME)))
65 RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
67 choices=_RESTART_CHOICES,
69 help=("How to handle restarting instances"
70 " same name (One of: %s/%s/%s)" %
74 def Flatten(unflattened_list):
77 @param unflattened_list: A list of unflattened list objects.
78 @return: A flattened list
83 for item in unflattened_list:
84 if isinstance(item, list):
85 flattened_list.extend(Flatten(item))
87 flattened_list.append(item)
91 class MergerData(object):
92 """Container class to hold data used for merger.
95 def __init__(self, cluster, key_path, nodes, instances, config_path=None):
96 """Initialize the container.
98 @param cluster: The name of the cluster
99 @param key_path: Path to the ssh private key used for authentication
100 @param nodes: List of online nodes in the merging cluster
101 @param instances: List of instances running on merging cluster
102 @param config_path: Path to the merging cluster config
105 self.cluster = cluster
106 self.key_path = key_path
108 self.instances = instances
109 self.config_path = config_path
112 class Merger(object):
113 """Handling the merge.
116 def __init__(self, clusters, pause_period, groups, restart):
117 """Initialize object with sane defaults and infos required.
119 @param clusters: The list of clusters to merge in
120 @param pause_period: The time watcher shall be disabled for
121 @param groups: How to handle group conflicts
122 @param restart: How to handle instance restart
125 self.merger_data = []
126 self.clusters = clusters
127 self.pause_period = pause_period
128 self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
129 (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
130 self.ssh_runner = ssh.SshRunner(self.cluster_name)
132 self.restart = restart
133 if self.restart == _RESTART_UP:
134 raise NotImplementedError
138 """Sets up our end so we can do the merger.
140 This method is setting us up as a preparation for the merger.
141 It makes the initial contact and gathers information needed.
143 @raise errors.RemoteError: for errors in communication/grabbing
146 (remote_path, _, _) = ssh.GetUserFiles("root")
148 if self.cluster_name in self.clusters:
149 raise errors.CommandError("Cannot merge cluster %s with itself" %
152 # Fetch remotes private key
153 for cluster in self.clusters:
154 result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
157 raise errors.RemoteError("There was an error while grabbing ssh private"
158 " key from %s. Fail reason: %s; output: %s" %
159 (cluster, result.fail_reason, result.output))
161 key_path = utils.PathJoin(self.work_dir, cluster)
162 utils.WriteFile(key_path, mode=0600, data=result.stdout)
164 result = self._RunCmd(cluster, "gnt-node list -o name,offline"
165 " --no-header --separator=,", private_key=key_path)
167 raise errors.RemoteError("Unable to retrieve list of nodes from %s."
168 " Fail reason: %s; output: %s" %
169 (cluster, result.fail_reason, result.output))
170 nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
171 nodes = [node_status[0] for node_status in nodes_statuses
172 if node_status[1] == "N"]
174 result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
175 private_key=key_path)
177 raise errors.RemoteError("Unable to retrieve list of instances from"
178 " %s. Fail reason: %s; output: %s" %
179 (cluster, result.fail_reason, result.output))
180 instances = result.stdout.splitlines()
182 self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
184 def _PrepareAuthorizedKeys(self):
185 """Prepare the authorized_keys on every merging node.
187 This method add our public key to remotes authorized_key for further
191 (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
192 pub_key = utils.ReadFile(pub_key_file)
194 for data in self.merger_data:
195 for node in data.nodes:
196 result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
197 (auth_keys, pub_key)),
198 private_key=data.key_path)
201 raise errors.RemoteError("Unable to add our public key to %s in %s."
202 " Fail reason: %s; output: %s" %
203 (node, data.cluster, result.fail_reason,
206 def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
207 strict_host_check=False, private_key=None, batch=True,
209 """Wrapping SshRunner.Run with default parameters.
211 For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
214 return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
215 use_cluster_key=use_cluster_key,
216 strict_host_check=strict_host_check,
217 private_key=private_key, batch=batch,
220 def _StopMergingInstances(self):
221 """Stop instances on merging clusters.
224 for cluster in self.clusters:
225 result = self._RunCmd(cluster, "gnt-instance shutdown --all"
229 raise errors.RemoteError("Unable to stop instances on %s."
230 " Fail reason: %s; output: %s" %
231 (cluster, result.fail_reason, result.output))
233 def _DisableWatcher(self):
234 """Disable watch on all merging clusters, including ourself.
237 for cluster in ["localhost"] + self.clusters:
238 result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
242 raise errors.RemoteError("Unable to pause watcher on %s."
243 " Fail reason: %s; output: %s" %
244 (cluster, result.fail_reason, result.output))
246 def _StopDaemons(self):
247 """Stop all daemons on merging nodes.
250 cmd = "%s stop-all" % constants.DAEMON_UTIL
251 for data in self.merger_data:
252 for node in data.nodes:
253 result = self._RunCmd(node, cmd)
256 raise errors.RemoteError("Unable to stop daemons on %s."
257 " Fail reason: %s; output: %s." %
258 (node, result.fail_reason, result.output))
260 def _FetchRemoteConfig(self):
261 """Fetches and stores remote cluster config from the master.
263 This step is needed before we can merge the config.
266 for data in self.merger_data:
267 result = self._RunCmd(data.cluster, "cat %s" %
268 constants.CLUSTER_CONF_FILE)
271 raise errors.RemoteError("Unable to retrieve remote config on %s."
272 " Fail reason: %s; output %s" %
273 (data.cluster, result.fail_reason,
276 data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
278 utils.WriteFile(data.config_path, data=result.stdout)
280 # R0201: Method could be a function
281 def _KillMasterDaemon(self): # pylint: disable-msg=R0201
282 """Kills the local master daemon.
284 @raise errors.CommandError: If unable to kill
287 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
289 raise errors.CommandError("Unable to stop master daemons."
290 " Fail reason: %s; output: %s" %
291 (result.fail_reason, result.output))
293 def _MergeConfig(self):
294 """Merges all foreign config into our own config.
297 my_config = config.ConfigWriter(offline=True)
298 fake_ec_id = 0 # Needs to be uniq over the whole config merge
300 for data in self.merger_data:
301 other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
302 self._MergeClusterConfigs(my_config, other_config)
303 self._MergeNodeGroups(my_config, other_config)
305 for node in other_config.GetNodeList():
306 node_info = other_config.GetNodeInfo(node)
307 my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
310 for instance in other_config.GetInstanceList():
311 instance_info = other_config.GetInstanceInfo(instance)
313 # Update the DRBD port assignments
314 # This is a little bit hackish
315 for dsk in instance_info.disks:
316 if dsk.dev_type in constants.LDS_DRBD:
317 port = my_config.AllocatePort()
319 logical_id = list(dsk.logical_id)
321 dsk.logical_id = tuple(logical_id)
323 physical_id = list(dsk.physical_id)
324 physical_id[1] = physical_id[3] = port
325 dsk.physical_id = tuple(physical_id)
327 my_config.AddInstance(instance_info,
328 _CLUSTERMERGE_ECID + str(fake_ec_id))
331 # R0201: Method could be a function
332 def _MergeClusterConfigs(self, my_config, other_config):
333 """Checks that all relevant cluster parameters are compatible
336 # pylint: disable-msg=R0201
337 my_cluster = my_config.GetClusterInfo()
338 other_cluster = other_config.GetClusterInfo()
346 "default_iallocator",
347 "drbd_usermode_helper",
350 "maintain_node_health",
359 for param_name in check_params:
360 my_param = getattr(my_cluster, param_name)
361 other_param = getattr(other_cluster, param_name)
362 if my_param != other_param:
363 logging.error("The value (%s) of the cluster parameter %s on %s"
364 " differs to this cluster's value (%s)",
365 other_param, param_name, other_cluster.cluster_name,
373 # Check default hypervisor
374 my_defhyp = my_cluster.enabled_hypervisors[0]
375 other_defhyp = other_cluster.enabled_hypervisors[0]
376 if my_defhyp != other_defhyp:
377 logging.warning("The default hypervisor (%s) differs on %s, new"
378 " instances will be created with this cluster's"
379 " default hypervisor (%s)", other_defhyp,
380 other_cluster.cluster_name, my_defhyp)
382 if (set(my_cluster.enabled_hypervisors) !=
383 set(other_cluster.enabled_hypervisors)):
384 logging.error("The set of enabled hypervisors (%s) on %s differs to"
385 " this cluster's set (%s)",
386 other_cluster.enabled_hypervisors,
387 other_cluster.cluster_name, my_cluster.enabled_hypervisors)
390 # Check hypervisor params for hypervisors we care about
391 # TODO: we probably don't care about all params for a given hypervisor
392 for hyp in my_cluster.enabled_hypervisors:
393 for param in my_cluster.hvparams[hyp]:
394 my_value = my_cluster.hvparams[hyp][param]
395 other_value = other_cluster.hvparams[hyp][param]
396 if my_value != other_value:
397 logging.error("The value (%s) of the %s parameter of the %s"
398 " hypervisor on %s differs to this cluster's parameter"
400 other_value, param, hyp, other_cluster.cluster_name,
404 # Check os hypervisor params for hypervisors we care about
405 for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
406 for hyp in my_cluster.enabled_hypervisors:
407 my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
408 other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
409 if my_os_hvp != other_os_hvp:
410 logging.error("The OS parameters (%s) for the %s OS for the %s"
411 " hypervisor on %s differs to this cluster's parameters"
413 other_os_hvp, os_name, hyp, other_cluster.cluster_name,
420 if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
421 logging.warning("The modify_etc_hosts value (%s) differs on %s,"
422 " this cluster's value (%s) will take precedence",
423 other_cluster.modify_etc_hosts,
424 other_cluster.cluster_name,
425 my_cluster.modify_etc_hosts)
427 if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
428 logging.warning("The modify_ssh_setup value (%s) differs on %s,"
429 " this cluster's value (%s) will take precedence",
430 other_cluster.modify_ssh_setup,
431 other_cluster.cluster_name,
432 my_cluster.modify_ssh_setup)
437 my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
438 other_cluster.reserved_lvs))
440 if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
441 logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
442 " cluster's value (%s). The least permissive value (%s)"
443 " will be used", other_cluster.prealloc_wipe_disks,
444 other_cluster.cluster_name,
445 my_cluster.prealloc_wipe_disks, True)
446 my_cluster.prealloc_wipe_disks = True
448 for os_, osparams in other_cluster.osparams.items():
449 if os_ not in my_cluster.osparams:
450 my_cluster.osparams[os_] = osparams
451 elif my_cluster.osparams[os_] != osparams:
452 logging.error("The OS parameters (%s) for the %s OS on %s differs to"
453 " this cluster's parameters (%s)",
454 osparams, os_, other_cluster.cluster_name,
455 my_cluster.osparams[os_])
459 raise errors.ConfigurationError("Cluster config for %s has incompatible"
460 " values, please fix and re-run" %
461 other_cluster.cluster_name)
463 # R0201: Method could be a function
464 def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201
465 if os_name in cluster.os_hvp:
466 return cluster.os_hvp[os_name].get(hyp, None)
470 # R0201: Method could be a function
471 def _MergeNodeGroups(self, my_config, other_config):
472 """Adds foreign node groups
474 ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
476 # pylint: disable-msg=R0201
477 logging.info("Node group conflict strategy: %s", self.groups)
479 my_grps = my_config.GetAllNodeGroupsInfo().values()
480 other_grps = other_config.GetAllNodeGroupsInfo().values()
482 # Check for node group naming conflicts:
484 for other_grp in other_grps:
485 for my_grp in my_grps:
486 if other_grp.name == my_grp.name:
487 conflicts.append(other_grp)
490 conflict_names = utils.CommaJoin([g.name for g in conflicts])
491 logging.info("Node groups in both local and remote cluster: %s",
494 # User hasn't specified how to handle conflicts
496 raise errors.CommandError("The following node group(s) are in both"
497 " clusters, and no merge strategy has been"
498 " supplied (see the --groups option): %s" %
501 # User wants to rename conflicts
502 elif self.groups == _GROUPS_RENAME:
503 for grp in conflicts:
504 new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
505 logging.info("Renaming remote node group from %s to %s"
506 " to resolve conflict", grp.name, new_name)
509 # User wants to merge conflicting groups
510 elif self.groups == _GROUPS_MERGE:
511 for other_grp in conflicts:
512 logging.info("Merging local and remote '%s' groups", other_grp.name)
513 for node_name in other_grp.members[:]:
514 node = other_config.GetNodeInfo(node_name)
515 # Access to a protected member of a client class
516 # pylint: disable-msg=W0212
517 other_config._UnlockedRemoveNodeFromGroup(node)
519 # Access to a protected member of a client class
520 # pylint: disable-msg=W0212
521 my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
523 # Access to a protected member of a client class
524 # pylint: disable-msg=W0212
525 my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
526 node.group = my_grp_uuid
527 # Remove from list of groups to add
528 other_grps.remove(other_grp)
530 for grp in other_grps:
531 #TODO: handle node group conflicts
532 my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
534 # R0201: Method could be a function
535 def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
536 """Starts the local master daemon.
538 @param no_vote: Should the masterd started without voting? default: False
539 @raise errors.CommandError: If unable to start daemon.
544 env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
546 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
548 raise errors.CommandError("Couldn't start ganeti master."
549 " Fail reason: %s; output: %s" %
550 (result.fail_reason, result.output))
552 def _ReaddMergedNodesAndRedist(self):
553 """Readds all merging nodes and make sure their config is up-to-date.
555 @raise errors.CommandError: If anything fails.
558 for data in self.merger_data:
559 for node in data.nodes:
560 result = utils.RunCmd(["gnt-node", "add", "--readd",
561 "--no-ssh-key-check", "--force-join", node])
563 raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
564 " output: %s" % (node, result.fail_reason,
567 result = utils.RunCmd(["gnt-cluster", "redist-conf"])
569 raise errors.CommandError("Redistribution failed. Fail reason: %s;"
570 " output: %s" % (result.fail_reason,
573 # R0201: Method could be a function
574 def _StartupAllInstances(self): # pylint: disable-msg=R0201
575 """Starts up all instances (locally).
577 @raise errors.CommandError: If unable to start clusters
580 result = utils.RunCmd(["gnt-instance", "startup", "--all",
583 raise errors.CommandError("Unable to start all instances."
584 " Fail reason: %s; output: %s" %
585 (result.fail_reason, result.output))
587 # R0201: Method could be a function
588 def _VerifyCluster(self): # pylint: disable-msg=R0201
589 """Runs gnt-cluster verify to verify the health.
591 @raise errors.ProgrammError: If cluster fails on verification
594 result = utils.RunCmd(["gnt-cluster", "verify"])
596 raise errors.CommandError("Verification of cluster failed."
597 " Fail reason: %s; output: %s" %
598 (result.fail_reason, result.output))
601 """Does the actual merge.
603 It runs all the steps in the right order and updates the user about steps
604 taken. Also it keeps track of rollback_steps to undo everything.
609 logging.info("Pre cluster verification")
610 self._VerifyCluster()
612 logging.info("Prepare authorized_keys")
613 rbsteps.append("Remove our key from authorized_keys on nodes:"
615 self._PrepareAuthorizedKeys()
617 rbsteps.append("Start all instances again on the merging"
618 " clusters: %(clusters)s")
619 logging.info("Stopping merging instances (takes a while)")
620 self._StopMergingInstances()
622 logging.info("Disable watcher")
623 self._DisableWatcher()
624 logging.info("Stop daemons on merging nodes")
626 logging.info("Merging config")
627 self._FetchRemoteConfig()
629 logging.info("Stopping master daemon")
630 self._KillMasterDaemon()
632 rbsteps.append("Restore %s from another master candidate"
633 " and restart master daemon" %
634 constants.CLUSTER_CONF_FILE)
636 self._StartMasterDaemon(no_vote=True)
638 # Point of no return, delete rbsteps
641 logging.warning("We are at the point of no return. Merge can not easily"
642 " be undone after this point.")
643 logging.info("Readd nodes")
644 self._ReaddMergedNodesAndRedist()
646 logging.info("Merge done, restart master daemon normally")
647 self._KillMasterDaemon()
648 self._StartMasterDaemon()
650 if self.restart == _RESTART_ALL:
651 logging.info("Starting instances again")
652 self._StartupAllInstances()
654 logging.info("Not starting instances again")
655 logging.info("Post cluster verification")
656 self._VerifyCluster()
657 except errors.GenericError, e:
661 nodes = Flatten([data.nodes for data in self.merger_data])
663 "clusters": self.clusters,
666 logging.critical("In order to rollback do the following:")
668 logging.critical(" * %s", step % info)
670 logging.critical("Nothing to rollback.")
672 # TODO: Keep track of steps done for a flawless resume?
675 """Clean up our environment.
677 This cleans up remote private keys and configs and after that
678 deletes the temporary directory.
681 shutil.rmtree(self.work_dir)
684 def SetupLogging(options):
685 """Setting up logging infrastructure.
687 @param options: Parsed command line options
690 formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
692 stderr_handler = logging.StreamHandler()
693 stderr_handler.setFormatter(formatter)
695 stderr_handler.setLevel(logging.NOTSET)
696 elif options.verbose:
697 stderr_handler.setLevel(logging.INFO)
699 stderr_handler.setLevel(logging.WARNING)
701 root_logger = logging.getLogger("")
702 root_logger.setLevel(logging.NOTSET)
703 root_logger.addHandler(stderr_handler)
710 program = os.path.basename(sys.argv[0])
712 parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
714 parser.add_option(cli.DEBUG_OPT)
715 parser.add_option(cli.VERBOSE_OPT)
716 parser.add_option(PAUSE_PERIOD_OPT)
717 parser.add_option(GROUPS_OPT)
718 parser.add_option(RESTART_OPT)
720 (options, args) = parser.parse_args()
722 SetupLogging(options)
725 parser.error("No clusters specified")
727 cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
728 options.groups, options.restart)
731 cluster_merger.Setup()
732 cluster_merger.Merge()
733 except errors.GenericError, e:
735 return constants.EXIT_FAILURE
737 cluster_merger.Cleanup()
739 return constants.EXIT_SUCCESS
742 if __name__ == "__main__":