4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Tool to merge two or more clusters together.
23 The clusters have to run the same version of Ganeti!
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
45 _GROUPS_MERGE = "merge"
46 _GROUPS_RENAME = "rename"
47 _CLUSTERMERGE_ECID = "clustermerge-ecid"
50 _RESTART_NONE = "none"
51 _RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
52 _PARAMS_STRICT = "strict"
54 _PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
57 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
58 action="store", type="int",
60 help=("Amount of time in seconds watcher"
61 " should be suspended from running"))
62 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
63 choices=(_GROUPS_MERGE, _GROUPS_RENAME),
65 help=("How to handle groups that have the"
66 " same name (One of: %s/%s)" %
67 (_GROUPS_MERGE, _GROUPS_RENAME)))
68 PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
70 choices=_PARAMS_CHOICES,
72 help=("How to handle params that have"
73 " different values (One of: %s/%s)" %
76 RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
78 choices=_RESTART_CHOICES,
80 help=("How to handle restarting instances"
81 " same name (One of: %s/%s/%s)" %
84 SKIP_STOP_INSTANCES_OPT = \
85 cli.cli_option("--skip-stop-instances", default=True, action="store_false",
86 dest="stop_instances",
87 help=("Don't stop the instances on the clusters, just check "
88 "that none is running"))
91 def Flatten(unflattened_list):
94 @param unflattened_list: A list of unflattened list objects.
95 @return: A flattened list
100 for item in unflattened_list:
101 if isinstance(item, list):
102 flattened_list.extend(Flatten(item))
104 flattened_list.append(item)
105 return flattened_list
108 class MergerData(object):
109 """Container class to hold data used for merger.
112 def __init__(self, cluster, key_path, nodes, instances, config_path=None):
113 """Initialize the container.
115 @param cluster: The name of the cluster
116 @param key_path: Path to the ssh private key used for authentication
117 @param nodes: List of online nodes in the merging cluster
118 @param instances: List of instances running on merging cluster
119 @param config_path: Path to the merging cluster config
122 self.cluster = cluster
123 self.key_path = key_path
125 self.instances = instances
126 self.config_path = config_path
129 class Merger(object):
130 """Handling the merge.
133 RUNNING_STATUSES = frozenset([
134 constants.INSTST_RUNNING,
135 constants.INSTST_ERRORUP,
138 def __init__(self, clusters, pause_period, groups, restart, params,
140 """Initialize object with sane defaults and infos required.
142 @param clusters: The list of clusters to merge in
143 @param pause_period: The time watcher shall be disabled for
144 @param groups: How to handle group conflicts
145 @param restart: How to handle instance restart
146 @param stop_instances: Indicates whether the instances must be stopped
147 (True) or if the Merger must only check if no
148 instances are running on the mergee clusters (False)
151 self.merger_data = []
152 self.clusters = clusters
153 self.pause_period = pause_period
154 self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
155 (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
156 self.ssh_runner = ssh.SshRunner(self.cluster_name)
158 self.restart = restart
160 self.stop_instances = stop_instances
161 if self.restart == _RESTART_UP:
162 raise NotImplementedError
165 """Sets up our end so we can do the merger.
167 This method is setting us up as a preparation for the merger.
168 It makes the initial contact and gathers information needed.
170 @raise errors.RemoteError: for errors in communication/grabbing
173 (remote_path, _, _) = ssh.GetUserFiles("root")
175 if self.cluster_name in self.clusters:
176 raise errors.CommandError("Cannot merge cluster %s with itself" %
179 # Fetch remotes private key
180 for cluster in self.clusters:
181 result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
184 raise errors.RemoteError("There was an error while grabbing ssh private"
185 " key from %s. Fail reason: %s; output: %s" %
186 (cluster, result.fail_reason, result.output))
188 key_path = utils.PathJoin(self.work_dir, cluster)
189 utils.WriteFile(key_path, mode=0600, data=result.stdout)
191 result = self._RunCmd(cluster, "gnt-node list -o name,offline"
192 " --no-header --separator=,", private_key=key_path)
194 raise errors.RemoteError("Unable to retrieve list of nodes from %s."
195 " Fail reason: %s; output: %s" %
196 (cluster, result.fail_reason, result.output))
197 nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
198 nodes = [node_status[0] for node_status in nodes_statuses
199 if node_status[1] == "N"]
201 result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
202 private_key=key_path)
204 raise errors.RemoteError("Unable to retrieve list of instances from"
205 " %s. Fail reason: %s; output: %s" %
206 (cluster, result.fail_reason, result.output))
207 instances = result.stdout.splitlines()
209 self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
211 def _PrepareAuthorizedKeys(self):
212 """Prepare the authorized_keys on every merging node.
214 This method add our public key to remotes authorized_key for further
218 (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
219 pub_key = utils.ReadFile(pub_key_file)
221 for data in self.merger_data:
222 for node in data.nodes:
223 result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
224 (auth_keys, pub_key)),
225 private_key=data.key_path, max_attempts=3)
228 raise errors.RemoteError("Unable to add our public key to %s in %s."
229 " Fail reason: %s; output: %s" %
230 (node, data.cluster, result.fail_reason,
233 def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
234 strict_host_check=False, private_key=None, batch=True,
235 ask_key=False, max_attempts=1):
236 """Wrapping SshRunner.Run with default parameters.
238 For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
241 for _ in range(max_attempts):
242 result = self.ssh_runner.Run(hostname=hostname, command=command,
243 user=user, use_cluster_key=use_cluster_key,
244 strict_host_check=strict_host_check,
245 private_key=private_key, batch=batch,
247 if not result.failed:
252 def _CheckRunningInstances(self):
253 """Checks if on the clusters to be merged there are running instances
256 @return: True if there are running instances, False otherwise
259 for cluster in self.clusters:
260 result = self._RunCmd(cluster, "gnt-instance list -o status")
261 if self.RUNNING_STATUSES.intersection(result.output.splitlines()):
266 def _StopMergingInstances(self):
267 """Stop instances on merging clusters.
270 for cluster in self.clusters:
271 result = self._RunCmd(cluster, "gnt-instance shutdown --all"
275 raise errors.RemoteError("Unable to stop instances on %s."
276 " Fail reason: %s; output: %s" %
277 (cluster, result.fail_reason, result.output))
279 def _DisableWatcher(self):
280 """Disable watch on all merging clusters, including ourself.
283 for cluster in ["localhost"] + self.clusters:
284 result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
288 raise errors.RemoteError("Unable to pause watcher on %s."
289 " Fail reason: %s; output: %s" %
290 (cluster, result.fail_reason, result.output))
292 def _StopDaemons(self):
293 """Stop all daemons on merging nodes.
296 cmd = "%s stop-all" % constants.DAEMON_UTIL
297 for data in self.merger_data:
298 for node in data.nodes:
299 result = self._RunCmd(node, cmd, max_attempts=3)
302 raise errors.RemoteError("Unable to stop daemons on %s."
303 " Fail reason: %s; output: %s." %
304 (node, result.fail_reason, result.output))
306 def _FetchRemoteConfig(self):
307 """Fetches and stores remote cluster config from the master.
309 This step is needed before we can merge the config.
312 for data in self.merger_data:
313 result = self._RunCmd(data.cluster, "cat %s" %
314 constants.CLUSTER_CONF_FILE)
317 raise errors.RemoteError("Unable to retrieve remote config on %s."
318 " Fail reason: %s; output %s" %
319 (data.cluster, result.fail_reason,
322 data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
324 utils.WriteFile(data.config_path, data=result.stdout)
326 # R0201: Method could be a function
327 def _KillMasterDaemon(self): # pylint: disable-msg=R0201
328 """Kills the local master daemon.
330 @raise errors.CommandError: If unable to kill
333 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
335 raise errors.CommandError("Unable to stop master daemons."
336 " Fail reason: %s; output: %s" %
337 (result.fail_reason, result.output))
339 def _MergeConfig(self):
340 """Merges all foreign config into our own config.
343 my_config = config.ConfigWriter(offline=True)
344 fake_ec_id = 0 # Needs to be uniq over the whole config merge
346 for data in self.merger_data:
347 other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
348 self._MergeClusterConfigs(my_config, other_config)
349 self._MergeNodeGroups(my_config, other_config)
351 for node in other_config.GetNodeList():
352 node_info = other_config.GetNodeInfo(node)
353 # Offline the node, it will be reonlined later at node readd
354 node_info.master_candidate = False
355 node_info.drained = False
356 node_info.offline = True
357 my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
360 for instance in other_config.GetInstanceList():
361 instance_info = other_config.GetInstanceInfo(instance)
363 # Update the DRBD port assignments
364 # This is a little bit hackish
365 for dsk in instance_info.disks:
366 if dsk.dev_type in constants.LDS_DRBD:
367 port = my_config.AllocatePort()
369 logical_id = list(dsk.logical_id)
371 dsk.logical_id = tuple(logical_id)
373 physical_id = list(dsk.physical_id)
374 physical_id[1] = physical_id[3] = port
375 dsk.physical_id = tuple(physical_id)
377 my_config.AddInstance(instance_info,
378 _CLUSTERMERGE_ECID + str(fake_ec_id))
381 def _MergeClusterConfigs(self, my_config, other_config):
382 """Checks that all relevant cluster parameters are compatible
385 my_cluster = my_config.GetClusterInfo()
386 other_cluster = other_config.GetClusterInfo()
394 "default_iallocator",
395 "drbd_usermode_helper",
397 "maintain_node_health",
405 check_params_strict = [
408 if constants.ENABLE_FILE_STORAGE:
409 check_params_strict.append("file_storage_dir")
410 if constants.ENABLE_SHARED_FILE_STORAGE:
411 check_params_strict.append("shared_file_storage_dir")
412 check_params.extend(check_params_strict)
414 if self.params == _PARAMS_STRICT:
417 params_strict = False
419 for param_name in check_params:
420 my_param = getattr(my_cluster, param_name)
421 other_param = getattr(other_cluster, param_name)
422 if my_param != other_param:
423 logging.error("The value (%s) of the cluster parameter %s on %s"
424 " differs to this cluster's value (%s)",
425 other_param, param_name, other_cluster.cluster_name,
427 if params_strict or param_name in check_params_strict:
434 # Check default hypervisor
435 my_defhyp = my_cluster.enabled_hypervisors[0]
436 other_defhyp = other_cluster.enabled_hypervisors[0]
437 if my_defhyp != other_defhyp:
438 logging.warning("The default hypervisor (%s) differs on %s, new"
439 " instances will be created with this cluster's"
440 " default hypervisor (%s)", other_defhyp,
441 other_cluster.cluster_name, my_defhyp)
443 if (set(my_cluster.enabled_hypervisors) !=
444 set(other_cluster.enabled_hypervisors)):
445 logging.error("The set of enabled hypervisors (%s) on %s differs to"
446 " this cluster's set (%s)",
447 other_cluster.enabled_hypervisors,
448 other_cluster.cluster_name, my_cluster.enabled_hypervisors)
451 # Check hypervisor params for hypervisors we care about
452 for hyp in my_cluster.enabled_hypervisors:
453 for param in my_cluster.hvparams[hyp]:
454 my_value = my_cluster.hvparams[hyp][param]
455 other_value = other_cluster.hvparams[hyp][param]
456 if my_value != other_value:
457 logging.error("The value (%s) of the %s parameter of the %s"
458 " hypervisor on %s differs to this cluster's parameter"
460 other_value, param, hyp, other_cluster.cluster_name,
465 # Check os hypervisor params for hypervisors we care about
466 for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
467 for hyp in my_cluster.enabled_hypervisors:
468 my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
469 other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
470 if my_os_hvp != other_os_hvp:
471 logging.error("The OS parameters (%s) for the %s OS for the %s"
472 " hypervisor on %s differs to this cluster's parameters"
474 other_os_hvp, os_name, hyp, other_cluster.cluster_name,
482 if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
483 logging.warning("The modify_etc_hosts value (%s) differs on %s,"
484 " this cluster's value (%s) will take precedence",
485 other_cluster.modify_etc_hosts,
486 other_cluster.cluster_name,
487 my_cluster.modify_etc_hosts)
489 if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
490 logging.warning("The modify_ssh_setup value (%s) differs on %s,"
491 " this cluster's value (%s) will take precedence",
492 other_cluster.modify_ssh_setup,
493 other_cluster.cluster_name,
494 my_cluster.modify_ssh_setup)
499 my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
500 other_cluster.reserved_lvs))
502 if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
503 logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
504 " cluster's value (%s). The least permissive value (%s)"
505 " will be used", other_cluster.prealloc_wipe_disks,
506 other_cluster.cluster_name,
507 my_cluster.prealloc_wipe_disks, True)
508 my_cluster.prealloc_wipe_disks = True
510 for os_, osparams in other_cluster.osparams.items():
511 if os_ not in my_cluster.osparams:
512 my_cluster.osparams[os_] = osparams
513 elif my_cluster.osparams[os_] != osparams:
514 logging.error("The OS parameters (%s) for the %s OS on %s differs to"
515 " this cluster's parameters (%s)",
516 osparams, os_, other_cluster.cluster_name,
517 my_cluster.osparams[os_])
522 raise errors.ConfigurationError("Cluster config for %s has incompatible"
523 " values, please fix and re-run" %
524 other_cluster.cluster_name)
526 # R0201: Method could be a function
527 def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201
528 if os_name in cluster.os_hvp:
529 return cluster.os_hvp[os_name].get(hyp, None)
533 # R0201: Method could be a function
534 def _MergeNodeGroups(self, my_config, other_config):
535 """Adds foreign node groups
537 ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
539 # pylint: disable-msg=R0201
540 logging.info("Node group conflict strategy: %s", self.groups)
542 my_grps = my_config.GetAllNodeGroupsInfo().values()
543 other_grps = other_config.GetAllNodeGroupsInfo().values()
545 # Check for node group naming conflicts:
547 for other_grp in other_grps:
548 for my_grp in my_grps:
549 if other_grp.name == my_grp.name:
550 conflicts.append(other_grp)
553 conflict_names = utils.CommaJoin([g.name for g in conflicts])
554 logging.info("Node groups in both local and remote cluster: %s",
557 # User hasn't specified how to handle conflicts
559 raise errors.CommandError("The following node group(s) are in both"
560 " clusters, and no merge strategy has been"
561 " supplied (see the --groups option): %s" %
564 # User wants to rename conflicts
565 elif self.groups == _GROUPS_RENAME:
566 for grp in conflicts:
567 new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
568 logging.info("Renaming remote node group from %s to %s"
569 " to resolve conflict", grp.name, new_name)
572 # User wants to merge conflicting groups
573 elif self.groups == _GROUPS_MERGE:
574 for other_grp in conflicts:
575 logging.info("Merging local and remote '%s' groups", other_grp.name)
576 for node_name in other_grp.members[:]:
577 node = other_config.GetNodeInfo(node_name)
578 # Access to a protected member of a client class
579 # pylint: disable-msg=W0212
580 other_config._UnlockedRemoveNodeFromGroup(node)
582 # Access to a protected member of a client class
583 # pylint: disable-msg=W0212
584 my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
586 # Access to a protected member of a client class
587 # pylint: disable-msg=W0212
588 my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
589 node.group = my_grp_uuid
590 # Remove from list of groups to add
591 other_grps.remove(other_grp)
593 for grp in other_grps:
594 #TODO: handle node group conflicts
595 my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
597 # R0201: Method could be a function
598 def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
599 """Starts the local master daemon.
601 @param no_vote: Should the masterd started without voting? default: False
602 @raise errors.CommandError: If unable to start daemon.
607 env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
609 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
611 raise errors.CommandError("Couldn't start ganeti master."
612 " Fail reason: %s; output: %s" %
613 (result.fail_reason, result.output))
615 def _ReaddMergedNodesAndRedist(self):
616 """Readds all merging nodes and make sure their config is up-to-date.
618 @raise errors.CommandError: If anything fails.
621 for data in self.merger_data:
622 for node in data.nodes:
623 result = utils.RunCmd(["gnt-node", "add", "--readd",
624 "--no-ssh-key-check", "--force-join", node])
626 logging.error("%s failed to be readded. Reason: %s, output: %s",
627 node, result.fail_reason, result.output)
629 result = utils.RunCmd(["gnt-cluster", "redist-conf"])
631 raise errors.CommandError("Redistribution failed. Fail reason: %s;"
632 " output: %s" % (result.fail_reason,
635 # R0201: Method could be a function
636 def _StartupAllInstances(self): # pylint: disable-msg=R0201
637 """Starts up all instances (locally).
639 @raise errors.CommandError: If unable to start clusters
642 result = utils.RunCmd(["gnt-instance", "startup", "--all",
645 raise errors.CommandError("Unable to start all instances."
646 " Fail reason: %s; output: %s" %
647 (result.fail_reason, result.output))
649 # R0201: Method could be a function
650 # TODO: make this overridable, for some verify errors
651 def _VerifyCluster(self): # pylint: disable-msg=R0201
652 """Runs gnt-cluster verify to verify the health.
654 @raise errors.ProgrammError: If cluster fails on verification
657 result = utils.RunCmd(["gnt-cluster", "verify"])
659 raise errors.CommandError("Verification of cluster failed."
660 " Fail reason: %s; output: %s" %
661 (result.fail_reason, result.output))
664 """Does the actual merge.
666 It runs all the steps in the right order and updates the user about steps
667 taken. Also it keeps track of rollback_steps to undo everything.
672 logging.info("Pre cluster verification")
673 self._VerifyCluster()
675 logging.info("Prepare authorized_keys")
676 rbsteps.append("Remove our key from authorized_keys on nodes:"
678 self._PrepareAuthorizedKeys()
680 rbsteps.append("Start all instances again on the merging"
681 " clusters: %(clusters)s")
682 if self.stop_instances:
683 logging.info("Stopping merging instances (takes a while)")
684 self._StopMergingInstances()
685 logging.info("Checking that no instances are running on the mergees")
686 instances_running = self._CheckRunningInstances()
687 if instances_running:
688 raise errors.CommandError("Some instances are still running on the"
690 logging.info("Disable watcher")
691 self._DisableWatcher()
692 logging.info("Stop daemons on merging nodes")
694 logging.info("Merging config")
695 self._FetchRemoteConfig()
697 logging.info("Stopping master daemon")
698 self._KillMasterDaemon()
700 rbsteps.append("Restore %s from another master candidate"
701 " and restart master daemon" %
702 constants.CLUSTER_CONF_FILE)
704 self._StartMasterDaemon(no_vote=True)
706 # Point of no return, delete rbsteps
709 logging.warning("We are at the point of no return. Merge can not easily"
710 " be undone after this point.")
711 logging.info("Readd nodes")
712 self._ReaddMergedNodesAndRedist()
714 logging.info("Merge done, restart master daemon normally")
715 self._KillMasterDaemon()
716 self._StartMasterDaemon()
718 if self.restart == _RESTART_ALL:
719 logging.info("Starting instances again")
720 self._StartupAllInstances()
722 logging.info("Not starting instances again")
723 logging.info("Post cluster verification")
724 self._VerifyCluster()
725 except errors.GenericError, e:
729 nodes = Flatten([data.nodes for data in self.merger_data])
731 "clusters": self.clusters,
734 logging.critical("In order to rollback do the following:")
736 logging.critical(" * %s", step % info)
738 logging.critical("Nothing to rollback.")
740 # TODO: Keep track of steps done for a flawless resume?
743 """Clean up our environment.
745 This cleans up remote private keys and configs and after that
746 deletes the temporary directory.
749 shutil.rmtree(self.work_dir)
752 def SetupLogging(options):
753 """Setting up logging infrastructure.
755 @param options: Parsed command line options
758 formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
760 stderr_handler = logging.StreamHandler()
761 stderr_handler.setFormatter(formatter)
763 stderr_handler.setLevel(logging.NOTSET)
764 elif options.verbose:
765 stderr_handler.setLevel(logging.INFO)
767 stderr_handler.setLevel(logging.WARNING)
769 root_logger = logging.getLogger("")
770 root_logger.setLevel(logging.NOTSET)
771 root_logger.addHandler(stderr_handler)
778 program = os.path.basename(sys.argv[0])
780 parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
782 parser.add_option(cli.DEBUG_OPT)
783 parser.add_option(cli.VERBOSE_OPT)
784 parser.add_option(PAUSE_PERIOD_OPT)
785 parser.add_option(GROUPS_OPT)
786 parser.add_option(RESTART_OPT)
787 parser.add_option(PARAMS_OPT)
788 parser.add_option(SKIP_STOP_INSTANCES_OPT)
790 (options, args) = parser.parse_args()
792 SetupLogging(options)
795 parser.error("No clusters specified")
797 cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
798 options.groups, options.restart, options.params,
799 options.stop_instances)
802 cluster_merger.Setup()
803 cluster_merger.Merge()
804 except errors.GenericError, e:
806 return constants.EXIT_FAILURE
808 cluster_merger.Cleanup()
810 return constants.EXIT_SUCCESS
813 if __name__ == "__main__":