4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Tool to merge two or more clusters together.
23 The clusters have to run the same version of Ganeti!
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
43 from ganeti import netutils
46 _GROUPS_MERGE = "merge"
47 _GROUPS_RENAME = "rename"
48 _CLUSTERMERGE_ECID = "clustermerge-ecid"
51 _RESTART_NONE = "none"
52 _RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
53 _PARAMS_STRICT = "strict"
55 _PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
58 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
59 action="store", type="int",
61 help=("Amount of time in seconds watcher"
62 " should be suspended from running"))
63 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
64 choices=(_GROUPS_MERGE, _GROUPS_RENAME),
66 help=("How to handle groups that have the"
67 " same name (One of: %s/%s)" %
68 (_GROUPS_MERGE, _GROUPS_RENAME)))
69 PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
71 choices=_PARAMS_CHOICES,
73 help=("How to handle params that have"
74 " different values (One of: %s/%s)" %
77 RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
79 choices=_RESTART_CHOICES,
81 help=("How to handle restarting instances"
82 " same name (One of: %s/%s/%s)" %
85 SKIP_STOP_INSTANCES_OPT = \
86 cli.cli_option("--skip-stop-instances", default=True, action="store_false",
87 dest="stop_instances",
88 help=("Don't stop the instances on the clusters, just check "
89 "that none is running"))
92 def Flatten(unflattened_list):
95 @param unflattened_list: A list of unflattened list objects.
96 @return: A flattened list
101 for item in unflattened_list:
102 if isinstance(item, list):
103 flattened_list.extend(Flatten(item))
105 flattened_list.append(item)
106 return flattened_list
109 class MergerData(object):
110 """Container class to hold data used for merger.
113 def __init__(self, cluster, key_path, nodes, instances, master_node,
114 master_ip, config_path=None):
115 """Initialize the container.
117 @param cluster: The name of the cluster
118 @param key_path: Path to the ssh private key used for authentication
119 @param nodes: List of online nodes in the merging cluster
120 @param instances: List of instances running on merging cluster
121 @param master_node: Name of the master node
122 @param master_ip: Cluster IP
123 @param config_path: Path to the merging cluster config
126 self.cluster = cluster
127 self.key_path = key_path
129 self.instances = instances
130 self.master_node = master_node
131 self.master_ip = master_ip
132 self.config_path = config_path
135 class Merger(object):
136 """Handling the merge.
139 RUNNING_STATUSES = frozenset([
140 constants.INSTST_RUNNING,
141 constants.INSTST_ERRORUP,
144 def __init__(self, clusters, pause_period, groups, restart, params,
146 """Initialize object with sane defaults and infos required.
148 @param clusters: The list of clusters to merge in
149 @param pause_period: The time watcher shall be disabled for
150 @param groups: How to handle group conflicts
151 @param restart: How to handle instance restart
152 @param stop_instances: Indicates whether the instances must be stopped
153 (True) or if the Merger must only check if no
154 instances are running on the mergee clusters (False)
157 self.merger_data = []
158 self.clusters = clusters
159 self.pause_period = pause_period
160 self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
161 (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
162 self.ssh_runner = ssh.SshRunner(self.cluster_name)
164 self.restart = restart
166 self.stop_instances = stop_instances
167 if self.restart == _RESTART_UP:
168 raise NotImplementedError
171 """Sets up our end so we can do the merger.
173 This method is setting us up as a preparation for the merger.
174 It makes the initial contact and gathers information needed.
176 @raise errors.RemoteError: for errors in communication/grabbing
179 (remote_path, _, _) = ssh.GetUserFiles("root")
181 if self.cluster_name in self.clusters:
182 raise errors.CommandError("Cannot merge cluster %s with itself" %
185 # Fetch remotes private key
186 for cluster in self.clusters:
187 result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
190 raise errors.RemoteError("There was an error while grabbing ssh private"
191 " key from %s. Fail reason: %s; output: %s" %
192 (cluster, result.fail_reason, result.output))
194 key_path = utils.PathJoin(self.work_dir, cluster)
195 utils.WriteFile(key_path, mode=0600, data=result.stdout)
197 result = self._RunCmd(cluster, "gnt-node list -o name,offline"
198 " --no-header --separator=,", private_key=key_path)
200 raise errors.RemoteError("Unable to retrieve list of nodes from %s."
201 " Fail reason: %s; output: %s" %
202 (cluster, result.fail_reason, result.output))
203 nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
204 nodes = [node_status[0] for node_status in nodes_statuses
205 if node_status[1] == "N"]
207 result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
208 private_key=key_path)
210 raise errors.RemoteError("Unable to retrieve list of instances from"
211 " %s. Fail reason: %s; output: %s" %
212 (cluster, result.fail_reason, result.output))
213 instances = result.stdout.splitlines()
215 path = utils.PathJoin(constants.DATA_DIR, "ssconf_%s" %
216 constants.SS_MASTER_NODE)
217 result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path)
219 raise errors.RemoteError("Unable to retrieve the master node name from"
220 " %s. Fail reason: %s; output: %s" %
221 (cluster, result.fail_reason, result.output))
222 master_node = result.stdout.strip()
224 path = utils.PathJoin(constants.DATA_DIR, "ssconf_%s" %
225 constants.SS_MASTER_IP)
226 result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path)
228 raise errors.RemoteError("Unable to retrieve the master IP from"
229 " %s. Fail reason: %s; output: %s" %
230 (cluster, result.fail_reason, result.output))
231 master_ip = result.stdout.strip()
233 self.merger_data.append(MergerData(cluster, key_path, nodes, instances,
234 master_node, master_ip))
236 def _PrepareAuthorizedKeys(self):
237 """Prepare the authorized_keys on every merging node.
239 This method add our public key to remotes authorized_key for further
243 (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
244 pub_key = utils.ReadFile(pub_key_file)
246 for data in self.merger_data:
247 for node in data.nodes:
248 result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
249 (auth_keys, pub_key)),
250 private_key=data.key_path, max_attempts=3)
253 raise errors.RemoteError("Unable to add our public key to %s in %s."
254 " Fail reason: %s; output: %s" %
255 (node, data.cluster, result.fail_reason,
258 def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
259 strict_host_check=False, private_key=None, batch=True,
260 ask_key=False, max_attempts=1):
261 """Wrapping SshRunner.Run with default parameters.
263 For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
266 for _ in range(max_attempts):
267 result = self.ssh_runner.Run(hostname=hostname, command=command,
268 user=user, use_cluster_key=use_cluster_key,
269 strict_host_check=strict_host_check,
270 private_key=private_key, batch=batch,
272 if not result.failed:
277 def _CheckRunningInstances(self):
278 """Checks if on the clusters to be merged there are running instances
281 @return: True if there are running instances, False otherwise
284 for cluster in self.clusters:
285 result = self._RunCmd(cluster, "gnt-instance list -o status")
286 if self.RUNNING_STATUSES.intersection(result.output.splitlines()):
291 def _StopMergingInstances(self):
292 """Stop instances on merging clusters.
295 for cluster in self.clusters:
296 result = self._RunCmd(cluster, "gnt-instance shutdown --all"
300 raise errors.RemoteError("Unable to stop instances on %s."
301 " Fail reason: %s; output: %s" %
302 (cluster, result.fail_reason, result.output))
304 def _DisableWatcher(self):
305 """Disable watch on all merging clusters, including ourself.
308 for cluster in ["localhost"] + self.clusters:
309 result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
313 raise errors.RemoteError("Unable to pause watcher on %s."
314 " Fail reason: %s; output: %s" %
315 (cluster, result.fail_reason, result.output))
317 def _RemoveMasterIps(self):
318 """Removes the master IPs from the master nodes of each cluster.
321 for data in self.merger_data:
322 master_ip_family = netutils.IPAddress.GetAddressFamily(data.master_ip)
323 master_ip_len = netutils.IP4Address.iplen
324 if master_ip_family == netutils.IP6Address.family:
325 master_ip_len = netutils.IP6Address.iplen
326 # Not using constants.IP_COMMAND_PATH because the command might run on a
327 # machine in which the ip path is different, so it's better to rely on
329 cmd = "ip address del %s/%s dev $(cat %s)" % (
332 utils.PathJoin(constants.DATA_DIR, "ssconf_%s" %
333 constants.SS_MASTER_NETDEV))
334 result = self._RunCmd(data.master_node, cmd, max_attempts=3)
336 raise errors.RemoteError("Unable to remove master IP on %s."
337 " Fail reason: %s; output: %s" %
342 def _StopDaemons(self):
343 """Stop all daemons on merging nodes.
346 cmd = "%s stop-all" % constants.DAEMON_UTIL
347 for data in self.merger_data:
348 for node in data.nodes:
349 result = self._RunCmd(node, cmd, max_attempts=3)
352 raise errors.RemoteError("Unable to stop daemons on %s."
353 " Fail reason: %s; output: %s." %
354 (node, result.fail_reason, result.output))
356 def _FetchRemoteConfig(self):
357 """Fetches and stores remote cluster config from the master.
359 This step is needed before we can merge the config.
362 for data in self.merger_data:
363 result = self._RunCmd(data.cluster, "cat %s" %
364 constants.CLUSTER_CONF_FILE)
367 raise errors.RemoteError("Unable to retrieve remote config on %s."
368 " Fail reason: %s; output %s" %
369 (data.cluster, result.fail_reason,
372 data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
374 utils.WriteFile(data.config_path, data=result.stdout)
376 # R0201: Method could be a function
377 def _KillMasterDaemon(self): # pylint: disable-msg=R0201
378 """Kills the local master daemon.
380 @raise errors.CommandError: If unable to kill
383 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
385 raise errors.CommandError("Unable to stop master daemons."
386 " Fail reason: %s; output: %s" %
387 (result.fail_reason, result.output))
389 def _MergeConfig(self):
390 """Merges all foreign config into our own config.
393 my_config = config.ConfigWriter(offline=True)
394 fake_ec_id = 0 # Needs to be uniq over the whole config merge
396 for data in self.merger_data:
397 other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
398 self._MergeClusterConfigs(my_config, other_config)
399 self._MergeNodeGroups(my_config, other_config)
401 for node in other_config.GetNodeList():
402 node_info = other_config.GetNodeInfo(node)
403 # Offline the node, it will be reonlined later at node readd
404 node_info.master_candidate = False
405 node_info.drained = False
406 node_info.offline = True
407 my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
410 for instance in other_config.GetInstanceList():
411 instance_info = other_config.GetInstanceInfo(instance)
413 # Update the DRBD port assignments
414 # This is a little bit hackish
415 for dsk in instance_info.disks:
416 if dsk.dev_type in constants.LDS_DRBD:
417 port = my_config.AllocatePort()
419 logical_id = list(dsk.logical_id)
421 dsk.logical_id = tuple(logical_id)
423 physical_id = list(dsk.physical_id)
424 physical_id[1] = physical_id[3] = port
425 dsk.physical_id = tuple(physical_id)
427 my_config.AddInstance(instance_info,
428 _CLUSTERMERGE_ECID + str(fake_ec_id))
431 def _MergeClusterConfigs(self, my_config, other_config):
432 """Checks that all relevant cluster parameters are compatible
435 my_cluster = my_config.GetClusterInfo()
436 other_cluster = other_config.GetClusterInfo()
444 "default_iallocator",
445 "drbd_usermode_helper",
447 "maintain_node_health",
455 check_params_strict = [
458 if constants.ENABLE_FILE_STORAGE:
459 check_params_strict.append("file_storage_dir")
460 if constants.ENABLE_SHARED_FILE_STORAGE:
461 check_params_strict.append("shared_file_storage_dir")
462 check_params.extend(check_params_strict)
464 if self.params == _PARAMS_STRICT:
467 params_strict = False
469 for param_name in check_params:
470 my_param = getattr(my_cluster, param_name)
471 other_param = getattr(other_cluster, param_name)
472 if my_param != other_param:
473 logging.error("The value (%s) of the cluster parameter %s on %s"
474 " differs to this cluster's value (%s)",
475 other_param, param_name, other_cluster.cluster_name,
477 if params_strict or param_name in check_params_strict:
484 # Check default hypervisor
485 my_defhyp = my_cluster.enabled_hypervisors[0]
486 other_defhyp = other_cluster.enabled_hypervisors[0]
487 if my_defhyp != other_defhyp:
488 logging.warning("The default hypervisor (%s) differs on %s, new"
489 " instances will be created with this cluster's"
490 " default hypervisor (%s)", other_defhyp,
491 other_cluster.cluster_name, my_defhyp)
493 if (set(my_cluster.enabled_hypervisors) !=
494 set(other_cluster.enabled_hypervisors)):
495 logging.error("The set of enabled hypervisors (%s) on %s differs to"
496 " this cluster's set (%s)",
497 other_cluster.enabled_hypervisors,
498 other_cluster.cluster_name, my_cluster.enabled_hypervisors)
501 # Check hypervisor params for hypervisors we care about
502 for hyp in my_cluster.enabled_hypervisors:
503 for param in my_cluster.hvparams[hyp]:
504 my_value = my_cluster.hvparams[hyp][param]
505 other_value = other_cluster.hvparams[hyp][param]
506 if my_value != other_value:
507 logging.error("The value (%s) of the %s parameter of the %s"
508 " hypervisor on %s differs to this cluster's parameter"
510 other_value, param, hyp, other_cluster.cluster_name,
515 # Check os hypervisor params for hypervisors we care about
516 for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
517 for hyp in my_cluster.enabled_hypervisors:
518 my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
519 other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
520 if my_os_hvp != other_os_hvp:
521 logging.error("The OS parameters (%s) for the %s OS for the %s"
522 " hypervisor on %s differs to this cluster's parameters"
524 other_os_hvp, os_name, hyp, other_cluster.cluster_name,
532 if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
533 logging.warning("The modify_etc_hosts value (%s) differs on %s,"
534 " this cluster's value (%s) will take precedence",
535 other_cluster.modify_etc_hosts,
536 other_cluster.cluster_name,
537 my_cluster.modify_etc_hosts)
539 if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
540 logging.warning("The modify_ssh_setup value (%s) differs on %s,"
541 " this cluster's value (%s) will take precedence",
542 other_cluster.modify_ssh_setup,
543 other_cluster.cluster_name,
544 my_cluster.modify_ssh_setup)
549 my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
550 other_cluster.reserved_lvs))
552 if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
553 logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
554 " cluster's value (%s). The least permissive value (%s)"
555 " will be used", other_cluster.prealloc_wipe_disks,
556 other_cluster.cluster_name,
557 my_cluster.prealloc_wipe_disks, True)
558 my_cluster.prealloc_wipe_disks = True
560 for os_, osparams in other_cluster.osparams.items():
561 if os_ not in my_cluster.osparams:
562 my_cluster.osparams[os_] = osparams
563 elif my_cluster.osparams[os_] != osparams:
564 logging.error("The OS parameters (%s) for the %s OS on %s differs to"
565 " this cluster's parameters (%s)",
566 osparams, os_, other_cluster.cluster_name,
567 my_cluster.osparams[os_])
572 raise errors.ConfigurationError("Cluster config for %s has incompatible"
573 " values, please fix and re-run" %
574 other_cluster.cluster_name)
576 # R0201: Method could be a function
577 def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201
578 if os_name in cluster.os_hvp:
579 return cluster.os_hvp[os_name].get(hyp, None)
583 # R0201: Method could be a function
584 def _MergeNodeGroups(self, my_config, other_config):
585 """Adds foreign node groups
587 ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
589 # pylint: disable-msg=R0201
590 logging.info("Node group conflict strategy: %s", self.groups)
592 my_grps = my_config.GetAllNodeGroupsInfo().values()
593 other_grps = other_config.GetAllNodeGroupsInfo().values()
595 # Check for node group naming conflicts:
597 for other_grp in other_grps:
598 for my_grp in my_grps:
599 if other_grp.name == my_grp.name:
600 conflicts.append(other_grp)
603 conflict_names = utils.CommaJoin([g.name for g in conflicts])
604 logging.info("Node groups in both local and remote cluster: %s",
607 # User hasn't specified how to handle conflicts
609 raise errors.CommandError("The following node group(s) are in both"
610 " clusters, and no merge strategy has been"
611 " supplied (see the --groups option): %s" %
614 # User wants to rename conflicts
615 elif self.groups == _GROUPS_RENAME:
616 for grp in conflicts:
617 new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
618 logging.info("Renaming remote node group from %s to %s"
619 " to resolve conflict", grp.name, new_name)
622 # User wants to merge conflicting groups
623 elif self.groups == _GROUPS_MERGE:
624 for other_grp in conflicts:
625 logging.info("Merging local and remote '%s' groups", other_grp.name)
626 for node_name in other_grp.members[:]:
627 node = other_config.GetNodeInfo(node_name)
628 # Access to a protected member of a client class
629 # pylint: disable-msg=W0212
630 other_config._UnlockedRemoveNodeFromGroup(node)
632 # Access to a protected member of a client class
633 # pylint: disable-msg=W0212
634 my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
636 # Access to a protected member of a client class
637 # pylint: disable-msg=W0212
638 my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
639 node.group = my_grp_uuid
640 # Remove from list of groups to add
641 other_grps.remove(other_grp)
643 for grp in other_grps:
644 #TODO: handle node group conflicts
645 my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
647 # R0201: Method could be a function
648 def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
649 """Starts the local master daemon.
651 @param no_vote: Should the masterd started without voting? default: False
652 @raise errors.CommandError: If unable to start daemon.
657 env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
659 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
661 raise errors.CommandError("Couldn't start ganeti master."
662 " Fail reason: %s; output: %s" %
663 (result.fail_reason, result.output))
665 def _ReaddMergedNodesAndRedist(self):
666 """Readds all merging nodes and make sure their config is up-to-date.
668 @raise errors.CommandError: If anything fails.
671 for data in self.merger_data:
672 for node in data.nodes:
673 result = utils.RunCmd(["gnt-node", "add", "--readd",
674 "--no-ssh-key-check", "--force-join", node])
676 logging.error("%s failed to be readded. Reason: %s, output: %s",
677 node, result.fail_reason, result.output)
679 result = utils.RunCmd(["gnt-cluster", "redist-conf"])
681 raise errors.CommandError("Redistribution failed. Fail reason: %s;"
682 " output: %s" % (result.fail_reason,
685 # R0201: Method could be a function
686 def _StartupAllInstances(self): # pylint: disable-msg=R0201
687 """Starts up all instances (locally).
689 @raise errors.CommandError: If unable to start clusters
692 result = utils.RunCmd(["gnt-instance", "startup", "--all",
695 raise errors.CommandError("Unable to start all instances."
696 " Fail reason: %s; output: %s" %
697 (result.fail_reason, result.output))
699 # R0201: Method could be a function
700 # TODO: make this overridable, for some verify errors
701 def _VerifyCluster(self): # pylint: disable-msg=R0201
702 """Runs gnt-cluster verify to verify the health.
704 @raise errors.ProgrammError: If cluster fails on verification
707 result = utils.RunCmd(["gnt-cluster", "verify"])
709 raise errors.CommandError("Verification of cluster failed."
710 " Fail reason: %s; output: %s" %
711 (result.fail_reason, result.output))
714 """Does the actual merge.
716 It runs all the steps in the right order and updates the user about steps
717 taken. Also it keeps track of rollback_steps to undo everything.
722 logging.info("Pre cluster verification")
723 self._VerifyCluster()
725 logging.info("Prepare authorized_keys")
726 rbsteps.append("Remove our key from authorized_keys on nodes:"
728 self._PrepareAuthorizedKeys()
730 rbsteps.append("Start all instances again on the merging"
731 " clusters: %(clusters)s")
732 if self.stop_instances:
733 logging.info("Stopping merging instances (takes a while)")
734 self._StopMergingInstances()
735 logging.info("Checking that no instances are running on the mergees")
736 instances_running = self._CheckRunningInstances()
737 if instances_running:
738 raise errors.CommandError("Some instances are still running on the"
740 logging.info("Disable watcher")
741 self._DisableWatcher()
742 logging.info("Stop daemons on merging nodes")
744 logging.info("Merging config")
745 self._FetchRemoteConfig()
746 logging.info("Removing master IPs on mergee master nodes")
747 self._RemoveMasterIps()
749 logging.info("Stopping master daemon")
750 self._KillMasterDaemon()
752 rbsteps.append("Restore %s from another master candidate"
753 " and restart master daemon" %
754 constants.CLUSTER_CONF_FILE)
756 self._StartMasterDaemon(no_vote=True)
758 # Point of no return, delete rbsteps
761 logging.warning("We are at the point of no return. Merge can not easily"
762 " be undone after this point.")
763 logging.info("Readd nodes")
764 self._ReaddMergedNodesAndRedist()
766 logging.info("Merge done, restart master daemon normally")
767 self._KillMasterDaemon()
768 self._StartMasterDaemon()
770 if self.restart == _RESTART_ALL:
771 logging.info("Starting instances again")
772 self._StartupAllInstances()
774 logging.info("Not starting instances again")
775 logging.info("Post cluster verification")
776 self._VerifyCluster()
777 except errors.GenericError, e:
781 nodes = Flatten([data.nodes for data in self.merger_data])
783 "clusters": self.clusters,
786 logging.critical("In order to rollback do the following:")
788 logging.critical(" * %s", step % info)
790 logging.critical("Nothing to rollback.")
792 # TODO: Keep track of steps done for a flawless resume?
795 """Clean up our environment.
797 This cleans up remote private keys and configs and after that
798 deletes the temporary directory.
801 shutil.rmtree(self.work_dir)
804 def SetupLogging(options):
805 """Setting up logging infrastructure.
807 @param options: Parsed command line options
810 formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
812 stderr_handler = logging.StreamHandler()
813 stderr_handler.setFormatter(formatter)
815 stderr_handler.setLevel(logging.NOTSET)
816 elif options.verbose:
817 stderr_handler.setLevel(logging.INFO)
819 stderr_handler.setLevel(logging.WARNING)
821 root_logger = logging.getLogger("")
822 root_logger.setLevel(logging.NOTSET)
823 root_logger.addHandler(stderr_handler)
830 program = os.path.basename(sys.argv[0])
832 parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
834 parser.add_option(cli.DEBUG_OPT)
835 parser.add_option(cli.VERBOSE_OPT)
836 parser.add_option(PAUSE_PERIOD_OPT)
837 parser.add_option(GROUPS_OPT)
838 parser.add_option(RESTART_OPT)
839 parser.add_option(PARAMS_OPT)
840 parser.add_option(SKIP_STOP_INSTANCES_OPT)
842 (options, args) = parser.parse_args()
844 SetupLogging(options)
847 parser.error("No clusters specified")
849 cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
850 options.groups, options.restart, options.params,
851 options.stop_instances)
854 cluster_merger.Setup()
855 cluster_merger.Merge()
856 except errors.GenericError, e:
858 return constants.EXIT_FAILURE
860 cluster_merger.Cleanup()
862 return constants.EXIT_SUCCESS
865 if __name__ == "__main__":