Hs2Py constants: network related
[ganeti-local] / tools / cluster-merge
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Tool to merge two or more clusters together.
22
23 The clusters have to run the same version of Ganeti!
24
25 """
26
27 # pylint: disable=C0103
28 # C0103: Invalid name cluster-merge
29
30 import logging
31 import os
32 import optparse
33 import shutil
34 import sys
35 import tempfile
36
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
43 from ganeti import pathutils
44 from ganeti import compat
45
46
47 _GROUPS_MERGE = "merge"
48 _GROUPS_RENAME = "rename"
49 _CLUSTERMERGE_ECID = "clustermerge-ecid"
50 _RESTART_ALL = "all"
51 _RESTART_UP = "up"
52 _RESTART_NONE = "none"
53 _RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
54 _PARAMS_STRICT = "strict"
55 _PARAMS_WARN = "warn"
56 _PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
57
58
59 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
60                                   action="store", type="int",
61                                   dest="pause_period",
62                                   help=("Amount of time in seconds watcher"
63                                         " should be suspended from running"))
64 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
65                             choices=(_GROUPS_MERGE, _GROUPS_RENAME),
66                             dest="groups",
67                             help=("How to handle groups that have the"
68                                   " same name (One of: %s/%s)" %
69                                   (_GROUPS_MERGE, _GROUPS_RENAME)))
70 PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
71                             metavar="STRATEGY",
72                             choices=_PARAMS_CHOICES,
73                             dest="params",
74                             help=("How to handle params that have"
75                                   " different values (One of: %s/%s)" %
76                                   _PARAMS_CHOICES))
77
78 RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
79                              metavar="STRATEGY",
80                              choices=_RESTART_CHOICES,
81                              dest="restart",
82                              help=("How to handle restarting instances"
83                                    " same name (One of: %s/%s/%s)" %
84                                    _RESTART_CHOICES))
85
86 SKIP_STOP_INSTANCES_OPT = \
87   cli.cli_option("--skip-stop-instances", default=True, action="store_false",
88                  dest="stop_instances",
89                  help=("Don't stop the instances on the clusters, just check "
90                        "that none is running"))
91
92
93 def Flatten(unflattened_list):
94   """Flattens a list.
95
96   @param unflattened_list: A list of unflattened list objects.
97   @return: A flattened list
98
99   """
100   flattened_list = []
101
102   for item in unflattened_list:
103     if isinstance(item, list):
104       flattened_list.extend(Flatten(item))
105     else:
106       flattened_list.append(item)
107   return flattened_list
108
109
110 class MergerData(object):
111   """Container class to hold data used for merger.
112
113   """
114   def __init__(self, cluster, key_path, nodes, instances, master_node,
115                config_path=None):
116     """Initialize the container.
117
118     @param cluster: The name of the cluster
119     @param key_path: Path to the ssh private key used for authentication
120     @param nodes: List of online nodes in the merging cluster
121     @param instances: List of instances running on merging cluster
122     @param master_node: Name of the master node
123     @param config_path: Path to the merging cluster config
124
125     """
126     self.cluster = cluster
127     self.key_path = key_path
128     self.nodes = nodes
129     self.instances = instances
130     self.master_node = master_node
131     self.config_path = config_path
132
133
134 class Merger(object):
135   """Handling the merge.
136
137   """
138   RUNNING_STATUSES = compat.UniqueFrozenset([
139     constants.INSTST_RUNNING,
140     constants.INSTST_ERRORUP,
141     ])
142
143   def __init__(self, clusters, pause_period, groups, restart, params,
144                stop_instances):
145     """Initialize object with sane defaults and infos required.
146
147     @param clusters: The list of clusters to merge in
148     @param pause_period: The time watcher shall be disabled for
149     @param groups: How to handle group conflicts
150     @param restart: How to handle instance restart
151     @param stop_instances: Indicates whether the instances must be stopped
152                            (True) or if the Merger must only check if no
153                            instances are running on the mergee clusters (False)
154
155     """
156     self.merger_data = []
157     self.clusters = clusters
158     self.pause_period = pause_period
159     self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
160     (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
161     self.ssh_runner = ssh.SshRunner(self.cluster_name)
162     self.groups = groups
163     self.restart = restart
164     self.params = params
165     self.stop_instances = stop_instances
166     if self.restart == _RESTART_UP:
167       raise NotImplementedError
168
169   def Setup(self):
170     """Sets up our end so we can do the merger.
171
172     This method is setting us up as a preparation for the merger.
173     It makes the initial contact and gathers information needed.
174
175     @raise errors.RemoteError: for errors in communication/grabbing
176
177     """
178     (remote_path, _, _) = ssh.GetUserFiles("root")
179
180     if self.cluster_name in self.clusters:
181       raise errors.CommandError("Cannot merge cluster %s with itself" %
182                                 self.cluster_name)
183
184     # Fetch remotes private key
185     for cluster in self.clusters:
186       result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
187                             ask_key=False)
188       if result.failed:
189         raise errors.RemoteError("There was an error while grabbing ssh private"
190                                  " key from %s. Fail reason: %s; output: %s" %
191                                  (cluster, result.fail_reason, result.output))
192
193       key_path = utils.PathJoin(self.work_dir, cluster)
194       utils.WriteFile(key_path, mode=0600, data=result.stdout)
195
196       result = self._RunCmd(cluster, "gnt-node list -o name,offline"
197                             " --no-headers --separator=,", private_key=key_path)
198       if result.failed:
199         raise errors.RemoteError("Unable to retrieve list of nodes from %s."
200                                  " Fail reason: %s; output: %s" %
201                                  (cluster, result.fail_reason, result.output))
202       nodes_statuses = [line.split(",") for line in result.stdout.splitlines()]
203       nodes = [node_status[0] for node_status in nodes_statuses
204                if node_status[1] == "N"]
205
206       result = self._RunCmd(cluster, "gnt-instance list -o name --no-headers",
207                             private_key=key_path)
208       if result.failed:
209         raise errors.RemoteError("Unable to retrieve list of instances from"
210                                  " %s. Fail reason: %s; output: %s" %
211                                  (cluster, result.fail_reason, result.output))
212       instances = result.stdout.splitlines()
213
214       path = utils.PathJoin(pathutils.DATA_DIR, "ssconf_%s" %
215                             constants.SS_MASTER_NODE)
216       result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path)
217       if result.failed:
218         raise errors.RemoteError("Unable to retrieve the master node name from"
219                                  " %s. Fail reason: %s; output: %s" %
220                                  (cluster, result.fail_reason, result.output))
221       master_node = result.stdout.strip()
222
223       self.merger_data.append(MergerData(cluster, key_path, nodes, instances,
224                                          master_node))
225
226   def _PrepareAuthorizedKeys(self):
227     """Prepare the authorized_keys on every merging node.
228
229     This method add our public key to remotes authorized_key for further
230     communication.
231
232     """
233     (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
234     pub_key = utils.ReadFile(pub_key_file)
235
236     for data in self.merger_data:
237       for node in data.nodes:
238         result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
239                                      (auth_keys, pub_key)),
240                               private_key=data.key_path, max_attempts=3)
241
242         if result.failed:
243           raise errors.RemoteError("Unable to add our public key to %s in %s."
244                                    " Fail reason: %s; output: %s" %
245                                    (node, data.cluster, result.fail_reason,
246                                     result.output))
247
248   def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
249               strict_host_check=False, private_key=None, batch=True,
250               ask_key=False, max_attempts=1):
251     """Wrapping SshRunner.Run with default parameters.
252
253     For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
254
255     """
256     for _ in range(max_attempts):
257       result = self.ssh_runner.Run(hostname=hostname, command=command,
258                                    user=user, use_cluster_key=use_cluster_key,
259                                    strict_host_check=strict_host_check,
260                                    private_key=private_key, batch=batch,
261                                    ask_key=ask_key)
262       if not result.failed:
263         break
264
265     return result
266
267   def _CheckRunningInstances(self):
268     """Checks if on the clusters to be merged there are running instances
269
270     @rtype: boolean
271     @return: True if there are running instances, False otherwise
272
273     """
274     for cluster in self.clusters:
275       result = self._RunCmd(cluster, "gnt-instance list -o status")
276       if self.RUNNING_STATUSES.intersection(result.output.splitlines()):
277         return True
278
279     return False
280
281   def _StopMergingInstances(self):
282     """Stop instances on merging clusters.
283
284     """
285     for cluster in self.clusters:
286       result = self._RunCmd(cluster, "gnt-instance shutdown --all"
287                                      " --force-multiple")
288
289       if result.failed:
290         raise errors.RemoteError("Unable to stop instances on %s."
291                                  " Fail reason: %s; output: %s" %
292                                  (cluster, result.fail_reason, result.output))
293
294   def _DisableWatcher(self):
295     """Disable watch on all merging clusters, including ourself.
296
297     """
298     for cluster in ["localhost"] + self.clusters:
299       result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
300                                      self.pause_period)
301
302       if result.failed:
303         raise errors.RemoteError("Unable to pause watcher on %s."
304                                  " Fail reason: %s; output: %s" %
305                                  (cluster, result.fail_reason, result.output))
306
307   def _RemoveMasterIps(self):
308     """Removes the master IPs from the master nodes of each cluster.
309
310     """
311     for data in self.merger_data:
312       result = self._RunCmd(data.master_node,
313                             "gnt-cluster deactivate-master-ip --yes")
314
315       if result.failed:
316         raise errors.RemoteError("Unable to remove master IP on %s."
317                                  " Fail reason: %s; output: %s" %
318                                  (data.master_node,
319                                   result.fail_reason,
320                                   result.output))
321
322   def _StopDaemons(self):
323     """Stop all daemons on merging nodes.
324
325     """
326     cmd = "%s stop-all" % pathutils.DAEMON_UTIL
327     for data in self.merger_data:
328       for node in data.nodes:
329         result = self._RunCmd(node, cmd, max_attempts=3)
330
331         if result.failed:
332           raise errors.RemoteError("Unable to stop daemons on %s."
333                                    " Fail reason: %s; output: %s." %
334                                    (node, result.fail_reason, result.output))
335
336   def _FetchRemoteConfig(self):
337     """Fetches and stores remote cluster config from the master.
338
339     This step is needed before we can merge the config.
340
341     """
342     for data in self.merger_data:
343       result = self._RunCmd(data.cluster, "cat %s" %
344                                           pathutils.CLUSTER_CONF_FILE)
345
346       if result.failed:
347         raise errors.RemoteError("Unable to retrieve remote config on %s."
348                                  " Fail reason: %s; output %s" %
349                                  (data.cluster, result.fail_reason,
350                                   result.output))
351
352       data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
353                                         data.cluster)
354       utils.WriteFile(data.config_path, data=result.stdout)
355
356   # R0201: Method could be a function
357   def _KillMasterDaemon(self): # pylint: disable=R0201
358     """Kills the local master daemon.
359
360     @raise errors.CommandError: If unable to kill
361
362     """
363     result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
364     if result.failed:
365       raise errors.CommandError("Unable to stop master daemons."
366                                 " Fail reason: %s; output: %s" %
367                                 (result.fail_reason, result.output))
368
369   def _MergeConfig(self):
370     """Merges all foreign config into our own config.
371
372     """
373     my_config = config.ConfigWriter(offline=True)
374     fake_ec_id = 0 # Needs to be uniq over the whole config merge
375
376     for data in self.merger_data:
377       other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
378       self._MergeClusterConfigs(my_config, other_config)
379       self._MergeNodeGroups(my_config, other_config)
380
381       for node in other_config.GetNodeList():
382         node_info = other_config.GetNodeInfo(node)
383         # Offline the node, it will be reonlined later at node readd
384         node_info.master_candidate = False
385         node_info.drained = False
386         node_info.offline = True
387         my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
388         fake_ec_id += 1
389
390       for instance in other_config.GetInstanceList():
391         instance_info = other_config.GetInstanceInfo(instance)
392
393         # Update the DRBD port assignments
394         # This is a little bit hackish
395         for dsk in instance_info.disks:
396           if dsk.dev_type in constants.LDS_DRBD:
397             port = my_config.AllocatePort()
398
399             logical_id = list(dsk.logical_id)
400             logical_id[2] = port
401             dsk.logical_id = tuple(logical_id)
402
403         my_config.AddInstance(instance_info,
404                               _CLUSTERMERGE_ECID + str(fake_ec_id))
405         fake_ec_id += 1
406
407   def _MergeClusterConfigs(self, my_config, other_config):
408     """Checks that all relevant cluster parameters are compatible
409
410     """
411     my_cluster = my_config.GetClusterInfo()
412     other_cluster = other_config.GetClusterInfo()
413     err_count = 0
414
415     #
416     # Generic checks
417     #
418     check_params = [
419       "beparams",
420       "default_iallocator",
421       "drbd_usermode_helper",
422       "hidden_os",
423       "maintain_node_health",
424       "master_netdev",
425       "ndparams",
426       "nicparams",
427       "primary_ip_family",
428       "tags",
429       "uid_pool",
430       ]
431     check_params_strict = [
432       "volume_group_name",
433     ]
434     if my_cluster.IsFileStorageEnabled() or \
435         other_cluster.IsFileStorageEnabled():
436       check_params_strict.append("file_storage_dir")
437     if my_cluster.IsSharedFileStorageEnabled() or \
438         other_cluster.IsSharedFileStorageEnabled():
439       check_params_strict.append("shared_file_storage_dir")
440     check_params.extend(check_params_strict)
441
442     if self.params == _PARAMS_STRICT:
443       params_strict = True
444     else:
445       params_strict = False
446
447     for param_name in check_params:
448       my_param = getattr(my_cluster, param_name)
449       other_param = getattr(other_cluster, param_name)
450       if my_param != other_param:
451         logging.error("The value (%s) of the cluster parameter %s on %s"
452                       " differs to this cluster's value (%s)",
453                       other_param, param_name, other_cluster.cluster_name,
454                       my_param)
455         if params_strict or param_name in check_params_strict:
456           err_count += 1
457
458     #
459     # Custom checks
460     #
461
462     # Check default hypervisor
463     my_defhyp = my_cluster.enabled_hypervisors[0]
464     other_defhyp = other_cluster.enabled_hypervisors[0]
465     if my_defhyp != other_defhyp:
466       logging.warning("The default hypervisor (%s) differs on %s, new"
467                       " instances will be created with this cluster's"
468                       " default hypervisor (%s)", other_defhyp,
469                       other_cluster.cluster_name, my_defhyp)
470
471     if (set(my_cluster.enabled_hypervisors) !=
472         set(other_cluster.enabled_hypervisors)):
473       logging.error("The set of enabled hypervisors (%s) on %s differs to"
474                     " this cluster's set (%s)",
475                     other_cluster.enabled_hypervisors,
476                     other_cluster.cluster_name, my_cluster.enabled_hypervisors)
477       err_count += 1
478
479     # Check hypervisor params for hypervisors we care about
480     for hyp in my_cluster.enabled_hypervisors:
481       for param in my_cluster.hvparams[hyp]:
482         my_value = my_cluster.hvparams[hyp][param]
483         other_value = other_cluster.hvparams[hyp][param]
484         if my_value != other_value:
485           logging.error("The value (%s) of the %s parameter of the %s"
486                         " hypervisor on %s differs to this cluster's parameter"
487                         " (%s)",
488                         other_value, param, hyp, other_cluster.cluster_name,
489                         my_value)
490           if params_strict:
491             err_count += 1
492
493     # Check os hypervisor params for hypervisors we care about
494     for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
495       for hyp in my_cluster.enabled_hypervisors:
496         my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
497         other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
498         if my_os_hvp != other_os_hvp:
499           logging.error("The OS parameters (%s) for the %s OS for the %s"
500                         " hypervisor on %s differs to this cluster's parameters"
501                         " (%s)",
502                         other_os_hvp, os_name, hyp, other_cluster.cluster_name,
503                         my_os_hvp)
504           if params_strict:
505             err_count += 1
506
507     #
508     # Warnings
509     #
510     if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
511       logging.warning("The modify_etc_hosts value (%s) differs on %s,"
512                       " this cluster's value (%s) will take precedence",
513                       other_cluster.modify_etc_hosts,
514                       other_cluster.cluster_name,
515                       my_cluster.modify_etc_hosts)
516
517     if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
518       logging.warning("The modify_ssh_setup value (%s) differs on %s,"
519                       " this cluster's value (%s) will take precedence",
520                       other_cluster.modify_ssh_setup,
521                       other_cluster.cluster_name,
522                       my_cluster.modify_ssh_setup)
523
524     #
525     # Actual merging
526     #
527     my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
528                                        other_cluster.reserved_lvs))
529
530     if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
531       logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
532                       " cluster's value (%s). The least permissive value (%s)"
533                       " will be used", other_cluster.prealloc_wipe_disks,
534                       other_cluster.cluster_name,
535                       my_cluster.prealloc_wipe_disks, True)
536       my_cluster.prealloc_wipe_disks = True
537
538     for os_, osparams in other_cluster.osparams.items():
539       if os_ not in my_cluster.osparams:
540         my_cluster.osparams[os_] = osparams
541       elif my_cluster.osparams[os_] != osparams:
542         logging.error("The OS parameters (%s) for the %s OS on %s differs to"
543                       " this cluster's parameters (%s)",
544                       osparams, os_, other_cluster.cluster_name,
545                       my_cluster.osparams[os_])
546         if params_strict:
547           err_count += 1
548
549     if err_count:
550       raise errors.ConfigurationError("Cluster config for %s has incompatible"
551                                       " values, please fix and re-run" %
552                                       other_cluster.cluster_name)
553
554   # R0201: Method could be a function
555   def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable=R0201
556     if os_name in cluster.os_hvp:
557       return cluster.os_hvp[os_name].get(hyp, None)
558     else:
559       return None
560
561   # R0201: Method could be a function
562   def _MergeNodeGroups(self, my_config, other_config):
563     """Adds foreign node groups
564
565     ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
566     """
567     # pylint: disable=R0201
568     logging.info("Node group conflict strategy: %s", self.groups)
569
570     my_grps = my_config.GetAllNodeGroupsInfo().values()
571     other_grps = other_config.GetAllNodeGroupsInfo().values()
572
573     # Check for node group naming conflicts:
574     conflicts = []
575     for other_grp in other_grps:
576       for my_grp in my_grps:
577         if other_grp.name == my_grp.name:
578           conflicts.append(other_grp)
579
580     if conflicts:
581       conflict_names = utils.CommaJoin([g.name for g in conflicts])
582       logging.info("Node groups in both local and remote cluster: %s",
583                    conflict_names)
584
585       # User hasn't specified how to handle conflicts
586       if not self.groups:
587         raise errors.CommandError("The following node group(s) are in both"
588                                   " clusters, and no merge strategy has been"
589                                   " supplied (see the --groups option): %s" %
590                                   conflict_names)
591
592       # User wants to rename conflicts
593       elif self.groups == _GROUPS_RENAME:
594         for grp in conflicts:
595           new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
596           logging.info("Renaming remote node group from %s to %s"
597                        " to resolve conflict", grp.name, new_name)
598           grp.name = new_name
599
600       # User wants to merge conflicting groups
601       elif self.groups == _GROUPS_MERGE:
602         for other_grp in conflicts:
603           logging.info("Merging local and remote '%s' groups", other_grp.name)
604           for node_name in other_grp.members[:]:
605             node = other_config.GetNodeInfo(node_name)
606             # Access to a protected member of a client class
607             # pylint: disable=W0212
608             other_config._UnlockedRemoveNodeFromGroup(node)
609
610             # Access to a protected member of a client class
611             # pylint: disable=W0212
612             my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
613
614             # Access to a protected member of a client class
615             # pylint: disable=W0212
616             my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
617             node.group = my_grp_uuid
618           # Remove from list of groups to add
619           other_grps.remove(other_grp)
620
621     for grp in other_grps:
622       #TODO: handle node group conflicts
623       my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
624
625   # R0201: Method could be a function
626   def _StartMasterDaemon(self, no_vote=False): # pylint: disable=R0201
627     """Starts the local master daemon.
628
629     @param no_vote: Should the masterd started without voting? default: False
630     @raise errors.CommandError: If unable to start daemon.
631
632     """
633     env = {}
634     if no_vote:
635       env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
636
637     result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
638     if result.failed:
639       raise errors.CommandError("Couldn't start ganeti master."
640                                 " Fail reason: %s; output: %s" %
641                                 (result.fail_reason, result.output))
642
643   def _ReaddMergedNodesAndRedist(self):
644     """Readds all merging nodes and make sure their config is up-to-date.
645
646     @raise errors.CommandError: If anything fails.
647
648     """
649     for data in self.merger_data:
650       for node in data.nodes:
651         logging.info("Readding node %s", node)
652         result = utils.RunCmd(["gnt-node", "add", "--readd",
653                                "--no-ssh-key-check", node])
654         if result.failed:
655           logging.error("%s failed to be readded. Reason: %s, output: %s",
656                          node, result.fail_reason, result.output)
657
658     result = utils.RunCmd(["gnt-cluster", "redist-conf"])
659     if result.failed:
660       raise errors.CommandError("Redistribution failed. Fail reason: %s;"
661                                 " output: %s" % (result.fail_reason,
662                                                  result.output))
663
664   # R0201: Method could be a function
665   def _StartupAllInstances(self): # pylint: disable=R0201
666     """Starts up all instances (locally).
667
668     @raise errors.CommandError: If unable to start clusters
669
670     """
671     result = utils.RunCmd(["gnt-instance", "startup", "--all",
672                            "--force-multiple"])
673     if result.failed:
674       raise errors.CommandError("Unable to start all instances."
675                                 " Fail reason: %s; output: %s" %
676                                 (result.fail_reason, result.output))
677
678   # R0201: Method could be a function
679   # TODO: make this overridable, for some verify errors
680   def _VerifyCluster(self): # pylint: disable=R0201
681     """Runs gnt-cluster verify to verify the health.
682
683     @raise errors.ProgrammError: If cluster fails on verification
684
685     """
686     result = utils.RunCmd(["gnt-cluster", "verify"])
687     if result.failed:
688       raise errors.CommandError("Verification of cluster failed."
689                                 " Fail reason: %s; output: %s" %
690                                 (result.fail_reason, result.output))
691
692   def Merge(self):
693     """Does the actual merge.
694
695     It runs all the steps in the right order and updates the user about steps
696     taken. Also it keeps track of rollback_steps to undo everything.
697
698     """
699     rbsteps = []
700     try:
701       logging.info("Pre cluster verification")
702       self._VerifyCluster()
703
704       logging.info("Prepare authorized_keys")
705       rbsteps.append("Remove our key from authorized_keys on nodes:"
706                      " %(nodes)s")
707       self._PrepareAuthorizedKeys()
708
709       rbsteps.append("Start all instances again on the merging"
710                      " clusters: %(clusters)s")
711       if self.stop_instances:
712         logging.info("Stopping merging instances (takes a while)")
713         self._StopMergingInstances()
714       logging.info("Checking that no instances are running on the mergees")
715       instances_running = self._CheckRunningInstances()
716       if instances_running:
717         raise errors.CommandError("Some instances are still running on the"
718                                   " mergees")
719       logging.info("Disable watcher")
720       self._DisableWatcher()
721       logging.info("Merging config")
722       self._FetchRemoteConfig()
723       logging.info("Removing master IPs on mergee master nodes")
724       self._RemoveMasterIps()
725       logging.info("Stop daemons on merging nodes")
726       self._StopDaemons()
727
728       logging.info("Stopping master daemon")
729       self._KillMasterDaemon()
730
731       rbsteps.append("Restore %s from another master candidate"
732                      " and restart master daemon" %
733                      pathutils.CLUSTER_CONF_FILE)
734       self._MergeConfig()
735       self._StartMasterDaemon(no_vote=True)
736
737       # Point of no return, delete rbsteps
738       del rbsteps[:]
739
740       logging.warning("We are at the point of no return. Merge can not easily"
741                       " be undone after this point.")
742       logging.info("Readd nodes")
743       self._ReaddMergedNodesAndRedist()
744
745       logging.info("Merge done, restart master daemon normally")
746       self._KillMasterDaemon()
747       self._StartMasterDaemon()
748
749       if self.restart == _RESTART_ALL:
750         logging.info("Starting instances again")
751         self._StartupAllInstances()
752       else:
753         logging.info("Not starting instances again")
754       logging.info("Post cluster verification")
755       self._VerifyCluster()
756     except errors.GenericError, e:
757       logging.exception(e)
758
759       if rbsteps:
760         nodes = Flatten([data.nodes for data in self.merger_data])
761         info = {
762           "clusters": self.clusters,
763           "nodes": nodes,
764           }
765         logging.critical("In order to rollback do the following:")
766         for step in rbsteps:
767           logging.critical("  * %s", step % info)
768       else:
769         logging.critical("Nothing to rollback.")
770
771       # TODO: Keep track of steps done for a flawless resume?
772
773   def Cleanup(self):
774     """Clean up our environment.
775
776     This cleans up remote private keys and configs and after that
777     deletes the temporary directory.
778
779     """
780     shutil.rmtree(self.work_dir)
781
782
783 def main():
784   """Main routine.
785
786   """
787   program = os.path.basename(sys.argv[0])
788
789   parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
790                                  prog=program)
791   parser.add_option(cli.DEBUG_OPT)
792   parser.add_option(cli.VERBOSE_OPT)
793   parser.add_option(PAUSE_PERIOD_OPT)
794   parser.add_option(GROUPS_OPT)
795   parser.add_option(RESTART_OPT)
796   parser.add_option(PARAMS_OPT)
797   parser.add_option(SKIP_STOP_INSTANCES_OPT)
798
799   (options, args) = parser.parse_args()
800
801   utils.SetupToolLogging(options.debug, options.verbose)
802
803   if not args:
804     parser.error("No clusters specified")
805
806   cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
807                           options.groups, options.restart, options.params,
808                           options.stop_instances)
809   try:
810     try:
811       cluster_merger.Setup()
812       cluster_merger.Merge()
813     except errors.GenericError, e:
814       logging.exception(e)
815       return constants.EXIT_FAILURE
816   finally:
817     cluster_merger.Cleanup()
818
819   return constants.EXIT_SUCCESS
820
821
822 if __name__ == "__main__":
823   sys.exit(main())