4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Tool to merge two or more clusters together.
23 The clusters have to run the same version of Ganeti!
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
45 _GROUPS_MERGE = "merge"
46 _GROUPS_RENAME = "rename"
47 _CLUSTERMERGE_ECID = "clustermerge-ecid"
50 _RESTART_NONE = "none"
51 _RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
52 _PARAMS_STRICT = "strict"
54 _PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
57 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
58 action="store", type="int",
60 help=("Amount of time in seconds watcher"
61 " should be suspended from running"))
62 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
63 choices=(_GROUPS_MERGE, _GROUPS_RENAME),
65 help=("How to handle groups that have the"
66 " same name (One of: %s/%s)" %
67 (_GROUPS_MERGE, _GROUPS_RENAME)))
68 PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
70 choices=_PARAMS_CHOICES,
72 help=("How to handle params that have"
73 " different values (One of: %s/%s)" %
76 RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
78 choices=_RESTART_CHOICES,
80 help=("How to handle restarting instances"
81 " same name (One of: %s/%s/%s)" %
84 SKIP_STOP_INSTANCES_OPT = cli.cli_option("--skip-stop-instances", default=True,
85 action="store_false", type="boolean",
86 dest="stop_instances",
87 help=("Don't stop the instances on the"
88 " clusters, just check that none"
92 def Flatten(unflattened_list):
95 @param unflattened_list: A list of unflattened list objects.
96 @return: A flattened list
101 for item in unflattened_list:
102 if isinstance(item, list):
103 flattened_list.extend(Flatten(item))
105 flattened_list.append(item)
106 return flattened_list
109 class MergerData(object):
110 """Container class to hold data used for merger.
113 def __init__(self, cluster, key_path, nodes, instances, config_path=None):
114 """Initialize the container.
116 @param cluster: The name of the cluster
117 @param key_path: Path to the ssh private key used for authentication
118 @param nodes: List of online nodes in the merging cluster
119 @param instances: List of instances running on merging cluster
120 @param config_path: Path to the merging cluster config
123 self.cluster = cluster
124 self.key_path = key_path
126 self.instances = instances
127 self.config_path = config_path
130 class Merger(object):
131 """Handling the merge.
134 RUNNING_STATUSES = frozenset([
135 constants.INSTST_RUNNING,
136 constants.INSTST_ERRORUP,
138 def __init__(self, clusters, pause_period, groups, restart, params,
140 """Initialize object with sane defaults and infos required.
142 @param clusters: The list of clusters to merge in
143 @param pause_period: The time watcher shall be disabled for
144 @param groups: How to handle group conflicts
145 @param restart: How to handle instance restart
146 @param stop_instances: Indicates whether the instances must be stopped
147 (True) or if the Merger must only check if no
148 instances are running on the mergee clusters (False)
151 self.merger_data = []
152 self.clusters = clusters
153 self.pause_period = pause_period
154 self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
155 (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
156 self.ssh_runner = ssh.SshRunner(self.cluster_name)
158 self.restart = restart
160 self.stop_instances = stop_instances
161 if self.restart == _RESTART_UP:
162 raise NotImplementedError
166 """Sets up our end so we can do the merger.
168 This method is setting us up as a preparation for the merger.
169 It makes the initial contact and gathers information needed.
171 @raise errors.RemoteError: for errors in communication/grabbing
174 (remote_path, _, _) = ssh.GetUserFiles("root")
176 if self.cluster_name in self.clusters:
177 raise errors.CommandError("Cannot merge cluster %s with itself" %
180 # Fetch remotes private key
181 for cluster in self.clusters:
182 result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
185 raise errors.RemoteError("There was an error while grabbing ssh private"
186 " key from %s. Fail reason: %s; output: %s" %
187 (cluster, result.fail_reason, result.output))
189 key_path = utils.PathJoin(self.work_dir, cluster)
190 utils.WriteFile(key_path, mode=0600, data=result.stdout)
192 result = self._RunCmd(cluster, "gnt-node list -o name,offline"
193 " --no-header --separator=,", private_key=key_path)
195 raise errors.RemoteError("Unable to retrieve list of nodes from %s."
196 " Fail reason: %s; output: %s" %
197 (cluster, result.fail_reason, result.output))
198 nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
199 nodes = [node_status[0] for node_status in nodes_statuses
200 if node_status[1] == "N"]
202 result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
203 private_key=key_path)
205 raise errors.RemoteError("Unable to retrieve list of instances from"
206 " %s. Fail reason: %s; output: %s" %
207 (cluster, result.fail_reason, result.output))
208 instances = result.stdout.splitlines()
210 self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
212 def _PrepareAuthorizedKeys(self):
213 """Prepare the authorized_keys on every merging node.
215 This method add our public key to remotes authorized_key for further
219 (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
220 pub_key = utils.ReadFile(pub_key_file)
222 for data in self.merger_data:
223 for node in data.nodes:
224 result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
225 (auth_keys, pub_key)),
226 private_key=data.key_path, max_attempts=3)
229 raise errors.RemoteError("Unable to add our public key to %s in %s."
230 " Fail reason: %s; output: %s" %
231 (node, data.cluster, result.fail_reason,
234 def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
235 strict_host_check=False, private_key=None, batch=True,
236 ask_key=False, max_attempts=1):
237 """Wrapping SshRunner.Run with default parameters.
239 For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
242 for _ in range(max_attempts):
243 result = self.ssh_runner.Run(hostname=hostname, command=command,
244 user=user, use_cluster_key=use_cluster_key,
245 strict_host_check=strict_host_check,
246 private_key=private_key, batch=batch,
248 if not result.failed:
253 def _CheckRunningInstances(self):
254 """Checks if on the clusters to be merged there are running instances
257 @return: True if there are running instances, False otherwise
260 for cluster in self.clusters:
261 result = self._RunCmd(cluster, "gnt-instance list -o status")
262 if self.RUNNING_STATUSES.intersection(result.output.splitlines()):
267 def _StopMergingInstances(self):
268 """Stop instances on merging clusters.
271 for cluster in self.clusters:
272 result = self._RunCmd(cluster, "gnt-instance shutdown --all"
276 raise errors.RemoteError("Unable to stop instances on %s."
277 " Fail reason: %s; output: %s" %
278 (cluster, result.fail_reason, result.output))
280 def _DisableWatcher(self):
281 """Disable watch on all merging clusters, including ourself.
284 for cluster in ["localhost"] + self.clusters:
285 result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
289 raise errors.RemoteError("Unable to pause watcher on %s."
290 " Fail reason: %s; output: %s" %
291 (cluster, result.fail_reason, result.output))
293 def _StopDaemons(self):
294 """Stop all daemons on merging nodes.
297 cmd = "%s stop-all" % constants.DAEMON_UTIL
298 for data in self.merger_data:
299 for node in data.nodes:
300 result = self._RunCmd(node, cmd, max_attempts=3)
303 raise errors.RemoteError("Unable to stop daemons on %s."
304 " Fail reason: %s; output: %s." %
305 (node, result.fail_reason, result.output))
307 def _FetchRemoteConfig(self):
308 """Fetches and stores remote cluster config from the master.
310 This step is needed before we can merge the config.
313 for data in self.merger_data:
314 result = self._RunCmd(data.cluster, "cat %s" %
315 constants.CLUSTER_CONF_FILE)
318 raise errors.RemoteError("Unable to retrieve remote config on %s."
319 " Fail reason: %s; output %s" %
320 (data.cluster, result.fail_reason,
323 data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
325 utils.WriteFile(data.config_path, data=result.stdout)
327 # R0201: Method could be a function
328 def _KillMasterDaemon(self): # pylint: disable-msg=R0201
329 """Kills the local master daemon.
331 @raise errors.CommandError: If unable to kill
334 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
336 raise errors.CommandError("Unable to stop master daemons."
337 " Fail reason: %s; output: %s" %
338 (result.fail_reason, result.output))
340 def _MergeConfig(self):
341 """Merges all foreign config into our own config.
344 my_config = config.ConfigWriter(offline=True)
345 fake_ec_id = 0 # Needs to be uniq over the whole config merge
347 for data in self.merger_data:
348 other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
349 self._MergeClusterConfigs(my_config, other_config)
350 self._MergeNodeGroups(my_config, other_config)
352 for node in other_config.GetNodeList():
353 node_info = other_config.GetNodeInfo(node)
354 # Offline the node, it will be reonlined later at node readd
355 node_info.master_candidate = False
356 node_info.drained = False
357 node_info.offline = True
358 my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
361 for instance in other_config.GetInstanceList():
362 instance_info = other_config.GetInstanceInfo(instance)
364 # Update the DRBD port assignments
365 # This is a little bit hackish
366 for dsk in instance_info.disks:
367 if dsk.dev_type in constants.LDS_DRBD:
368 port = my_config.AllocatePort()
370 logical_id = list(dsk.logical_id)
372 dsk.logical_id = tuple(logical_id)
374 physical_id = list(dsk.physical_id)
375 physical_id[1] = physical_id[3] = port
376 dsk.physical_id = tuple(physical_id)
378 my_config.AddInstance(instance_info,
379 _CLUSTERMERGE_ECID + str(fake_ec_id))
382 def _MergeClusterConfigs(self, my_config, other_config):
383 """Checks that all relevant cluster parameters are compatible
386 my_cluster = my_config.GetClusterInfo()
387 other_cluster = other_config.GetClusterInfo()
395 "default_iallocator",
396 "drbd_usermode_helper",
398 "maintain_node_health",
406 check_params_strict = [
409 if constants.ENABLE_FILE_STORAGE:
410 check_params_strict.append("file_storage_dir")
411 if constants.ENABLE_SHARED_FILE_STORAGE:
412 check_params_strict.append("shared_file_storage_dir")
413 check_params.extend(check_params_strict)
415 if self.params == _PARAMS_STRICT:
418 params_strict = False
420 for param_name in check_params:
421 my_param = getattr(my_cluster, param_name)
422 other_param = getattr(other_cluster, param_name)
423 if my_param != other_param:
424 logging.error("The value (%s) of the cluster parameter %s on %s"
425 " differs to this cluster's value (%s)",
426 other_param, param_name, other_cluster.cluster_name,
428 if params_strict or param_name in check_params_strict:
435 # Check default hypervisor
436 my_defhyp = my_cluster.enabled_hypervisors[0]
437 other_defhyp = other_cluster.enabled_hypervisors[0]
438 if my_defhyp != other_defhyp:
439 logging.warning("The default hypervisor (%s) differs on %s, new"
440 " instances will be created with this cluster's"
441 " default hypervisor (%s)", other_defhyp,
442 other_cluster.cluster_name, my_defhyp)
444 if (set(my_cluster.enabled_hypervisors) !=
445 set(other_cluster.enabled_hypervisors)):
446 logging.error("The set of enabled hypervisors (%s) on %s differs to"
447 " this cluster's set (%s)",
448 other_cluster.enabled_hypervisors,
449 other_cluster.cluster_name, my_cluster.enabled_hypervisors)
452 # Check hypervisor params for hypervisors we care about
453 for hyp in my_cluster.enabled_hypervisors:
454 for param in my_cluster.hvparams[hyp]:
455 my_value = my_cluster.hvparams[hyp][param]
456 other_value = other_cluster.hvparams[hyp][param]
457 if my_value != other_value:
458 logging.error("The value (%s) of the %s parameter of the %s"
459 " hypervisor on %s differs to this cluster's parameter"
461 other_value, param, hyp, other_cluster.cluster_name,
466 # Check os hypervisor params for hypervisors we care about
467 for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
468 for hyp in my_cluster.enabled_hypervisors:
469 my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
470 other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
471 if my_os_hvp != other_os_hvp:
472 logging.error("The OS parameters (%s) for the %s OS for the %s"
473 " hypervisor on %s differs to this cluster's parameters"
475 other_os_hvp, os_name, hyp, other_cluster.cluster_name,
483 if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
484 logging.warning("The modify_etc_hosts value (%s) differs on %s,"
485 " this cluster's value (%s) will take precedence",
486 other_cluster.modify_etc_hosts,
487 other_cluster.cluster_name,
488 my_cluster.modify_etc_hosts)
490 if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
491 logging.warning("The modify_ssh_setup value (%s) differs on %s,"
492 " this cluster's value (%s) will take precedence",
493 other_cluster.modify_ssh_setup,
494 other_cluster.cluster_name,
495 my_cluster.modify_ssh_setup)
500 my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
501 other_cluster.reserved_lvs))
503 if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
504 logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
505 " cluster's value (%s). The least permissive value (%s)"
506 " will be used", other_cluster.prealloc_wipe_disks,
507 other_cluster.cluster_name,
508 my_cluster.prealloc_wipe_disks, True)
509 my_cluster.prealloc_wipe_disks = True
511 for os_, osparams in other_cluster.osparams.items():
512 if os_ not in my_cluster.osparams:
513 my_cluster.osparams[os_] = osparams
514 elif my_cluster.osparams[os_] != osparams:
515 logging.error("The OS parameters (%s) for the %s OS on %s differs to"
516 " this cluster's parameters (%s)",
517 osparams, os_, other_cluster.cluster_name,
518 my_cluster.osparams[os_])
523 raise errors.ConfigurationError("Cluster config for %s has incompatible"
524 " values, please fix and re-run" %
525 other_cluster.cluster_name)
527 # R0201: Method could be a function
528 def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201
529 if os_name in cluster.os_hvp:
530 return cluster.os_hvp[os_name].get(hyp, None)
534 # R0201: Method could be a function
535 def _MergeNodeGroups(self, my_config, other_config):
536 """Adds foreign node groups
538 ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
540 # pylint: disable-msg=R0201
541 logging.info("Node group conflict strategy: %s", self.groups)
543 my_grps = my_config.GetAllNodeGroupsInfo().values()
544 other_grps = other_config.GetAllNodeGroupsInfo().values()
546 # Check for node group naming conflicts:
548 for other_grp in other_grps:
549 for my_grp in my_grps:
550 if other_grp.name == my_grp.name:
551 conflicts.append(other_grp)
554 conflict_names = utils.CommaJoin([g.name for g in conflicts])
555 logging.info("Node groups in both local and remote cluster: %s",
558 # User hasn't specified how to handle conflicts
560 raise errors.CommandError("The following node group(s) are in both"
561 " clusters, and no merge strategy has been"
562 " supplied (see the --groups option): %s" %
565 # User wants to rename conflicts
566 elif self.groups == _GROUPS_RENAME:
567 for grp in conflicts:
568 new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
569 logging.info("Renaming remote node group from %s to %s"
570 " to resolve conflict", grp.name, new_name)
573 # User wants to merge conflicting groups
574 elif self.groups == _GROUPS_MERGE:
575 for other_grp in conflicts:
576 logging.info("Merging local and remote '%s' groups", other_grp.name)
577 for node_name in other_grp.members[:]:
578 node = other_config.GetNodeInfo(node_name)
579 # Access to a protected member of a client class
580 # pylint: disable-msg=W0212
581 other_config._UnlockedRemoveNodeFromGroup(node)
583 # Access to a protected member of a client class
584 # pylint: disable-msg=W0212
585 my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
587 # Access to a protected member of a client class
588 # pylint: disable-msg=W0212
589 my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
590 node.group = my_grp_uuid
591 # Remove from list of groups to add
592 other_grps.remove(other_grp)
594 for grp in other_grps:
595 #TODO: handle node group conflicts
596 my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
598 # R0201: Method could be a function
599 def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
600 """Starts the local master daemon.
602 @param no_vote: Should the masterd started without voting? default: False
603 @raise errors.CommandError: If unable to start daemon.
608 env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
610 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
612 raise errors.CommandError("Couldn't start ganeti master."
613 " Fail reason: %s; output: %s" %
614 (result.fail_reason, result.output))
616 def _ReaddMergedNodesAndRedist(self):
617 """Readds all merging nodes and make sure their config is up-to-date.
619 @raise errors.CommandError: If anything fails.
622 for data in self.merger_data:
623 for node in data.nodes:
624 result = utils.RunCmd(["gnt-node", "add", "--readd",
625 "--no-ssh-key-check", "--force-join", node])
627 logging.error("%s failed to be readded. Reason: %s, output: %s",
628 node, result.fail_reason, result.output)
630 result = utils.RunCmd(["gnt-cluster", "redist-conf"])
632 raise errors.CommandError("Redistribution failed. Fail reason: %s;"
633 " output: %s" % (result.fail_reason,
636 # R0201: Method could be a function
637 def _StartupAllInstances(self): # pylint: disable-msg=R0201
638 """Starts up all instances (locally).
640 @raise errors.CommandError: If unable to start clusters
643 result = utils.RunCmd(["gnt-instance", "startup", "--all",
646 raise errors.CommandError("Unable to start all instances."
647 " Fail reason: %s; output: %s" %
648 (result.fail_reason, result.output))
650 # R0201: Method could be a function
651 # TODO: make this overridable, for some verify errors
652 def _VerifyCluster(self): # pylint: disable-msg=R0201
653 """Runs gnt-cluster verify to verify the health.
655 @raise errors.ProgrammError: If cluster fails on verification
658 result = utils.RunCmd(["gnt-cluster", "verify"])
660 raise errors.CommandError("Verification of cluster failed."
661 " Fail reason: %s; output: %s" %
662 (result.fail_reason, result.output))
665 """Does the actual merge.
667 It runs all the steps in the right order and updates the user about steps
668 taken. Also it keeps track of rollback_steps to undo everything.
673 logging.info("Pre cluster verification")
674 self._VerifyCluster()
676 logging.info("Prepare authorized_keys")
677 rbsteps.append("Remove our key from authorized_keys on nodes:"
679 self._PrepareAuthorizedKeys()
681 rbsteps.append("Start all instances again on the merging"
682 " clusters: %(clusters)s")
683 if self.stop_instances:
684 logging.info("Stopping merging instances (takes a while)")
685 self._StopMergingInstances()
686 logging.info("Checking that no instances are running on the mergees")
687 instances_running = self._CheckRunningInstances()
688 if instances_running:
689 raise errors.CommandError("Some instances are still running on the"
691 logging.info("Disable watcher")
692 self._DisableWatcher()
693 logging.info("Stop daemons on merging nodes")
695 logging.info("Merging config")
696 self._FetchRemoteConfig()
698 logging.info("Stopping master daemon")
699 self._KillMasterDaemon()
701 rbsteps.append("Restore %s from another master candidate"
702 " and restart master daemon" %
703 constants.CLUSTER_CONF_FILE)
705 self._StartMasterDaemon(no_vote=True)
707 # Point of no return, delete rbsteps
710 logging.warning("We are at the point of no return. Merge can not easily"
711 " be undone after this point.")
712 logging.info("Readd nodes")
713 self._ReaddMergedNodesAndRedist()
715 logging.info("Merge done, restart master daemon normally")
716 self._KillMasterDaemon()
717 self._StartMasterDaemon()
719 if self.restart == _RESTART_ALL:
720 logging.info("Starting instances again")
721 self._StartupAllInstances()
723 logging.info("Not starting instances again")
724 logging.info("Post cluster verification")
725 self._VerifyCluster()
726 except errors.GenericError, e:
730 nodes = Flatten([data.nodes for data in self.merger_data])
732 "clusters": self.clusters,
735 logging.critical("In order to rollback do the following:")
737 logging.critical(" * %s", step % info)
739 logging.critical("Nothing to rollback.")
741 # TODO: Keep track of steps done for a flawless resume?
744 """Clean up our environment.
746 This cleans up remote private keys and configs and after that
747 deletes the temporary directory.
750 shutil.rmtree(self.work_dir)
753 def SetupLogging(options):
754 """Setting up logging infrastructure.
756 @param options: Parsed command line options
759 formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
761 stderr_handler = logging.StreamHandler()
762 stderr_handler.setFormatter(formatter)
764 stderr_handler.setLevel(logging.NOTSET)
765 elif options.verbose:
766 stderr_handler.setLevel(logging.INFO)
768 stderr_handler.setLevel(logging.WARNING)
770 root_logger = logging.getLogger("")
771 root_logger.setLevel(logging.NOTSET)
772 root_logger.addHandler(stderr_handler)
779 program = os.path.basename(sys.argv[0])
781 parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
783 parser.add_option(cli.DEBUG_OPT)
784 parser.add_option(cli.VERBOSE_OPT)
785 parser.add_option(PAUSE_PERIOD_OPT)
786 parser.add_option(GROUPS_OPT)
787 parser.add_option(RESTART_OPT)
788 parser.add_option(PARAMS_OPT)
789 parser.add_option(SKIP_STOP_INSTANCES_OPT)
791 (options, args) = parser.parse_args()
793 SetupLogging(options)
796 parser.error("No clusters specified")
798 cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
799 options.groups, options.restart, options.params,
800 options.stop_instances)
803 cluster_merger.Setup()
804 cluster_merger.Merge()
805 except errors.GenericError, e:
807 return constants.EXIT_FAILURE
809 cluster_merger.Cleanup()
811 return constants.EXIT_SUCCESS
814 if __name__ == "__main__":