Remove useless code in backend for network hooks
[ganeti-local] / tools / cluster-merge
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Tool to merge two or more clusters together.
22
23 The clusters have to run the same version of Ganeti!
24
25 """
26
27 # pylint: disable=C0103
28 # C0103: Invalid name cluster-merge
29
30 import logging
31 import os
32 import optparse
33 import shutil
34 import sys
35 import tempfile
36
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
43 from ganeti import pathutils
44 from ganeti import compat
45
46
47 _GROUPS_MERGE = "merge"
48 _GROUPS_RENAME = "rename"
49 _CLUSTERMERGE_ECID = "clustermerge-ecid"
50 _RESTART_ALL = "all"
51 _RESTART_UP = "up"
52 _RESTART_NONE = "none"
53 _RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
54 _PARAMS_STRICT = "strict"
55 _PARAMS_WARN = "warn"
56 _PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
57
58
59 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
60                                   action="store", type="int",
61                                   dest="pause_period",
62                                   help=("Amount of time in seconds watcher"
63                                         " should be suspended from running"))
64 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
65                             choices=(_GROUPS_MERGE, _GROUPS_RENAME),
66                             dest="groups",
67                             help=("How to handle groups that have the"
68                                   " same name (One of: %s/%s)" %
69                                   (_GROUPS_MERGE, _GROUPS_RENAME)))
70 PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
71                             metavar="STRATEGY",
72                             choices=_PARAMS_CHOICES,
73                             dest="params",
74                             help=("How to handle params that have"
75                                   " different values (One of: %s/%s)" %
76                                   _PARAMS_CHOICES))
77
78 RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
79                              metavar="STRATEGY",
80                              choices=_RESTART_CHOICES,
81                              dest="restart",
82                              help=("How to handle restarting instances"
83                                    " same name (One of: %s/%s/%s)" %
84                                    _RESTART_CHOICES))
85
86 SKIP_STOP_INSTANCES_OPT = \
87   cli.cli_option("--skip-stop-instances", default=True, action="store_false",
88                  dest="stop_instances",
89                  help=("Don't stop the instances on the clusters, just check "
90                        "that none is running"))
91
92
93 def Flatten(unflattened_list):
94   """Flattens a list.
95
96   @param unflattened_list: A list of unflattened list objects.
97   @return: A flattened list
98
99   """
100   flattened_list = []
101
102   for item in unflattened_list:
103     if isinstance(item, list):
104       flattened_list.extend(Flatten(item))
105     else:
106       flattened_list.append(item)
107   return flattened_list
108
109
110 class MergerData(object):
111   """Container class to hold data used for merger.
112
113   """
114   def __init__(self, cluster, key_path, nodes, instances, master_node,
115                config_path=None):
116     """Initialize the container.
117
118     @param cluster: The name of the cluster
119     @param key_path: Path to the ssh private key used for authentication
120     @param nodes: List of online nodes in the merging cluster
121     @param instances: List of instances running on merging cluster
122     @param master_node: Name of the master node
123     @param config_path: Path to the merging cluster config
124
125     """
126     self.cluster = cluster
127     self.key_path = key_path
128     self.nodes = nodes
129     self.instances = instances
130     self.master_node = master_node
131     self.config_path = config_path
132
133
134 class Merger(object):
135   """Handling the merge.
136
137   """
138   RUNNING_STATUSES = compat.UniqueFrozenset([
139     constants.INSTST_RUNNING,
140     constants.INSTST_ERRORUP,
141     ])
142
143   def __init__(self, clusters, pause_period, groups, restart, params,
144                stop_instances):
145     """Initialize object with sane defaults and infos required.
146
147     @param clusters: The list of clusters to merge in
148     @param pause_period: The time watcher shall be disabled for
149     @param groups: How to handle group conflicts
150     @param restart: How to handle instance restart
151     @param stop_instances: Indicates whether the instances must be stopped
152                            (True) or if the Merger must only check if no
153                            instances are running on the mergee clusters (False)
154
155     """
156     self.merger_data = []
157     self.clusters = clusters
158     self.pause_period = pause_period
159     self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
160     (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
161     self.ssh_runner = ssh.SshRunner(self.cluster_name)
162     self.groups = groups
163     self.restart = restart
164     self.params = params
165     self.stop_instances = stop_instances
166     if self.restart == _RESTART_UP:
167       raise NotImplementedError
168
169   def Setup(self):
170     """Sets up our end so we can do the merger.
171
172     This method is setting us up as a preparation for the merger.
173     It makes the initial contact and gathers information needed.
174
175     @raise errors.RemoteError: for errors in communication/grabbing
176
177     """
178     (remote_path, _, _) = ssh.GetUserFiles("root")
179
180     if self.cluster_name in self.clusters:
181       raise errors.CommandError("Cannot merge cluster %s with itself" %
182                                 self.cluster_name)
183
184     # Fetch remotes private key
185     for cluster in self.clusters:
186       result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
187                             ask_key=False)
188       if result.failed:
189         raise errors.RemoteError("There was an error while grabbing ssh private"
190                                  " key from %s. Fail reason: %s; output: %s" %
191                                  (cluster, result.fail_reason, result.output))
192
193       key_path = utils.PathJoin(self.work_dir, cluster)
194       utils.WriteFile(key_path, mode=0600, data=result.stdout)
195
196       result = self._RunCmd(cluster, "gnt-node list -o name,offline"
197                             " --no-headers --separator=,", private_key=key_path)
198       if result.failed:
199         raise errors.RemoteError("Unable to retrieve list of nodes from %s."
200                                  " Fail reason: %s; output: %s" %
201                                  (cluster, result.fail_reason, result.output))
202       nodes_statuses = [line.split(",") for line in result.stdout.splitlines()]
203       nodes = [node_status[0] for node_status in nodes_statuses
204                if node_status[1] == "N"]
205
206       result = self._RunCmd(cluster, "gnt-instance list -o name --no-headers",
207                             private_key=key_path)
208       if result.failed:
209         raise errors.RemoteError("Unable to retrieve list of instances from"
210                                  " %s. Fail reason: %s; output: %s" %
211                                  (cluster, result.fail_reason, result.output))
212       instances = result.stdout.splitlines()
213
214       path = utils.PathJoin(pathutils.DATA_DIR, "ssconf_%s" %
215                             constants.SS_MASTER_NODE)
216       result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path)
217       if result.failed:
218         raise errors.RemoteError("Unable to retrieve the master node name from"
219                                  " %s. Fail reason: %s; output: %s" %
220                                  (cluster, result.fail_reason, result.output))
221       master_node = result.stdout.strip()
222
223       self.merger_data.append(MergerData(cluster, key_path, nodes, instances,
224                                          master_node))
225
226   def _PrepareAuthorizedKeys(self):
227     """Prepare the authorized_keys on every merging node.
228
229     This method add our public key to remotes authorized_key for further
230     communication.
231
232     """
233     (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
234     pub_key = utils.ReadFile(pub_key_file)
235
236     for data in self.merger_data:
237       for node in data.nodes:
238         result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
239                                      (auth_keys, pub_key)),
240                               private_key=data.key_path, max_attempts=3)
241
242         if result.failed:
243           raise errors.RemoteError("Unable to add our public key to %s in %s."
244                                    " Fail reason: %s; output: %s" %
245                                    (node, data.cluster, result.fail_reason,
246                                     result.output))
247
248   def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
249               strict_host_check=False, private_key=None, batch=True,
250               ask_key=False, max_attempts=1):
251     """Wrapping SshRunner.Run with default parameters.
252
253     For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
254
255     """
256     for _ in range(max_attempts):
257       result = self.ssh_runner.Run(hostname=hostname, command=command,
258                                    user=user, use_cluster_key=use_cluster_key,
259                                    strict_host_check=strict_host_check,
260                                    private_key=private_key, batch=batch,
261                                    ask_key=ask_key)
262       if not result.failed:
263         break
264
265     return result
266
267   def _CheckRunningInstances(self):
268     """Checks if on the clusters to be merged there are running instances
269
270     @rtype: boolean
271     @return: True if there are running instances, False otherwise
272
273     """
274     for cluster in self.clusters:
275       result = self._RunCmd(cluster, "gnt-instance list -o status")
276       if self.RUNNING_STATUSES.intersection(result.output.splitlines()):
277         return True
278
279     return False
280
281   def _StopMergingInstances(self):
282     """Stop instances on merging clusters.
283
284     """
285     for cluster in self.clusters:
286       result = self._RunCmd(cluster, "gnt-instance shutdown --all"
287                                      " --force-multiple")
288
289       if result.failed:
290         raise errors.RemoteError("Unable to stop instances on %s."
291                                  " Fail reason: %s; output: %s" %
292                                  (cluster, result.fail_reason, result.output))
293
294   def _DisableWatcher(self):
295     """Disable watch on all merging clusters, including ourself.
296
297     """
298     for cluster in ["localhost"] + self.clusters:
299       result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
300                                      self.pause_period)
301
302       if result.failed:
303         raise errors.RemoteError("Unable to pause watcher on %s."
304                                  " Fail reason: %s; output: %s" %
305                                  (cluster, result.fail_reason, result.output))
306
307   def _RemoveMasterIps(self):
308     """Removes the master IPs from the master nodes of each cluster.
309
310     """
311     for data in self.merger_data:
312       result = self._RunCmd(data.master_node,
313                             "gnt-cluster deactivate-master-ip --yes")
314
315       if result.failed:
316         raise errors.RemoteError("Unable to remove master IP on %s."
317                                  " Fail reason: %s; output: %s" %
318                                  (data.master_node,
319                                   result.fail_reason,
320                                   result.output))
321
322   def _StopDaemons(self):
323     """Stop all daemons on merging nodes.
324
325     """
326     cmd = "%s stop-all" % pathutils.DAEMON_UTIL
327     for data in self.merger_data:
328       for node in data.nodes:
329         result = self._RunCmd(node, cmd, max_attempts=3)
330
331         if result.failed:
332           raise errors.RemoteError("Unable to stop daemons on %s."
333                                    " Fail reason: %s; output: %s." %
334                                    (node, result.fail_reason, result.output))
335
336   def _FetchRemoteConfig(self):
337     """Fetches and stores remote cluster config from the master.
338
339     This step is needed before we can merge the config.
340
341     """
342     for data in self.merger_data:
343       result = self._RunCmd(data.cluster, "cat %s" %
344                                           pathutils.CLUSTER_CONF_FILE)
345
346       if result.failed:
347         raise errors.RemoteError("Unable to retrieve remote config on %s."
348                                  " Fail reason: %s; output %s" %
349                                  (data.cluster, result.fail_reason,
350                                   result.output))
351
352       data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
353                                         data.cluster)
354       utils.WriteFile(data.config_path, data=result.stdout)
355
356   # R0201: Method could be a function
357   def _KillMasterDaemon(self): # pylint: disable=R0201
358     """Kills the local master daemon.
359
360     @raise errors.CommandError: If unable to kill
361
362     """
363     result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
364     if result.failed:
365       raise errors.CommandError("Unable to stop master daemons."
366                                 " Fail reason: %s; output: %s" %
367                                 (result.fail_reason, result.output))
368
369   def _MergeConfig(self):
370     """Merges all foreign config into our own config.
371
372     """
373     my_config = config.ConfigWriter(offline=True)
374     fake_ec_id = 0 # Needs to be uniq over the whole config merge
375
376     for data in self.merger_data:
377       other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
378       self._MergeClusterConfigs(my_config, other_config)
379       self._MergeNodeGroups(my_config, other_config)
380
381       for node in other_config.GetNodeList():
382         node_info = other_config.GetNodeInfo(node)
383         # Offline the node, it will be reonlined later at node readd
384         node_info.master_candidate = False
385         node_info.drained = False
386         node_info.offline = True
387         my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
388         fake_ec_id += 1
389
390       for instance in other_config.GetInstanceList():
391         instance_info = other_config.GetInstanceInfo(instance)
392
393         # Update the DRBD port assignments
394         # This is a little bit hackish
395         for dsk in instance_info.disks:
396           if dsk.dev_type in constants.LDS_DRBD:
397             port = my_config.AllocatePort()
398
399             logical_id = list(dsk.logical_id)
400             logical_id[2] = port
401             dsk.logical_id = tuple(logical_id)
402
403             physical_id = list(dsk.physical_id)
404             physical_id[1] = physical_id[3] = port
405             dsk.physical_id = tuple(physical_id)
406
407         my_config.AddInstance(instance_info,
408                               _CLUSTERMERGE_ECID + str(fake_ec_id))
409         fake_ec_id += 1
410
411   def _MergeClusterConfigs(self, my_config, other_config):
412     """Checks that all relevant cluster parameters are compatible
413
414     """
415     my_cluster = my_config.GetClusterInfo()
416     other_cluster = other_config.GetClusterInfo()
417     err_count = 0
418
419     #
420     # Generic checks
421     #
422     check_params = [
423       "beparams",
424       "default_iallocator",
425       "drbd_usermode_helper",
426       "hidden_os",
427       "maintain_node_health",
428       "master_netdev",
429       "ndparams",
430       "nicparams",
431       "primary_ip_family",
432       "tags",
433       "uid_pool",
434       ]
435     check_params_strict = [
436       "volume_group_name",
437     ]
438     if constants.ENABLE_FILE_STORAGE:
439       check_params_strict.append("file_storage_dir")
440     if constants.ENABLE_SHARED_FILE_STORAGE:
441       check_params_strict.append("shared_file_storage_dir")
442     check_params.extend(check_params_strict)
443
444     if self.params == _PARAMS_STRICT:
445       params_strict = True
446     else:
447       params_strict = False
448
449     for param_name in check_params:
450       my_param = getattr(my_cluster, param_name)
451       other_param = getattr(other_cluster, param_name)
452       if my_param != other_param:
453         logging.error("The value (%s) of the cluster parameter %s on %s"
454                       " differs to this cluster's value (%s)",
455                       other_param, param_name, other_cluster.cluster_name,
456                       my_param)
457         if params_strict or param_name in check_params_strict:
458           err_count += 1
459
460     #
461     # Custom checks
462     #
463
464     # Check default hypervisor
465     my_defhyp = my_cluster.enabled_hypervisors[0]
466     other_defhyp = other_cluster.enabled_hypervisors[0]
467     if my_defhyp != other_defhyp:
468       logging.warning("The default hypervisor (%s) differs on %s, new"
469                       " instances will be created with this cluster's"
470                       " default hypervisor (%s)", other_defhyp,
471                       other_cluster.cluster_name, my_defhyp)
472
473     if (set(my_cluster.enabled_hypervisors) !=
474         set(other_cluster.enabled_hypervisors)):
475       logging.error("The set of enabled hypervisors (%s) on %s differs to"
476                     " this cluster's set (%s)",
477                     other_cluster.enabled_hypervisors,
478                     other_cluster.cluster_name, my_cluster.enabled_hypervisors)
479       err_count += 1
480
481     # Check hypervisor params for hypervisors we care about
482     for hyp in my_cluster.enabled_hypervisors:
483       for param in my_cluster.hvparams[hyp]:
484         my_value = my_cluster.hvparams[hyp][param]
485         other_value = other_cluster.hvparams[hyp][param]
486         if my_value != other_value:
487           logging.error("The value (%s) of the %s parameter of the %s"
488                         " hypervisor on %s differs to this cluster's parameter"
489                         " (%s)",
490                         other_value, param, hyp, other_cluster.cluster_name,
491                         my_value)
492           if params_strict:
493             err_count += 1
494
495     # Check os hypervisor params for hypervisors we care about
496     for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
497       for hyp in my_cluster.enabled_hypervisors:
498         my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
499         other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
500         if my_os_hvp != other_os_hvp:
501           logging.error("The OS parameters (%s) for the %s OS for the %s"
502                         " hypervisor on %s differs to this cluster's parameters"
503                         " (%s)",
504                         other_os_hvp, os_name, hyp, other_cluster.cluster_name,
505                         my_os_hvp)
506           if params_strict:
507             err_count += 1
508
509     #
510     # Warnings
511     #
512     if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
513       logging.warning("The modify_etc_hosts value (%s) differs on %s,"
514                       " this cluster's value (%s) will take precedence",
515                       other_cluster.modify_etc_hosts,
516                       other_cluster.cluster_name,
517                       my_cluster.modify_etc_hosts)
518
519     if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
520       logging.warning("The modify_ssh_setup value (%s) differs on %s,"
521                       " this cluster's value (%s) will take precedence",
522                       other_cluster.modify_ssh_setup,
523                       other_cluster.cluster_name,
524                       my_cluster.modify_ssh_setup)
525
526     #
527     # Actual merging
528     #
529     my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
530                                        other_cluster.reserved_lvs))
531
532     if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
533       logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
534                       " cluster's value (%s). The least permissive value (%s)"
535                       " will be used", other_cluster.prealloc_wipe_disks,
536                       other_cluster.cluster_name,
537                       my_cluster.prealloc_wipe_disks, True)
538       my_cluster.prealloc_wipe_disks = True
539
540     for os_, osparams in other_cluster.osparams.items():
541       if os_ not in my_cluster.osparams:
542         my_cluster.osparams[os_] = osparams
543       elif my_cluster.osparams[os_] != osparams:
544         logging.error("The OS parameters (%s) for the %s OS on %s differs to"
545                       " this cluster's parameters (%s)",
546                       osparams, os_, other_cluster.cluster_name,
547                       my_cluster.osparams[os_])
548         if params_strict:
549           err_count += 1
550
551     if err_count:
552       raise errors.ConfigurationError("Cluster config for %s has incompatible"
553                                       " values, please fix and re-run" %
554                                       other_cluster.cluster_name)
555
556   # R0201: Method could be a function
557   def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable=R0201
558     if os_name in cluster.os_hvp:
559       return cluster.os_hvp[os_name].get(hyp, None)
560     else:
561       return None
562
563   # R0201: Method could be a function
564   def _MergeNodeGroups(self, my_config, other_config):
565     """Adds foreign node groups
566
567     ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
568     """
569     # pylint: disable=R0201
570     logging.info("Node group conflict strategy: %s", self.groups)
571
572     my_grps = my_config.GetAllNodeGroupsInfo().values()
573     other_grps = other_config.GetAllNodeGroupsInfo().values()
574
575     # Check for node group naming conflicts:
576     conflicts = []
577     for other_grp in other_grps:
578       for my_grp in my_grps:
579         if other_grp.name == my_grp.name:
580           conflicts.append(other_grp)
581
582     if conflicts:
583       conflict_names = utils.CommaJoin([g.name for g in conflicts])
584       logging.info("Node groups in both local and remote cluster: %s",
585                    conflict_names)
586
587       # User hasn't specified how to handle conflicts
588       if not self.groups:
589         raise errors.CommandError("The following node group(s) are in both"
590                                   " clusters, and no merge strategy has been"
591                                   " supplied (see the --groups option): %s" %
592                                   conflict_names)
593
594       # User wants to rename conflicts
595       elif self.groups == _GROUPS_RENAME:
596         for grp in conflicts:
597           new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
598           logging.info("Renaming remote node group from %s to %s"
599                        " to resolve conflict", grp.name, new_name)
600           grp.name = new_name
601
602       # User wants to merge conflicting groups
603       elif self.groups == _GROUPS_MERGE:
604         for other_grp in conflicts:
605           logging.info("Merging local and remote '%s' groups", other_grp.name)
606           for node_name in other_grp.members[:]:
607             node = other_config.GetNodeInfo(node_name)
608             # Access to a protected member of a client class
609             # pylint: disable=W0212
610             other_config._UnlockedRemoveNodeFromGroup(node)
611
612             # Access to a protected member of a client class
613             # pylint: disable=W0212
614             my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
615
616             # Access to a protected member of a client class
617             # pylint: disable=W0212
618             my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
619             node.group = my_grp_uuid
620           # Remove from list of groups to add
621           other_grps.remove(other_grp)
622
623     for grp in other_grps:
624       #TODO: handle node group conflicts
625       my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
626
627   # R0201: Method could be a function
628   def _StartMasterDaemon(self, no_vote=False): # pylint: disable=R0201
629     """Starts the local master daemon.
630
631     @param no_vote: Should the masterd started without voting? default: False
632     @raise errors.CommandError: If unable to start daemon.
633
634     """
635     env = {}
636     if no_vote:
637       env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
638
639     result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
640     if result.failed:
641       raise errors.CommandError("Couldn't start ganeti master."
642                                 " Fail reason: %s; output: %s" %
643                                 (result.fail_reason, result.output))
644
645   def _ReaddMergedNodesAndRedist(self):
646     """Readds all merging nodes and make sure their config is up-to-date.
647
648     @raise errors.CommandError: If anything fails.
649
650     """
651     for data in self.merger_data:
652       for node in data.nodes:
653         logging.info("Readding node %s", node)
654         result = utils.RunCmd(["gnt-node", "add", "--readd",
655                                "--no-ssh-key-check", node])
656         if result.failed:
657           logging.error("%s failed to be readded. Reason: %s, output: %s",
658                          node, result.fail_reason, result.output)
659
660     result = utils.RunCmd(["gnt-cluster", "redist-conf"])
661     if result.failed:
662       raise errors.CommandError("Redistribution failed. Fail reason: %s;"
663                                 " output: %s" % (result.fail_reason,
664                                                  result.output))
665
666   # R0201: Method could be a function
667   def _StartupAllInstances(self): # pylint: disable=R0201
668     """Starts up all instances (locally).
669
670     @raise errors.CommandError: If unable to start clusters
671
672     """
673     result = utils.RunCmd(["gnt-instance", "startup", "--all",
674                            "--force-multiple"])
675     if result.failed:
676       raise errors.CommandError("Unable to start all instances."
677                                 " Fail reason: %s; output: %s" %
678                                 (result.fail_reason, result.output))
679
680   # R0201: Method could be a function
681   # TODO: make this overridable, for some verify errors
682   def _VerifyCluster(self): # pylint: disable=R0201
683     """Runs gnt-cluster verify to verify the health.
684
685     @raise errors.ProgrammError: If cluster fails on verification
686
687     """
688     result = utils.RunCmd(["gnt-cluster", "verify"])
689     if result.failed:
690       raise errors.CommandError("Verification of cluster failed."
691                                 " Fail reason: %s; output: %s" %
692                                 (result.fail_reason, result.output))
693
694   def Merge(self):
695     """Does the actual merge.
696
697     It runs all the steps in the right order and updates the user about steps
698     taken. Also it keeps track of rollback_steps to undo everything.
699
700     """
701     rbsteps = []
702     try:
703       logging.info("Pre cluster verification")
704       self._VerifyCluster()
705
706       logging.info("Prepare authorized_keys")
707       rbsteps.append("Remove our key from authorized_keys on nodes:"
708                      " %(nodes)s")
709       self._PrepareAuthorizedKeys()
710
711       rbsteps.append("Start all instances again on the merging"
712                      " clusters: %(clusters)s")
713       if self.stop_instances:
714         logging.info("Stopping merging instances (takes a while)")
715         self._StopMergingInstances()
716       logging.info("Checking that no instances are running on the mergees")
717       instances_running = self._CheckRunningInstances()
718       if instances_running:
719         raise errors.CommandError("Some instances are still running on the"
720                                   " mergees")
721       logging.info("Disable watcher")
722       self._DisableWatcher()
723       logging.info("Merging config")
724       self._FetchRemoteConfig()
725       logging.info("Removing master IPs on mergee master nodes")
726       self._RemoveMasterIps()
727       logging.info("Stop daemons on merging nodes")
728       self._StopDaemons()
729
730       logging.info("Stopping master daemon")
731       self._KillMasterDaemon()
732
733       rbsteps.append("Restore %s from another master candidate"
734                      " and restart master daemon" %
735                      pathutils.CLUSTER_CONF_FILE)
736       self._MergeConfig()
737       self._StartMasterDaemon(no_vote=True)
738
739       # Point of no return, delete rbsteps
740       del rbsteps[:]
741
742       logging.warning("We are at the point of no return. Merge can not easily"
743                       " be undone after this point.")
744       logging.info("Readd nodes")
745       self._ReaddMergedNodesAndRedist()
746
747       logging.info("Merge done, restart master daemon normally")
748       self._KillMasterDaemon()
749       self._StartMasterDaemon()
750
751       if self.restart == _RESTART_ALL:
752         logging.info("Starting instances again")
753         self._StartupAllInstances()
754       else:
755         logging.info("Not starting instances again")
756       logging.info("Post cluster verification")
757       self._VerifyCluster()
758     except errors.GenericError, e:
759       logging.exception(e)
760
761       if rbsteps:
762         nodes = Flatten([data.nodes for data in self.merger_data])
763         info = {
764           "clusters": self.clusters,
765           "nodes": nodes,
766           }
767         logging.critical("In order to rollback do the following:")
768         for step in rbsteps:
769           logging.critical("  * %s", step % info)
770       else:
771         logging.critical("Nothing to rollback.")
772
773       # TODO: Keep track of steps done for a flawless resume?
774
775   def Cleanup(self):
776     """Clean up our environment.
777
778     This cleans up remote private keys and configs and after that
779     deletes the temporary directory.
780
781     """
782     shutil.rmtree(self.work_dir)
783
784
785 def main():
786   """Main routine.
787
788   """
789   program = os.path.basename(sys.argv[0])
790
791   parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
792                                  prog=program)
793   parser.add_option(cli.DEBUG_OPT)
794   parser.add_option(cli.VERBOSE_OPT)
795   parser.add_option(PAUSE_PERIOD_OPT)
796   parser.add_option(GROUPS_OPT)
797   parser.add_option(RESTART_OPT)
798   parser.add_option(PARAMS_OPT)
799   parser.add_option(SKIP_STOP_INSTANCES_OPT)
800
801   (options, args) = parser.parse_args()
802
803   utils.SetupToolLogging(options.debug, options.verbose)
804
805   if not args:
806     parser.error("No clusters specified")
807
808   cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
809                           options.groups, options.restart, options.params,
810                           options.stop_instances)
811   try:
812     try:
813       cluster_merger.Setup()
814       cluster_merger.Merge()
815     except errors.GenericError, e:
816       logging.exception(e)
817       return constants.EXIT_FAILURE
818   finally:
819     cluster_merger.Cleanup()
820
821   return constants.EXIT_SUCCESS
822
823
824 if __name__ == "__main__":
825   sys.exit(main())