Fix wrong method name in cluster-merge
[ganeti-local] / tools / cluster-merge
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Tool to merge two or more clusters together.
22
23 The clusters have to run the same version of Ganeti!
24
25 """
26
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
29
30 import logging
31 import os
32 import optparse
33 import shutil
34 import sys
35 import tempfile
36
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
43
44
45 _GROUPS_MERGE = "merge"
46 _GROUPS_RENAME = "rename"
47 _CLUSTERMERGE_ECID = "clustermerge-ecid"
48 _RESTART_ALL = "all"
49 _RESTART_UP = "up"
50 _RESTART_NONE = "none"
51 _RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
52 _PARAMS_STRICT = "strict"
53 _PARAMS_WARN = "warn"
54 _PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
55
56
57 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
58                                   action="store", type="int",
59                                   dest="pause_period",
60                                   help=("Amount of time in seconds watcher"
61                                         " should be suspended from running"))
62 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
63                             choices=(_GROUPS_MERGE, _GROUPS_RENAME),
64                             dest="groups",
65                             help=("How to handle groups that have the"
66                                   " same name (One of: %s/%s)" %
67                                   (_GROUPS_MERGE, _GROUPS_RENAME)))
68 PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
69                             metavar="STRATEGY",
70                             choices=_PARAMS_CHOICES,
71                             dest="params",
72                             help=("How to handle params that have"
73                                   " different values (One of: %s/%s)" %
74                                   _PARAMS_CHOICES))
75
76 RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
77                              metavar="STRATEGY",
78                              choices=_RESTART_CHOICES,
79                              dest="restart",
80                              help=("How to handle restarting instances"
81                                    " same name (One of: %s/%s/%s)" %
82                                    _RESTART_CHOICES))
83
84 SKIP_STOP_INSTANCES_OPT = cli.cli_option("--skip-stop-instances", default=True,
85                                          action="store_false", type="boolean",
86                                          dest="stop_instances",
87                                          help=("Don't stop the instances on the"
88                                                " clusters, just check that none"
89                                                " is running"))
90
91
92 def Flatten(unflattened_list):
93   """Flattens a list.
94
95   @param unflattened_list: A list of unflattened list objects.
96   @return: A flattened list
97
98   """
99   flattened_list = []
100
101   for item in unflattened_list:
102     if isinstance(item, list):
103       flattened_list.extend(Flatten(item))
104     else:
105       flattened_list.append(item)
106   return flattened_list
107
108
109 class MergerData(object):
110   """Container class to hold data used for merger.
111
112   """
113   def __init__(self, cluster, key_path, nodes, instances, config_path=None):
114     """Initialize the container.
115
116     @param cluster: The name of the cluster
117     @param key_path: Path to the ssh private key used for authentication
118     @param nodes: List of online nodes in the merging cluster
119     @param instances: List of instances running on merging cluster
120     @param config_path: Path to the merging cluster config
121
122     """
123     self.cluster = cluster
124     self.key_path = key_path
125     self.nodes = nodes
126     self.instances = instances
127     self.config_path = config_path
128
129
130 class Merger(object):
131   """Handling the merge.
132
133   """
134   RUNNING_STATUSES = frozenset([
135     constants.INSTST_RUNNING,
136     constants.INSTST_ERRORUP,
137     ])
138   def __init__(self, clusters, pause_period, groups, restart, params,
139                stop_instances):
140     """Initialize object with sane defaults and infos required.
141
142     @param clusters: The list of clusters to merge in
143     @param pause_period: The time watcher shall be disabled for
144     @param groups: How to handle group conflicts
145     @param restart: How to handle instance restart
146     @param stop_instances: Indicates whether the instances must be stopped
147                            (True) or if the Merger must only check if no
148                            instances are running on the mergee clusters (False)
149
150     """
151     self.merger_data = []
152     self.clusters = clusters
153     self.pause_period = pause_period
154     self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
155     (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
156     self.ssh_runner = ssh.SshRunner(self.cluster_name)
157     self.groups = groups
158     self.restart = restart
159     self.params = params
160     self.stop_instances = stop_instances
161     if self.restart == _RESTART_UP:
162       raise NotImplementedError
163
164
165   def Setup(self):
166     """Sets up our end so we can do the merger.
167
168     This method is setting us up as a preparation for the merger.
169     It makes the initial contact and gathers information needed.
170
171     @raise errors.RemoteError: for errors in communication/grabbing
172
173     """
174     (remote_path, _, _) = ssh.GetUserFiles("root")
175
176     if self.cluster_name in self.clusters:
177       raise errors.CommandError("Cannot merge cluster %s with itself" %
178                                 self.cluster_name)
179
180     # Fetch remotes private key
181     for cluster in self.clusters:
182       result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
183                             ask_key=False)
184       if result.failed:
185         raise errors.RemoteError("There was an error while grabbing ssh private"
186                                  " key from %s. Fail reason: %s; output: %s" %
187                                  (cluster, result.fail_reason, result.output))
188
189       key_path = utils.PathJoin(self.work_dir, cluster)
190       utils.WriteFile(key_path, mode=0600, data=result.stdout)
191
192       result = self._RunCmd(cluster, "gnt-node list -o name,offline"
193                             " --no-header --separator=,", private_key=key_path)
194       if result.failed:
195         raise errors.RemoteError("Unable to retrieve list of nodes from %s."
196                                  " Fail reason: %s; output: %s" %
197                                  (cluster, result.fail_reason, result.output))
198       nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
199       nodes = [node_status[0] for node_status in nodes_statuses
200                if node_status[1] == "N"]
201
202       result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
203                             private_key=key_path)
204       if result.failed:
205         raise errors.RemoteError("Unable to retrieve list of instances from"
206                                  " %s. Fail reason: %s; output: %s" %
207                                  (cluster, result.fail_reason, result.output))
208       instances = result.stdout.splitlines()
209
210       self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
211
212   def _PrepareAuthorizedKeys(self):
213     """Prepare the authorized_keys on every merging node.
214
215     This method add our public key to remotes authorized_key for further
216     communication.
217
218     """
219     (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
220     pub_key = utils.ReadFile(pub_key_file)
221
222     for data in self.merger_data:
223       for node in data.nodes:
224         result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
225                                      (auth_keys, pub_key)),
226                               private_key=data.key_path, max_attempts=3)
227
228         if result.failed:
229           raise errors.RemoteError("Unable to add our public key to %s in %s."
230                                    " Fail reason: %s; output: %s" %
231                                    (node, data.cluster, result.fail_reason,
232                                     result.output))
233
234   def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
235               strict_host_check=False, private_key=None, batch=True,
236               ask_key=False, max_attempts=1):
237     """Wrapping SshRunner.Run with default parameters.
238
239     For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
240
241     """
242     for _ in range(max_attempts):
243       result = self.ssh_runner.Run(hostname=hostname, command=command,
244                                  user=user, use_cluster_key=use_cluster_key,
245                                  strict_host_check=strict_host_check,
246                                  private_key=private_key, batch=batch,
247                                  ask_key=ask_key)
248       if not result.failed:
249         break
250
251     return result
252
253   def _CheckRunningInstances(self):
254     """Checks if on the clusters to be merged there are running instances
255
256     @rtype: boolean
257     @return: True if there are running instances, False otherwise
258
259     """
260     for cluster in self.clusters:
261       result = self._RunCmd(cluster, "gnt-instance list -o status")
262       if self.RUNNING_STATUSES.intersection(result.output.splitlines()):
263         return True
264
265     return False
266
267   def _StopMergingInstances(self):
268     """Stop instances on merging clusters.
269
270     """
271     for cluster in self.clusters:
272       result = self._RunCmd(cluster, "gnt-instance shutdown --all"
273                                      " --force-multiple")
274
275       if result.failed:
276         raise errors.RemoteError("Unable to stop instances on %s."
277                                  " Fail reason: %s; output: %s" %
278                                  (cluster, result.fail_reason, result.output))
279
280   def _DisableWatcher(self):
281     """Disable watch on all merging clusters, including ourself.
282
283     """
284     for cluster in ["localhost"] + self.clusters:
285       result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
286                                      self.pause_period)
287
288       if result.failed:
289         raise errors.RemoteError("Unable to pause watcher on %s."
290                                  " Fail reason: %s; output: %s" %
291                                  (cluster, result.fail_reason, result.output))
292
293   def _StopDaemons(self):
294     """Stop all daemons on merging nodes.
295
296     """
297     cmd = "%s stop-all" % constants.DAEMON_UTIL
298     for data in self.merger_data:
299       for node in data.nodes:
300         result = self._RunCmd(node, cmd, max_attempts=3)
301
302         if result.failed:
303           raise errors.RemoteError("Unable to stop daemons on %s."
304                                    " Fail reason: %s; output: %s." %
305                                    (node, result.fail_reason, result.output))
306
307   def _FetchRemoteConfig(self):
308     """Fetches and stores remote cluster config from the master.
309
310     This step is needed before we can merge the config.
311
312     """
313     for data in self.merger_data:
314       result = self._RunCmd(data.cluster, "cat %s" %
315                                           constants.CLUSTER_CONF_FILE)
316
317       if result.failed:
318         raise errors.RemoteError("Unable to retrieve remote config on %s."
319                                  " Fail reason: %s; output %s" %
320                                  (data.cluster, result.fail_reason,
321                                   result.output))
322
323       data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
324                                         data.cluster)
325       utils.WriteFile(data.config_path, data=result.stdout)
326
327   # R0201: Method could be a function
328   def _KillMasterDaemon(self): # pylint: disable-msg=R0201
329     """Kills the local master daemon.
330
331     @raise errors.CommandError: If unable to kill
332
333     """
334     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
335     if result.failed:
336       raise errors.CommandError("Unable to stop master daemons."
337                                 " Fail reason: %s; output: %s" %
338                                 (result.fail_reason, result.output))
339
340   def _MergeConfig(self):
341     """Merges all foreign config into our own config.
342
343     """
344     my_config = config.ConfigWriter(offline=True)
345     fake_ec_id = 0 # Needs to be uniq over the whole config merge
346
347     for data in self.merger_data:
348       other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
349       self._MergeClusterConfigs(my_config, other_config)
350       self._MergeNodeGroups(my_config, other_config)
351
352       for node in other_config.GetNodeList():
353         node_info = other_config.GetNodeInfo(node)
354         # Offline the node, it will be reonlined later at node readd
355         node_info.master_candidate = False
356         node_info.drained = False
357         node_info.offline = True
358         my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
359         fake_ec_id += 1
360
361       for instance in other_config.GetInstanceList():
362         instance_info = other_config.GetInstanceInfo(instance)
363
364         # Update the DRBD port assignments
365         # This is a little bit hackish
366         for dsk in instance_info.disks:
367           if dsk.dev_type in constants.LDS_DRBD:
368             port = my_config.AllocatePort()
369
370             logical_id = list(dsk.logical_id)
371             logical_id[2] = port
372             dsk.logical_id = tuple(logical_id)
373
374             physical_id = list(dsk.physical_id)
375             physical_id[1] = physical_id[3] = port
376             dsk.physical_id = tuple(physical_id)
377
378         my_config.AddInstance(instance_info,
379                               _CLUSTERMERGE_ECID + str(fake_ec_id))
380         fake_ec_id += 1
381
382   def _MergeClusterConfigs(self, my_config, other_config):
383     """Checks that all relevant cluster parameters are compatible
384
385     """
386     my_cluster = my_config.GetClusterInfo()
387     other_cluster = other_config.GetClusterInfo()
388     err_count = 0
389
390     #
391     # Generic checks
392     #
393     check_params = [
394       "beparams",
395       "default_iallocator",
396       "drbd_usermode_helper",
397       "hidden_os",
398       "maintain_node_health",
399       "master_netdev",
400       "ndparams",
401       "nicparams",
402       "primary_ip_family",
403       "tags",
404       "uid_pool",
405       ]
406     check_params_strict = [
407       "volume_group_name",
408     ]
409     if constants.ENABLE_FILE_STORAGE:
410       check_params_strict.append("file_storage_dir")
411     if constants.ENABLE_SHARED_FILE_STORAGE:
412       check_params_strict.append("shared_file_storage_dir")
413     check_params.extend(check_params_strict)
414
415     if self.params == _PARAMS_STRICT:
416       params_strict = True
417     else:
418       params_strict = False
419
420     for param_name in check_params:
421       my_param = getattr(my_cluster, param_name)
422       other_param = getattr(other_cluster, param_name)
423       if my_param != other_param:
424         logging.error("The value (%s) of the cluster parameter %s on %s"
425                       " differs to this cluster's value (%s)",
426                       other_param, param_name, other_cluster.cluster_name,
427                       my_param)
428         if params_strict or param_name in check_params_strict:
429           err_count += 1
430
431     #
432     # Custom checks
433     #
434
435     # Check default hypervisor
436     my_defhyp = my_cluster.enabled_hypervisors[0]
437     other_defhyp = other_cluster.enabled_hypervisors[0]
438     if my_defhyp != other_defhyp:
439       logging.warning("The default hypervisor (%s) differs on %s, new"
440                       " instances will be created with this cluster's"
441                       " default hypervisor (%s)", other_defhyp,
442                       other_cluster.cluster_name, my_defhyp)
443
444     if (set(my_cluster.enabled_hypervisors) !=
445         set(other_cluster.enabled_hypervisors)):
446       logging.error("The set of enabled hypervisors (%s) on %s differs to"
447                     " this cluster's set (%s)",
448                     other_cluster.enabled_hypervisors,
449                     other_cluster.cluster_name, my_cluster.enabled_hypervisors)
450       err_count += 1
451
452     # Check hypervisor params for hypervisors we care about
453     for hyp in my_cluster.enabled_hypervisors:
454       for param in my_cluster.hvparams[hyp]:
455         my_value = my_cluster.hvparams[hyp][param]
456         other_value = other_cluster.hvparams[hyp][param]
457         if my_value != other_value:
458           logging.error("The value (%s) of the %s parameter of the %s"
459                         " hypervisor on %s differs to this cluster's parameter"
460                         " (%s)",
461                         other_value, param, hyp, other_cluster.cluster_name,
462                         my_value)
463           if params_strict:
464             err_count += 1
465
466     # Check os hypervisor params for hypervisors we care about
467     for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
468       for hyp in my_cluster.enabled_hypervisors:
469         my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
470         other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
471         if my_os_hvp != other_os_hvp:
472           logging.error("The OS parameters (%s) for the %s OS for the %s"
473                         " hypervisor on %s differs to this cluster's parameters"
474                         " (%s)",
475                         other_os_hvp, os_name, hyp, other_cluster.cluster_name,
476                         my_os_hvp)
477           if params_strict:
478             err_count += 1
479
480     #
481     # Warnings
482     #
483     if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
484       logging.warning("The modify_etc_hosts value (%s) differs on %s,"
485                       " this cluster's value (%s) will take precedence",
486                       other_cluster.modify_etc_hosts,
487                       other_cluster.cluster_name,
488                       my_cluster.modify_etc_hosts)
489
490     if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
491       logging.warning("The modify_ssh_setup value (%s) differs on %s,"
492                       " this cluster's value (%s) will take precedence",
493                       other_cluster.modify_ssh_setup,
494                       other_cluster.cluster_name,
495                       my_cluster.modify_ssh_setup)
496
497     #
498     # Actual merging
499     #
500     my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
501                                        other_cluster.reserved_lvs))
502
503     if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
504       logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
505                       " cluster's value (%s). The least permissive value (%s)"
506                       " will be used", other_cluster.prealloc_wipe_disks,
507                       other_cluster.cluster_name,
508                       my_cluster.prealloc_wipe_disks, True)
509       my_cluster.prealloc_wipe_disks = True
510
511     for os_, osparams in other_cluster.osparams.items():
512       if os_ not in my_cluster.osparams:
513         my_cluster.osparams[os_] = osparams
514       elif my_cluster.osparams[os_] != osparams:
515         logging.error("The OS parameters (%s) for the %s OS on %s differs to"
516                       " this cluster's parameters (%s)",
517                       osparams, os_, other_cluster.cluster_name,
518                       my_cluster.osparams[os_])
519         if params_strict:
520           err_count += 1
521
522     if err_count:
523       raise errors.ConfigurationError("Cluster config for %s has incompatible"
524                                       " values, please fix and re-run" %
525                                       other_cluster.cluster_name)
526
527   # R0201: Method could be a function
528   def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201
529     if os_name in cluster.os_hvp:
530       return cluster.os_hvp[os_name].get(hyp, None)
531     else:
532       return None
533
534   # R0201: Method could be a function
535   def _MergeNodeGroups(self, my_config, other_config):
536     """Adds foreign node groups
537
538     ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
539     """
540     # pylint: disable-msg=R0201
541     logging.info("Node group conflict strategy: %s", self.groups)
542
543     my_grps = my_config.GetAllNodeGroupsInfo().values()
544     other_grps = other_config.GetAllNodeGroupsInfo().values()
545
546     # Check for node group naming conflicts:
547     conflicts = []
548     for other_grp in other_grps:
549       for my_grp in my_grps:
550         if other_grp.name == my_grp.name:
551           conflicts.append(other_grp)
552
553     if conflicts:
554       conflict_names = utils.CommaJoin([g.name for g in conflicts])
555       logging.info("Node groups in both local and remote cluster: %s",
556                    conflict_names)
557
558       # User hasn't specified how to handle conflicts
559       if not self.groups:
560         raise errors.CommandError("The following node group(s) are in both"
561                                   " clusters, and no merge strategy has been"
562                                   " supplied (see the --groups option): %s" %
563                                   conflict_names)
564
565       # User wants to rename conflicts
566       elif self.groups == _GROUPS_RENAME:
567         for grp in conflicts:
568           new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
569           logging.info("Renaming remote node group from %s to %s"
570                        " to resolve conflict", grp.name, new_name)
571           grp.name = new_name
572
573       # User wants to merge conflicting groups
574       elif self.groups == _GROUPS_MERGE:
575         for other_grp in conflicts:
576           logging.info("Merging local and remote '%s' groups", other_grp.name)
577           for node_name in other_grp.members[:]:
578             node = other_config.GetNodeInfo(node_name)
579             # Access to a protected member of a client class
580             # pylint: disable-msg=W0212
581             other_config._UnlockedRemoveNodeFromGroup(node)
582
583             # Access to a protected member of a client class
584             # pylint: disable-msg=W0212
585             my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
586
587             # Access to a protected member of a client class
588             # pylint: disable-msg=W0212
589             my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
590             node.group = my_grp_uuid
591           # Remove from list of groups to add
592           other_grps.remove(other_grp)
593
594     for grp in other_grps:
595       #TODO: handle node group conflicts
596       my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
597
598   # R0201: Method could be a function
599   def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
600     """Starts the local master daemon.
601
602     @param no_vote: Should the masterd started without voting? default: False
603     @raise errors.CommandError: If unable to start daemon.
604
605     """
606     env = {}
607     if no_vote:
608       env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
609
610     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
611     if result.failed:
612       raise errors.CommandError("Couldn't start ganeti master."
613                                 " Fail reason: %s; output: %s" %
614                                 (result.fail_reason, result.output))
615
616   def _ReaddMergedNodesAndRedist(self):
617     """Readds all merging nodes and make sure their config is up-to-date.
618
619     @raise errors.CommandError: If anything fails.
620
621     """
622     for data in self.merger_data:
623       for node in data.nodes:
624         result = utils.RunCmd(["gnt-node", "add", "--readd",
625                                "--no-ssh-key-check", "--force-join", node])
626         if result.failed:
627           logging.error("%s failed to be readded. Reason: %s, output: %s",
628                          node, result.fail_reason, result.output)
629
630     result = utils.RunCmd(["gnt-cluster", "redist-conf"])
631     if result.failed:
632       raise errors.CommandError("Redistribution failed. Fail reason: %s;"
633                                 " output: %s" % (result.fail_reason,
634                                                 result.output))
635
636   # R0201: Method could be a function
637   def _StartupAllInstances(self): # pylint: disable-msg=R0201
638     """Starts up all instances (locally).
639
640     @raise errors.CommandError: If unable to start clusters
641
642     """
643     result = utils.RunCmd(["gnt-instance", "startup", "--all",
644                            "--force-multiple"])
645     if result.failed:
646       raise errors.CommandError("Unable to start all instances."
647                                 " Fail reason: %s; output: %s" %
648                                 (result.fail_reason, result.output))
649
650   # R0201: Method could be a function
651   # TODO: make this overridable, for some verify errors
652   def _VerifyCluster(self): # pylint: disable-msg=R0201
653     """Runs gnt-cluster verify to verify the health.
654
655     @raise errors.ProgrammError: If cluster fails on verification
656
657     """
658     result = utils.RunCmd(["gnt-cluster", "verify"])
659     if result.failed:
660       raise errors.CommandError("Verification of cluster failed."
661                                 " Fail reason: %s; output: %s" %
662                                 (result.fail_reason, result.output))
663
664   def Merge(self):
665     """Does the actual merge.
666
667     It runs all the steps in the right order and updates the user about steps
668     taken. Also it keeps track of rollback_steps to undo everything.
669
670     """
671     rbsteps = []
672     try:
673       logging.info("Pre cluster verification")
674       self._VerifyCluster()
675
676       logging.info("Prepare authorized_keys")
677       rbsteps.append("Remove our key from authorized_keys on nodes:"
678                      " %(nodes)s")
679       self._PrepareAuthorizedKeys()
680
681       rbsteps.append("Start all instances again on the merging"
682                      " clusters: %(clusters)s")
683       if self.stop_instances:
684         logging.info("Stopping merging instances (takes a while)")
685         self._StopMergingInstances()
686       logging.info("Checking that no instances are running on the mergees")
687       instances_running = self._CheckRunningInstances()
688       if instances_running:
689         raise errors.CommandError("Some instances are still running on the"
690                                   " mergees")
691       logging.info("Disable watcher")
692       self._DisableWatcher()
693       logging.info("Stop daemons on merging nodes")
694       self._StopDaemons()
695       logging.info("Merging config")
696       self._FetchRemoteConfig()
697
698       logging.info("Stopping master daemon")
699       self._KillMasterDaemon()
700
701       rbsteps.append("Restore %s from another master candidate"
702                      " and restart master daemon" %
703                      constants.CLUSTER_CONF_FILE)
704       self._MergeConfig()
705       self._StartMasterDaemon(no_vote=True)
706
707       # Point of no return, delete rbsteps
708       del rbsteps[:]
709
710       logging.warning("We are at the point of no return. Merge can not easily"
711                       " be undone after this point.")
712       logging.info("Readd nodes")
713       self._ReaddMergedNodesAndRedist()
714
715       logging.info("Merge done, restart master daemon normally")
716       self._KillMasterDaemon()
717       self._StartMasterDaemon()
718
719       if self.restart == _RESTART_ALL:
720         logging.info("Starting instances again")
721         self._StartupAllInstances()
722       else:
723         logging.info("Not starting instances again")
724       logging.info("Post cluster verification")
725       self._VerifyCluster()
726     except errors.GenericError, e:
727       logging.exception(e)
728
729       if rbsteps:
730         nodes = Flatten([data.nodes for data in self.merger_data])
731         info = {
732           "clusters": self.clusters,
733           "nodes": nodes,
734           }
735         logging.critical("In order to rollback do the following:")
736         for step in rbsteps:
737           logging.critical("  * %s", step % info)
738       else:
739         logging.critical("Nothing to rollback.")
740
741       # TODO: Keep track of steps done for a flawless resume?
742
743   def Cleanup(self):
744     """Clean up our environment.
745
746     This cleans up remote private keys and configs and after that
747     deletes the temporary directory.
748
749     """
750     shutil.rmtree(self.work_dir)
751
752
753 def SetupLogging(options):
754   """Setting up logging infrastructure.
755
756   @param options: Parsed command line options
757
758   """
759   formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
760
761   stderr_handler = logging.StreamHandler()
762   stderr_handler.setFormatter(formatter)
763   if options.debug:
764     stderr_handler.setLevel(logging.NOTSET)
765   elif options.verbose:
766     stderr_handler.setLevel(logging.INFO)
767   else:
768     stderr_handler.setLevel(logging.WARNING)
769
770   root_logger = logging.getLogger("")
771   root_logger.setLevel(logging.NOTSET)
772   root_logger.addHandler(stderr_handler)
773
774
775 def main():
776   """Main routine.
777
778   """
779   program = os.path.basename(sys.argv[0])
780
781   parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
782                                  prog=program)
783   parser.add_option(cli.DEBUG_OPT)
784   parser.add_option(cli.VERBOSE_OPT)
785   parser.add_option(PAUSE_PERIOD_OPT)
786   parser.add_option(GROUPS_OPT)
787   parser.add_option(RESTART_OPT)
788   parser.add_option(PARAMS_OPT)
789   parser.add_option(SKIP_STOP_INSTANCES_OPT)
790
791   (options, args) = parser.parse_args()
792
793   SetupLogging(options)
794
795   if not args:
796     parser.error("No clusters specified")
797
798   cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
799                           options.groups, options.restart, options.params,
800                           options.stop_instances)
801   try:
802     try:
803       cluster_merger.Setup()
804       cluster_merger.Merge()
805     except errors.GenericError, e:
806       logging.exception(e)
807       return constants.EXIT_FAILURE
808   finally:
809     cluster_merger.Cleanup()
810
811   return constants.EXIT_SUCCESS
812
813
814 if __name__ == "__main__":
815   sys.exit(main())