cluster-merge: remove usage of 'ENABLE_FILE_STORAGE'
[ganeti-local] / tools / cluster-merge
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Tool to merge two or more clusters together.
22
23 The clusters have to run the same version of Ganeti!
24
25 """
26
27 # pylint: disable=C0103
28 # C0103: Invalid name cluster-merge
29
30 import logging
31 import os
32 import optparse
33 import shutil
34 import sys
35 import tempfile
36
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
43 from ganeti import pathutils
44 from ganeti import compat
45
46
47 _GROUPS_MERGE = "merge"
48 _GROUPS_RENAME = "rename"
49 _CLUSTERMERGE_ECID = "clustermerge-ecid"
50 _RESTART_ALL = "all"
51 _RESTART_UP = "up"
52 _RESTART_NONE = "none"
53 _RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
54 _PARAMS_STRICT = "strict"
55 _PARAMS_WARN = "warn"
56 _PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
57
58
59 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
60                                   action="store", type="int",
61                                   dest="pause_period",
62                                   help=("Amount of time in seconds watcher"
63                                         " should be suspended from running"))
64 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
65                             choices=(_GROUPS_MERGE, _GROUPS_RENAME),
66                             dest="groups",
67                             help=("How to handle groups that have the"
68                                   " same name (One of: %s/%s)" %
69                                   (_GROUPS_MERGE, _GROUPS_RENAME)))
70 PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
71                             metavar="STRATEGY",
72                             choices=_PARAMS_CHOICES,
73                             dest="params",
74                             help=("How to handle params that have"
75                                   " different values (One of: %s/%s)" %
76                                   _PARAMS_CHOICES))
77
78 RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
79                              metavar="STRATEGY",
80                              choices=_RESTART_CHOICES,
81                              dest="restart",
82                              help=("How to handle restarting instances"
83                                    " same name (One of: %s/%s/%s)" %
84                                    _RESTART_CHOICES))
85
86 SKIP_STOP_INSTANCES_OPT = \
87   cli.cli_option("--skip-stop-instances", default=True, action="store_false",
88                  dest="stop_instances",
89                  help=("Don't stop the instances on the clusters, just check "
90                        "that none is running"))
91
92
93 def Flatten(unflattened_list):
94   """Flattens a list.
95
96   @param unflattened_list: A list of unflattened list objects.
97   @return: A flattened list
98
99   """
100   flattened_list = []
101
102   for item in unflattened_list:
103     if isinstance(item, list):
104       flattened_list.extend(Flatten(item))
105     else:
106       flattened_list.append(item)
107   return flattened_list
108
109
110 class MergerData(object):
111   """Container class to hold data used for merger.
112
113   """
114   def __init__(self, cluster, key_path, nodes, instances, master_node,
115                config_path=None):
116     """Initialize the container.
117
118     @param cluster: The name of the cluster
119     @param key_path: Path to the ssh private key used for authentication
120     @param nodes: List of online nodes in the merging cluster
121     @param instances: List of instances running on merging cluster
122     @param master_node: Name of the master node
123     @param config_path: Path to the merging cluster config
124
125     """
126     self.cluster = cluster
127     self.key_path = key_path
128     self.nodes = nodes
129     self.instances = instances
130     self.master_node = master_node
131     self.config_path = config_path
132
133
134 class Merger(object):
135   """Handling the merge.
136
137   """
138   RUNNING_STATUSES = compat.UniqueFrozenset([
139     constants.INSTST_RUNNING,
140     constants.INSTST_ERRORUP,
141     ])
142
143   def __init__(self, clusters, pause_period, groups, restart, params,
144                stop_instances):
145     """Initialize object with sane defaults and infos required.
146
147     @param clusters: The list of clusters to merge in
148     @param pause_period: The time watcher shall be disabled for
149     @param groups: How to handle group conflicts
150     @param restart: How to handle instance restart
151     @param stop_instances: Indicates whether the instances must be stopped
152                            (True) or if the Merger must only check if no
153                            instances are running on the mergee clusters (False)
154
155     """
156     self.merger_data = []
157     self.clusters = clusters
158     self.pause_period = pause_period
159     self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
160     (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
161     self.ssh_runner = ssh.SshRunner(self.cluster_name)
162     self.groups = groups
163     self.restart = restart
164     self.params = params
165     self.stop_instances = stop_instances
166     if self.restart == _RESTART_UP:
167       raise NotImplementedError
168
169   def Setup(self):
170     """Sets up our end so we can do the merger.
171
172     This method is setting us up as a preparation for the merger.
173     It makes the initial contact and gathers information needed.
174
175     @raise errors.RemoteError: for errors in communication/grabbing
176
177     """
178     (remote_path, _, _) = ssh.GetUserFiles("root")
179
180     if self.cluster_name in self.clusters:
181       raise errors.CommandError("Cannot merge cluster %s with itself" %
182                                 self.cluster_name)
183
184     # Fetch remotes private key
185     for cluster in self.clusters:
186       result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
187                             ask_key=False)
188       if result.failed:
189         raise errors.RemoteError("There was an error while grabbing ssh private"
190                                  " key from %s. Fail reason: %s; output: %s" %
191                                  (cluster, result.fail_reason, result.output))
192
193       key_path = utils.PathJoin(self.work_dir, cluster)
194       utils.WriteFile(key_path, mode=0600, data=result.stdout)
195
196       result = self._RunCmd(cluster, "gnt-node list -o name,offline"
197                             " --no-headers --separator=,", private_key=key_path)
198       if result.failed:
199         raise errors.RemoteError("Unable to retrieve list of nodes from %s."
200                                  " Fail reason: %s; output: %s" %
201                                  (cluster, result.fail_reason, result.output))
202       nodes_statuses = [line.split(",") for line in result.stdout.splitlines()]
203       nodes = [node_status[0] for node_status in nodes_statuses
204                if node_status[1] == "N"]
205
206       result = self._RunCmd(cluster, "gnt-instance list -o name --no-headers",
207                             private_key=key_path)
208       if result.failed:
209         raise errors.RemoteError("Unable to retrieve list of instances from"
210                                  " %s. Fail reason: %s; output: %s" %
211                                  (cluster, result.fail_reason, result.output))
212       instances = result.stdout.splitlines()
213
214       path = utils.PathJoin(pathutils.DATA_DIR, "ssconf_%s" %
215                             constants.SS_MASTER_NODE)
216       result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path)
217       if result.failed:
218         raise errors.RemoteError("Unable to retrieve the master node name from"
219                                  " %s. Fail reason: %s; output: %s" %
220                                  (cluster, result.fail_reason, result.output))
221       master_node = result.stdout.strip()
222
223       self.merger_data.append(MergerData(cluster, key_path, nodes, instances,
224                                          master_node))
225
226   def _PrepareAuthorizedKeys(self):
227     """Prepare the authorized_keys on every merging node.
228
229     This method add our public key to remotes authorized_key for further
230     communication.
231
232     """
233     (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
234     pub_key = utils.ReadFile(pub_key_file)
235
236     for data in self.merger_data:
237       for node in data.nodes:
238         result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
239                                      (auth_keys, pub_key)),
240                               private_key=data.key_path, max_attempts=3)
241
242         if result.failed:
243           raise errors.RemoteError("Unable to add our public key to %s in %s."
244                                    " Fail reason: %s; output: %s" %
245                                    (node, data.cluster, result.fail_reason,
246                                     result.output))
247
248   def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
249               strict_host_check=False, private_key=None, batch=True,
250               ask_key=False, max_attempts=1):
251     """Wrapping SshRunner.Run with default parameters.
252
253     For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
254
255     """
256     for _ in range(max_attempts):
257       result = self.ssh_runner.Run(hostname=hostname, command=command,
258                                    user=user, use_cluster_key=use_cluster_key,
259                                    strict_host_check=strict_host_check,
260                                    private_key=private_key, batch=batch,
261                                    ask_key=ask_key)
262       if not result.failed:
263         break
264
265     return result
266
267   def _CheckRunningInstances(self):
268     """Checks if on the clusters to be merged there are running instances
269
270     @rtype: boolean
271     @return: True if there are running instances, False otherwise
272
273     """
274     for cluster in self.clusters:
275       result = self._RunCmd(cluster, "gnt-instance list -o status")
276       if self.RUNNING_STATUSES.intersection(result.output.splitlines()):
277         return True
278
279     return False
280
281   def _StopMergingInstances(self):
282     """Stop instances on merging clusters.
283
284     """
285     for cluster in self.clusters:
286       result = self._RunCmd(cluster, "gnt-instance shutdown --all"
287                                      " --force-multiple")
288
289       if result.failed:
290         raise errors.RemoteError("Unable to stop instances on %s."
291                                  " Fail reason: %s; output: %s" %
292                                  (cluster, result.fail_reason, result.output))
293
294   def _DisableWatcher(self):
295     """Disable watch on all merging clusters, including ourself.
296
297     """
298     for cluster in ["localhost"] + self.clusters:
299       result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
300                                      self.pause_period)
301
302       if result.failed:
303         raise errors.RemoteError("Unable to pause watcher on %s."
304                                  " Fail reason: %s; output: %s" %
305                                  (cluster, result.fail_reason, result.output))
306
307   def _RemoveMasterIps(self):
308     """Removes the master IPs from the master nodes of each cluster.
309
310     """
311     for data in self.merger_data:
312       result = self._RunCmd(data.master_node,
313                             "gnt-cluster deactivate-master-ip --yes")
314
315       if result.failed:
316         raise errors.RemoteError("Unable to remove master IP on %s."
317                                  " Fail reason: %s; output: %s" %
318                                  (data.master_node,
319                                   result.fail_reason,
320                                   result.output))
321
322   def _StopDaemons(self):
323     """Stop all daemons on merging nodes.
324
325     """
326     cmd = "%s stop-all" % pathutils.DAEMON_UTIL
327     for data in self.merger_data:
328       for node in data.nodes:
329         result = self._RunCmd(node, cmd, max_attempts=3)
330
331         if result.failed:
332           raise errors.RemoteError("Unable to stop daemons on %s."
333                                    " Fail reason: %s; output: %s." %
334                                    (node, result.fail_reason, result.output))
335
336   def _FetchRemoteConfig(self):
337     """Fetches and stores remote cluster config from the master.
338
339     This step is needed before we can merge the config.
340
341     """
342     for data in self.merger_data:
343       result = self._RunCmd(data.cluster, "cat %s" %
344                                           pathutils.CLUSTER_CONF_FILE)
345
346       if result.failed:
347         raise errors.RemoteError("Unable to retrieve remote config on %s."
348                                  " Fail reason: %s; output %s" %
349                                  (data.cluster, result.fail_reason,
350                                   result.output))
351
352       data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
353                                         data.cluster)
354       utils.WriteFile(data.config_path, data=result.stdout)
355
356   # R0201: Method could be a function
357   def _KillMasterDaemon(self): # pylint: disable=R0201
358     """Kills the local master daemon.
359
360     @raise errors.CommandError: If unable to kill
361
362     """
363     result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
364     if result.failed:
365       raise errors.CommandError("Unable to stop master daemons."
366                                 " Fail reason: %s; output: %s" %
367                                 (result.fail_reason, result.output))
368
369   def _MergeConfig(self):
370     """Merges all foreign config into our own config.
371
372     """
373     my_config = config.ConfigWriter(offline=True)
374     fake_ec_id = 0 # Needs to be uniq over the whole config merge
375
376     for data in self.merger_data:
377       other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
378       self._MergeClusterConfigs(my_config, other_config)
379       self._MergeNodeGroups(my_config, other_config)
380
381       for node in other_config.GetNodeList():
382         node_info = other_config.GetNodeInfo(node)
383         # Offline the node, it will be reonlined later at node readd
384         node_info.master_candidate = False
385         node_info.drained = False
386         node_info.offline = True
387         my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
388         fake_ec_id += 1
389
390       for instance in other_config.GetInstanceList():
391         instance_info = other_config.GetInstanceInfo(instance)
392
393         # Update the DRBD port assignments
394         # This is a little bit hackish
395         for dsk in instance_info.disks:
396           if dsk.dev_type in constants.LDS_DRBD:
397             port = my_config.AllocatePort()
398
399             logical_id = list(dsk.logical_id)
400             logical_id[2] = port
401             dsk.logical_id = tuple(logical_id)
402
403             physical_id = list(dsk.physical_id)
404             physical_id[1] = physical_id[3] = port
405             dsk.physical_id = tuple(physical_id)
406
407         my_config.AddInstance(instance_info,
408                               _CLUSTERMERGE_ECID + str(fake_ec_id))
409         fake_ec_id += 1
410
411   def _MergeClusterConfigs(self, my_config, other_config):
412     """Checks that all relevant cluster parameters are compatible
413
414     """
415     my_cluster = my_config.GetClusterInfo()
416     other_cluster = other_config.GetClusterInfo()
417     err_count = 0
418
419     #
420     # Generic checks
421     #
422     check_params = [
423       "beparams",
424       "default_iallocator",
425       "drbd_usermode_helper",
426       "hidden_os",
427       "maintain_node_health",
428       "master_netdev",
429       "ndparams",
430       "nicparams",
431       "primary_ip_family",
432       "tags",
433       "uid_pool",
434       ]
435     check_params_strict = [
436       "volume_group_name",
437     ]
438     if my_cluster.IsFileStorageEnabled() or \
439         other_cluster.IsFileStorageEnabled():
440       check_params_strict.append("file_storage_dir")
441     if constants.ENABLE_SHARED_FILE_STORAGE:
442       check_params_strict.append("shared_file_storage_dir")
443     check_params.extend(check_params_strict)
444
445     if self.params == _PARAMS_STRICT:
446       params_strict = True
447     else:
448       params_strict = False
449
450     for param_name in check_params:
451       my_param = getattr(my_cluster, param_name)
452       other_param = getattr(other_cluster, param_name)
453       if my_param != other_param:
454         logging.error("The value (%s) of the cluster parameter %s on %s"
455                       " differs to this cluster's value (%s)",
456                       other_param, param_name, other_cluster.cluster_name,
457                       my_param)
458         if params_strict or param_name in check_params_strict:
459           err_count += 1
460
461     #
462     # Custom checks
463     #
464
465     # Check default hypervisor
466     my_defhyp = my_cluster.enabled_hypervisors[0]
467     other_defhyp = other_cluster.enabled_hypervisors[0]
468     if my_defhyp != other_defhyp:
469       logging.warning("The default hypervisor (%s) differs on %s, new"
470                       " instances will be created with this cluster's"
471                       " default hypervisor (%s)", other_defhyp,
472                       other_cluster.cluster_name, my_defhyp)
473
474     if (set(my_cluster.enabled_hypervisors) !=
475         set(other_cluster.enabled_hypervisors)):
476       logging.error("The set of enabled hypervisors (%s) on %s differs to"
477                     " this cluster's set (%s)",
478                     other_cluster.enabled_hypervisors,
479                     other_cluster.cluster_name, my_cluster.enabled_hypervisors)
480       err_count += 1
481
482     # Check hypervisor params for hypervisors we care about
483     for hyp in my_cluster.enabled_hypervisors:
484       for param in my_cluster.hvparams[hyp]:
485         my_value = my_cluster.hvparams[hyp][param]
486         other_value = other_cluster.hvparams[hyp][param]
487         if my_value != other_value:
488           logging.error("The value (%s) of the %s parameter of the %s"
489                         " hypervisor on %s differs to this cluster's parameter"
490                         " (%s)",
491                         other_value, param, hyp, other_cluster.cluster_name,
492                         my_value)
493           if params_strict:
494             err_count += 1
495
496     # Check os hypervisor params for hypervisors we care about
497     for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
498       for hyp in my_cluster.enabled_hypervisors:
499         my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
500         other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
501         if my_os_hvp != other_os_hvp:
502           logging.error("The OS parameters (%s) for the %s OS for the %s"
503                         " hypervisor on %s differs to this cluster's parameters"
504                         " (%s)",
505                         other_os_hvp, os_name, hyp, other_cluster.cluster_name,
506                         my_os_hvp)
507           if params_strict:
508             err_count += 1
509
510     #
511     # Warnings
512     #
513     if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
514       logging.warning("The modify_etc_hosts value (%s) differs on %s,"
515                       " this cluster's value (%s) will take precedence",
516                       other_cluster.modify_etc_hosts,
517                       other_cluster.cluster_name,
518                       my_cluster.modify_etc_hosts)
519
520     if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
521       logging.warning("The modify_ssh_setup value (%s) differs on %s,"
522                       " this cluster's value (%s) will take precedence",
523                       other_cluster.modify_ssh_setup,
524                       other_cluster.cluster_name,
525                       my_cluster.modify_ssh_setup)
526
527     #
528     # Actual merging
529     #
530     my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
531                                        other_cluster.reserved_lvs))
532
533     if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
534       logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
535                       " cluster's value (%s). The least permissive value (%s)"
536                       " will be used", other_cluster.prealloc_wipe_disks,
537                       other_cluster.cluster_name,
538                       my_cluster.prealloc_wipe_disks, True)
539       my_cluster.prealloc_wipe_disks = True
540
541     for os_, osparams in other_cluster.osparams.items():
542       if os_ not in my_cluster.osparams:
543         my_cluster.osparams[os_] = osparams
544       elif my_cluster.osparams[os_] != osparams:
545         logging.error("The OS parameters (%s) for the %s OS on %s differs to"
546                       " this cluster's parameters (%s)",
547                       osparams, os_, other_cluster.cluster_name,
548                       my_cluster.osparams[os_])
549         if params_strict:
550           err_count += 1
551
552     if err_count:
553       raise errors.ConfigurationError("Cluster config for %s has incompatible"
554                                       " values, please fix and re-run" %
555                                       other_cluster.cluster_name)
556
557   # R0201: Method could be a function
558   def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable=R0201
559     if os_name in cluster.os_hvp:
560       return cluster.os_hvp[os_name].get(hyp, None)
561     else:
562       return None
563
564   # R0201: Method could be a function
565   def _MergeNodeGroups(self, my_config, other_config):
566     """Adds foreign node groups
567
568     ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
569     """
570     # pylint: disable=R0201
571     logging.info("Node group conflict strategy: %s", self.groups)
572
573     my_grps = my_config.GetAllNodeGroupsInfo().values()
574     other_grps = other_config.GetAllNodeGroupsInfo().values()
575
576     # Check for node group naming conflicts:
577     conflicts = []
578     for other_grp in other_grps:
579       for my_grp in my_grps:
580         if other_grp.name == my_grp.name:
581           conflicts.append(other_grp)
582
583     if conflicts:
584       conflict_names = utils.CommaJoin([g.name for g in conflicts])
585       logging.info("Node groups in both local and remote cluster: %s",
586                    conflict_names)
587
588       # User hasn't specified how to handle conflicts
589       if not self.groups:
590         raise errors.CommandError("The following node group(s) are in both"
591                                   " clusters, and no merge strategy has been"
592                                   " supplied (see the --groups option): %s" %
593                                   conflict_names)
594
595       # User wants to rename conflicts
596       elif self.groups == _GROUPS_RENAME:
597         for grp in conflicts:
598           new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
599           logging.info("Renaming remote node group from %s to %s"
600                        " to resolve conflict", grp.name, new_name)
601           grp.name = new_name
602
603       # User wants to merge conflicting groups
604       elif self.groups == _GROUPS_MERGE:
605         for other_grp in conflicts:
606           logging.info("Merging local and remote '%s' groups", other_grp.name)
607           for node_name in other_grp.members[:]:
608             node = other_config.GetNodeInfo(node_name)
609             # Access to a protected member of a client class
610             # pylint: disable=W0212
611             other_config._UnlockedRemoveNodeFromGroup(node)
612
613             # Access to a protected member of a client class
614             # pylint: disable=W0212
615             my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
616
617             # Access to a protected member of a client class
618             # pylint: disable=W0212
619             my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
620             node.group = my_grp_uuid
621           # Remove from list of groups to add
622           other_grps.remove(other_grp)
623
624     for grp in other_grps:
625       #TODO: handle node group conflicts
626       my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
627
628   # R0201: Method could be a function
629   def _StartMasterDaemon(self, no_vote=False): # pylint: disable=R0201
630     """Starts the local master daemon.
631
632     @param no_vote: Should the masterd started without voting? default: False
633     @raise errors.CommandError: If unable to start daemon.
634
635     """
636     env = {}
637     if no_vote:
638       env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
639
640     result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
641     if result.failed:
642       raise errors.CommandError("Couldn't start ganeti master."
643                                 " Fail reason: %s; output: %s" %
644                                 (result.fail_reason, result.output))
645
646   def _ReaddMergedNodesAndRedist(self):
647     """Readds all merging nodes and make sure their config is up-to-date.
648
649     @raise errors.CommandError: If anything fails.
650
651     """
652     for data in self.merger_data:
653       for node in data.nodes:
654         logging.info("Readding node %s", node)
655         result = utils.RunCmd(["gnt-node", "add", "--readd",
656                                "--no-ssh-key-check", node])
657         if result.failed:
658           logging.error("%s failed to be readded. Reason: %s, output: %s",
659                          node, result.fail_reason, result.output)
660
661     result = utils.RunCmd(["gnt-cluster", "redist-conf"])
662     if result.failed:
663       raise errors.CommandError("Redistribution failed. Fail reason: %s;"
664                                 " output: %s" % (result.fail_reason,
665                                                  result.output))
666
667   # R0201: Method could be a function
668   def _StartupAllInstances(self): # pylint: disable=R0201
669     """Starts up all instances (locally).
670
671     @raise errors.CommandError: If unable to start clusters
672
673     """
674     result = utils.RunCmd(["gnt-instance", "startup", "--all",
675                            "--force-multiple"])
676     if result.failed:
677       raise errors.CommandError("Unable to start all instances."
678                                 " Fail reason: %s; output: %s" %
679                                 (result.fail_reason, result.output))
680
681   # R0201: Method could be a function
682   # TODO: make this overridable, for some verify errors
683   def _VerifyCluster(self): # pylint: disable=R0201
684     """Runs gnt-cluster verify to verify the health.
685
686     @raise errors.ProgrammError: If cluster fails on verification
687
688     """
689     result = utils.RunCmd(["gnt-cluster", "verify"])
690     if result.failed:
691       raise errors.CommandError("Verification of cluster failed."
692                                 " Fail reason: %s; output: %s" %
693                                 (result.fail_reason, result.output))
694
695   def Merge(self):
696     """Does the actual merge.
697
698     It runs all the steps in the right order and updates the user about steps
699     taken. Also it keeps track of rollback_steps to undo everything.
700
701     """
702     rbsteps = []
703     try:
704       logging.info("Pre cluster verification")
705       self._VerifyCluster()
706
707       logging.info("Prepare authorized_keys")
708       rbsteps.append("Remove our key from authorized_keys on nodes:"
709                      " %(nodes)s")
710       self._PrepareAuthorizedKeys()
711
712       rbsteps.append("Start all instances again on the merging"
713                      " clusters: %(clusters)s")
714       if self.stop_instances:
715         logging.info("Stopping merging instances (takes a while)")
716         self._StopMergingInstances()
717       logging.info("Checking that no instances are running on the mergees")
718       instances_running = self._CheckRunningInstances()
719       if instances_running:
720         raise errors.CommandError("Some instances are still running on the"
721                                   " mergees")
722       logging.info("Disable watcher")
723       self._DisableWatcher()
724       logging.info("Merging config")
725       self._FetchRemoteConfig()
726       logging.info("Removing master IPs on mergee master nodes")
727       self._RemoveMasterIps()
728       logging.info("Stop daemons on merging nodes")
729       self._StopDaemons()
730
731       logging.info("Stopping master daemon")
732       self._KillMasterDaemon()
733
734       rbsteps.append("Restore %s from another master candidate"
735                      " and restart master daemon" %
736                      pathutils.CLUSTER_CONF_FILE)
737       self._MergeConfig()
738       self._StartMasterDaemon(no_vote=True)
739
740       # Point of no return, delete rbsteps
741       del rbsteps[:]
742
743       logging.warning("We are at the point of no return. Merge can not easily"
744                       " be undone after this point.")
745       logging.info("Readd nodes")
746       self._ReaddMergedNodesAndRedist()
747
748       logging.info("Merge done, restart master daemon normally")
749       self._KillMasterDaemon()
750       self._StartMasterDaemon()
751
752       if self.restart == _RESTART_ALL:
753         logging.info("Starting instances again")
754         self._StartupAllInstances()
755       else:
756         logging.info("Not starting instances again")
757       logging.info("Post cluster verification")
758       self._VerifyCluster()
759     except errors.GenericError, e:
760       logging.exception(e)
761
762       if rbsteps:
763         nodes = Flatten([data.nodes for data in self.merger_data])
764         info = {
765           "clusters": self.clusters,
766           "nodes": nodes,
767           }
768         logging.critical("In order to rollback do the following:")
769         for step in rbsteps:
770           logging.critical("  * %s", step % info)
771       else:
772         logging.critical("Nothing to rollback.")
773
774       # TODO: Keep track of steps done for a flawless resume?
775
776   def Cleanup(self):
777     """Clean up our environment.
778
779     This cleans up remote private keys and configs and after that
780     deletes the temporary directory.
781
782     """
783     shutil.rmtree(self.work_dir)
784
785
786 def main():
787   """Main routine.
788
789   """
790   program = os.path.basename(sys.argv[0])
791
792   parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
793                                  prog=program)
794   parser.add_option(cli.DEBUG_OPT)
795   parser.add_option(cli.VERBOSE_OPT)
796   parser.add_option(PAUSE_PERIOD_OPT)
797   parser.add_option(GROUPS_OPT)
798   parser.add_option(RESTART_OPT)
799   parser.add_option(PARAMS_OPT)
800   parser.add_option(SKIP_STOP_INSTANCES_OPT)
801
802   (options, args) = parser.parse_args()
803
804   utils.SetupToolLogging(options.debug, options.verbose)
805
806   if not args:
807     parser.error("No clusters specified")
808
809   cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
810                           options.groups, options.restart, options.params,
811                           options.stop_instances)
812   try:
813     try:
814       cluster_merger.Setup()
815       cluster_merger.Merge()
816     except errors.GenericError, e:
817       logging.exception(e)
818       return constants.EXIT_FAILURE
819   finally:
820     cluster_merger.Cleanup()
821
822   return constants.EXIT_SUCCESS
823
824
825 if __name__ == "__main__":
826   sys.exit(main())