Make Paramiko an optional dependency for listrunner
[ganeti-local] / tools / cluster-merge
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Tool to merge two or more clusters together.
22
23 The clusters have to run the same version of Ganeti!
24
25 """
26
27 # pylint: disable=C0103
28 # C0103: Invalid name cluster-merge
29
30 import logging
31 import os
32 import optparse
33 import shutil
34 import sys
35 import tempfile
36
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
43 from ganeti import pathutils
44
45
46 _GROUPS_MERGE = "merge"
47 _GROUPS_RENAME = "rename"
48 _CLUSTERMERGE_ECID = "clustermerge-ecid"
49 _RESTART_ALL = "all"
50 _RESTART_UP = "up"
51 _RESTART_NONE = "none"
52 _RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
53 _PARAMS_STRICT = "strict"
54 _PARAMS_WARN = "warn"
55 _PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
56
57
58 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
59                                   action="store", type="int",
60                                   dest="pause_period",
61                                   help=("Amount of time in seconds watcher"
62                                         " should be suspended from running"))
63 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
64                             choices=(_GROUPS_MERGE, _GROUPS_RENAME),
65                             dest="groups",
66                             help=("How to handle groups that have the"
67                                   " same name (One of: %s/%s)" %
68                                   (_GROUPS_MERGE, _GROUPS_RENAME)))
69 PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
70                             metavar="STRATEGY",
71                             choices=_PARAMS_CHOICES,
72                             dest="params",
73                             help=("How to handle params that have"
74                                   " different values (One of: %s/%s)" %
75                                   _PARAMS_CHOICES))
76
77 RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
78                              metavar="STRATEGY",
79                              choices=_RESTART_CHOICES,
80                              dest="restart",
81                              help=("How to handle restarting instances"
82                                    " same name (One of: %s/%s/%s)" %
83                                    _RESTART_CHOICES))
84
85 SKIP_STOP_INSTANCES_OPT = \
86   cli.cli_option("--skip-stop-instances", default=True, action="store_false",
87                  dest="stop_instances",
88                  help=("Don't stop the instances on the clusters, just check "
89                        "that none is running"))
90
91
92 def Flatten(unflattened_list):
93   """Flattens a list.
94
95   @param unflattened_list: A list of unflattened list objects.
96   @return: A flattened list
97
98   """
99   flattened_list = []
100
101   for item in unflattened_list:
102     if isinstance(item, list):
103       flattened_list.extend(Flatten(item))
104     else:
105       flattened_list.append(item)
106   return flattened_list
107
108
109 class MergerData(object):
110   """Container class to hold data used for merger.
111
112   """
113   def __init__(self, cluster, key_path, nodes, instances, master_node,
114                config_path=None):
115     """Initialize the container.
116
117     @param cluster: The name of the cluster
118     @param key_path: Path to the ssh private key used for authentication
119     @param nodes: List of online nodes in the merging cluster
120     @param instances: List of instances running on merging cluster
121     @param master_node: Name of the master node
122     @param config_path: Path to the merging cluster config
123
124     """
125     self.cluster = cluster
126     self.key_path = key_path
127     self.nodes = nodes
128     self.instances = instances
129     self.master_node = master_node
130     self.config_path = config_path
131
132
133 class Merger(object):
134   """Handling the merge.
135
136   """
137   RUNNING_STATUSES = frozenset([
138     constants.INSTST_RUNNING,
139     constants.INSTST_ERRORUP,
140     ])
141
142   def __init__(self, clusters, pause_period, groups, restart, params,
143                stop_instances):
144     """Initialize object with sane defaults and infos required.
145
146     @param clusters: The list of clusters to merge in
147     @param pause_period: The time watcher shall be disabled for
148     @param groups: How to handle group conflicts
149     @param restart: How to handle instance restart
150     @param stop_instances: Indicates whether the instances must be stopped
151                            (True) or if the Merger must only check if no
152                            instances are running on the mergee clusters (False)
153
154     """
155     self.merger_data = []
156     self.clusters = clusters
157     self.pause_period = pause_period
158     self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
159     (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
160     self.ssh_runner = ssh.SshRunner(self.cluster_name)
161     self.groups = groups
162     self.restart = restart
163     self.params = params
164     self.stop_instances = stop_instances
165     if self.restart == _RESTART_UP:
166       raise NotImplementedError
167
168   def Setup(self):
169     """Sets up our end so we can do the merger.
170
171     This method is setting us up as a preparation for the merger.
172     It makes the initial contact and gathers information needed.
173
174     @raise errors.RemoteError: for errors in communication/grabbing
175
176     """
177     (remote_path, _, _) = ssh.GetUserFiles("root")
178
179     if self.cluster_name in self.clusters:
180       raise errors.CommandError("Cannot merge cluster %s with itself" %
181                                 self.cluster_name)
182
183     # Fetch remotes private key
184     for cluster in self.clusters:
185       result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
186                             ask_key=False)
187       if result.failed:
188         raise errors.RemoteError("There was an error while grabbing ssh private"
189                                  " key from %s. Fail reason: %s; output: %s" %
190                                  (cluster, result.fail_reason, result.output))
191
192       key_path = utils.PathJoin(self.work_dir, cluster)
193       utils.WriteFile(key_path, mode=0600, data=result.stdout)
194
195       result = self._RunCmd(cluster, "gnt-node list -o name,offline"
196                             " --no-headers --separator=,", private_key=key_path)
197       if result.failed:
198         raise errors.RemoteError("Unable to retrieve list of nodes from %s."
199                                  " Fail reason: %s; output: %s" %
200                                  (cluster, result.fail_reason, result.output))
201       nodes_statuses = [line.split(",") for line in result.stdout.splitlines()]
202       nodes = [node_status[0] for node_status in nodes_statuses
203                if node_status[1] == "N"]
204
205       result = self._RunCmd(cluster, "gnt-instance list -o name --no-headers",
206                             private_key=key_path)
207       if result.failed:
208         raise errors.RemoteError("Unable to retrieve list of instances from"
209                                  " %s. Fail reason: %s; output: %s" %
210                                  (cluster, result.fail_reason, result.output))
211       instances = result.stdout.splitlines()
212
213       path = utils.PathJoin(pathutils.DATA_DIR, "ssconf_%s" %
214                             constants.SS_MASTER_NODE)
215       result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path)
216       if result.failed:
217         raise errors.RemoteError("Unable to retrieve the master node name from"
218                                  " %s. Fail reason: %s; output: %s" %
219                                  (cluster, result.fail_reason, result.output))
220       master_node = result.stdout.strip()
221
222       self.merger_data.append(MergerData(cluster, key_path, nodes, instances,
223                                          master_node))
224
225   def _PrepareAuthorizedKeys(self):
226     """Prepare the authorized_keys on every merging node.
227
228     This method add our public key to remotes authorized_key for further
229     communication.
230
231     """
232     (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
233     pub_key = utils.ReadFile(pub_key_file)
234
235     for data in self.merger_data:
236       for node in data.nodes:
237         result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
238                                      (auth_keys, pub_key)),
239                               private_key=data.key_path, max_attempts=3)
240
241         if result.failed:
242           raise errors.RemoteError("Unable to add our public key to %s in %s."
243                                    " Fail reason: %s; output: %s" %
244                                    (node, data.cluster, result.fail_reason,
245                                     result.output))
246
247   def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
248               strict_host_check=False, private_key=None, batch=True,
249               ask_key=False, max_attempts=1):
250     """Wrapping SshRunner.Run with default parameters.
251
252     For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
253
254     """
255     for _ in range(max_attempts):
256       result = self.ssh_runner.Run(hostname=hostname, command=command,
257                                    user=user, use_cluster_key=use_cluster_key,
258                                    strict_host_check=strict_host_check,
259                                    private_key=private_key, batch=batch,
260                                    ask_key=ask_key)
261       if not result.failed:
262         break
263
264     return result
265
266   def _CheckRunningInstances(self):
267     """Checks if on the clusters to be merged there are running instances
268
269     @rtype: boolean
270     @return: True if there are running instances, False otherwise
271
272     """
273     for cluster in self.clusters:
274       result = self._RunCmd(cluster, "gnt-instance list -o status")
275       if self.RUNNING_STATUSES.intersection(result.output.splitlines()):
276         return True
277
278     return False
279
280   def _StopMergingInstances(self):
281     """Stop instances on merging clusters.
282
283     """
284     for cluster in self.clusters:
285       result = self._RunCmd(cluster, "gnt-instance shutdown --all"
286                                      " --force-multiple")
287
288       if result.failed:
289         raise errors.RemoteError("Unable to stop instances on %s."
290                                  " Fail reason: %s; output: %s" %
291                                  (cluster, result.fail_reason, result.output))
292
293   def _DisableWatcher(self):
294     """Disable watch on all merging clusters, including ourself.
295
296     """
297     for cluster in ["localhost"] + self.clusters:
298       result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
299                                      self.pause_period)
300
301       if result.failed:
302         raise errors.RemoteError("Unable to pause watcher on %s."
303                                  " Fail reason: %s; output: %s" %
304                                  (cluster, result.fail_reason, result.output))
305
306   def _RemoveMasterIps(self):
307     """Removes the master IPs from the master nodes of each cluster.
308
309     """
310     for data in self.merger_data:
311       result = self._RunCmd(data.master_node,
312                             "gnt-cluster deactivate-master-ip --yes")
313
314       if result.failed:
315         raise errors.RemoteError("Unable to remove master IP on %s."
316                                  " Fail reason: %s; output: %s" %
317                                  (data.master_node,
318                                   result.fail_reason,
319                                   result.output))
320
321   def _StopDaemons(self):
322     """Stop all daemons on merging nodes.
323
324     """
325     cmd = "%s stop-all" % pathutils.DAEMON_UTIL
326     for data in self.merger_data:
327       for node in data.nodes:
328         result = self._RunCmd(node, cmd, max_attempts=3)
329
330         if result.failed:
331           raise errors.RemoteError("Unable to stop daemons on %s."
332                                    " Fail reason: %s; output: %s." %
333                                    (node, result.fail_reason, result.output))
334
335   def _FetchRemoteConfig(self):
336     """Fetches and stores remote cluster config from the master.
337
338     This step is needed before we can merge the config.
339
340     """
341     for data in self.merger_data:
342       result = self._RunCmd(data.cluster, "cat %s" %
343                                           pathutils.CLUSTER_CONF_FILE)
344
345       if result.failed:
346         raise errors.RemoteError("Unable to retrieve remote config on %s."
347                                  " Fail reason: %s; output %s" %
348                                  (data.cluster, result.fail_reason,
349                                   result.output))
350
351       data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
352                                         data.cluster)
353       utils.WriteFile(data.config_path, data=result.stdout)
354
355   # R0201: Method could be a function
356   def _KillMasterDaemon(self): # pylint: disable=R0201
357     """Kills the local master daemon.
358
359     @raise errors.CommandError: If unable to kill
360
361     """
362     result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
363     if result.failed:
364       raise errors.CommandError("Unable to stop master daemons."
365                                 " Fail reason: %s; output: %s" %
366                                 (result.fail_reason, result.output))
367
368   def _MergeConfig(self):
369     """Merges all foreign config into our own config.
370
371     """
372     my_config = config.ConfigWriter(offline=True)
373     fake_ec_id = 0 # Needs to be uniq over the whole config merge
374
375     for data in self.merger_data:
376       other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
377       self._MergeClusterConfigs(my_config, other_config)
378       self._MergeNodeGroups(my_config, other_config)
379
380       for node in other_config.GetNodeList():
381         node_info = other_config.GetNodeInfo(node)
382         # Offline the node, it will be reonlined later at node readd
383         node_info.master_candidate = False
384         node_info.drained = False
385         node_info.offline = True
386         my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
387         fake_ec_id += 1
388
389       for instance in other_config.GetInstanceList():
390         instance_info = other_config.GetInstanceInfo(instance)
391
392         # Update the DRBD port assignments
393         # This is a little bit hackish
394         for dsk in instance_info.disks:
395           if dsk.dev_type in constants.LDS_DRBD:
396             port = my_config.AllocatePort()
397
398             logical_id = list(dsk.logical_id)
399             logical_id[2] = port
400             dsk.logical_id = tuple(logical_id)
401
402             physical_id = list(dsk.physical_id)
403             physical_id[1] = physical_id[3] = port
404             dsk.physical_id = tuple(physical_id)
405
406         my_config.AddInstance(instance_info,
407                               _CLUSTERMERGE_ECID + str(fake_ec_id))
408         fake_ec_id += 1
409
410   def _MergeClusterConfigs(self, my_config, other_config):
411     """Checks that all relevant cluster parameters are compatible
412
413     """
414     my_cluster = my_config.GetClusterInfo()
415     other_cluster = other_config.GetClusterInfo()
416     err_count = 0
417
418     #
419     # Generic checks
420     #
421     check_params = [
422       "beparams",
423       "default_iallocator",
424       "drbd_usermode_helper",
425       "hidden_os",
426       "maintain_node_health",
427       "master_netdev",
428       "ndparams",
429       "nicparams",
430       "primary_ip_family",
431       "tags",
432       "uid_pool",
433       ]
434     check_params_strict = [
435       "volume_group_name",
436     ]
437     if constants.ENABLE_FILE_STORAGE:
438       check_params_strict.append("file_storage_dir")
439     if constants.ENABLE_SHARED_FILE_STORAGE:
440       check_params_strict.append("shared_file_storage_dir")
441     check_params.extend(check_params_strict)
442
443     if self.params == _PARAMS_STRICT:
444       params_strict = True
445     else:
446       params_strict = False
447
448     for param_name in check_params:
449       my_param = getattr(my_cluster, param_name)
450       other_param = getattr(other_cluster, param_name)
451       if my_param != other_param:
452         logging.error("The value (%s) of the cluster parameter %s on %s"
453                       " differs to this cluster's value (%s)",
454                       other_param, param_name, other_cluster.cluster_name,
455                       my_param)
456         if params_strict or param_name in check_params_strict:
457           err_count += 1
458
459     #
460     # Custom checks
461     #
462
463     # Check default hypervisor
464     my_defhyp = my_cluster.enabled_hypervisors[0]
465     other_defhyp = other_cluster.enabled_hypervisors[0]
466     if my_defhyp != other_defhyp:
467       logging.warning("The default hypervisor (%s) differs on %s, new"
468                       " instances will be created with this cluster's"
469                       " default hypervisor (%s)", other_defhyp,
470                       other_cluster.cluster_name, my_defhyp)
471
472     if (set(my_cluster.enabled_hypervisors) !=
473         set(other_cluster.enabled_hypervisors)):
474       logging.error("The set of enabled hypervisors (%s) on %s differs to"
475                     " this cluster's set (%s)",
476                     other_cluster.enabled_hypervisors,
477                     other_cluster.cluster_name, my_cluster.enabled_hypervisors)
478       err_count += 1
479
480     # Check hypervisor params for hypervisors we care about
481     for hyp in my_cluster.enabled_hypervisors:
482       for param in my_cluster.hvparams[hyp]:
483         my_value = my_cluster.hvparams[hyp][param]
484         other_value = other_cluster.hvparams[hyp][param]
485         if my_value != other_value:
486           logging.error("The value (%s) of the %s parameter of the %s"
487                         " hypervisor on %s differs to this cluster's parameter"
488                         " (%s)",
489                         other_value, param, hyp, other_cluster.cluster_name,
490                         my_value)
491           if params_strict:
492             err_count += 1
493
494     # Check os hypervisor params for hypervisors we care about
495     for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
496       for hyp in my_cluster.enabled_hypervisors:
497         my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
498         other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
499         if my_os_hvp != other_os_hvp:
500           logging.error("The OS parameters (%s) for the %s OS for the %s"
501                         " hypervisor on %s differs to this cluster's parameters"
502                         " (%s)",
503                         other_os_hvp, os_name, hyp, other_cluster.cluster_name,
504                         my_os_hvp)
505           if params_strict:
506             err_count += 1
507
508     #
509     # Warnings
510     #
511     if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
512       logging.warning("The modify_etc_hosts value (%s) differs on %s,"
513                       " this cluster's value (%s) will take precedence",
514                       other_cluster.modify_etc_hosts,
515                       other_cluster.cluster_name,
516                       my_cluster.modify_etc_hosts)
517
518     if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
519       logging.warning("The modify_ssh_setup value (%s) differs on %s,"
520                       " this cluster's value (%s) will take precedence",
521                       other_cluster.modify_ssh_setup,
522                       other_cluster.cluster_name,
523                       my_cluster.modify_ssh_setup)
524
525     #
526     # Actual merging
527     #
528     my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
529                                        other_cluster.reserved_lvs))
530
531     if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
532       logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
533                       " cluster's value (%s). The least permissive value (%s)"
534                       " will be used", other_cluster.prealloc_wipe_disks,
535                       other_cluster.cluster_name,
536                       my_cluster.prealloc_wipe_disks, True)
537       my_cluster.prealloc_wipe_disks = True
538
539     for os_, osparams in other_cluster.osparams.items():
540       if os_ not in my_cluster.osparams:
541         my_cluster.osparams[os_] = osparams
542       elif my_cluster.osparams[os_] != osparams:
543         logging.error("The OS parameters (%s) for the %s OS on %s differs to"
544                       " this cluster's parameters (%s)",
545                       osparams, os_, other_cluster.cluster_name,
546                       my_cluster.osparams[os_])
547         if params_strict:
548           err_count += 1
549
550     if err_count:
551       raise errors.ConfigurationError("Cluster config for %s has incompatible"
552                                       " values, please fix and re-run" %
553                                       other_cluster.cluster_name)
554
555   # R0201: Method could be a function
556   def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable=R0201
557     if os_name in cluster.os_hvp:
558       return cluster.os_hvp[os_name].get(hyp, None)
559     else:
560       return None
561
562   # R0201: Method could be a function
563   def _MergeNodeGroups(self, my_config, other_config):
564     """Adds foreign node groups
565
566     ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
567     """
568     # pylint: disable=R0201
569     logging.info("Node group conflict strategy: %s", self.groups)
570
571     my_grps = my_config.GetAllNodeGroupsInfo().values()
572     other_grps = other_config.GetAllNodeGroupsInfo().values()
573
574     # Check for node group naming conflicts:
575     conflicts = []
576     for other_grp in other_grps:
577       for my_grp in my_grps:
578         if other_grp.name == my_grp.name:
579           conflicts.append(other_grp)
580
581     if conflicts:
582       conflict_names = utils.CommaJoin([g.name for g in conflicts])
583       logging.info("Node groups in both local and remote cluster: %s",
584                    conflict_names)
585
586       # User hasn't specified how to handle conflicts
587       if not self.groups:
588         raise errors.CommandError("The following node group(s) are in both"
589                                   " clusters, and no merge strategy has been"
590                                   " supplied (see the --groups option): %s" %
591                                   conflict_names)
592
593       # User wants to rename conflicts
594       elif self.groups == _GROUPS_RENAME:
595         for grp in conflicts:
596           new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
597           logging.info("Renaming remote node group from %s to %s"
598                        " to resolve conflict", grp.name, new_name)
599           grp.name = new_name
600
601       # User wants to merge conflicting groups
602       elif self.groups == _GROUPS_MERGE:
603         for other_grp in conflicts:
604           logging.info("Merging local and remote '%s' groups", other_grp.name)
605           for node_name in other_grp.members[:]:
606             node = other_config.GetNodeInfo(node_name)
607             # Access to a protected member of a client class
608             # pylint: disable=W0212
609             other_config._UnlockedRemoveNodeFromGroup(node)
610
611             # Access to a protected member of a client class
612             # pylint: disable=W0212
613             my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
614
615             # Access to a protected member of a client class
616             # pylint: disable=W0212
617             my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
618             node.group = my_grp_uuid
619           # Remove from list of groups to add
620           other_grps.remove(other_grp)
621
622     for grp in other_grps:
623       #TODO: handle node group conflicts
624       my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
625
626   # R0201: Method could be a function
627   def _StartMasterDaemon(self, no_vote=False): # pylint: disable=R0201
628     """Starts the local master daemon.
629
630     @param no_vote: Should the masterd started without voting? default: False
631     @raise errors.CommandError: If unable to start daemon.
632
633     """
634     env = {}
635     if no_vote:
636       env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
637
638     result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
639     if result.failed:
640       raise errors.CommandError("Couldn't start ganeti master."
641                                 " Fail reason: %s; output: %s" %
642                                 (result.fail_reason, result.output))
643
644   def _ReaddMergedNodesAndRedist(self):
645     """Readds all merging nodes and make sure their config is up-to-date.
646
647     @raise errors.CommandError: If anything fails.
648
649     """
650     for data in self.merger_data:
651       for node in data.nodes:
652         logging.info("Readding node %s", node)
653         result = utils.RunCmd(["gnt-node", "add", "--readd",
654                                "--no-ssh-key-check", node])
655         if result.failed:
656           logging.error("%s failed to be readded. Reason: %s, output: %s",
657                          node, result.fail_reason, result.output)
658
659     result = utils.RunCmd(["gnt-cluster", "redist-conf"])
660     if result.failed:
661       raise errors.CommandError("Redistribution failed. Fail reason: %s;"
662                                 " output: %s" % (result.fail_reason,
663                                                  result.output))
664
665   # R0201: Method could be a function
666   def _StartupAllInstances(self): # pylint: disable=R0201
667     """Starts up all instances (locally).
668
669     @raise errors.CommandError: If unable to start clusters
670
671     """
672     result = utils.RunCmd(["gnt-instance", "startup", "--all",
673                            "--force-multiple"])
674     if result.failed:
675       raise errors.CommandError("Unable to start all instances."
676                                 " Fail reason: %s; output: %s" %
677                                 (result.fail_reason, result.output))
678
679   # R0201: Method could be a function
680   # TODO: make this overridable, for some verify errors
681   def _VerifyCluster(self): # pylint: disable=R0201
682     """Runs gnt-cluster verify to verify the health.
683
684     @raise errors.ProgrammError: If cluster fails on verification
685
686     """
687     result = utils.RunCmd(["gnt-cluster", "verify"])
688     if result.failed:
689       raise errors.CommandError("Verification of cluster failed."
690                                 " Fail reason: %s; output: %s" %
691                                 (result.fail_reason, result.output))
692
693   def Merge(self):
694     """Does the actual merge.
695
696     It runs all the steps in the right order and updates the user about steps
697     taken. Also it keeps track of rollback_steps to undo everything.
698
699     """
700     rbsteps = []
701     try:
702       logging.info("Pre cluster verification")
703       self._VerifyCluster()
704
705       logging.info("Prepare authorized_keys")
706       rbsteps.append("Remove our key from authorized_keys on nodes:"
707                      " %(nodes)s")
708       self._PrepareAuthorizedKeys()
709
710       rbsteps.append("Start all instances again on the merging"
711                      " clusters: %(clusters)s")
712       if self.stop_instances:
713         logging.info("Stopping merging instances (takes a while)")
714         self._StopMergingInstances()
715       logging.info("Checking that no instances are running on the mergees")
716       instances_running = self._CheckRunningInstances()
717       if instances_running:
718         raise errors.CommandError("Some instances are still running on the"
719                                   " mergees")
720       logging.info("Disable watcher")
721       self._DisableWatcher()
722       logging.info("Merging config")
723       self._FetchRemoteConfig()
724       logging.info("Removing master IPs on mergee master nodes")
725       self._RemoveMasterIps()
726       logging.info("Stop daemons on merging nodes")
727       self._StopDaemons()
728
729       logging.info("Stopping master daemon")
730       self._KillMasterDaemon()
731
732       rbsteps.append("Restore %s from another master candidate"
733                      " and restart master daemon" %
734                      pathutils.CLUSTER_CONF_FILE)
735       self._MergeConfig()
736       self._StartMasterDaemon(no_vote=True)
737
738       # Point of no return, delete rbsteps
739       del rbsteps[:]
740
741       logging.warning("We are at the point of no return. Merge can not easily"
742                       " be undone after this point.")
743       logging.info("Readd nodes")
744       self._ReaddMergedNodesAndRedist()
745
746       logging.info("Merge done, restart master daemon normally")
747       self._KillMasterDaemon()
748       self._StartMasterDaemon()
749
750       if self.restart == _RESTART_ALL:
751         logging.info("Starting instances again")
752         self._StartupAllInstances()
753       else:
754         logging.info("Not starting instances again")
755       logging.info("Post cluster verification")
756       self._VerifyCluster()
757     except errors.GenericError, e:
758       logging.exception(e)
759
760       if rbsteps:
761         nodes = Flatten([data.nodes for data in self.merger_data])
762         info = {
763           "clusters": self.clusters,
764           "nodes": nodes,
765           }
766         logging.critical("In order to rollback do the following:")
767         for step in rbsteps:
768           logging.critical("  * %s", step % info)
769       else:
770         logging.critical("Nothing to rollback.")
771
772       # TODO: Keep track of steps done for a flawless resume?
773
774   def Cleanup(self):
775     """Clean up our environment.
776
777     This cleans up remote private keys and configs and after that
778     deletes the temporary directory.
779
780     """
781     shutil.rmtree(self.work_dir)
782
783
784 def SetupLogging(options):
785   """Setting up logging infrastructure.
786
787   @param options: Parsed command line options
788
789   """
790   formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
791
792   stderr_handler = logging.StreamHandler()
793   stderr_handler.setFormatter(formatter)
794   if options.debug:
795     stderr_handler.setLevel(logging.NOTSET)
796   elif options.verbose:
797     stderr_handler.setLevel(logging.INFO)
798   else:
799     stderr_handler.setLevel(logging.WARNING)
800
801   root_logger = logging.getLogger("")
802   root_logger.setLevel(logging.NOTSET)
803   root_logger.addHandler(stderr_handler)
804
805
806 def main():
807   """Main routine.
808
809   """
810   program = os.path.basename(sys.argv[0])
811
812   parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
813                                  prog=program)
814   parser.add_option(cli.DEBUG_OPT)
815   parser.add_option(cli.VERBOSE_OPT)
816   parser.add_option(PAUSE_PERIOD_OPT)
817   parser.add_option(GROUPS_OPT)
818   parser.add_option(RESTART_OPT)
819   parser.add_option(PARAMS_OPT)
820   parser.add_option(SKIP_STOP_INSTANCES_OPT)
821
822   (options, args) = parser.parse_args()
823
824   SetupLogging(options)
825
826   if not args:
827     parser.error("No clusters specified")
828
829   cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
830                           options.groups, options.restart, options.params,
831                           options.stop_instances)
832   try:
833     try:
834       cluster_merger.Setup()
835       cluster_merger.Merge()
836     except errors.GenericError, e:
837       logging.exception(e)
838       return constants.EXIT_FAILURE
839   finally:
840     cluster_merger.Cleanup()
841
842   return constants.EXIT_SUCCESS
843
844
845 if __name__ == "__main__":
846   sys.exit(main())