Document spindles in TSPEC
[ganeti-local] / tools / cluster-merge
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Tool to merge two or more clusters together.
22
23 The clusters have to run the same version of Ganeti!
24
25 """
26
27 # pylint: disable=C0103
28 # C0103: Invalid name cluster-merge
29
30 import logging
31 import os
32 import optparse
33 import shutil
34 import sys
35 import tempfile
36
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
43 from ganeti import pathutils
44 from ganeti import compat
45
46
47 _GROUPS_MERGE = "merge"
48 _GROUPS_RENAME = "rename"
49 _CLUSTERMERGE_ECID = "clustermerge-ecid"
50 _RESTART_ALL = "all"
51 _RESTART_UP = "up"
52 _RESTART_NONE = "none"
53 _RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
54 _PARAMS_STRICT = "strict"
55 _PARAMS_WARN = "warn"
56 _PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
57
58
59 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
60                                   action="store", type="int",
61                                   dest="pause_period",
62                                   help=("Amount of time in seconds watcher"
63                                         " should be suspended from running"))
64 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
65                             choices=(_GROUPS_MERGE, _GROUPS_RENAME),
66                             dest="groups",
67                             help=("How to handle groups that have the"
68                                   " same name (One of: %s/%s)" %
69                                   (_GROUPS_MERGE, _GROUPS_RENAME)))
70 PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
71                             metavar="STRATEGY",
72                             choices=_PARAMS_CHOICES,
73                             dest="params",
74                             help=("How to handle params that have"
75                                   " different values (One of: %s/%s)" %
76                                   _PARAMS_CHOICES))
77
78 RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
79                              metavar="STRATEGY",
80                              choices=_RESTART_CHOICES,
81                              dest="restart",
82                              help=("How to handle restarting instances"
83                                    " same name (One of: %s/%s/%s)" %
84                                    _RESTART_CHOICES))
85
86 SKIP_STOP_INSTANCES_OPT = \
87   cli.cli_option("--skip-stop-instances", default=True, action="store_false",
88                  dest="stop_instances",
89                  help=("Don't stop the instances on the clusters, just check "
90                        "that none is running"))
91
92
93 def Flatten(unflattened_list):
94   """Flattens a list.
95
96   @param unflattened_list: A list of unflattened list objects.
97   @return: A flattened list
98
99   """
100   flattened_list = []
101
102   for item in unflattened_list:
103     if isinstance(item, list):
104       flattened_list.extend(Flatten(item))
105     else:
106       flattened_list.append(item)
107   return flattened_list
108
109
110 class MergerData(object):
111   """Container class to hold data used for merger.
112
113   """
114   def __init__(self, cluster, key_path, nodes, instances, master_node,
115                config_path=None):
116     """Initialize the container.
117
118     @param cluster: The name of the cluster
119     @param key_path: Path to the ssh private key used for authentication
120     @param nodes: List of online nodes in the merging cluster
121     @param instances: List of instances running on merging cluster
122     @param master_node: Name of the master node
123     @param config_path: Path to the merging cluster config
124
125     """
126     self.cluster = cluster
127     self.key_path = key_path
128     self.nodes = nodes
129     self.instances = instances
130     self.master_node = master_node
131     self.config_path = config_path
132
133
134 class Merger(object):
135   """Handling the merge.
136
137   """
138   RUNNING_STATUSES = compat.UniqueFrozenset([
139     constants.INSTST_RUNNING,
140     constants.INSTST_ERRORUP,
141     ])
142
143   def __init__(self, clusters, pause_period, groups, restart, params,
144                stop_instances):
145     """Initialize object with sane defaults and infos required.
146
147     @param clusters: The list of clusters to merge in
148     @param pause_period: The time watcher shall be disabled for
149     @param groups: How to handle group conflicts
150     @param restart: How to handle instance restart
151     @param stop_instances: Indicates whether the instances must be stopped
152                            (True) or if the Merger must only check if no
153                            instances are running on the mergee clusters (False)
154
155     """
156     self.merger_data = []
157     self.clusters = clusters
158     self.pause_period = pause_period
159     self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
160     (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
161     self.ssh_runner = ssh.SshRunner(self.cluster_name)
162     self.groups = groups
163     self.restart = restart
164     self.params = params
165     self.stop_instances = stop_instances
166     if self.restart == _RESTART_UP:
167       raise NotImplementedError
168
169   def Setup(self):
170     """Sets up our end so we can do the merger.
171
172     This method is setting us up as a preparation for the merger.
173     It makes the initial contact and gathers information needed.
174
175     @raise errors.RemoteError: for errors in communication/grabbing
176
177     """
178     (remote_path, _, _) = ssh.GetUserFiles("root")
179
180     if self.cluster_name in self.clusters:
181       raise errors.CommandError("Cannot merge cluster %s with itself" %
182                                 self.cluster_name)
183
184     # Fetch remotes private key
185     for cluster in self.clusters:
186       result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
187                             ask_key=False)
188       if result.failed:
189         raise errors.RemoteError("There was an error while grabbing ssh private"
190                                  " key from %s. Fail reason: %s; output: %s" %
191                                  (cluster, result.fail_reason, result.output))
192
193       key_path = utils.PathJoin(self.work_dir, cluster)
194       utils.WriteFile(key_path, mode=0600, data=result.stdout)
195
196       result = self._RunCmd(cluster, "gnt-node list -o name,offline"
197                             " --no-headers --separator=,", private_key=key_path)
198       if result.failed:
199         raise errors.RemoteError("Unable to retrieve list of nodes from %s."
200                                  " Fail reason: %s; output: %s" %
201                                  (cluster, result.fail_reason, result.output))
202       nodes_statuses = [line.split(",") for line in result.stdout.splitlines()]
203       nodes = [node_status[0] for node_status in nodes_statuses
204                if node_status[1] == "N"]
205
206       result = self._RunCmd(cluster, "gnt-instance list -o name --no-headers",
207                             private_key=key_path)
208       if result.failed:
209         raise errors.RemoteError("Unable to retrieve list of instances from"
210                                  " %s. Fail reason: %s; output: %s" %
211                                  (cluster, result.fail_reason, result.output))
212       instances = result.stdout.splitlines()
213
214       path = utils.PathJoin(pathutils.DATA_DIR, "ssconf_%s" %
215                             constants.SS_MASTER_NODE)
216       result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path)
217       if result.failed:
218         raise errors.RemoteError("Unable to retrieve the master node name from"
219                                  " %s. Fail reason: %s; output: %s" %
220                                  (cluster, result.fail_reason, result.output))
221       master_node = result.stdout.strip()
222
223       self.merger_data.append(MergerData(cluster, key_path, nodes, instances,
224                                          master_node))
225
226   def _PrepareAuthorizedKeys(self):
227     """Prepare the authorized_keys on every merging node.
228
229     This method add our public key to remotes authorized_key for further
230     communication.
231
232     """
233     (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
234     pub_key = utils.ReadFile(pub_key_file)
235
236     for data in self.merger_data:
237       for node in data.nodes:
238         result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
239                                      (auth_keys, pub_key)),
240                               private_key=data.key_path, max_attempts=3)
241
242         if result.failed:
243           raise errors.RemoteError("Unable to add our public key to %s in %s."
244                                    " Fail reason: %s; output: %s" %
245                                    (node, data.cluster, result.fail_reason,
246                                     result.output))
247
248   def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
249               strict_host_check=False, private_key=None, batch=True,
250               ask_key=False, max_attempts=1):
251     """Wrapping SshRunner.Run with default parameters.
252
253     For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
254
255     """
256     for _ in range(max_attempts):
257       result = self.ssh_runner.Run(hostname=hostname, command=command,
258                                    user=user, use_cluster_key=use_cluster_key,
259                                    strict_host_check=strict_host_check,
260                                    private_key=private_key, batch=batch,
261                                    ask_key=ask_key)
262       if not result.failed:
263         break
264
265     return result
266
267   def _CheckRunningInstances(self):
268     """Checks if on the clusters to be merged there are running instances
269
270     @rtype: boolean
271     @return: True if there are running instances, False otherwise
272
273     """
274     for cluster in self.clusters:
275       result = self._RunCmd(cluster, "gnt-instance list -o status")
276       if self.RUNNING_STATUSES.intersection(result.output.splitlines()):
277         return True
278
279     return False
280
281   def _StopMergingInstances(self):
282     """Stop instances on merging clusters.
283
284     """
285     for cluster in self.clusters:
286       result = self._RunCmd(cluster, "gnt-instance shutdown --all"
287                                      " --force-multiple")
288
289       if result.failed:
290         raise errors.RemoteError("Unable to stop instances on %s."
291                                  " Fail reason: %s; output: %s" %
292                                  (cluster, result.fail_reason, result.output))
293
294   def _DisableWatcher(self):
295     """Disable watch on all merging clusters, including ourself.
296
297     """
298     for cluster in ["localhost"] + self.clusters:
299       result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
300                                      self.pause_period)
301
302       if result.failed:
303         raise errors.RemoteError("Unable to pause watcher on %s."
304                                  " Fail reason: %s; output: %s" %
305                                  (cluster, result.fail_reason, result.output))
306
307   def _RemoveMasterIps(self):
308     """Removes the master IPs from the master nodes of each cluster.
309
310     """
311     for data in self.merger_data:
312       result = self._RunCmd(data.master_node,
313                             "gnt-cluster deactivate-master-ip --yes")
314
315       if result.failed:
316         raise errors.RemoteError("Unable to remove master IP on %s."
317                                  " Fail reason: %s; output: %s" %
318                                  (data.master_node,
319                                   result.fail_reason,
320                                   result.output))
321
322   def _StopDaemons(self):
323     """Stop all daemons on merging nodes.
324
325     """
326     cmd = "%s stop-all" % pathutils.DAEMON_UTIL
327     for data in self.merger_data:
328       for node in data.nodes:
329         result = self._RunCmd(node, cmd, max_attempts=3)
330
331         if result.failed:
332           raise errors.RemoteError("Unable to stop daemons on %s."
333                                    " Fail reason: %s; output: %s." %
334                                    (node, result.fail_reason, result.output))
335
336   def _FetchRemoteConfig(self):
337     """Fetches and stores remote cluster config from the master.
338
339     This step is needed before we can merge the config.
340
341     """
342     for data in self.merger_data:
343       result = self._RunCmd(data.cluster, "cat %s" %
344                                           pathutils.CLUSTER_CONF_FILE)
345
346       if result.failed:
347         raise errors.RemoteError("Unable to retrieve remote config on %s."
348                                  " Fail reason: %s; output %s" %
349                                  (data.cluster, result.fail_reason,
350                                   result.output))
351
352       data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
353                                         data.cluster)
354       utils.WriteFile(data.config_path, data=result.stdout)
355
356   # R0201: Method could be a function
357   def _KillMasterDaemon(self): # pylint: disable=R0201
358     """Kills the local master daemon.
359
360     @raise errors.CommandError: If unable to kill
361
362     """
363     result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
364     if result.failed:
365       raise errors.CommandError("Unable to stop master daemons."
366                                 " Fail reason: %s; output: %s" %
367                                 (result.fail_reason, result.output))
368
369   def _MergeConfig(self):
370     """Merges all foreign config into our own config.
371
372     """
373     my_config = config.ConfigWriter(offline=True)
374     fake_ec_id = 0 # Needs to be uniq over the whole config merge
375
376     for data in self.merger_data:
377       other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
378       self._MergeClusterConfigs(my_config, other_config)
379       self._MergeNodeGroups(my_config, other_config)
380
381       for node in other_config.GetNodeList():
382         node_info = other_config.GetNodeInfo(node)
383         # Offline the node, it will be reonlined later at node readd
384         node_info.master_candidate = False
385         node_info.drained = False
386         node_info.offline = True
387         my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
388         fake_ec_id += 1
389
390       for instance in other_config.GetInstanceList():
391         instance_info = other_config.GetInstanceInfo(instance)
392
393         # Update the DRBD port assignments
394         # This is a little bit hackish
395         for dsk in instance_info.disks:
396           if dsk.dev_type in constants.DTS_DRBD:
397             port = my_config.AllocatePort()
398
399             logical_id = list(dsk.logical_id)
400             logical_id[2] = port
401             dsk.logical_id = tuple(logical_id)
402
403             physical_id = list(dsk.physical_id)
404             physical_id[1] = physical_id[3] = port
405             dsk.physical_id = tuple(physical_id)
406
407         my_config.AddInstance(instance_info,
408                               _CLUSTERMERGE_ECID + str(fake_ec_id))
409         fake_ec_id += 1
410
411   def _MergeClusterConfigs(self, my_config, other_config):
412     """Checks that all relevant cluster parameters are compatible
413
414     """
415     my_cluster = my_config.GetClusterInfo()
416     other_cluster = other_config.GetClusterInfo()
417     err_count = 0
418
419     #
420     # Generic checks
421     #
422     check_params = [
423       "beparams",
424       "default_iallocator",
425       "drbd_usermode_helper",
426       "hidden_os",
427       "maintain_node_health",
428       "master_netdev",
429       "ndparams",
430       "nicparams",
431       "primary_ip_family",
432       "tags",
433       "uid_pool",
434       ]
435     check_params_strict = [
436       "volume_group_name",
437     ]
438     if my_cluster.IsFileStorageEnabled() or \
439         other_cluster.IsFileStorageEnabled():
440       check_params_strict.append("file_storage_dir")
441     if my_cluster.IsSharedFileStorageEnabled() or \
442         other_cluster.IsSharedFileStorageEnabled():
443       check_params_strict.append("shared_file_storage_dir")
444     check_params.extend(check_params_strict)
445
446     if self.params == _PARAMS_STRICT:
447       params_strict = True
448     else:
449       params_strict = False
450
451     for param_name in check_params:
452       my_param = getattr(my_cluster, param_name)
453       other_param = getattr(other_cluster, param_name)
454       if my_param != other_param:
455         logging.error("The value (%s) of the cluster parameter %s on %s"
456                       " differs to this cluster's value (%s)",
457                       other_param, param_name, other_cluster.cluster_name,
458                       my_param)
459         if params_strict or param_name in check_params_strict:
460           err_count += 1
461
462     #
463     # Custom checks
464     #
465
466     # Check default hypervisor
467     my_defhyp = my_cluster.enabled_hypervisors[0]
468     other_defhyp = other_cluster.enabled_hypervisors[0]
469     if my_defhyp != other_defhyp:
470       logging.warning("The default hypervisor (%s) differs on %s, new"
471                       " instances will be created with this cluster's"
472                       " default hypervisor (%s)", other_defhyp,
473                       other_cluster.cluster_name, my_defhyp)
474
475     if (set(my_cluster.enabled_hypervisors) !=
476         set(other_cluster.enabled_hypervisors)):
477       logging.error("The set of enabled hypervisors (%s) on %s differs to"
478                     " this cluster's set (%s)",
479                     other_cluster.enabled_hypervisors,
480                     other_cluster.cluster_name, my_cluster.enabled_hypervisors)
481       err_count += 1
482
483     # Check hypervisor params for hypervisors we care about
484     for hyp in my_cluster.enabled_hypervisors:
485       for param in my_cluster.hvparams[hyp]:
486         my_value = my_cluster.hvparams[hyp][param]
487         other_value = other_cluster.hvparams[hyp][param]
488         if my_value != other_value:
489           logging.error("The value (%s) of the %s parameter of the %s"
490                         " hypervisor on %s differs to this cluster's parameter"
491                         " (%s)",
492                         other_value, param, hyp, other_cluster.cluster_name,
493                         my_value)
494           if params_strict:
495             err_count += 1
496
497     # Check os hypervisor params for hypervisors we care about
498     for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
499       for hyp in my_cluster.enabled_hypervisors:
500         my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
501         other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
502         if my_os_hvp != other_os_hvp:
503           logging.error("The OS parameters (%s) for the %s OS for the %s"
504                         " hypervisor on %s differs to this cluster's parameters"
505                         " (%s)",
506                         other_os_hvp, os_name, hyp, other_cluster.cluster_name,
507                         my_os_hvp)
508           if params_strict:
509             err_count += 1
510
511     #
512     # Warnings
513     #
514     if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
515       logging.warning("The modify_etc_hosts value (%s) differs on %s,"
516                       " this cluster's value (%s) will take precedence",
517                       other_cluster.modify_etc_hosts,
518                       other_cluster.cluster_name,
519                       my_cluster.modify_etc_hosts)
520
521     if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
522       logging.warning("The modify_ssh_setup value (%s) differs on %s,"
523                       " this cluster's value (%s) will take precedence",
524                       other_cluster.modify_ssh_setup,
525                       other_cluster.cluster_name,
526                       my_cluster.modify_ssh_setup)
527
528     #
529     # Actual merging
530     #
531     my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
532                                        other_cluster.reserved_lvs))
533
534     if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
535       logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
536                       " cluster's value (%s). The least permissive value (%s)"
537                       " will be used", other_cluster.prealloc_wipe_disks,
538                       other_cluster.cluster_name,
539                       my_cluster.prealloc_wipe_disks, True)
540       my_cluster.prealloc_wipe_disks = True
541
542     for os_, osparams in other_cluster.osparams.items():
543       if os_ not in my_cluster.osparams:
544         my_cluster.osparams[os_] = osparams
545       elif my_cluster.osparams[os_] != osparams:
546         logging.error("The OS parameters (%s) for the %s OS on %s differs to"
547                       " this cluster's parameters (%s)",
548                       osparams, os_, other_cluster.cluster_name,
549                       my_cluster.osparams[os_])
550         if params_strict:
551           err_count += 1
552
553     if err_count:
554       raise errors.ConfigurationError("Cluster config for %s has incompatible"
555                                       " values, please fix and re-run" %
556                                       other_cluster.cluster_name)
557
558   # R0201: Method could be a function
559   def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable=R0201
560     if os_name in cluster.os_hvp:
561       return cluster.os_hvp[os_name].get(hyp, None)
562     else:
563       return None
564
565   # R0201: Method could be a function
566   def _MergeNodeGroups(self, my_config, other_config):
567     """Adds foreign node groups
568
569     ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
570     """
571     # pylint: disable=R0201
572     logging.info("Node group conflict strategy: %s", self.groups)
573
574     my_grps = my_config.GetAllNodeGroupsInfo().values()
575     other_grps = other_config.GetAllNodeGroupsInfo().values()
576
577     # Check for node group naming conflicts:
578     conflicts = []
579     for other_grp in other_grps:
580       for my_grp in my_grps:
581         if other_grp.name == my_grp.name:
582           conflicts.append(other_grp)
583
584     if conflicts:
585       conflict_names = utils.CommaJoin([g.name for g in conflicts])
586       logging.info("Node groups in both local and remote cluster: %s",
587                    conflict_names)
588
589       # User hasn't specified how to handle conflicts
590       if not self.groups:
591         raise errors.CommandError("The following node group(s) are in both"
592                                   " clusters, and no merge strategy has been"
593                                   " supplied (see the --groups option): %s" %
594                                   conflict_names)
595
596       # User wants to rename conflicts
597       elif self.groups == _GROUPS_RENAME:
598         for grp in conflicts:
599           new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
600           logging.info("Renaming remote node group from %s to %s"
601                        " to resolve conflict", grp.name, new_name)
602           grp.name = new_name
603
604       # User wants to merge conflicting groups
605       elif self.groups == _GROUPS_MERGE:
606         for other_grp in conflicts:
607           logging.info("Merging local and remote '%s' groups", other_grp.name)
608           for node_name in other_grp.members[:]:
609             node = other_config.GetNodeInfo(node_name)
610             # Access to a protected member of a client class
611             # pylint: disable=W0212
612             other_config._UnlockedRemoveNodeFromGroup(node)
613
614             # Access to a protected member of a client class
615             # pylint: disable=W0212
616             my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
617
618             # Access to a protected member of a client class
619             # pylint: disable=W0212
620             my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
621             node.group = my_grp_uuid
622           # Remove from list of groups to add
623           other_grps.remove(other_grp)
624
625     for grp in other_grps:
626       #TODO: handle node group conflicts
627       my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
628
629   # R0201: Method could be a function
630   def _StartMasterDaemon(self, no_vote=False): # pylint: disable=R0201
631     """Starts the local master daemon.
632
633     @param no_vote: Should the masterd started without voting? default: False
634     @raise errors.CommandError: If unable to start daemon.
635
636     """
637     env = {}
638     if no_vote:
639       env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
640
641     result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
642     if result.failed:
643       raise errors.CommandError("Couldn't start ganeti master."
644                                 " Fail reason: %s; output: %s" %
645                                 (result.fail_reason, result.output))
646
647   def _ReaddMergedNodesAndRedist(self):
648     """Readds all merging nodes and make sure their config is up-to-date.
649
650     @raise errors.CommandError: If anything fails.
651
652     """
653     for data in self.merger_data:
654       for node in data.nodes:
655         logging.info("Readding node %s", node)
656         result = utils.RunCmd(["gnt-node", "add", "--readd",
657                                "--no-ssh-key-check", node])
658         if result.failed:
659           logging.error("%s failed to be readded. Reason: %s, output: %s",
660                          node, result.fail_reason, result.output)
661
662     result = utils.RunCmd(["gnt-cluster", "redist-conf"])
663     if result.failed:
664       raise errors.CommandError("Redistribution failed. Fail reason: %s;"
665                                 " output: %s" % (result.fail_reason,
666                                                  result.output))
667
668   # R0201: Method could be a function
669   def _StartupAllInstances(self): # pylint: disable=R0201
670     """Starts up all instances (locally).
671
672     @raise errors.CommandError: If unable to start clusters
673
674     """
675     result = utils.RunCmd(["gnt-instance", "startup", "--all",
676                            "--force-multiple"])
677     if result.failed:
678       raise errors.CommandError("Unable to start all instances."
679                                 " Fail reason: %s; output: %s" %
680                                 (result.fail_reason, result.output))
681
682   # R0201: Method could be a function
683   # TODO: make this overridable, for some verify errors
684   def _VerifyCluster(self): # pylint: disable=R0201
685     """Runs gnt-cluster verify to verify the health.
686
687     @raise errors.ProgrammError: If cluster fails on verification
688
689     """
690     result = utils.RunCmd(["gnt-cluster", "verify"])
691     if result.failed:
692       raise errors.CommandError("Verification of cluster failed."
693                                 " Fail reason: %s; output: %s" %
694                                 (result.fail_reason, result.output))
695
696   def Merge(self):
697     """Does the actual merge.
698
699     It runs all the steps in the right order and updates the user about steps
700     taken. Also it keeps track of rollback_steps to undo everything.
701
702     """
703     rbsteps = []
704     try:
705       logging.info("Pre cluster verification")
706       self._VerifyCluster()
707
708       logging.info("Prepare authorized_keys")
709       rbsteps.append("Remove our key from authorized_keys on nodes:"
710                      " %(nodes)s")
711       self._PrepareAuthorizedKeys()
712
713       rbsteps.append("Start all instances again on the merging"
714                      " clusters: %(clusters)s")
715       if self.stop_instances:
716         logging.info("Stopping merging instances (takes a while)")
717         self._StopMergingInstances()
718       logging.info("Checking that no instances are running on the mergees")
719       instances_running = self._CheckRunningInstances()
720       if instances_running:
721         raise errors.CommandError("Some instances are still running on the"
722                                   " mergees")
723       logging.info("Disable watcher")
724       self._DisableWatcher()
725       logging.info("Merging config")
726       self._FetchRemoteConfig()
727       logging.info("Removing master IPs on mergee master nodes")
728       self._RemoveMasterIps()
729       logging.info("Stop daemons on merging nodes")
730       self._StopDaemons()
731
732       logging.info("Stopping master daemon")
733       self._KillMasterDaemon()
734
735       rbsteps.append("Restore %s from another master candidate"
736                      " and restart master daemon" %
737                      pathutils.CLUSTER_CONF_FILE)
738       self._MergeConfig()
739       self._StartMasterDaemon(no_vote=True)
740
741       # Point of no return, delete rbsteps
742       del rbsteps[:]
743
744       logging.warning("We are at the point of no return. Merge can not easily"
745                       " be undone after this point.")
746       logging.info("Readd nodes")
747       self._ReaddMergedNodesAndRedist()
748
749       logging.info("Merge done, restart master daemon normally")
750       self._KillMasterDaemon()
751       self._StartMasterDaemon()
752
753       if self.restart == _RESTART_ALL:
754         logging.info("Starting instances again")
755         self._StartupAllInstances()
756       else:
757         logging.info("Not starting instances again")
758       logging.info("Post cluster verification")
759       self._VerifyCluster()
760     except errors.GenericError, e:
761       logging.exception(e)
762
763       if rbsteps:
764         nodes = Flatten([data.nodes for data in self.merger_data])
765         info = {
766           "clusters": self.clusters,
767           "nodes": nodes,
768           }
769         logging.critical("In order to rollback do the following:")
770         for step in rbsteps:
771           logging.critical("  * %s", step % info)
772       else:
773         logging.critical("Nothing to rollback.")
774
775       # TODO: Keep track of steps done for a flawless resume?
776
777   def Cleanup(self):
778     """Clean up our environment.
779
780     This cleans up remote private keys and configs and after that
781     deletes the temporary directory.
782
783     """
784     shutil.rmtree(self.work_dir)
785
786
787 def main():
788   """Main routine.
789
790   """
791   program = os.path.basename(sys.argv[0])
792
793   parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
794                                  prog=program)
795   parser.add_option(cli.DEBUG_OPT)
796   parser.add_option(cli.VERBOSE_OPT)
797   parser.add_option(PAUSE_PERIOD_OPT)
798   parser.add_option(GROUPS_OPT)
799   parser.add_option(RESTART_OPT)
800   parser.add_option(PARAMS_OPT)
801   parser.add_option(SKIP_STOP_INSTANCES_OPT)
802
803   (options, args) = parser.parse_args()
804
805   utils.SetupToolLogging(options.debug, options.verbose)
806
807   if not args:
808     parser.error("No clusters specified")
809
810   cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
811                           options.groups, options.restart, options.params,
812                           options.stop_instances)
813   try:
814     try:
815       cluster_merger.Setup()
816       cluster_merger.Merge()
817     except errors.GenericError, e:
818       logging.exception(e)
819       return constants.EXIT_FAILURE
820   finally:
821     cluster_merger.Cleanup()
822
823   return constants.EXIT_SUCCESS
824
825
826 if __name__ == "__main__":
827   sys.exit(main())