Use --yes to deactivate master ip in cluster merge
[ganeti-local] / tools / cluster-merge
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Tool to merge two or more clusters together.
22
23 The clusters have to run the same version of Ganeti!
24
25 """
26
27 # pylint: disable=C0103
28 # C0103: Invalid name cluster-merge
29
30 import logging
31 import os
32 import optparse
33 import shutil
34 import sys
35 import tempfile
36
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
43
44
45 _GROUPS_MERGE = "merge"
46 _GROUPS_RENAME = "rename"
47 _CLUSTERMERGE_ECID = "clustermerge-ecid"
48 _RESTART_ALL = "all"
49 _RESTART_UP = "up"
50 _RESTART_NONE = "none"
51 _RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
52 _PARAMS_STRICT = "strict"
53 _PARAMS_WARN = "warn"
54 _PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
55
56
57 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
58                                   action="store", type="int",
59                                   dest="pause_period",
60                                   help=("Amount of time in seconds watcher"
61                                         " should be suspended from running"))
62 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
63                             choices=(_GROUPS_MERGE, _GROUPS_RENAME),
64                             dest="groups",
65                             help=("How to handle groups that have the"
66                                   " same name (One of: %s/%s)" %
67                                   (_GROUPS_MERGE, _GROUPS_RENAME)))
68 PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
69                             metavar="STRATEGY",
70                             choices=_PARAMS_CHOICES,
71                             dest="params",
72                             help=("How to handle params that have"
73                                   " different values (One of: %s/%s)" %
74                                   _PARAMS_CHOICES))
75
76 RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
77                              metavar="STRATEGY",
78                              choices=_RESTART_CHOICES,
79                              dest="restart",
80                              help=("How to handle restarting instances"
81                                    " same name (One of: %s/%s/%s)" %
82                                    _RESTART_CHOICES))
83
84 SKIP_STOP_INSTANCES_OPT = \
85   cli.cli_option("--skip-stop-instances", default=True, action="store_false",
86                  dest="stop_instances",
87                  help=("Don't stop the instances on the clusters, just check "
88                        "that none is running"))
89
90
91 def Flatten(unflattened_list):
92   """Flattens a list.
93
94   @param unflattened_list: A list of unflattened list objects.
95   @return: A flattened list
96
97   """
98   flattened_list = []
99
100   for item in unflattened_list:
101     if isinstance(item, list):
102       flattened_list.extend(Flatten(item))
103     else:
104       flattened_list.append(item)
105   return flattened_list
106
107
108 class MergerData(object):
109   """Container class to hold data used for merger.
110
111   """
112   def __init__(self, cluster, key_path, nodes, instances, master_node,
113                config_path=None):
114     """Initialize the container.
115
116     @param cluster: The name of the cluster
117     @param key_path: Path to the ssh private key used for authentication
118     @param nodes: List of online nodes in the merging cluster
119     @param instances: List of instances running on merging cluster
120     @param master_node: Name of the master node
121     @param config_path: Path to the merging cluster config
122
123     """
124     self.cluster = cluster
125     self.key_path = key_path
126     self.nodes = nodes
127     self.instances = instances
128     self.master_node = master_node
129     self.config_path = config_path
130
131
132 class Merger(object):
133   """Handling the merge.
134
135   """
136   RUNNING_STATUSES = frozenset([
137     constants.INSTST_RUNNING,
138     constants.INSTST_ERRORUP,
139     ])
140
141   def __init__(self, clusters, pause_period, groups, restart, params,
142                stop_instances):
143     """Initialize object with sane defaults and infos required.
144
145     @param clusters: The list of clusters to merge in
146     @param pause_period: The time watcher shall be disabled for
147     @param groups: How to handle group conflicts
148     @param restart: How to handle instance restart
149     @param stop_instances: Indicates whether the instances must be stopped
150                            (True) or if the Merger must only check if no
151                            instances are running on the mergee clusters (False)
152
153     """
154     self.merger_data = []
155     self.clusters = clusters
156     self.pause_period = pause_period
157     self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
158     (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
159     self.ssh_runner = ssh.SshRunner(self.cluster_name)
160     self.groups = groups
161     self.restart = restart
162     self.params = params
163     self.stop_instances = stop_instances
164     if self.restart == _RESTART_UP:
165       raise NotImplementedError
166
167   def Setup(self):
168     """Sets up our end so we can do the merger.
169
170     This method is setting us up as a preparation for the merger.
171     It makes the initial contact and gathers information needed.
172
173     @raise errors.RemoteError: for errors in communication/grabbing
174
175     """
176     (remote_path, _, _) = ssh.GetUserFiles("root")
177
178     if self.cluster_name in self.clusters:
179       raise errors.CommandError("Cannot merge cluster %s with itself" %
180                                 self.cluster_name)
181
182     # Fetch remotes private key
183     for cluster in self.clusters:
184       result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
185                             ask_key=False)
186       if result.failed:
187         raise errors.RemoteError("There was an error while grabbing ssh private"
188                                  " key from %s. Fail reason: %s; output: %s" %
189                                  (cluster, result.fail_reason, result.output))
190
191       key_path = utils.PathJoin(self.work_dir, cluster)
192       utils.WriteFile(key_path, mode=0600, data=result.stdout)
193
194       result = self._RunCmd(cluster, "gnt-node list -o name,offline"
195                             " --no-header --separator=,", private_key=key_path)
196       if result.failed:
197         raise errors.RemoteError("Unable to retrieve list of nodes from %s."
198                                  " Fail reason: %s; output: %s" %
199                                  (cluster, result.fail_reason, result.output))
200       nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
201       nodes = [node_status[0] for node_status in nodes_statuses
202                if node_status[1] == "N"]
203
204       result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
205                             private_key=key_path)
206       if result.failed:
207         raise errors.RemoteError("Unable to retrieve list of instances from"
208                                  " %s. Fail reason: %s; output: %s" %
209                                  (cluster, result.fail_reason, result.output))
210       instances = result.stdout.splitlines()
211
212       path = utils.PathJoin(constants.DATA_DIR, "ssconf_%s" %
213                             constants.SS_MASTER_NODE)
214       result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path)
215       if result.failed:
216         raise errors.RemoteError("Unable to retrieve the master node name from"
217                                  " %s. Fail reason: %s; output: %s" %
218                                  (cluster, result.fail_reason, result.output))
219       master_node = result.stdout.strip()
220
221       self.merger_data.append(MergerData(cluster, key_path, nodes, instances,
222                                          master_node))
223
224   def _PrepareAuthorizedKeys(self):
225     """Prepare the authorized_keys on every merging node.
226
227     This method add our public key to remotes authorized_key for further
228     communication.
229
230     """
231     (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
232     pub_key = utils.ReadFile(pub_key_file)
233
234     for data in self.merger_data:
235       for node in data.nodes:
236         result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
237                                      (auth_keys, pub_key)),
238                               private_key=data.key_path, max_attempts=3)
239
240         if result.failed:
241           raise errors.RemoteError("Unable to add our public key to %s in %s."
242                                    " Fail reason: %s; output: %s" %
243                                    (node, data.cluster, result.fail_reason,
244                                     result.output))
245
246   def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
247               strict_host_check=False, private_key=None, batch=True,
248               ask_key=False, max_attempts=1):
249     """Wrapping SshRunner.Run with default parameters.
250
251     For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
252
253     """
254     for _ in range(max_attempts):
255       result = self.ssh_runner.Run(hostname=hostname, command=command,
256                                  user=user, use_cluster_key=use_cluster_key,
257                                  strict_host_check=strict_host_check,
258                                  private_key=private_key, batch=batch,
259                                  ask_key=ask_key)
260       if not result.failed:
261         break
262
263     return result
264
265   def _CheckRunningInstances(self):
266     """Checks if on the clusters to be merged there are running instances
267
268     @rtype: boolean
269     @return: True if there are running instances, False otherwise
270
271     """
272     for cluster in self.clusters:
273       result = self._RunCmd(cluster, "gnt-instance list -o status")
274       if self.RUNNING_STATUSES.intersection(result.output.splitlines()):
275         return True
276
277     return False
278
279   def _StopMergingInstances(self):
280     """Stop instances on merging clusters.
281
282     """
283     for cluster in self.clusters:
284       result = self._RunCmd(cluster, "gnt-instance shutdown --all"
285                                      " --force-multiple")
286
287       if result.failed:
288         raise errors.RemoteError("Unable to stop instances on %s."
289                                  " Fail reason: %s; output: %s" %
290                                  (cluster, result.fail_reason, result.output))
291
292   def _DisableWatcher(self):
293     """Disable watch on all merging clusters, including ourself.
294
295     """
296     for cluster in ["localhost"] + self.clusters:
297       result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
298                                      self.pause_period)
299
300       if result.failed:
301         raise errors.RemoteError("Unable to pause watcher on %s."
302                                  " Fail reason: %s; output: %s" %
303                                  (cluster, result.fail_reason, result.output))
304
305   def _RemoveMasterIps(self):
306     """Removes the master IPs from the master nodes of each cluster.
307
308     """
309     for data in self.merger_data:
310       result = self._RunCmd(data.master_node,
311                             "gnt-cluster deactivate-master-ip --yes")
312
313       if result.failed:
314         raise errors.RemoteError("Unable to remove master IP on %s."
315                                  " Fail reason: %s; output: %s" %
316                                  (data.master_node,
317                                   result.fail_reason,
318                                   result.output))
319
320   def _StopDaemons(self):
321     """Stop all daemons on merging nodes.
322
323     """
324     cmd = "%s stop-all" % constants.DAEMON_UTIL
325     for data in self.merger_data:
326       for node in data.nodes:
327         result = self._RunCmd(node, cmd, max_attempts=3)
328
329         if result.failed:
330           raise errors.RemoteError("Unable to stop daemons on %s."
331                                    " Fail reason: %s; output: %s." %
332                                    (node, result.fail_reason, result.output))
333
334   def _FetchRemoteConfig(self):
335     """Fetches and stores remote cluster config from the master.
336
337     This step is needed before we can merge the config.
338
339     """
340     for data in self.merger_data:
341       result = self._RunCmd(data.cluster, "cat %s" %
342                                           constants.CLUSTER_CONF_FILE)
343
344       if result.failed:
345         raise errors.RemoteError("Unable to retrieve remote config on %s."
346                                  " Fail reason: %s; output %s" %
347                                  (data.cluster, result.fail_reason,
348                                   result.output))
349
350       data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
351                                         data.cluster)
352       utils.WriteFile(data.config_path, data=result.stdout)
353
354   # R0201: Method could be a function
355   def _KillMasterDaemon(self): # pylint: disable=R0201
356     """Kills the local master daemon.
357
358     @raise errors.CommandError: If unable to kill
359
360     """
361     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
362     if result.failed:
363       raise errors.CommandError("Unable to stop master daemons."
364                                 " Fail reason: %s; output: %s" %
365                                 (result.fail_reason, result.output))
366
367   def _MergeConfig(self):
368     """Merges all foreign config into our own config.
369
370     """
371     my_config = config.ConfigWriter(offline=True)
372     fake_ec_id = 0 # Needs to be uniq over the whole config merge
373
374     for data in self.merger_data:
375       other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
376       self._MergeClusterConfigs(my_config, other_config)
377       self._MergeNodeGroups(my_config, other_config)
378
379       for node in other_config.GetNodeList():
380         node_info = other_config.GetNodeInfo(node)
381         # Offline the node, it will be reonlined later at node readd
382         node_info.master_candidate = False
383         node_info.drained = False
384         node_info.offline = True
385         my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
386         fake_ec_id += 1
387
388       for instance in other_config.GetInstanceList():
389         instance_info = other_config.GetInstanceInfo(instance)
390
391         # Update the DRBD port assignments
392         # This is a little bit hackish
393         for dsk in instance_info.disks:
394           if dsk.dev_type in constants.LDS_DRBD:
395             port = my_config.AllocatePort()
396
397             logical_id = list(dsk.logical_id)
398             logical_id[2] = port
399             dsk.logical_id = tuple(logical_id)
400
401             physical_id = list(dsk.physical_id)
402             physical_id[1] = physical_id[3] = port
403             dsk.physical_id = tuple(physical_id)
404
405         my_config.AddInstance(instance_info,
406                               _CLUSTERMERGE_ECID + str(fake_ec_id))
407         fake_ec_id += 1
408
409   def _MergeClusterConfigs(self, my_config, other_config):
410     """Checks that all relevant cluster parameters are compatible
411
412     """
413     my_cluster = my_config.GetClusterInfo()
414     other_cluster = other_config.GetClusterInfo()
415     err_count = 0
416
417     #
418     # Generic checks
419     #
420     check_params = [
421       "beparams",
422       "default_iallocator",
423       "drbd_usermode_helper",
424       "hidden_os",
425       "maintain_node_health",
426       "master_netdev",
427       "ndparams",
428       "nicparams",
429       "primary_ip_family",
430       "tags",
431       "uid_pool",
432       ]
433     check_params_strict = [
434       "volume_group_name",
435     ]
436     if constants.ENABLE_FILE_STORAGE:
437       check_params_strict.append("file_storage_dir")
438     if constants.ENABLE_SHARED_FILE_STORAGE:
439       check_params_strict.append("shared_file_storage_dir")
440     check_params.extend(check_params_strict)
441
442     if self.params == _PARAMS_STRICT:
443       params_strict = True
444     else:
445       params_strict = False
446
447     for param_name in check_params:
448       my_param = getattr(my_cluster, param_name)
449       other_param = getattr(other_cluster, param_name)
450       if my_param != other_param:
451         logging.error("The value (%s) of the cluster parameter %s on %s"
452                       " differs to this cluster's value (%s)",
453                       other_param, param_name, other_cluster.cluster_name,
454                       my_param)
455         if params_strict or param_name in check_params_strict:
456           err_count += 1
457
458     #
459     # Custom checks
460     #
461
462     # Check default hypervisor
463     my_defhyp = my_cluster.enabled_hypervisors[0]
464     other_defhyp = other_cluster.enabled_hypervisors[0]
465     if my_defhyp != other_defhyp:
466       logging.warning("The default hypervisor (%s) differs on %s, new"
467                       " instances will be created with this cluster's"
468                       " default hypervisor (%s)", other_defhyp,
469                       other_cluster.cluster_name, my_defhyp)
470
471     if (set(my_cluster.enabled_hypervisors) !=
472         set(other_cluster.enabled_hypervisors)):
473       logging.error("The set of enabled hypervisors (%s) on %s differs to"
474                     " this cluster's set (%s)",
475                     other_cluster.enabled_hypervisors,
476                     other_cluster.cluster_name, my_cluster.enabled_hypervisors)
477       err_count += 1
478
479     # Check hypervisor params for hypervisors we care about
480     for hyp in my_cluster.enabled_hypervisors:
481       for param in my_cluster.hvparams[hyp]:
482         my_value = my_cluster.hvparams[hyp][param]
483         other_value = other_cluster.hvparams[hyp][param]
484         if my_value != other_value:
485           logging.error("The value (%s) of the %s parameter of the %s"
486                         " hypervisor on %s differs to this cluster's parameter"
487                         " (%s)",
488                         other_value, param, hyp, other_cluster.cluster_name,
489                         my_value)
490           if params_strict:
491             err_count += 1
492
493     # Check os hypervisor params for hypervisors we care about
494     for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
495       for hyp in my_cluster.enabled_hypervisors:
496         my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
497         other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
498         if my_os_hvp != other_os_hvp:
499           logging.error("The OS parameters (%s) for the %s OS for the %s"
500                         " hypervisor on %s differs to this cluster's parameters"
501                         " (%s)",
502                         other_os_hvp, os_name, hyp, other_cluster.cluster_name,
503                         my_os_hvp)
504           if params_strict:
505             err_count += 1
506
507     #
508     # Warnings
509     #
510     if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
511       logging.warning("The modify_etc_hosts value (%s) differs on %s,"
512                       " this cluster's value (%s) will take precedence",
513                       other_cluster.modify_etc_hosts,
514                       other_cluster.cluster_name,
515                       my_cluster.modify_etc_hosts)
516
517     if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
518       logging.warning("The modify_ssh_setup value (%s) differs on %s,"
519                       " this cluster's value (%s) will take precedence",
520                       other_cluster.modify_ssh_setup,
521                       other_cluster.cluster_name,
522                       my_cluster.modify_ssh_setup)
523
524     #
525     # Actual merging
526     #
527     my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
528                                        other_cluster.reserved_lvs))
529
530     if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
531       logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
532                       " cluster's value (%s). The least permissive value (%s)"
533                       " will be used", other_cluster.prealloc_wipe_disks,
534                       other_cluster.cluster_name,
535                       my_cluster.prealloc_wipe_disks, True)
536       my_cluster.prealloc_wipe_disks = True
537
538     for os_, osparams in other_cluster.osparams.items():
539       if os_ not in my_cluster.osparams:
540         my_cluster.osparams[os_] = osparams
541       elif my_cluster.osparams[os_] != osparams:
542         logging.error("The OS parameters (%s) for the %s OS on %s differs to"
543                       " this cluster's parameters (%s)",
544                       osparams, os_, other_cluster.cluster_name,
545                       my_cluster.osparams[os_])
546         if params_strict:
547           err_count += 1
548
549     if err_count:
550       raise errors.ConfigurationError("Cluster config for %s has incompatible"
551                                       " values, please fix and re-run" %
552                                       other_cluster.cluster_name)
553
554   # R0201: Method could be a function
555   def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable=R0201
556     if os_name in cluster.os_hvp:
557       return cluster.os_hvp[os_name].get(hyp, None)
558     else:
559       return None
560
561   # R0201: Method could be a function
562   def _MergeNodeGroups(self, my_config, other_config):
563     """Adds foreign node groups
564
565     ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
566     """
567     # pylint: disable=R0201
568     logging.info("Node group conflict strategy: %s", self.groups)
569
570     my_grps = my_config.GetAllNodeGroupsInfo().values()
571     other_grps = other_config.GetAllNodeGroupsInfo().values()
572
573     # Check for node group naming conflicts:
574     conflicts = []
575     for other_grp in other_grps:
576       for my_grp in my_grps:
577         if other_grp.name == my_grp.name:
578           conflicts.append(other_grp)
579
580     if conflicts:
581       conflict_names = utils.CommaJoin([g.name for g in conflicts])
582       logging.info("Node groups in both local and remote cluster: %s",
583                    conflict_names)
584
585       # User hasn't specified how to handle conflicts
586       if not self.groups:
587         raise errors.CommandError("The following node group(s) are in both"
588                                   " clusters, and no merge strategy has been"
589                                   " supplied (see the --groups option): %s" %
590                                   conflict_names)
591
592       # User wants to rename conflicts
593       elif self.groups == _GROUPS_RENAME:
594         for grp in conflicts:
595           new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
596           logging.info("Renaming remote node group from %s to %s"
597                        " to resolve conflict", grp.name, new_name)
598           grp.name = new_name
599
600       # User wants to merge conflicting groups
601       elif self.groups == _GROUPS_MERGE:
602         for other_grp in conflicts:
603           logging.info("Merging local and remote '%s' groups", other_grp.name)
604           for node_name in other_grp.members[:]:
605             node = other_config.GetNodeInfo(node_name)
606             # Access to a protected member of a client class
607             # pylint: disable=W0212
608             other_config._UnlockedRemoveNodeFromGroup(node)
609
610             # Access to a protected member of a client class
611             # pylint: disable=W0212
612             my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
613
614             # Access to a protected member of a client class
615             # pylint: disable=W0212
616             my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
617             node.group = my_grp_uuid
618           # Remove from list of groups to add
619           other_grps.remove(other_grp)
620
621     for grp in other_grps:
622       #TODO: handle node group conflicts
623       my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
624
625   # R0201: Method could be a function
626   def _StartMasterDaemon(self, no_vote=False): # pylint: disable=R0201
627     """Starts the local master daemon.
628
629     @param no_vote: Should the masterd started without voting? default: False
630     @raise errors.CommandError: If unable to start daemon.
631
632     """
633     env = {}
634     if no_vote:
635       env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
636
637     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
638     if result.failed:
639       raise errors.CommandError("Couldn't start ganeti master."
640                                 " Fail reason: %s; output: %s" %
641                                 (result.fail_reason, result.output))
642
643   def _ReaddMergedNodesAndRedist(self):
644     """Readds all merging nodes and make sure their config is up-to-date.
645
646     @raise errors.CommandError: If anything fails.
647
648     """
649     for data in self.merger_data:
650       for node in data.nodes:
651         result = utils.RunCmd(["gnt-node", "add", "--readd",
652                                "--no-ssh-key-check", "--force-join", node])
653         if result.failed:
654           logging.error("%s failed to be readded. Reason: %s, output: %s",
655                          node, result.fail_reason, result.output)
656
657     result = utils.RunCmd(["gnt-cluster", "redist-conf"])
658     if result.failed:
659       raise errors.CommandError("Redistribution failed. Fail reason: %s;"
660                                 " output: %s" % (result.fail_reason,
661                                                 result.output))
662
663   # R0201: Method could be a function
664   def _StartupAllInstances(self): # pylint: disable=R0201
665     """Starts up all instances (locally).
666
667     @raise errors.CommandError: If unable to start clusters
668
669     """
670     result = utils.RunCmd(["gnt-instance", "startup", "--all",
671                            "--force-multiple"])
672     if result.failed:
673       raise errors.CommandError("Unable to start all instances."
674                                 " Fail reason: %s; output: %s" %
675                                 (result.fail_reason, result.output))
676
677   # R0201: Method could be a function
678   # TODO: make this overridable, for some verify errors
679   def _VerifyCluster(self): # pylint: disable=R0201
680     """Runs gnt-cluster verify to verify the health.
681
682     @raise errors.ProgrammError: If cluster fails on verification
683
684     """
685     result = utils.RunCmd(["gnt-cluster", "verify"])
686     if result.failed:
687       raise errors.CommandError("Verification of cluster failed."
688                                 " Fail reason: %s; output: %s" %
689                                 (result.fail_reason, result.output))
690
691   def Merge(self):
692     """Does the actual merge.
693
694     It runs all the steps in the right order and updates the user about steps
695     taken. Also it keeps track of rollback_steps to undo everything.
696
697     """
698     rbsteps = []
699     try:
700       logging.info("Pre cluster verification")
701       self._VerifyCluster()
702
703       logging.info("Prepare authorized_keys")
704       rbsteps.append("Remove our key from authorized_keys on nodes:"
705                      " %(nodes)s")
706       self._PrepareAuthorizedKeys()
707
708       rbsteps.append("Start all instances again on the merging"
709                      " clusters: %(clusters)s")
710       if self.stop_instances:
711         logging.info("Stopping merging instances (takes a while)")
712         self._StopMergingInstances()
713       logging.info("Checking that no instances are running on the mergees")
714       instances_running = self._CheckRunningInstances()
715       if instances_running:
716         raise errors.CommandError("Some instances are still running on the"
717                                   " mergees")
718       logging.info("Disable watcher")
719       self._DisableWatcher()
720       logging.info("Merging config")
721       self._FetchRemoteConfig()
722       logging.info("Removing master IPs on mergee master nodes")
723       self._RemoveMasterIps()
724       logging.info("Stop daemons on merging nodes")
725       self._StopDaemons()
726
727       logging.info("Stopping master daemon")
728       self._KillMasterDaemon()
729
730       rbsteps.append("Restore %s from another master candidate"
731                      " and restart master daemon" %
732                      constants.CLUSTER_CONF_FILE)
733       self._MergeConfig()
734       self._StartMasterDaemon(no_vote=True)
735
736       # Point of no return, delete rbsteps
737       del rbsteps[:]
738
739       logging.warning("We are at the point of no return. Merge can not easily"
740                       " be undone after this point.")
741       logging.info("Readd nodes")
742       self._ReaddMergedNodesAndRedist()
743
744       logging.info("Merge done, restart master daemon normally")
745       self._KillMasterDaemon()
746       self._StartMasterDaemon()
747
748       if self.restart == _RESTART_ALL:
749         logging.info("Starting instances again")
750         self._StartupAllInstances()
751       else:
752         logging.info("Not starting instances again")
753       logging.info("Post cluster verification")
754       self._VerifyCluster()
755     except errors.GenericError, e:
756       logging.exception(e)
757
758       if rbsteps:
759         nodes = Flatten([data.nodes for data in self.merger_data])
760         info = {
761           "clusters": self.clusters,
762           "nodes": nodes,
763           }
764         logging.critical("In order to rollback do the following:")
765         for step in rbsteps:
766           logging.critical("  * %s", step % info)
767       else:
768         logging.critical("Nothing to rollback.")
769
770       # TODO: Keep track of steps done for a flawless resume?
771
772   def Cleanup(self):
773     """Clean up our environment.
774
775     This cleans up remote private keys and configs and after that
776     deletes the temporary directory.
777
778     """
779     shutil.rmtree(self.work_dir)
780
781
782 def SetupLogging(options):
783   """Setting up logging infrastructure.
784
785   @param options: Parsed command line options
786
787   """
788   formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
789
790   stderr_handler = logging.StreamHandler()
791   stderr_handler.setFormatter(formatter)
792   if options.debug:
793     stderr_handler.setLevel(logging.NOTSET)
794   elif options.verbose:
795     stderr_handler.setLevel(logging.INFO)
796   else:
797     stderr_handler.setLevel(logging.WARNING)
798
799   root_logger = logging.getLogger("")
800   root_logger.setLevel(logging.NOTSET)
801   root_logger.addHandler(stderr_handler)
802
803
804 def main():
805   """Main routine.
806
807   """
808   program = os.path.basename(sys.argv[0])
809
810   parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
811                                  prog=program)
812   parser.add_option(cli.DEBUG_OPT)
813   parser.add_option(cli.VERBOSE_OPT)
814   parser.add_option(PAUSE_PERIOD_OPT)
815   parser.add_option(GROUPS_OPT)
816   parser.add_option(RESTART_OPT)
817   parser.add_option(PARAMS_OPT)
818   parser.add_option(SKIP_STOP_INSTANCES_OPT)
819
820   (options, args) = parser.parse_args()
821
822   SetupLogging(options)
823
824   if not args:
825     parser.error("No clusters specified")
826
827   cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
828                           options.groups, options.restart, options.params,
829                           options.stop_instances)
830   try:
831     try:
832       cluster_merger.Setup()
833       cluster_merger.Merge()
834     except errors.GenericError, e:
835       logging.exception(e)
836       return constants.EXIT_FAILURE
837   finally:
838     cluster_merger.Cleanup()
839
840   return constants.EXIT_SUCCESS
841
842
843 if __name__ == "__main__":
844   sys.exit(main())