Add a TODO on the VerifyCluster option
[ganeti-local] / tools / cluster-merge
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Tool to merge two or more clusters together.
22
23 The clusters have to run the same version of Ganeti!
24
25 """
26
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
29
30 import logging
31 import os
32 import optparse
33 import shutil
34 import sys
35 import tempfile
36
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
43
44
45 _GROUPS_MERGE = "merge"
46 _GROUPS_RENAME = "rename"
47 _CLUSTERMERGE_ECID = "clustermerge-ecid"
48 _RESTART_ALL = "all"
49 _RESTART_UP = "up"
50 _RESTART_NONE = "none"
51 _RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
52 _PARAMS_STRICT = "strict"
53 _PARAMS_WARN = "warn"
54 _PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
55
56
57 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
58                                   action="store", type="int",
59                                   dest="pause_period",
60                                   help=("Amount of time in seconds watcher"
61                                         " should be suspended from running"))
62 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
63                             choices=(_GROUPS_MERGE, _GROUPS_RENAME),
64                             dest="groups",
65                             help=("How to handle groups that have the"
66                                   " same name (One of: %s/%s)" %
67                                   (_GROUPS_MERGE, _GROUPS_RENAME)))
68 PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
69                             metavar="STRATEGY",
70                             choices=_PARAMS_CHOICES,
71                             dest="params",
72                             help=("How to handle params that have"
73                                   " different values (One of: %s/%s)" %
74                                   _PARAMS_CHOICES))
75
76 RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
77                              metavar="STRATEGY",
78                              choices=_RESTART_CHOICES,
79                              dest="restart",
80                              help=("How to handle restarting instances"
81                                    " same name (One of: %s/%s/%s)" %
82                                    _RESTART_CHOICES))
83
84
85 def Flatten(unflattened_list):
86   """Flattens a list.
87
88   @param unflattened_list: A list of unflattened list objects.
89   @return: A flattened list
90
91   """
92   flattened_list = []
93
94   for item in unflattened_list:
95     if isinstance(item, list):
96       flattened_list.extend(Flatten(item))
97     else:
98       flattened_list.append(item)
99   return flattened_list
100
101
102 class MergerData(object):
103   """Container class to hold data used for merger.
104
105   """
106   def __init__(self, cluster, key_path, nodes, instances, config_path=None):
107     """Initialize the container.
108
109     @param cluster: The name of the cluster
110     @param key_path: Path to the ssh private key used for authentication
111     @param nodes: List of online nodes in the merging cluster
112     @param instances: List of instances running on merging cluster
113     @param config_path: Path to the merging cluster config
114
115     """
116     self.cluster = cluster
117     self.key_path = key_path
118     self.nodes = nodes
119     self.instances = instances
120     self.config_path = config_path
121
122
123 class Merger(object):
124   """Handling the merge.
125
126   """
127   def __init__(self, clusters, pause_period, groups, restart, params):
128     """Initialize object with sane defaults and infos required.
129
130     @param clusters: The list of clusters to merge in
131     @param pause_period: The time watcher shall be disabled for
132     @param groups: How to handle group conflicts
133     @param restart: How to handle instance restart
134
135     """
136     self.merger_data = []
137     self.clusters = clusters
138     self.pause_period = pause_period
139     self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
140     (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
141     self.ssh_runner = ssh.SshRunner(self.cluster_name)
142     self.groups = groups
143     self.restart = restart
144     self.params = params
145     if self.restart == _RESTART_UP:
146       raise NotImplementedError
147
148
149   def Setup(self):
150     """Sets up our end so we can do the merger.
151
152     This method is setting us up as a preparation for the merger.
153     It makes the initial contact and gathers information needed.
154
155     @raise errors.RemoteError: for errors in communication/grabbing
156
157     """
158     (remote_path, _, _) = ssh.GetUserFiles("root")
159
160     if self.cluster_name in self.clusters:
161       raise errors.CommandError("Cannot merge cluster %s with itself" %
162                                 self.cluster_name)
163
164     # Fetch remotes private key
165     for cluster in self.clusters:
166       result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
167                             ask_key=False)
168       if result.failed:
169         raise errors.RemoteError("There was an error while grabbing ssh private"
170                                  " key from %s. Fail reason: %s; output: %s" %
171                                  (cluster, result.fail_reason, result.output))
172
173       key_path = utils.PathJoin(self.work_dir, cluster)
174       utils.WriteFile(key_path, mode=0600, data=result.stdout)
175
176       result = self._RunCmd(cluster, "gnt-node list -o name,offline"
177                             " --no-header --separator=,", private_key=key_path)
178       if result.failed:
179         raise errors.RemoteError("Unable to retrieve list of nodes from %s."
180                                  " Fail reason: %s; output: %s" %
181                                  (cluster, result.fail_reason, result.output))
182       nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
183       nodes = [node_status[0] for node_status in nodes_statuses
184                if node_status[1] == "N"]
185
186       result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
187                             private_key=key_path)
188       if result.failed:
189         raise errors.RemoteError("Unable to retrieve list of instances from"
190                                  " %s. Fail reason: %s; output: %s" %
191                                  (cluster, result.fail_reason, result.output))
192       instances = result.stdout.splitlines()
193
194       self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
195
196   def _PrepareAuthorizedKeys(self):
197     """Prepare the authorized_keys on every merging node.
198
199     This method add our public key to remotes authorized_key for further
200     communication.
201
202     """
203     (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
204     pub_key = utils.ReadFile(pub_key_file)
205
206     for data in self.merger_data:
207       for node in data.nodes:
208         result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
209                                      (auth_keys, pub_key)),
210                               private_key=data.key_path)
211
212         if result.failed:
213           raise errors.RemoteError("Unable to add our public key to %s in %s."
214                                    " Fail reason: %s; output: %s" %
215                                    (node, data.cluster, result.fail_reason,
216                                     result.output))
217
218   def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
219               strict_host_check=False, private_key=None, batch=True,
220               ask_key=False):
221     """Wrapping SshRunner.Run with default parameters.
222
223     For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
224
225     """
226     return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
227                                use_cluster_key=use_cluster_key,
228                                strict_host_check=strict_host_check,
229                                private_key=private_key, batch=batch,
230                                ask_key=ask_key)
231
232   def _StopMergingInstances(self):
233     """Stop instances on merging clusters.
234
235     """
236     for cluster in self.clusters:
237       result = self._RunCmd(cluster, "gnt-instance shutdown --all"
238                                      " --force-multiple")
239
240       if result.failed:
241         raise errors.RemoteError("Unable to stop instances on %s."
242                                  " Fail reason: %s; output: %s" %
243                                  (cluster, result.fail_reason, result.output))
244
245   def _DisableWatcher(self):
246     """Disable watch on all merging clusters, including ourself.
247
248     """
249     for cluster in ["localhost"] + self.clusters:
250       result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
251                                      self.pause_period)
252
253       if result.failed:
254         raise errors.RemoteError("Unable to pause watcher on %s."
255                                  " Fail reason: %s; output: %s" %
256                                  (cluster, result.fail_reason, result.output))
257
258   def _StopDaemons(self):
259     """Stop all daemons on merging nodes.
260
261     """
262     cmd = "%s stop-all" % constants.DAEMON_UTIL
263     for data in self.merger_data:
264       for node in data.nodes:
265         result = self._RunCmd(node, cmd)
266
267         if result.failed:
268           raise errors.RemoteError("Unable to stop daemons on %s."
269                                    " Fail reason: %s; output: %s." %
270                                    (node, result.fail_reason, result.output))
271
272   def _FetchRemoteConfig(self):
273     """Fetches and stores remote cluster config from the master.
274
275     This step is needed before we can merge the config.
276
277     """
278     for data in self.merger_data:
279       result = self._RunCmd(data.cluster, "cat %s" %
280                                           constants.CLUSTER_CONF_FILE)
281
282       if result.failed:
283         raise errors.RemoteError("Unable to retrieve remote config on %s."
284                                  " Fail reason: %s; output %s" %
285                                  (data.cluster, result.fail_reason,
286                                   result.output))
287
288       data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
289                                         data.cluster)
290       utils.WriteFile(data.config_path, data=result.stdout)
291
292   # R0201: Method could be a function
293   def _KillMasterDaemon(self): # pylint: disable-msg=R0201
294     """Kills the local master daemon.
295
296     @raise errors.CommandError: If unable to kill
297
298     """
299     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
300     if result.failed:
301       raise errors.CommandError("Unable to stop master daemons."
302                                 " Fail reason: %s; output: %s" %
303                                 (result.fail_reason, result.output))
304
305   def _MergeConfig(self):
306     """Merges all foreign config into our own config.
307
308     """
309     my_config = config.ConfigWriter(offline=True)
310     fake_ec_id = 0 # Needs to be uniq over the whole config merge
311
312     for data in self.merger_data:
313       other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
314       self._MergeClusterConfigs(my_config, other_config)
315       self._MergeNodeGroups(my_config, other_config)
316
317       for node in other_config.GetNodeList():
318         node_info = other_config.GetNodeInfo(node)
319         # Offline the node, it will be reonlined later at node readd
320         node_info.master_candidate = False
321         node_info.drained = False
322         node_info.offline = True
323         my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
324         fake_ec_id += 1
325
326       for instance in other_config.GetInstanceList():
327         instance_info = other_config.GetInstanceInfo(instance)
328
329         # Update the DRBD port assignments
330         # This is a little bit hackish
331         for dsk in instance_info.disks:
332           if dsk.dev_type in constants.LDS_DRBD:
333             port = my_config.AllocatePort()
334
335             logical_id = list(dsk.logical_id)
336             logical_id[2] = port
337             dsk.logical_id = tuple(logical_id)
338
339             physical_id = list(dsk.physical_id)
340             physical_id[1] = physical_id[3] = port
341             dsk.physical_id = tuple(physical_id)
342
343         my_config.AddInstance(instance_info,
344                               _CLUSTERMERGE_ECID + str(fake_ec_id))
345         fake_ec_id += 1
346
347   def _MergeClusterConfigs(self, my_config, other_config):
348     """Checks that all relevant cluster parameters are compatible
349
350     """
351     my_cluster = my_config.GetClusterInfo()
352     other_cluster = other_config.GetClusterInfo()
353     err_count = 0
354
355     #
356     # Generic checks
357     #
358     check_params = [
359       "beparams",
360       "default_iallocator",
361       "drbd_usermode_helper",
362       "hidden_os",
363       "maintain_node_health",
364       "master_netdev",
365       "ndparams",
366       "nicparams",
367       "primary_ip_family",
368       "tags",
369       "uid_pool",
370       ]
371     check_params_strict = [
372       "volume_group_name",
373     ]
374     if constants.ENABLE_FILE_STORAGE:
375       check_params_strict.append("file_storage_dir")
376     if constants.ENABLE_SHARED_FILE_STORAGE:
377       check_params_strict.append("shared_file_storage_dir")
378     check_params.extend(check_params_strict)
379
380     if self.params == _PARAMS_STRICT:
381       params_strict = True
382     else:
383       params_strict = False
384
385     for param_name in check_params:
386       my_param = getattr(my_cluster, param_name)
387       other_param = getattr(other_cluster, param_name)
388       if my_param != other_param:
389         logging.error("The value (%s) of the cluster parameter %s on %s"
390                       " differs to this cluster's value (%s)",
391                       other_param, param_name, other_cluster.cluster_name,
392                       my_param)
393         if params_strict or param_name in check_params_strict:
394           err_count += 1
395
396     #
397     # Custom checks
398     #
399
400     # Check default hypervisor
401     my_defhyp = my_cluster.enabled_hypervisors[0]
402     other_defhyp = other_cluster.enabled_hypervisors[0]
403     if my_defhyp != other_defhyp:
404       logging.warning("The default hypervisor (%s) differs on %s, new"
405                       " instances will be created with this cluster's"
406                       " default hypervisor (%s)", other_defhyp,
407                       other_cluster.cluster_name, my_defhyp)
408
409     if (set(my_cluster.enabled_hypervisors) !=
410         set(other_cluster.enabled_hypervisors)):
411       logging.error("The set of enabled hypervisors (%s) on %s differs to"
412                     " this cluster's set (%s)",
413                     other_cluster.enabled_hypervisors,
414                     other_cluster.cluster_name, my_cluster.enabled_hypervisors)
415       err_count += 1
416
417     # Check hypervisor params for hypervisors we care about
418     for hyp in my_cluster.enabled_hypervisors:
419       for param in my_cluster.hvparams[hyp]:
420         my_value = my_cluster.hvparams[hyp][param]
421         other_value = other_cluster.hvparams[hyp][param]
422         if my_value != other_value:
423           logging.error("The value (%s) of the %s parameter of the %s"
424                         " hypervisor on %s differs to this cluster's parameter"
425                         " (%s)",
426                         other_value, param, hyp, other_cluster.cluster_name,
427                         my_value)
428           if params_strict:
429             err_count += 1
430
431     # Check os hypervisor params for hypervisors we care about
432     for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
433       for hyp in my_cluster.enabled_hypervisors:
434         my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
435         other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
436         if my_os_hvp != other_os_hvp:
437           logging.error("The OS parameters (%s) for the %s OS for the %s"
438                         " hypervisor on %s differs to this cluster's parameters"
439                         " (%s)",
440                         other_os_hvp, os_name, hyp, other_cluster.cluster_name,
441                         my_os_hvp)
442           if params_strict:
443             err_count += 1
444
445     #
446     # Warnings
447     #
448     if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
449       logging.warning("The modify_etc_hosts value (%s) differs on %s,"
450                       " this cluster's value (%s) will take precedence",
451                       other_cluster.modify_etc_hosts,
452                       other_cluster.cluster_name,
453                       my_cluster.modify_etc_hosts)
454
455     if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
456       logging.warning("The modify_ssh_setup value (%s) differs on %s,"
457                       " this cluster's value (%s) will take precedence",
458                       other_cluster.modify_ssh_setup,
459                       other_cluster.cluster_name,
460                       my_cluster.modify_ssh_setup)
461
462     #
463     # Actual merging
464     #
465     my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
466                                        other_cluster.reserved_lvs))
467
468     if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
469       logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
470                       " cluster's value (%s). The least permissive value (%s)"
471                       " will be used", other_cluster.prealloc_wipe_disks,
472                       other_cluster.cluster_name,
473                       my_cluster.prealloc_wipe_disks, True)
474       my_cluster.prealloc_wipe_disks = True
475
476     for os_, osparams in other_cluster.osparams.items():
477       if os_ not in my_cluster.osparams:
478         my_cluster.osparams[os_] = osparams
479       elif my_cluster.osparams[os_] != osparams:
480         logging.error("The OS parameters (%s) for the %s OS on %s differs to"
481                       " this cluster's parameters (%s)",
482                       osparams, os_, other_cluster.cluster_name,
483                       my_cluster.osparams[os_])
484         if params_strict:
485           err_count += 1
486
487     if err_count:
488       raise errors.ConfigurationError("Cluster config for %s has incompatible"
489                                       " values, please fix and re-run" %
490                                       other_cluster.cluster_name)
491
492   # R0201: Method could be a function
493   def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201
494     if os_name in cluster.os_hvp:
495       return cluster.os_hvp[os_name].get(hyp, None)
496     else:
497       return None
498
499   # R0201: Method could be a function
500   def _MergeNodeGroups(self, my_config, other_config):
501     """Adds foreign node groups
502
503     ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
504     """
505     # pylint: disable-msg=R0201
506     logging.info("Node group conflict strategy: %s", self.groups)
507
508     my_grps = my_config.GetAllNodeGroupsInfo().values()
509     other_grps = other_config.GetAllNodeGroupsInfo().values()
510
511     # Check for node group naming conflicts:
512     conflicts = []
513     for other_grp in other_grps:
514       for my_grp in my_grps:
515         if other_grp.name == my_grp.name:
516           conflicts.append(other_grp)
517
518     if conflicts:
519       conflict_names = utils.CommaJoin([g.name for g in conflicts])
520       logging.info("Node groups in both local and remote cluster: %s",
521                    conflict_names)
522
523       # User hasn't specified how to handle conflicts
524       if not self.groups:
525         raise errors.CommandError("The following node group(s) are in both"
526                                   " clusters, and no merge strategy has been"
527                                   " supplied (see the --groups option): %s" %
528                                   conflict_names)
529
530       # User wants to rename conflicts
531       elif self.groups == _GROUPS_RENAME:
532         for grp in conflicts:
533           new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
534           logging.info("Renaming remote node group from %s to %s"
535                        " to resolve conflict", grp.name, new_name)
536           grp.name = new_name
537
538       # User wants to merge conflicting groups
539       elif self.groups == _GROUPS_MERGE:
540         for other_grp in conflicts:
541           logging.info("Merging local and remote '%s' groups", other_grp.name)
542           for node_name in other_grp.members[:]:
543             node = other_config.GetNodeInfo(node_name)
544             # Access to a protected member of a client class
545             # pylint: disable-msg=W0212
546             other_config._UnlockedRemoveNodeFromGroup(node)
547
548             # Access to a protected member of a client class
549             # pylint: disable-msg=W0212
550             my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
551
552             # Access to a protected member of a client class
553             # pylint: disable-msg=W0212
554             my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
555             node.group = my_grp_uuid
556           # Remove from list of groups to add
557           other_grps.remove(other_grp)
558
559     for grp in other_grps:
560       #TODO: handle node group conflicts
561       my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
562
563   # R0201: Method could be a function
564   def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
565     """Starts the local master daemon.
566
567     @param no_vote: Should the masterd started without voting? default: False
568     @raise errors.CommandError: If unable to start daemon.
569
570     """
571     env = {}
572     if no_vote:
573       env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
574
575     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
576     if result.failed:
577       raise errors.CommandError("Couldn't start ganeti master."
578                                 " Fail reason: %s; output: %s" %
579                                 (result.fail_reason, result.output))
580
581   def _ReaddMergedNodesAndRedist(self):
582     """Readds all merging nodes and make sure their config is up-to-date.
583
584     @raise errors.CommandError: If anything fails.
585
586     """
587     for data in self.merger_data:
588       for node in data.nodes:
589         result = utils.RunCmd(["gnt-node", "add", "--readd",
590                                "--no-ssh-key-check", "--force-join", node])
591         if result.failed:
592           logging.error("%s failed to be readded. Reason: %s, output: %s",
593                          node, result.fail_reason, result.output)
594
595     result = utils.RunCmd(["gnt-cluster", "redist-conf"])
596     if result.failed:
597       raise errors.CommandError("Redistribution failed. Fail reason: %s;"
598                                 " output: %s" % (result.fail_reason,
599                                                 result.output))
600
601   # R0201: Method could be a function
602   def _StartupAllInstances(self): # pylint: disable-msg=R0201
603     """Starts up all instances (locally).
604
605     @raise errors.CommandError: If unable to start clusters
606
607     """
608     result = utils.RunCmd(["gnt-instance", "startup", "--all",
609                            "--force-multiple"])
610     if result.failed:
611       raise errors.CommandError("Unable to start all instances."
612                                 " Fail reason: %s; output: %s" %
613                                 (result.fail_reason, result.output))
614
615   # R0201: Method could be a function
616   # TODO: make this overridable, for some verify errors
617   def _VerifyCluster(self): # pylint: disable-msg=R0201
618     """Runs gnt-cluster verify to verify the health.
619
620     @raise errors.ProgrammError: If cluster fails on verification
621
622     """
623     result = utils.RunCmd(["gnt-cluster", "verify"])
624     if result.failed:
625       raise errors.CommandError("Verification of cluster failed."
626                                 " Fail reason: %s; output: %s" %
627                                 (result.fail_reason, result.output))
628
629   def Merge(self):
630     """Does the actual merge.
631
632     It runs all the steps in the right order and updates the user about steps
633     taken. Also it keeps track of rollback_steps to undo everything.
634
635     """
636     rbsteps = []
637     try:
638       logging.info("Pre cluster verification")
639       self._VerifyCluster()
640
641       logging.info("Prepare authorized_keys")
642       rbsteps.append("Remove our key from authorized_keys on nodes:"
643                      " %(nodes)s")
644       self._PrepareAuthorizedKeys()
645
646       rbsteps.append("Start all instances again on the merging"
647                      " clusters: %(clusters)s")
648       logging.info("Stopping merging instances (takes a while)")
649       self._StopMergingInstances()
650
651       logging.info("Disable watcher")
652       self._DisableWatcher()
653       logging.info("Stop daemons on merging nodes")
654       self._StopDaemons()
655       logging.info("Merging config")
656       self._FetchRemoteConfig()
657
658       logging.info("Stopping master daemon")
659       self._KillMasterDaemon()
660
661       rbsteps.append("Restore %s from another master candidate"
662                      " and restart master daemon" %
663                      constants.CLUSTER_CONF_FILE)
664       self._MergeConfig()
665       self._StartMasterDaemon(no_vote=True)
666
667       # Point of no return, delete rbsteps
668       del rbsteps[:]
669
670       logging.warning("We are at the point of no return. Merge can not easily"
671                       " be undone after this point.")
672       logging.info("Readd nodes")
673       self._ReaddMergedNodesAndRedist()
674
675       logging.info("Merge done, restart master daemon normally")
676       self._KillMasterDaemon()
677       self._StartMasterDaemon()
678
679       if self.restart == _RESTART_ALL:
680         logging.info("Starting instances again")
681         self._StartupAllInstances()
682       else:
683         logging.info("Not starting instances again")
684       logging.info("Post cluster verification")
685       self._VerifyCluster()
686     except errors.GenericError, e:
687       logging.exception(e)
688
689       if rbsteps:
690         nodes = Flatten([data.nodes for data in self.merger_data])
691         info = {
692           "clusters": self.clusters,
693           "nodes": nodes,
694           }
695         logging.critical("In order to rollback do the following:")
696         for step in rbsteps:
697           logging.critical("  * %s", step % info)
698       else:
699         logging.critical("Nothing to rollback.")
700
701       # TODO: Keep track of steps done for a flawless resume?
702
703   def Cleanup(self):
704     """Clean up our environment.
705
706     This cleans up remote private keys and configs and after that
707     deletes the temporary directory.
708
709     """
710     shutil.rmtree(self.work_dir)
711
712
713 def SetupLogging(options):
714   """Setting up logging infrastructure.
715
716   @param options: Parsed command line options
717
718   """
719   formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
720
721   stderr_handler = logging.StreamHandler()
722   stderr_handler.setFormatter(formatter)
723   if options.debug:
724     stderr_handler.setLevel(logging.NOTSET)
725   elif options.verbose:
726     stderr_handler.setLevel(logging.INFO)
727   else:
728     stderr_handler.setLevel(logging.WARNING)
729
730   root_logger = logging.getLogger("")
731   root_logger.setLevel(logging.NOTSET)
732   root_logger.addHandler(stderr_handler)
733
734
735 def main():
736   """Main routine.
737
738   """
739   program = os.path.basename(sys.argv[0])
740
741   parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
742                                  prog=program)
743   parser.add_option(cli.DEBUG_OPT)
744   parser.add_option(cli.VERBOSE_OPT)
745   parser.add_option(PAUSE_PERIOD_OPT)
746   parser.add_option(GROUPS_OPT)
747   parser.add_option(RESTART_OPT)
748   parser.add_option(PARAMS_OPT)
749
750   (options, args) = parser.parse_args()
751
752   SetupLogging(options)
753
754   if not args:
755     parser.error("No clusters specified")
756
757   cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
758                           options.groups, options.restart, options.params)
759   try:
760     try:
761       cluster_merger.Setup()
762       cluster_merger.Merge()
763     except errors.GenericError, e:
764       logging.exception(e)
765       return constants.EXIT_FAILURE
766   finally:
767     cluster_merger.Cleanup()
768
769   return constants.EXIT_SUCCESS
770
771
772 if __name__ == "__main__":
773   sys.exit(main())