PEP8 style fixes
[ganeti-local] / tools / cluster-merge
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Tool to merge two or more clusters together.
22
23 The clusters have to run the same version of Ganeti!
24
25 """
26
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
29
30 import logging
31 import os
32 import optparse
33 import shutil
34 import sys
35 import tempfile
36
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
43
44
45 _GROUPS_MERGE = "merge"
46 _GROUPS_RENAME = "rename"
47 _CLUSTERMERGE_ECID = "clustermerge-ecid"
48 _RESTART_ALL = "all"
49 _RESTART_UP = "up"
50 _RESTART_NONE = "none"
51 _RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
52 _PARAMS_STRICT = "strict"
53 _PARAMS_WARN = "warn"
54 _PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
55
56
57 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
58                                   action="store", type="int",
59                                   dest="pause_period",
60                                   help=("Amount of time in seconds watcher"
61                                         " should be suspended from running"))
62 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
63                             choices=(_GROUPS_MERGE, _GROUPS_RENAME),
64                             dest="groups",
65                             help=("How to handle groups that have the"
66                                   " same name (One of: %s/%s)" %
67                                   (_GROUPS_MERGE, _GROUPS_RENAME)))
68 PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
69                             metavar="STRATEGY",
70                             choices=_PARAMS_CHOICES,
71                             dest="params",
72                             help=("How to handle params that have"
73                                   " different values (One of: %s/%s)" %
74                                   _PARAMS_CHOICES))
75
76 RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
77                              metavar="STRATEGY",
78                              choices=_RESTART_CHOICES,
79                              dest="restart",
80                              help=("How to handle restarting instances"
81                                    " same name (One of: %s/%s/%s)" %
82                                    _RESTART_CHOICES))
83
84 SKIP_STOP_INSTANCES_OPT = \
85   cli.cli_option("--skip-stop-instances", default=True, action="store_false",
86                  dest="stop_instances",
87                  help=("Don't stop the instances on the clusters, just check "
88                        "that none is running"))
89
90
91 def Flatten(unflattened_list):
92   """Flattens a list.
93
94   @param unflattened_list: A list of unflattened list objects.
95   @return: A flattened list
96
97   """
98   flattened_list = []
99
100   for item in unflattened_list:
101     if isinstance(item, list):
102       flattened_list.extend(Flatten(item))
103     else:
104       flattened_list.append(item)
105   return flattened_list
106
107
108 class MergerData(object):
109   """Container class to hold data used for merger.
110
111   """
112   def __init__(self, cluster, key_path, nodes, instances, config_path=None):
113     """Initialize the container.
114
115     @param cluster: The name of the cluster
116     @param key_path: Path to the ssh private key used for authentication
117     @param nodes: List of online nodes in the merging cluster
118     @param instances: List of instances running on merging cluster
119     @param config_path: Path to the merging cluster config
120
121     """
122     self.cluster = cluster
123     self.key_path = key_path
124     self.nodes = nodes
125     self.instances = instances
126     self.config_path = config_path
127
128
129 class Merger(object):
130   """Handling the merge.
131
132   """
133   RUNNING_STATUSES = frozenset([
134     constants.INSTST_RUNNING,
135     constants.INSTST_ERRORUP,
136     ])
137
138   def __init__(self, clusters, pause_period, groups, restart, params,
139                stop_instances):
140     """Initialize object with sane defaults and infos required.
141
142     @param clusters: The list of clusters to merge in
143     @param pause_period: The time watcher shall be disabled for
144     @param groups: How to handle group conflicts
145     @param restart: How to handle instance restart
146     @param stop_instances: Indicates whether the instances must be stopped
147                            (True) or if the Merger must only check if no
148                            instances are running on the mergee clusters (False)
149
150     """
151     self.merger_data = []
152     self.clusters = clusters
153     self.pause_period = pause_period
154     self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
155     (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
156     self.ssh_runner = ssh.SshRunner(self.cluster_name)
157     self.groups = groups
158     self.restart = restart
159     self.params = params
160     self.stop_instances = stop_instances
161     if self.restart == _RESTART_UP:
162       raise NotImplementedError
163
164   def Setup(self):
165     """Sets up our end so we can do the merger.
166
167     This method is setting us up as a preparation for the merger.
168     It makes the initial contact and gathers information needed.
169
170     @raise errors.RemoteError: for errors in communication/grabbing
171
172     """
173     (remote_path, _, _) = ssh.GetUserFiles("root")
174
175     if self.cluster_name in self.clusters:
176       raise errors.CommandError("Cannot merge cluster %s with itself" %
177                                 self.cluster_name)
178
179     # Fetch remotes private key
180     for cluster in self.clusters:
181       result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
182                             ask_key=False)
183       if result.failed:
184         raise errors.RemoteError("There was an error while grabbing ssh private"
185                                  " key from %s. Fail reason: %s; output: %s" %
186                                  (cluster, result.fail_reason, result.output))
187
188       key_path = utils.PathJoin(self.work_dir, cluster)
189       utils.WriteFile(key_path, mode=0600, data=result.stdout)
190
191       result = self._RunCmd(cluster, "gnt-node list -o name,offline"
192                             " --no-header --separator=,", private_key=key_path)
193       if result.failed:
194         raise errors.RemoteError("Unable to retrieve list of nodes from %s."
195                                  " Fail reason: %s; output: %s" %
196                                  (cluster, result.fail_reason, result.output))
197       nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
198       nodes = [node_status[0] for node_status in nodes_statuses
199                if node_status[1] == "N"]
200
201       result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
202                             private_key=key_path)
203       if result.failed:
204         raise errors.RemoteError("Unable to retrieve list of instances from"
205                                  " %s. Fail reason: %s; output: %s" %
206                                  (cluster, result.fail_reason, result.output))
207       instances = result.stdout.splitlines()
208
209       self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
210
211   def _PrepareAuthorizedKeys(self):
212     """Prepare the authorized_keys on every merging node.
213
214     This method add our public key to remotes authorized_key for further
215     communication.
216
217     """
218     (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
219     pub_key = utils.ReadFile(pub_key_file)
220
221     for data in self.merger_data:
222       for node in data.nodes:
223         result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
224                                      (auth_keys, pub_key)),
225                               private_key=data.key_path, max_attempts=3)
226
227         if result.failed:
228           raise errors.RemoteError("Unable to add our public key to %s in %s."
229                                    " Fail reason: %s; output: %s" %
230                                    (node, data.cluster, result.fail_reason,
231                                     result.output))
232
233   def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
234               strict_host_check=False, private_key=None, batch=True,
235               ask_key=False, max_attempts=1):
236     """Wrapping SshRunner.Run with default parameters.
237
238     For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
239
240     """
241     for _ in range(max_attempts):
242       result = self.ssh_runner.Run(hostname=hostname, command=command,
243                                  user=user, use_cluster_key=use_cluster_key,
244                                  strict_host_check=strict_host_check,
245                                  private_key=private_key, batch=batch,
246                                  ask_key=ask_key)
247       if not result.failed:
248         break
249
250     return result
251
252   def _CheckRunningInstances(self):
253     """Checks if on the clusters to be merged there are running instances
254
255     @rtype: boolean
256     @return: True if there are running instances, False otherwise
257
258     """
259     for cluster in self.clusters:
260       result = self._RunCmd(cluster, "gnt-instance list -o status")
261       if self.RUNNING_STATUSES.intersection(result.output.splitlines()):
262         return True
263
264     return False
265
266   def _StopMergingInstances(self):
267     """Stop instances on merging clusters.
268
269     """
270     for cluster in self.clusters:
271       result = self._RunCmd(cluster, "gnt-instance shutdown --all"
272                                      " --force-multiple")
273
274       if result.failed:
275         raise errors.RemoteError("Unable to stop instances on %s."
276                                  " Fail reason: %s; output: %s" %
277                                  (cluster, result.fail_reason, result.output))
278
279   def _DisableWatcher(self):
280     """Disable watch on all merging clusters, including ourself.
281
282     """
283     for cluster in ["localhost"] + self.clusters:
284       result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
285                                      self.pause_period)
286
287       if result.failed:
288         raise errors.RemoteError("Unable to pause watcher on %s."
289                                  " Fail reason: %s; output: %s" %
290                                  (cluster, result.fail_reason, result.output))
291
292   def _StopDaemons(self):
293     """Stop all daemons on merging nodes.
294
295     """
296     cmd = "%s stop-all" % constants.DAEMON_UTIL
297     for data in self.merger_data:
298       for node in data.nodes:
299         result = self._RunCmd(node, cmd, max_attempts=3)
300
301         if result.failed:
302           raise errors.RemoteError("Unable to stop daemons on %s."
303                                    " Fail reason: %s; output: %s." %
304                                    (node, result.fail_reason, result.output))
305
306   def _FetchRemoteConfig(self):
307     """Fetches and stores remote cluster config from the master.
308
309     This step is needed before we can merge the config.
310
311     """
312     for data in self.merger_data:
313       result = self._RunCmd(data.cluster, "cat %s" %
314                                           constants.CLUSTER_CONF_FILE)
315
316       if result.failed:
317         raise errors.RemoteError("Unable to retrieve remote config on %s."
318                                  " Fail reason: %s; output %s" %
319                                  (data.cluster, result.fail_reason,
320                                   result.output))
321
322       data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
323                                         data.cluster)
324       utils.WriteFile(data.config_path, data=result.stdout)
325
326   # R0201: Method could be a function
327   def _KillMasterDaemon(self): # pylint: disable-msg=R0201
328     """Kills the local master daemon.
329
330     @raise errors.CommandError: If unable to kill
331
332     """
333     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
334     if result.failed:
335       raise errors.CommandError("Unable to stop master daemons."
336                                 " Fail reason: %s; output: %s" %
337                                 (result.fail_reason, result.output))
338
339   def _MergeConfig(self):
340     """Merges all foreign config into our own config.
341
342     """
343     my_config = config.ConfigWriter(offline=True)
344     fake_ec_id = 0 # Needs to be uniq over the whole config merge
345
346     for data in self.merger_data:
347       other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
348       self._MergeClusterConfigs(my_config, other_config)
349       self._MergeNodeGroups(my_config, other_config)
350
351       for node in other_config.GetNodeList():
352         node_info = other_config.GetNodeInfo(node)
353         # Offline the node, it will be reonlined later at node readd
354         node_info.master_candidate = False
355         node_info.drained = False
356         node_info.offline = True
357         my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
358         fake_ec_id += 1
359
360       for instance in other_config.GetInstanceList():
361         instance_info = other_config.GetInstanceInfo(instance)
362
363         # Update the DRBD port assignments
364         # This is a little bit hackish
365         for dsk in instance_info.disks:
366           if dsk.dev_type in constants.LDS_DRBD:
367             port = my_config.AllocatePort()
368
369             logical_id = list(dsk.logical_id)
370             logical_id[2] = port
371             dsk.logical_id = tuple(logical_id)
372
373             physical_id = list(dsk.physical_id)
374             physical_id[1] = physical_id[3] = port
375             dsk.physical_id = tuple(physical_id)
376
377         my_config.AddInstance(instance_info,
378                               _CLUSTERMERGE_ECID + str(fake_ec_id))
379         fake_ec_id += 1
380
381   def _MergeClusterConfigs(self, my_config, other_config):
382     """Checks that all relevant cluster parameters are compatible
383
384     """
385     my_cluster = my_config.GetClusterInfo()
386     other_cluster = other_config.GetClusterInfo()
387     err_count = 0
388
389     #
390     # Generic checks
391     #
392     check_params = [
393       "beparams",
394       "default_iallocator",
395       "drbd_usermode_helper",
396       "hidden_os",
397       "maintain_node_health",
398       "master_netdev",
399       "ndparams",
400       "nicparams",
401       "primary_ip_family",
402       "tags",
403       "uid_pool",
404       ]
405     check_params_strict = [
406       "volume_group_name",
407     ]
408     if constants.ENABLE_FILE_STORAGE:
409       check_params_strict.append("file_storage_dir")
410     if constants.ENABLE_SHARED_FILE_STORAGE:
411       check_params_strict.append("shared_file_storage_dir")
412     check_params.extend(check_params_strict)
413
414     if self.params == _PARAMS_STRICT:
415       params_strict = True
416     else:
417       params_strict = False
418
419     for param_name in check_params:
420       my_param = getattr(my_cluster, param_name)
421       other_param = getattr(other_cluster, param_name)
422       if my_param != other_param:
423         logging.error("The value (%s) of the cluster parameter %s on %s"
424                       " differs to this cluster's value (%s)",
425                       other_param, param_name, other_cluster.cluster_name,
426                       my_param)
427         if params_strict or param_name in check_params_strict:
428           err_count += 1
429
430     #
431     # Custom checks
432     #
433
434     # Check default hypervisor
435     my_defhyp = my_cluster.enabled_hypervisors[0]
436     other_defhyp = other_cluster.enabled_hypervisors[0]
437     if my_defhyp != other_defhyp:
438       logging.warning("The default hypervisor (%s) differs on %s, new"
439                       " instances will be created with this cluster's"
440                       " default hypervisor (%s)", other_defhyp,
441                       other_cluster.cluster_name, my_defhyp)
442
443     if (set(my_cluster.enabled_hypervisors) !=
444         set(other_cluster.enabled_hypervisors)):
445       logging.error("The set of enabled hypervisors (%s) on %s differs to"
446                     " this cluster's set (%s)",
447                     other_cluster.enabled_hypervisors,
448                     other_cluster.cluster_name, my_cluster.enabled_hypervisors)
449       err_count += 1
450
451     # Check hypervisor params for hypervisors we care about
452     for hyp in my_cluster.enabled_hypervisors:
453       for param in my_cluster.hvparams[hyp]:
454         my_value = my_cluster.hvparams[hyp][param]
455         other_value = other_cluster.hvparams[hyp][param]
456         if my_value != other_value:
457           logging.error("The value (%s) of the %s parameter of the %s"
458                         " hypervisor on %s differs to this cluster's parameter"
459                         " (%s)",
460                         other_value, param, hyp, other_cluster.cluster_name,
461                         my_value)
462           if params_strict:
463             err_count += 1
464
465     # Check os hypervisor params for hypervisors we care about
466     for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
467       for hyp in my_cluster.enabled_hypervisors:
468         my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
469         other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
470         if my_os_hvp != other_os_hvp:
471           logging.error("The OS parameters (%s) for the %s OS for the %s"
472                         " hypervisor on %s differs to this cluster's parameters"
473                         " (%s)",
474                         other_os_hvp, os_name, hyp, other_cluster.cluster_name,
475                         my_os_hvp)
476           if params_strict:
477             err_count += 1
478
479     #
480     # Warnings
481     #
482     if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
483       logging.warning("The modify_etc_hosts value (%s) differs on %s,"
484                       " this cluster's value (%s) will take precedence",
485                       other_cluster.modify_etc_hosts,
486                       other_cluster.cluster_name,
487                       my_cluster.modify_etc_hosts)
488
489     if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
490       logging.warning("The modify_ssh_setup value (%s) differs on %s,"
491                       " this cluster's value (%s) will take precedence",
492                       other_cluster.modify_ssh_setup,
493                       other_cluster.cluster_name,
494                       my_cluster.modify_ssh_setup)
495
496     #
497     # Actual merging
498     #
499     my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
500                                        other_cluster.reserved_lvs))
501
502     if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
503       logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
504                       " cluster's value (%s). The least permissive value (%s)"
505                       " will be used", other_cluster.prealloc_wipe_disks,
506                       other_cluster.cluster_name,
507                       my_cluster.prealloc_wipe_disks, True)
508       my_cluster.prealloc_wipe_disks = True
509
510     for os_, osparams in other_cluster.osparams.items():
511       if os_ not in my_cluster.osparams:
512         my_cluster.osparams[os_] = osparams
513       elif my_cluster.osparams[os_] != osparams:
514         logging.error("The OS parameters (%s) for the %s OS on %s differs to"
515                       " this cluster's parameters (%s)",
516                       osparams, os_, other_cluster.cluster_name,
517                       my_cluster.osparams[os_])
518         if params_strict:
519           err_count += 1
520
521     if err_count:
522       raise errors.ConfigurationError("Cluster config for %s has incompatible"
523                                       " values, please fix and re-run" %
524                                       other_cluster.cluster_name)
525
526   # R0201: Method could be a function
527   def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201
528     if os_name in cluster.os_hvp:
529       return cluster.os_hvp[os_name].get(hyp, None)
530     else:
531       return None
532
533   # R0201: Method could be a function
534   def _MergeNodeGroups(self, my_config, other_config):
535     """Adds foreign node groups
536
537     ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
538     """
539     # pylint: disable-msg=R0201
540     logging.info("Node group conflict strategy: %s", self.groups)
541
542     my_grps = my_config.GetAllNodeGroupsInfo().values()
543     other_grps = other_config.GetAllNodeGroupsInfo().values()
544
545     # Check for node group naming conflicts:
546     conflicts = []
547     for other_grp in other_grps:
548       for my_grp in my_grps:
549         if other_grp.name == my_grp.name:
550           conflicts.append(other_grp)
551
552     if conflicts:
553       conflict_names = utils.CommaJoin([g.name for g in conflicts])
554       logging.info("Node groups in both local and remote cluster: %s",
555                    conflict_names)
556
557       # User hasn't specified how to handle conflicts
558       if not self.groups:
559         raise errors.CommandError("The following node group(s) are in both"
560                                   " clusters, and no merge strategy has been"
561                                   " supplied (see the --groups option): %s" %
562                                   conflict_names)
563
564       # User wants to rename conflicts
565       elif self.groups == _GROUPS_RENAME:
566         for grp in conflicts:
567           new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
568           logging.info("Renaming remote node group from %s to %s"
569                        " to resolve conflict", grp.name, new_name)
570           grp.name = new_name
571
572       # User wants to merge conflicting groups
573       elif self.groups == _GROUPS_MERGE:
574         for other_grp in conflicts:
575           logging.info("Merging local and remote '%s' groups", other_grp.name)
576           for node_name in other_grp.members[:]:
577             node = other_config.GetNodeInfo(node_name)
578             # Access to a protected member of a client class
579             # pylint: disable-msg=W0212
580             other_config._UnlockedRemoveNodeFromGroup(node)
581
582             # Access to a protected member of a client class
583             # pylint: disable-msg=W0212
584             my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
585
586             # Access to a protected member of a client class
587             # pylint: disable-msg=W0212
588             my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
589             node.group = my_grp_uuid
590           # Remove from list of groups to add
591           other_grps.remove(other_grp)
592
593     for grp in other_grps:
594       #TODO: handle node group conflicts
595       my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
596
597   # R0201: Method could be a function
598   def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
599     """Starts the local master daemon.
600
601     @param no_vote: Should the masterd started without voting? default: False
602     @raise errors.CommandError: If unable to start daemon.
603
604     """
605     env = {}
606     if no_vote:
607       env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
608
609     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
610     if result.failed:
611       raise errors.CommandError("Couldn't start ganeti master."
612                                 " Fail reason: %s; output: %s" %
613                                 (result.fail_reason, result.output))
614
615   def _ReaddMergedNodesAndRedist(self):
616     """Readds all merging nodes and make sure their config is up-to-date.
617
618     @raise errors.CommandError: If anything fails.
619
620     """
621     for data in self.merger_data:
622       for node in data.nodes:
623         result = utils.RunCmd(["gnt-node", "add", "--readd",
624                                "--no-ssh-key-check", "--force-join", node])
625         if result.failed:
626           logging.error("%s failed to be readded. Reason: %s, output: %s",
627                          node, result.fail_reason, result.output)
628
629     result = utils.RunCmd(["gnt-cluster", "redist-conf"])
630     if result.failed:
631       raise errors.CommandError("Redistribution failed. Fail reason: %s;"
632                                 " output: %s" % (result.fail_reason,
633                                                 result.output))
634
635   # R0201: Method could be a function
636   def _StartupAllInstances(self): # pylint: disable-msg=R0201
637     """Starts up all instances (locally).
638
639     @raise errors.CommandError: If unable to start clusters
640
641     """
642     result = utils.RunCmd(["gnt-instance", "startup", "--all",
643                            "--force-multiple"])
644     if result.failed:
645       raise errors.CommandError("Unable to start all instances."
646                                 " Fail reason: %s; output: %s" %
647                                 (result.fail_reason, result.output))
648
649   # R0201: Method could be a function
650   # TODO: make this overridable, for some verify errors
651   def _VerifyCluster(self): # pylint: disable-msg=R0201
652     """Runs gnt-cluster verify to verify the health.
653
654     @raise errors.ProgrammError: If cluster fails on verification
655
656     """
657     result = utils.RunCmd(["gnt-cluster", "verify"])
658     if result.failed:
659       raise errors.CommandError("Verification of cluster failed."
660                                 " Fail reason: %s; output: %s" %
661                                 (result.fail_reason, result.output))
662
663   def Merge(self):
664     """Does the actual merge.
665
666     It runs all the steps in the right order and updates the user about steps
667     taken. Also it keeps track of rollback_steps to undo everything.
668
669     """
670     rbsteps = []
671     try:
672       logging.info("Pre cluster verification")
673       self._VerifyCluster()
674
675       logging.info("Prepare authorized_keys")
676       rbsteps.append("Remove our key from authorized_keys on nodes:"
677                      " %(nodes)s")
678       self._PrepareAuthorizedKeys()
679
680       rbsteps.append("Start all instances again on the merging"
681                      " clusters: %(clusters)s")
682       if self.stop_instances:
683         logging.info("Stopping merging instances (takes a while)")
684         self._StopMergingInstances()
685       logging.info("Checking that no instances are running on the mergees")
686       instances_running = self._CheckRunningInstances()
687       if instances_running:
688         raise errors.CommandError("Some instances are still running on the"
689                                   " mergees")
690       logging.info("Disable watcher")
691       self._DisableWatcher()
692       logging.info("Stop daemons on merging nodes")
693       self._StopDaemons()
694       logging.info("Merging config")
695       self._FetchRemoteConfig()
696
697       logging.info("Stopping master daemon")
698       self._KillMasterDaemon()
699
700       rbsteps.append("Restore %s from another master candidate"
701                      " and restart master daemon" %
702                      constants.CLUSTER_CONF_FILE)
703       self._MergeConfig()
704       self._StartMasterDaemon(no_vote=True)
705
706       # Point of no return, delete rbsteps
707       del rbsteps[:]
708
709       logging.warning("We are at the point of no return. Merge can not easily"
710                       " be undone after this point.")
711       logging.info("Readd nodes")
712       self._ReaddMergedNodesAndRedist()
713
714       logging.info("Merge done, restart master daemon normally")
715       self._KillMasterDaemon()
716       self._StartMasterDaemon()
717
718       if self.restart == _RESTART_ALL:
719         logging.info("Starting instances again")
720         self._StartupAllInstances()
721       else:
722         logging.info("Not starting instances again")
723       logging.info("Post cluster verification")
724       self._VerifyCluster()
725     except errors.GenericError, e:
726       logging.exception(e)
727
728       if rbsteps:
729         nodes = Flatten([data.nodes for data in self.merger_data])
730         info = {
731           "clusters": self.clusters,
732           "nodes": nodes,
733           }
734         logging.critical("In order to rollback do the following:")
735         for step in rbsteps:
736           logging.critical("  * %s", step % info)
737       else:
738         logging.critical("Nothing to rollback.")
739
740       # TODO: Keep track of steps done for a flawless resume?
741
742   def Cleanup(self):
743     """Clean up our environment.
744
745     This cleans up remote private keys and configs and after that
746     deletes the temporary directory.
747
748     """
749     shutil.rmtree(self.work_dir)
750
751
752 def SetupLogging(options):
753   """Setting up logging infrastructure.
754
755   @param options: Parsed command line options
756
757   """
758   formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
759
760   stderr_handler = logging.StreamHandler()
761   stderr_handler.setFormatter(formatter)
762   if options.debug:
763     stderr_handler.setLevel(logging.NOTSET)
764   elif options.verbose:
765     stderr_handler.setLevel(logging.INFO)
766   else:
767     stderr_handler.setLevel(logging.WARNING)
768
769   root_logger = logging.getLogger("")
770   root_logger.setLevel(logging.NOTSET)
771   root_logger.addHandler(stderr_handler)
772
773
774 def main():
775   """Main routine.
776
777   """
778   program = os.path.basename(sys.argv[0])
779
780   parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
781                                  prog=program)
782   parser.add_option(cli.DEBUG_OPT)
783   parser.add_option(cli.VERBOSE_OPT)
784   parser.add_option(PAUSE_PERIOD_OPT)
785   parser.add_option(GROUPS_OPT)
786   parser.add_option(RESTART_OPT)
787   parser.add_option(PARAMS_OPT)
788   parser.add_option(SKIP_STOP_INSTANCES_OPT)
789
790   (options, args) = parser.parse_args()
791
792   SetupLogging(options)
793
794   if not args:
795     parser.error("No clusters specified")
796
797   cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
798                           options.groups, options.restart, options.params,
799                           options.stop_instances)
800   try:
801     try:
802       cluster_merger.Setup()
803       cluster_merger.Merge()
804     except errors.GenericError, e:
805       logging.exception(e)
806       return constants.EXIT_FAILURE
807   finally:
808     cluster_merger.Cleanup()
809
810   return constants.EXIT_SUCCESS
811
812
813 if __name__ == "__main__":
814   sys.exit(main())