4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Tool to merge two or more clusters together.
23 The clusters have to run the same version of Ganeti!
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
45 _GROUPS_MERGE = "merge"
46 _GROUPS_RENAME = "rename"
47 _CLUSTERMERGE_ECID = "clustermerge-ecid"
49 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
50 action="store", type="int",
52 help=("Amount of time in seconds watcher"
53 " should be suspended from running"))
54 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
55 choices=(_GROUPS_MERGE, _GROUPS_RENAME), dest="groups",
56 help=("How to handle groups that have the"
57 " same name (One of: %s/%s)" %
58 (_GROUPS_MERGE, _GROUPS_RENAME)))
61 def Flatten(unflattened_list):
64 @param unflattened_list: A list of unflattened list objects.
65 @return: A flattened list
70 for item in unflattened_list:
71 if isinstance(item, list):
72 flattened_list.extend(Flatten(item))
74 flattened_list.append(item)
78 class MergerData(object):
79 """Container class to hold data used for merger.
82 def __init__(self, cluster, key_path, nodes, instances, config_path=None):
83 """Initialize the container.
85 @param cluster: The name of the cluster
86 @param key_path: Path to the ssh private key used for authentication
87 @param nodes: List of nodes in the merging cluster
88 @param instances: List of instances running on merging cluster
89 @param config_path: Path to the merging cluster config
92 self.cluster = cluster
93 self.key_path = key_path
95 self.instances = instances
96 self.config_path = config_path
100 """Handling the merge.
103 def __init__(self, clusters, pause_period, groups):
104 """Initialize object with sane defaults and infos required.
106 @param clusters: The list of clusters to merge in
107 @param pause_period: The time watcher shall be disabled for
108 @param groups: How to handle group conflicts
111 self.merger_data = []
112 self.clusters = clusters
113 self.pause_period = pause_period
114 self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
115 (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
116 self.ssh_runner = ssh.SshRunner(self.cluster_name)
120 """Sets up our end so we can do the merger.
122 This method is setting us up as a preparation for the merger.
123 It makes the initial contact and gathers information needed.
125 @raise errors.RemoteError: for errors in communication/grabbing
128 (remote_path, _, _) = ssh.GetUserFiles("root")
130 if self.cluster_name in self.clusters:
131 raise errors.CommandError("Cannot merge cluster %s with itself" %
134 # Fetch remotes private key
135 for cluster in self.clusters:
136 result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
139 raise errors.RemoteError("There was an error while grabbing ssh private"
140 " key from %s. Fail reason: %s; output: %s" %
141 (cluster, result.fail_reason, result.output))
143 key_path = utils.PathJoin(self.work_dir, cluster)
144 utils.WriteFile(key_path, mode=0600, data=result.stdout)
146 result = self._RunCmd(cluster, "gnt-node list -o name --no-header",
147 private_key=key_path)
149 raise errors.RemoteError("Unable to retrieve list of nodes from %s."
150 " Fail reason: %s; output: %s" %
151 (cluster, result.fail_reason, result.output))
152 nodes = result.stdout.splitlines()
154 result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
155 private_key=key_path)
157 raise errors.RemoteError("Unable to retrieve list of instances from"
158 " %s. Fail reason: %s; output: %s" %
159 (cluster, result.fail_reason, result.output))
160 instances = result.stdout.splitlines()
162 self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
164 def _PrepareAuthorizedKeys(self):
165 """Prepare the authorized_keys on every merging node.
167 This method add our public key to remotes authorized_key for further
171 (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
172 pub_key = utils.ReadFile(pub_key_file)
174 for data in self.merger_data:
175 for node in data.nodes:
176 result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
177 (auth_keys, pub_key)),
178 private_key=data.key_path)
181 raise errors.RemoteError("Unable to add our public key to %s in %s."
182 " Fail reason: %s; output: %s" %
183 (node, data.cluster, result.fail_reason,
186 def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
187 strict_host_check=False, private_key=None, batch=True,
189 """Wrapping SshRunner.Run with default parameters.
191 For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
194 return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
195 use_cluster_key=use_cluster_key,
196 strict_host_check=strict_host_check,
197 private_key=private_key, batch=batch,
200 def _StopMergingInstances(self):
201 """Stop instances on merging clusters.
204 for cluster in self.clusters:
205 result = self._RunCmd(cluster, "gnt-instance shutdown --all"
209 raise errors.RemoteError("Unable to stop instances on %s."
210 " Fail reason: %s; output: %s" %
211 (cluster, result.fail_reason, result.output))
213 def _DisableWatcher(self):
214 """Disable watch on all merging clusters, including ourself.
217 for cluster in ["localhost"] + self.clusters:
218 result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
222 raise errors.RemoteError("Unable to pause watcher on %s."
223 " Fail reason: %s; output: %s" %
224 (cluster, result.fail_reason, result.output))
226 def _StopDaemons(self):
227 """Stop all daemons on merging nodes.
230 cmd = "%s stop-all" % constants.DAEMON_UTIL
231 for data in self.merger_data:
232 for node in data.nodes:
233 result = self._RunCmd(node, cmd)
236 raise errors.RemoteError("Unable to stop daemons on %s."
237 " Fail reason: %s; output: %s." %
238 (node, result.fail_reason, result.output))
240 def _FetchRemoteConfig(self):
241 """Fetches and stores remote cluster config from the master.
243 This step is needed before we can merge the config.
246 for data in self.merger_data:
247 result = self._RunCmd(data.cluster, "cat %s" %
248 constants.CLUSTER_CONF_FILE)
251 raise errors.RemoteError("Unable to retrieve remote config on %s."
252 " Fail reason: %s; output %s" %
253 (data.cluster, result.fail_reason,
256 data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
258 utils.WriteFile(data.config_path, data=result.stdout)
260 # R0201: Method could be a function
261 def _KillMasterDaemon(self): # pylint: disable-msg=R0201
262 """Kills the local master daemon.
264 @raise errors.CommandError: If unable to kill
267 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
269 raise errors.CommandError("Unable to stop master daemons."
270 " Fail reason: %s; output: %s" %
271 (result.fail_reason, result.output))
273 def _MergeConfig(self):
274 """Merges all foreign config into our own config.
277 my_config = config.ConfigWriter(offline=True)
278 fake_ec_id = 0 # Needs to be uniq over the whole config merge
280 for data in self.merger_data:
281 other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
282 self._MergeNodeGroups(my_config, other_config)
284 for node in other_config.GetNodeList():
285 node_info = other_config.GetNodeInfo(node)
286 my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
289 for instance in other_config.GetInstanceList():
290 instance_info = other_config.GetInstanceInfo(instance)
292 # Update the DRBD port assignments
293 # This is a little bit hackish
294 for dsk in instance_info.disks:
295 if dsk.dev_type in constants.LDS_DRBD:
296 port = my_config.AllocatePort()
298 logical_id = list(dsk.logical_id)
300 dsk.logical_id = tuple(logical_id)
302 physical_id = list(dsk.physical_id)
303 physical_id[1] = physical_id[3] = port
304 dsk.physical_id = tuple(physical_id)
306 my_config.AddInstance(instance_info,
307 _CLUSTERMERGE_ECID + str(fake_ec_id))
310 # R0201: Method could be a function
311 def _MergeNodeGroups(self, my_config, other_config):
312 """Adds foreign node groups
314 ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
316 # pylint: disable-msg=R0201
317 logging.info("Node group conflict strategy: %s" % self.groups)
319 my_grps = my_config.GetAllNodeGroupsInfo().values()
320 other_grps = other_config.GetAllNodeGroupsInfo().values()
322 # Check for node group naming conflicts:
324 for other_grp in other_grps:
325 for my_grp in my_grps:
326 if other_grp.name == my_grp.name:
327 conflicts.append(other_grp)
330 conflict_names = utils.CommaJoin([g.name for g in conflicts])
331 logging.info("Node groups in both local and remote cluster: %s" %
334 # User hasn't specified how to handle conflicts
336 raise errors.CommandError("The following node group(s) are in both"
337 " clusters, and no merge strategy has been"
338 " supplied (see the --groups option): %s" %
341 # User wants to rename conflicts
342 if self.groups == _GROUPS_RENAME:
343 for grp in conflicts:
344 new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
345 logging.info("Renaming remote node group from %s to %s"
346 " to resolve conflict" % (grp.name, new_name))
349 for grp in other_grps:
350 #TODO: handle node group conflicts
351 my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
353 # R0201: Method could be a function
354 def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
355 """Starts the local master daemon.
357 @param no_vote: Should the masterd started without voting? default: False
358 @raise errors.CommandError: If unable to start daemon.
363 env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
365 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
367 raise errors.CommandError("Couldn't start ganeti master."
368 " Fail reason: %s; output: %s" %
369 (result.fail_reason, result.output))
371 def _ReaddMergedNodesAndRedist(self):
372 """Readds all merging nodes and make sure their config is up-to-date.
374 @raise errors.CommandError: If anything fails.
377 for data in self.merger_data:
378 for node in data.nodes:
379 result = utils.RunCmd(["gnt-node", "add", "--readd",
380 "--no-ssh-key-check", "--force-join", node])
382 raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
383 " output: %s" % (node, result.fail_reason,
386 result = utils.RunCmd(["gnt-cluster", "redist-conf"])
388 raise errors.CommandError("Redistribution failed. Fail reason: %s;"
389 " output: %s" % (result.fail_reason,
392 # R0201: Method could be a function
393 def _StartupAllInstances(self): # pylint: disable-msg=R0201
394 """Starts up all instances (locally).
396 @raise errors.CommandError: If unable to start clusters
399 result = utils.RunCmd(["gnt-instance", "startup", "--all",
402 raise errors.CommandError("Unable to start all instances."
403 " Fail reason: %s; output: %s" %
404 (result.fail_reason, result.output))
406 # R0201: Method could be a function
407 def _VerifyCluster(self): # pylint: disable-msg=R0201
408 """Runs gnt-cluster verify to verify the health.
410 @raise errors.ProgrammError: If cluster fails on verification
413 result = utils.RunCmd(["gnt-cluster", "verify"])
415 raise errors.CommandError("Verification of cluster failed."
416 " Fail reason: %s; output: %s" %
417 (result.fail_reason, result.output))
420 """Does the actual merge.
422 It runs all the steps in the right order and updates the user about steps
423 taken. Also it keeps track of rollback_steps to undo everything.
428 logging.info("Pre cluster verification")
429 self._VerifyCluster()
431 logging.info("Prepare authorized_keys")
432 rbsteps.append("Remove our key from authorized_keys on nodes:"
434 self._PrepareAuthorizedKeys()
436 rbsteps.append("Start all instances again on the merging"
437 " clusters: %(clusters)s")
438 logging.info("Stopping merging instances (takes a while)")
439 self._StopMergingInstances()
441 logging.info("Disable watcher")
442 self._DisableWatcher()
443 logging.info("Stop daemons on merging nodes")
445 logging.info("Merging config")
446 self._FetchRemoteConfig()
448 logging.info("Stopping master daemon")
449 self._KillMasterDaemon()
451 rbsteps.append("Restore %s from another master candidate"
452 " and restart master daemon" %
453 constants.CLUSTER_CONF_FILE)
455 self._StartMasterDaemon(no_vote=True)
457 # Point of no return, delete rbsteps
460 logging.warning("We are at the point of no return. Merge can not easily"
461 " be undone after this point.")
462 logging.info("Readd nodes")
463 self._ReaddMergedNodesAndRedist()
465 logging.info("Merge done, restart master daemon normally")
466 self._KillMasterDaemon()
467 self._StartMasterDaemon()
469 logging.info("Starting instances again")
470 self._StartupAllInstances()
471 logging.info("Post cluster verification")
472 self._VerifyCluster()
473 except errors.GenericError, e:
477 nodes = Flatten([data.nodes for data in self.merger_data])
479 "clusters": self.clusters,
482 logging.critical("In order to rollback do the following:")
484 logging.critical(" * %s", step % info)
486 logging.critical("Nothing to rollback.")
488 # TODO: Keep track of steps done for a flawless resume?
491 """Clean up our environment.
493 This cleans up remote private keys and configs and after that
494 deletes the temporary directory.
497 shutil.rmtree(self.work_dir)
500 def SetupLogging(options):
501 """Setting up logging infrastructure.
503 @param options: Parsed command line options
506 formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
508 stderr_handler = logging.StreamHandler()
509 stderr_handler.setFormatter(formatter)
511 stderr_handler.setLevel(logging.NOTSET)
512 elif options.verbose:
513 stderr_handler.setLevel(logging.INFO)
515 stderr_handler.setLevel(logging.ERROR)
517 root_logger = logging.getLogger("")
518 root_logger.setLevel(logging.NOTSET)
519 root_logger.addHandler(stderr_handler)
526 program = os.path.basename(sys.argv[0])
528 parser = optparse.OptionParser(usage=("%%prog [--debug|--verbose]"
529 " [--watcher-pause-period SECONDS]"
530 " [--groups [%s|%s]]"
531 " <cluster> [<cluster...>]" %
532 (_GROUPS_MERGE, _GROUPS_RENAME)),
534 parser.add_option(cli.DEBUG_OPT)
535 parser.add_option(cli.VERBOSE_OPT)
536 parser.add_option(PAUSE_PERIOD_OPT)
537 parser.add_option(GROUPS_OPT)
539 (options, args) = parser.parse_args()
541 SetupLogging(options)
544 parser.error("No clusters specified")
546 cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
550 cluster_merger.Setup()
551 cluster_merger.Merge()
552 except errors.GenericError, e:
554 return constants.EXIT_FAILURE
556 cluster_merger.Cleanup()
558 return constants.EXIT_SUCCESS
561 if __name__ == "__main__":