Delete master IPs from mergee master nodes
[ganeti-local] / tools / cluster-merge
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Tool to merge two or more clusters together.
22
23 The clusters have to run the same version of Ganeti!
24
25 """
26
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
29
30 import logging
31 import os
32 import optparse
33 import shutil
34 import sys
35 import tempfile
36
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
43 from ganeti import netutils
44
45
46 _GROUPS_MERGE = "merge"
47 _GROUPS_RENAME = "rename"
48 _CLUSTERMERGE_ECID = "clustermerge-ecid"
49 _RESTART_ALL = "all"
50 _RESTART_UP = "up"
51 _RESTART_NONE = "none"
52 _RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
53 _PARAMS_STRICT = "strict"
54 _PARAMS_WARN = "warn"
55 _PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
56
57
58 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
59                                   action="store", type="int",
60                                   dest="pause_period",
61                                   help=("Amount of time in seconds watcher"
62                                         " should be suspended from running"))
63 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
64                             choices=(_GROUPS_MERGE, _GROUPS_RENAME),
65                             dest="groups",
66                             help=("How to handle groups that have the"
67                                   " same name (One of: %s/%s)" %
68                                   (_GROUPS_MERGE, _GROUPS_RENAME)))
69 PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
70                             metavar="STRATEGY",
71                             choices=_PARAMS_CHOICES,
72                             dest="params",
73                             help=("How to handle params that have"
74                                   " different values (One of: %s/%s)" %
75                                   _PARAMS_CHOICES))
76
77 RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
78                              metavar="STRATEGY",
79                              choices=_RESTART_CHOICES,
80                              dest="restart",
81                              help=("How to handle restarting instances"
82                                    " same name (One of: %s/%s/%s)" %
83                                    _RESTART_CHOICES))
84
85 SKIP_STOP_INSTANCES_OPT = \
86   cli.cli_option("--skip-stop-instances", default=True, action="store_false",
87                  dest="stop_instances",
88                  help=("Don't stop the instances on the clusters, just check "
89                        "that none is running"))
90
91
92 def Flatten(unflattened_list):
93   """Flattens a list.
94
95   @param unflattened_list: A list of unflattened list objects.
96   @return: A flattened list
97
98   """
99   flattened_list = []
100
101   for item in unflattened_list:
102     if isinstance(item, list):
103       flattened_list.extend(Flatten(item))
104     else:
105       flattened_list.append(item)
106   return flattened_list
107
108
109 class MergerData(object):
110   """Container class to hold data used for merger.
111
112   """
113   def __init__(self, cluster, key_path, nodes, instances, master_node,
114                master_ip, config_path=None):
115     """Initialize the container.
116
117     @param cluster: The name of the cluster
118     @param key_path: Path to the ssh private key used for authentication
119     @param nodes: List of online nodes in the merging cluster
120     @param instances: List of instances running on merging cluster
121     @param master_node: Name of the master node
122     @param master_ip: Cluster IP
123     @param config_path: Path to the merging cluster config
124
125     """
126     self.cluster = cluster
127     self.key_path = key_path
128     self.nodes = nodes
129     self.instances = instances
130     self.master_node = master_node
131     self.master_ip = master_ip
132     self.config_path = config_path
133
134
135 class Merger(object):
136   """Handling the merge.
137
138   """
139   RUNNING_STATUSES = frozenset([
140     constants.INSTST_RUNNING,
141     constants.INSTST_ERRORUP,
142     ])
143
144   def __init__(self, clusters, pause_period, groups, restart, params,
145                stop_instances):
146     """Initialize object with sane defaults and infos required.
147
148     @param clusters: The list of clusters to merge in
149     @param pause_period: The time watcher shall be disabled for
150     @param groups: How to handle group conflicts
151     @param restart: How to handle instance restart
152     @param stop_instances: Indicates whether the instances must be stopped
153                            (True) or if the Merger must only check if no
154                            instances are running on the mergee clusters (False)
155
156     """
157     self.merger_data = []
158     self.clusters = clusters
159     self.pause_period = pause_period
160     self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
161     (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
162     self.ssh_runner = ssh.SshRunner(self.cluster_name)
163     self.groups = groups
164     self.restart = restart
165     self.params = params
166     self.stop_instances = stop_instances
167     if self.restart == _RESTART_UP:
168       raise NotImplementedError
169
170   def Setup(self):
171     """Sets up our end so we can do the merger.
172
173     This method is setting us up as a preparation for the merger.
174     It makes the initial contact and gathers information needed.
175
176     @raise errors.RemoteError: for errors in communication/grabbing
177
178     """
179     (remote_path, _, _) = ssh.GetUserFiles("root")
180
181     if self.cluster_name in self.clusters:
182       raise errors.CommandError("Cannot merge cluster %s with itself" %
183                                 self.cluster_name)
184
185     # Fetch remotes private key
186     for cluster in self.clusters:
187       result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
188                             ask_key=False)
189       if result.failed:
190         raise errors.RemoteError("There was an error while grabbing ssh private"
191                                  " key from %s. Fail reason: %s; output: %s" %
192                                  (cluster, result.fail_reason, result.output))
193
194       key_path = utils.PathJoin(self.work_dir, cluster)
195       utils.WriteFile(key_path, mode=0600, data=result.stdout)
196
197       result = self._RunCmd(cluster, "gnt-node list -o name,offline"
198                             " --no-header --separator=,", private_key=key_path)
199       if result.failed:
200         raise errors.RemoteError("Unable to retrieve list of nodes from %s."
201                                  " Fail reason: %s; output: %s" %
202                                  (cluster, result.fail_reason, result.output))
203       nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
204       nodes = [node_status[0] for node_status in nodes_statuses
205                if node_status[1] == "N"]
206
207       result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
208                             private_key=key_path)
209       if result.failed:
210         raise errors.RemoteError("Unable to retrieve list of instances from"
211                                  " %s. Fail reason: %s; output: %s" %
212                                  (cluster, result.fail_reason, result.output))
213       instances = result.stdout.splitlines()
214
215       path = utils.PathJoin(constants.DATA_DIR, "ssconf_%s" %
216                             constants.SS_MASTER_NODE)
217       result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path)
218       if result.failed:
219         raise errors.RemoteError("Unable to retrieve the master node name from"
220                                  " %s. Fail reason: %s; output: %s" %
221                                  (cluster, result.fail_reason, result.output))
222       master_node = result.stdout.strip()
223
224       path = utils.PathJoin(constants.DATA_DIR, "ssconf_%s" %
225                             constants.SS_MASTER_IP)
226       result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path)
227       if result.failed:
228         raise errors.RemoteError("Unable to retrieve the master IP from"
229                                  " %s. Fail reason: %s; output: %s" %
230                                  (cluster, result.fail_reason, result.output))
231       master_ip = result.stdout.strip()
232
233       self.merger_data.append(MergerData(cluster, key_path, nodes, instances,
234                                          master_node, master_ip))
235
236   def _PrepareAuthorizedKeys(self):
237     """Prepare the authorized_keys on every merging node.
238
239     This method add our public key to remotes authorized_key for further
240     communication.
241
242     """
243     (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
244     pub_key = utils.ReadFile(pub_key_file)
245
246     for data in self.merger_data:
247       for node in data.nodes:
248         result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
249                                      (auth_keys, pub_key)),
250                               private_key=data.key_path, max_attempts=3)
251
252         if result.failed:
253           raise errors.RemoteError("Unable to add our public key to %s in %s."
254                                    " Fail reason: %s; output: %s" %
255                                    (node, data.cluster, result.fail_reason,
256                                     result.output))
257
258   def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
259               strict_host_check=False, private_key=None, batch=True,
260               ask_key=False, max_attempts=1):
261     """Wrapping SshRunner.Run with default parameters.
262
263     For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
264
265     """
266     for _ in range(max_attempts):
267       result = self.ssh_runner.Run(hostname=hostname, command=command,
268                                  user=user, use_cluster_key=use_cluster_key,
269                                  strict_host_check=strict_host_check,
270                                  private_key=private_key, batch=batch,
271                                  ask_key=ask_key)
272       if not result.failed:
273         break
274
275     return result
276
277   def _CheckRunningInstances(self):
278     """Checks if on the clusters to be merged there are running instances
279
280     @rtype: boolean
281     @return: True if there are running instances, False otherwise
282
283     """
284     for cluster in self.clusters:
285       result = self._RunCmd(cluster, "gnt-instance list -o status")
286       if self.RUNNING_STATUSES.intersection(result.output.splitlines()):
287         return True
288
289     return False
290
291   def _StopMergingInstances(self):
292     """Stop instances on merging clusters.
293
294     """
295     for cluster in self.clusters:
296       result = self._RunCmd(cluster, "gnt-instance shutdown --all"
297                                      " --force-multiple")
298
299       if result.failed:
300         raise errors.RemoteError("Unable to stop instances on %s."
301                                  " Fail reason: %s; output: %s" %
302                                  (cluster, result.fail_reason, result.output))
303
304   def _DisableWatcher(self):
305     """Disable watch on all merging clusters, including ourself.
306
307     """
308     for cluster in ["localhost"] + self.clusters:
309       result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
310                                      self.pause_period)
311
312       if result.failed:
313         raise errors.RemoteError("Unable to pause watcher on %s."
314                                  " Fail reason: %s; output: %s" %
315                                  (cluster, result.fail_reason, result.output))
316
317   def _RemoveMasterIps(self):
318     """Removes the master IPs from the master nodes of each cluster.
319
320     """
321     for data in self.merger_data:
322       master_ip_family = netutils.IPAddress.GetAddressFamily(data.master_ip)
323       master_ip_len = netutils.IP4Address.iplen
324       if master_ip_family == netutils.IP6Address.family:
325         master_ip_len = netutils.IP6Address.iplen
326       # Not using constants.IP_COMMAND_PATH because the command might run on a
327       # machine in which the ip path is different, so it's better to rely on
328       # $PATH.
329       cmd = "ip address del %s/%s dev $(cat %s)" % (
330              data.master_ip,
331              master_ip_len,
332              utils.PathJoin(constants.DATA_DIR, "ssconf_%s" %
333                             constants.SS_MASTER_NETDEV))
334       result = self._RunCmd(data.master_node, cmd, max_attempts=3)
335       if result.failed:
336         raise errors.RemoteError("Unable to remove master IP on %s."
337                                  " Fail reason: %s; output: %s" %
338                                  (data.master_node,
339                                   result.fail_reason,
340                                   result.output))
341
342   def _StopDaemons(self):
343     """Stop all daemons on merging nodes.
344
345     """
346     cmd = "%s stop-all" % constants.DAEMON_UTIL
347     for data in self.merger_data:
348       for node in data.nodes:
349         result = self._RunCmd(node, cmd, max_attempts=3)
350
351         if result.failed:
352           raise errors.RemoteError("Unable to stop daemons on %s."
353                                    " Fail reason: %s; output: %s." %
354                                    (node, result.fail_reason, result.output))
355
356   def _FetchRemoteConfig(self):
357     """Fetches and stores remote cluster config from the master.
358
359     This step is needed before we can merge the config.
360
361     """
362     for data in self.merger_data:
363       result = self._RunCmd(data.cluster, "cat %s" %
364                                           constants.CLUSTER_CONF_FILE)
365
366       if result.failed:
367         raise errors.RemoteError("Unable to retrieve remote config on %s."
368                                  " Fail reason: %s; output %s" %
369                                  (data.cluster, result.fail_reason,
370                                   result.output))
371
372       data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
373                                         data.cluster)
374       utils.WriteFile(data.config_path, data=result.stdout)
375
376   # R0201: Method could be a function
377   def _KillMasterDaemon(self): # pylint: disable-msg=R0201
378     """Kills the local master daemon.
379
380     @raise errors.CommandError: If unable to kill
381
382     """
383     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
384     if result.failed:
385       raise errors.CommandError("Unable to stop master daemons."
386                                 " Fail reason: %s; output: %s" %
387                                 (result.fail_reason, result.output))
388
389   def _MergeConfig(self):
390     """Merges all foreign config into our own config.
391
392     """
393     my_config = config.ConfigWriter(offline=True)
394     fake_ec_id = 0 # Needs to be uniq over the whole config merge
395
396     for data in self.merger_data:
397       other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
398       self._MergeClusterConfigs(my_config, other_config)
399       self._MergeNodeGroups(my_config, other_config)
400
401       for node in other_config.GetNodeList():
402         node_info = other_config.GetNodeInfo(node)
403         # Offline the node, it will be reonlined later at node readd
404         node_info.master_candidate = False
405         node_info.drained = False
406         node_info.offline = True
407         my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
408         fake_ec_id += 1
409
410       for instance in other_config.GetInstanceList():
411         instance_info = other_config.GetInstanceInfo(instance)
412
413         # Update the DRBD port assignments
414         # This is a little bit hackish
415         for dsk in instance_info.disks:
416           if dsk.dev_type in constants.LDS_DRBD:
417             port = my_config.AllocatePort()
418
419             logical_id = list(dsk.logical_id)
420             logical_id[2] = port
421             dsk.logical_id = tuple(logical_id)
422
423             physical_id = list(dsk.physical_id)
424             physical_id[1] = physical_id[3] = port
425             dsk.physical_id = tuple(physical_id)
426
427         my_config.AddInstance(instance_info,
428                               _CLUSTERMERGE_ECID + str(fake_ec_id))
429         fake_ec_id += 1
430
431   def _MergeClusterConfigs(self, my_config, other_config):
432     """Checks that all relevant cluster parameters are compatible
433
434     """
435     my_cluster = my_config.GetClusterInfo()
436     other_cluster = other_config.GetClusterInfo()
437     err_count = 0
438
439     #
440     # Generic checks
441     #
442     check_params = [
443       "beparams",
444       "default_iallocator",
445       "drbd_usermode_helper",
446       "hidden_os",
447       "maintain_node_health",
448       "master_netdev",
449       "ndparams",
450       "nicparams",
451       "primary_ip_family",
452       "tags",
453       "uid_pool",
454       ]
455     check_params_strict = [
456       "volume_group_name",
457     ]
458     if constants.ENABLE_FILE_STORAGE:
459       check_params_strict.append("file_storage_dir")
460     if constants.ENABLE_SHARED_FILE_STORAGE:
461       check_params_strict.append("shared_file_storage_dir")
462     check_params.extend(check_params_strict)
463
464     if self.params == _PARAMS_STRICT:
465       params_strict = True
466     else:
467       params_strict = False
468
469     for param_name in check_params:
470       my_param = getattr(my_cluster, param_name)
471       other_param = getattr(other_cluster, param_name)
472       if my_param != other_param:
473         logging.error("The value (%s) of the cluster parameter %s on %s"
474                       " differs to this cluster's value (%s)",
475                       other_param, param_name, other_cluster.cluster_name,
476                       my_param)
477         if params_strict or param_name in check_params_strict:
478           err_count += 1
479
480     #
481     # Custom checks
482     #
483
484     # Check default hypervisor
485     my_defhyp = my_cluster.enabled_hypervisors[0]
486     other_defhyp = other_cluster.enabled_hypervisors[0]
487     if my_defhyp != other_defhyp:
488       logging.warning("The default hypervisor (%s) differs on %s, new"
489                       " instances will be created with this cluster's"
490                       " default hypervisor (%s)", other_defhyp,
491                       other_cluster.cluster_name, my_defhyp)
492
493     if (set(my_cluster.enabled_hypervisors) !=
494         set(other_cluster.enabled_hypervisors)):
495       logging.error("The set of enabled hypervisors (%s) on %s differs to"
496                     " this cluster's set (%s)",
497                     other_cluster.enabled_hypervisors,
498                     other_cluster.cluster_name, my_cluster.enabled_hypervisors)
499       err_count += 1
500
501     # Check hypervisor params for hypervisors we care about
502     for hyp in my_cluster.enabled_hypervisors:
503       for param in my_cluster.hvparams[hyp]:
504         my_value = my_cluster.hvparams[hyp][param]
505         other_value = other_cluster.hvparams[hyp][param]
506         if my_value != other_value:
507           logging.error("The value (%s) of the %s parameter of the %s"
508                         " hypervisor on %s differs to this cluster's parameter"
509                         " (%s)",
510                         other_value, param, hyp, other_cluster.cluster_name,
511                         my_value)
512           if params_strict:
513             err_count += 1
514
515     # Check os hypervisor params for hypervisors we care about
516     for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
517       for hyp in my_cluster.enabled_hypervisors:
518         my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
519         other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
520         if my_os_hvp != other_os_hvp:
521           logging.error("The OS parameters (%s) for the %s OS for the %s"
522                         " hypervisor on %s differs to this cluster's parameters"
523                         " (%s)",
524                         other_os_hvp, os_name, hyp, other_cluster.cluster_name,
525                         my_os_hvp)
526           if params_strict:
527             err_count += 1
528
529     #
530     # Warnings
531     #
532     if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
533       logging.warning("The modify_etc_hosts value (%s) differs on %s,"
534                       " this cluster's value (%s) will take precedence",
535                       other_cluster.modify_etc_hosts,
536                       other_cluster.cluster_name,
537                       my_cluster.modify_etc_hosts)
538
539     if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
540       logging.warning("The modify_ssh_setup value (%s) differs on %s,"
541                       " this cluster's value (%s) will take precedence",
542                       other_cluster.modify_ssh_setup,
543                       other_cluster.cluster_name,
544                       my_cluster.modify_ssh_setup)
545
546     #
547     # Actual merging
548     #
549     my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
550                                        other_cluster.reserved_lvs))
551
552     if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
553       logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
554                       " cluster's value (%s). The least permissive value (%s)"
555                       " will be used", other_cluster.prealloc_wipe_disks,
556                       other_cluster.cluster_name,
557                       my_cluster.prealloc_wipe_disks, True)
558       my_cluster.prealloc_wipe_disks = True
559
560     for os_, osparams in other_cluster.osparams.items():
561       if os_ not in my_cluster.osparams:
562         my_cluster.osparams[os_] = osparams
563       elif my_cluster.osparams[os_] != osparams:
564         logging.error("The OS parameters (%s) for the %s OS on %s differs to"
565                       " this cluster's parameters (%s)",
566                       osparams, os_, other_cluster.cluster_name,
567                       my_cluster.osparams[os_])
568         if params_strict:
569           err_count += 1
570
571     if err_count:
572       raise errors.ConfigurationError("Cluster config for %s has incompatible"
573                                       " values, please fix and re-run" %
574                                       other_cluster.cluster_name)
575
576   # R0201: Method could be a function
577   def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201
578     if os_name in cluster.os_hvp:
579       return cluster.os_hvp[os_name].get(hyp, None)
580     else:
581       return None
582
583   # R0201: Method could be a function
584   def _MergeNodeGroups(self, my_config, other_config):
585     """Adds foreign node groups
586
587     ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
588     """
589     # pylint: disable-msg=R0201
590     logging.info("Node group conflict strategy: %s", self.groups)
591
592     my_grps = my_config.GetAllNodeGroupsInfo().values()
593     other_grps = other_config.GetAllNodeGroupsInfo().values()
594
595     # Check for node group naming conflicts:
596     conflicts = []
597     for other_grp in other_grps:
598       for my_grp in my_grps:
599         if other_grp.name == my_grp.name:
600           conflicts.append(other_grp)
601
602     if conflicts:
603       conflict_names = utils.CommaJoin([g.name for g in conflicts])
604       logging.info("Node groups in both local and remote cluster: %s",
605                    conflict_names)
606
607       # User hasn't specified how to handle conflicts
608       if not self.groups:
609         raise errors.CommandError("The following node group(s) are in both"
610                                   " clusters, and no merge strategy has been"
611                                   " supplied (see the --groups option): %s" %
612                                   conflict_names)
613
614       # User wants to rename conflicts
615       elif self.groups == _GROUPS_RENAME:
616         for grp in conflicts:
617           new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
618           logging.info("Renaming remote node group from %s to %s"
619                        " to resolve conflict", grp.name, new_name)
620           grp.name = new_name
621
622       # User wants to merge conflicting groups
623       elif self.groups == _GROUPS_MERGE:
624         for other_grp in conflicts:
625           logging.info("Merging local and remote '%s' groups", other_grp.name)
626           for node_name in other_grp.members[:]:
627             node = other_config.GetNodeInfo(node_name)
628             # Access to a protected member of a client class
629             # pylint: disable-msg=W0212
630             other_config._UnlockedRemoveNodeFromGroup(node)
631
632             # Access to a protected member of a client class
633             # pylint: disable-msg=W0212
634             my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
635
636             # Access to a protected member of a client class
637             # pylint: disable-msg=W0212
638             my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
639             node.group = my_grp_uuid
640           # Remove from list of groups to add
641           other_grps.remove(other_grp)
642
643     for grp in other_grps:
644       #TODO: handle node group conflicts
645       my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
646
647   # R0201: Method could be a function
648   def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
649     """Starts the local master daemon.
650
651     @param no_vote: Should the masterd started without voting? default: False
652     @raise errors.CommandError: If unable to start daemon.
653
654     """
655     env = {}
656     if no_vote:
657       env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
658
659     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
660     if result.failed:
661       raise errors.CommandError("Couldn't start ganeti master."
662                                 " Fail reason: %s; output: %s" %
663                                 (result.fail_reason, result.output))
664
665   def _ReaddMergedNodesAndRedist(self):
666     """Readds all merging nodes and make sure their config is up-to-date.
667
668     @raise errors.CommandError: If anything fails.
669
670     """
671     for data in self.merger_data:
672       for node in data.nodes:
673         result = utils.RunCmd(["gnt-node", "add", "--readd",
674                                "--no-ssh-key-check", "--force-join", node])
675         if result.failed:
676           logging.error("%s failed to be readded. Reason: %s, output: %s",
677                          node, result.fail_reason, result.output)
678
679     result = utils.RunCmd(["gnt-cluster", "redist-conf"])
680     if result.failed:
681       raise errors.CommandError("Redistribution failed. Fail reason: %s;"
682                                 " output: %s" % (result.fail_reason,
683                                                 result.output))
684
685   # R0201: Method could be a function
686   def _StartupAllInstances(self): # pylint: disable-msg=R0201
687     """Starts up all instances (locally).
688
689     @raise errors.CommandError: If unable to start clusters
690
691     """
692     result = utils.RunCmd(["gnt-instance", "startup", "--all",
693                            "--force-multiple"])
694     if result.failed:
695       raise errors.CommandError("Unable to start all instances."
696                                 " Fail reason: %s; output: %s" %
697                                 (result.fail_reason, result.output))
698
699   # R0201: Method could be a function
700   # TODO: make this overridable, for some verify errors
701   def _VerifyCluster(self): # pylint: disable-msg=R0201
702     """Runs gnt-cluster verify to verify the health.
703
704     @raise errors.ProgrammError: If cluster fails on verification
705
706     """
707     result = utils.RunCmd(["gnt-cluster", "verify"])
708     if result.failed:
709       raise errors.CommandError("Verification of cluster failed."
710                                 " Fail reason: %s; output: %s" %
711                                 (result.fail_reason, result.output))
712
713   def Merge(self):
714     """Does the actual merge.
715
716     It runs all the steps in the right order and updates the user about steps
717     taken. Also it keeps track of rollback_steps to undo everything.
718
719     """
720     rbsteps = []
721     try:
722       logging.info("Pre cluster verification")
723       self._VerifyCluster()
724
725       logging.info("Prepare authorized_keys")
726       rbsteps.append("Remove our key from authorized_keys on nodes:"
727                      " %(nodes)s")
728       self._PrepareAuthorizedKeys()
729
730       rbsteps.append("Start all instances again on the merging"
731                      " clusters: %(clusters)s")
732       if self.stop_instances:
733         logging.info("Stopping merging instances (takes a while)")
734         self._StopMergingInstances()
735       logging.info("Checking that no instances are running on the mergees")
736       instances_running = self._CheckRunningInstances()
737       if instances_running:
738         raise errors.CommandError("Some instances are still running on the"
739                                   " mergees")
740       logging.info("Disable watcher")
741       self._DisableWatcher()
742       logging.info("Stop daemons on merging nodes")
743       self._StopDaemons()
744       logging.info("Merging config")
745       self._FetchRemoteConfig()
746       logging.info("Removing master IPs on mergee master nodes")
747       self._RemoveMasterIps()
748
749       logging.info("Stopping master daemon")
750       self._KillMasterDaemon()
751
752       rbsteps.append("Restore %s from another master candidate"
753                      " and restart master daemon" %
754                      constants.CLUSTER_CONF_FILE)
755       self._MergeConfig()
756       self._StartMasterDaemon(no_vote=True)
757
758       # Point of no return, delete rbsteps
759       del rbsteps[:]
760
761       logging.warning("We are at the point of no return. Merge can not easily"
762                       " be undone after this point.")
763       logging.info("Readd nodes")
764       self._ReaddMergedNodesAndRedist()
765
766       logging.info("Merge done, restart master daemon normally")
767       self._KillMasterDaemon()
768       self._StartMasterDaemon()
769
770       if self.restart == _RESTART_ALL:
771         logging.info("Starting instances again")
772         self._StartupAllInstances()
773       else:
774         logging.info("Not starting instances again")
775       logging.info("Post cluster verification")
776       self._VerifyCluster()
777     except errors.GenericError, e:
778       logging.exception(e)
779
780       if rbsteps:
781         nodes = Flatten([data.nodes for data in self.merger_data])
782         info = {
783           "clusters": self.clusters,
784           "nodes": nodes,
785           }
786         logging.critical("In order to rollback do the following:")
787         for step in rbsteps:
788           logging.critical("  * %s", step % info)
789       else:
790         logging.critical("Nothing to rollback.")
791
792       # TODO: Keep track of steps done for a flawless resume?
793
794   def Cleanup(self):
795     """Clean up our environment.
796
797     This cleans up remote private keys and configs and after that
798     deletes the temporary directory.
799
800     """
801     shutil.rmtree(self.work_dir)
802
803
804 def SetupLogging(options):
805   """Setting up logging infrastructure.
806
807   @param options: Parsed command line options
808
809   """
810   formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
811
812   stderr_handler = logging.StreamHandler()
813   stderr_handler.setFormatter(formatter)
814   if options.debug:
815     stderr_handler.setLevel(logging.NOTSET)
816   elif options.verbose:
817     stderr_handler.setLevel(logging.INFO)
818   else:
819     stderr_handler.setLevel(logging.WARNING)
820
821   root_logger = logging.getLogger("")
822   root_logger.setLevel(logging.NOTSET)
823   root_logger.addHandler(stderr_handler)
824
825
826 def main():
827   """Main routine.
828
829   """
830   program = os.path.basename(sys.argv[0])
831
832   parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
833                                  prog=program)
834   parser.add_option(cli.DEBUG_OPT)
835   parser.add_option(cli.VERBOSE_OPT)
836   parser.add_option(PAUSE_PERIOD_OPT)
837   parser.add_option(GROUPS_OPT)
838   parser.add_option(RESTART_OPT)
839   parser.add_option(PARAMS_OPT)
840   parser.add_option(SKIP_STOP_INSTANCES_OPT)
841
842   (options, args) = parser.parse_args()
843
844   SetupLogging(options)
845
846   if not args:
847     parser.error("No clusters specified")
848
849   cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
850                           options.groups, options.restart, options.params,
851                           options.stop_instances)
852   try:
853     try:
854       cluster_merger.Setup()
855       cluster_merger.Merge()
856     except errors.GenericError, e:
857       logging.exception(e)
858       return constants.EXIT_FAILURE
859   finally:
860     cluster_merger.Cleanup()
861
862   return constants.EXIT_SUCCESS
863
864
865 if __name__ == "__main__":
866   sys.exit(main())