4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Tool to merge two or more clusters together.
23 The clusters have to run the same version of Ganeti!
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
45 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
46 action="store", type="int",
48 help=("Amount of time in seconds watcher"
49 " should be suspended from running"))
50 _CLUSTERMERGE_ECID = "clustermerge-ecid"
53 def Flatten(unflattened_list):
56 @param unflattened_list: A list of unflattened list objects.
57 @return: A flattened list
62 for item in unflattened_list:
63 if isinstance(item, list):
64 flattened_list.extend(Flatten(item))
66 flattened_list.append(item)
70 class MergerData(object):
71 """Container class to hold data used for merger.
74 def __init__(self, cluster, key_path, nodes, instances, config_path=None):
75 """Initialize the container.
77 @param cluster: The name of the cluster
78 @param key_path: Path to the ssh private key used for authentication
79 @param nodes: List of nodes in the merging cluster
80 @param instances: List of instances running on merging cluster
81 @param config_path: Path to the merging cluster config
84 self.cluster = cluster
85 self.key_path = key_path
87 self.instances = instances
88 self.config_path = config_path
92 """Handling the merge.
95 def __init__(self, clusters, pause_period):
96 """Initialize object with sane defaults and infos required.
98 @param clusters: The list of clusters to merge in
99 @param pause_period: The time watcher shall be disabled for
102 self.merger_data = []
103 self.clusters = clusters
104 self.pause_period = pause_period
105 self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
106 (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
107 self.ssh_runner = ssh.SshRunner(self.cluster_name)
110 """Sets up our end so we can do the merger.
112 This method is setting us up as a preparation for the merger.
113 It makes the initial contact and gathers information needed.
115 @raise errors.RemoteError: for errors in communication/grabbing
118 (remote_path, _, _) = ssh.GetUserFiles("root")
120 if self.cluster_name in self.clusters:
121 raise errors.CommandError("Cannot merge cluster %s with itself" %
124 # Fetch remotes private key
125 for cluster in self.clusters:
126 result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
129 raise errors.RemoteError("There was an error while grabbing ssh private"
130 " key from %s. Fail reason: %s; output: %s" %
131 (cluster, result.fail_reason, result.output))
133 key_path = utils.PathJoin(self.work_dir, cluster)
134 utils.WriteFile(key_path, mode=0600, data=result.stdout)
136 result = self._RunCmd(cluster, "gnt-node list -o name --no-header",
137 private_key=key_path)
139 raise errors.RemoteError("Unable to retrieve list of nodes from %s."
140 " Fail reason: %s; output: %s" %
141 (cluster, result.fail_reason, result.output))
142 nodes = result.stdout.splitlines()
144 result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
145 private_key=key_path)
147 raise errors.RemoteError("Unable to retrieve list of instances from"
148 " %s. Fail reason: %s; output: %s" %
149 (cluster, result.fail_reason, result.output))
150 instances = result.stdout.splitlines()
152 self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
154 def _PrepareAuthorizedKeys(self):
155 """Prepare the authorized_keys on every merging node.
157 This method add our public key to remotes authorized_key for further
161 (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
162 pub_key = utils.ReadFile(pub_key_file)
164 for data in self.merger_data:
165 for node in data.nodes:
166 result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
167 (auth_keys, pub_key)),
168 private_key=data.key_path)
171 raise errors.RemoteError("Unable to add our public key to %s in %s."
172 " Fail reason: %s; output: %s" %
173 (node, data.cluster, result.fail_reason,
176 def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
177 strict_host_check=False, private_key=None, batch=True,
179 """Wrapping SshRunner.Run with default parameters.
181 For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
184 return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
185 use_cluster_key=use_cluster_key,
186 strict_host_check=strict_host_check,
187 private_key=private_key, batch=batch,
190 def _StopMergingInstances(self):
191 """Stop instances on merging clusters.
194 for cluster in self.clusters:
195 result = self._RunCmd(cluster, "gnt-instance shutdown --all"
199 raise errors.RemoteError("Unable to stop instances on %s."
200 " Fail reason: %s; output: %s" %
201 (cluster, result.fail_reason, result.output))
203 def _DisableWatcher(self):
204 """Disable watch on all merging clusters, including ourself.
207 for cluster in ["localhost"] + self.clusters:
208 result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
212 raise errors.RemoteError("Unable to pause watcher on %s."
213 " Fail reason: %s; output: %s" %
214 (cluster, result.fail_reason, result.output))
216 def _StopDaemons(self):
217 """Stop all daemons on merging nodes.
220 cmd = "%s stop-all" % constants.DAEMON_UTIL
221 for data in self.merger_data:
222 for node in data.nodes:
223 result = self._RunCmd(node, cmd)
226 raise errors.RemoteError("Unable to stop daemons on %s."
227 " Fail reason: %s; output: %s." %
228 (node, result.fail_reason, result.output))
230 def _FetchRemoteConfig(self):
231 """Fetches and stores remote cluster config from the master.
233 This step is needed before we can merge the config.
236 for data in self.merger_data:
237 result = self._RunCmd(data.cluster, "cat %s" %
238 constants.CLUSTER_CONF_FILE)
241 raise errors.RemoteError("Unable to retrieve remote config on %s."
242 " Fail reason: %s; output %s" %
243 (data.cluster, result.fail_reason,
246 data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
248 utils.WriteFile(data.config_path, data=result.stdout)
250 # R0201: Method could be a function
251 def _KillMasterDaemon(self): # pylint: disable-msg=R0201
252 """Kills the local master daemon.
254 @raise errors.CommandError: If unable to kill
257 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
259 raise errors.CommandError("Unable to stop master daemons."
260 " Fail reason: %s; output: %s" %
261 (result.fail_reason, result.output))
263 def _MergeConfig(self):
264 """Merges all foreign config into our own config.
267 my_config = config.ConfigWriter(offline=True)
268 fake_ec_id = 0 # Needs to be uniq over the whole config merge
270 for data in self.merger_data:
271 other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
272 self._MergeNodeGroups(my_config, other_config)
274 for node in other_config.GetNodeList():
275 node_info = other_config.GetNodeInfo(node)
276 node_info.master_candidate = False
277 my_config.AddNode(node_info, str(fake_ec_id))
280 for instance in other_config.GetInstanceList():
281 instance_info = other_config.GetInstanceInfo(instance)
283 # Update the DRBD port assignments
284 # This is a little bit hackish
285 for dsk in instance_info.disks:
286 if dsk.dev_type in constants.LDS_DRBD:
287 port = my_config.AllocatePort()
289 logical_id = list(dsk.logical_id)
291 dsk.logical_id = tuple(logical_id)
293 physical_id = list(dsk.physical_id)
294 physical_id[1] = physical_id[3] = port
295 dsk.physical_id = tuple(physical_id)
297 my_config.AddInstance(instance_info, str(fake_ec_id))
300 # R0201: Method could be a function
301 def _MergeNodeGroups(self, my_config, other_config):
302 """Adds foreign node groups
304 ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
306 # pylint: disable-msg=R0201
307 for grp in other_config.GetAllNodeGroupsInfo().values():
308 #TODO: handle node group conflicts
309 my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
311 # R0201: Method could be a function
312 def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
313 """Starts the local master daemon.
315 @param no_vote: Should the masterd started without voting? default: False
316 @raise errors.CommandError: If unable to start daemon.
321 env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
323 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
325 raise errors.CommandError("Couldn't start ganeti master."
326 " Fail reason: %s; output: %s" %
327 (result.fail_reason, result.output))
329 def _ReaddMergedNodesAndRedist(self):
330 """Readds all merging nodes and make sure their config is up-to-date.
332 @raise errors.CommandError: If anything fails.
335 for data in self.merger_data:
336 for node in data.nodes:
337 result = utils.RunCmd(["gnt-node", "add", "--readd",
338 "--no-ssh-key-check", "--force-join", node])
340 raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
341 " output: %s" % (node, result.fail_reason,
344 result = utils.RunCmd(["gnt-cluster", "redist-conf"])
346 raise errors.CommandError("Redistribution failed. Fail reason: %s;"
347 " output: %s" % (result.fail_reason,
350 # R0201: Method could be a function
351 def _StartupAllInstances(self): # pylint: disable-msg=R0201
352 """Starts up all instances (locally).
354 @raise errors.CommandError: If unable to start clusters
357 result = utils.RunCmd(["gnt-instance", "startup", "--all",
360 raise errors.CommandError("Unable to start all instances."
361 " Fail reason: %s; output: %s" %
362 (result.fail_reason, result.output))
364 # R0201: Method could be a function
365 def _VerifyCluster(self): # pylint: disable-msg=R0201
366 """Runs gnt-cluster verify to verify the health.
368 @raise errors.ProgrammError: If cluster fails on verification
371 result = utils.RunCmd(["gnt-cluster", "verify"])
373 raise errors.CommandError("Verification of cluster failed."
374 " Fail reason: %s; output: %s" %
375 (result.fail_reason, result.output))
378 """Does the actual merge.
380 It runs all the steps in the right order and updates the user about steps
381 taken. Also it keeps track of rollback_steps to undo everything.
386 logging.info("Pre cluster verification")
387 self._VerifyCluster()
389 logging.info("Prepare authorized_keys")
390 rbsteps.append("Remove our key from authorized_keys on nodes:"
392 self._PrepareAuthorizedKeys()
394 rbsteps.append("Start all instances again on the merging"
395 " clusters: %(clusters)s")
396 logging.info("Stopping merging instances (takes a while)")
397 self._StopMergingInstances()
399 logging.info("Disable watcher")
400 self._DisableWatcher()
401 logging.info("Stop daemons on merging nodes")
403 logging.info("Merging config")
404 self._FetchRemoteConfig()
406 logging.info("Stopping master daemon")
407 self._KillMasterDaemon()
409 rbsteps.append("Restore %s from another master candidate"
410 " and restart master daemon" %
411 constants.CLUSTER_CONF_FILE)
413 self._StartMasterDaemon(no_vote=True)
415 # Point of no return, delete rbsteps
418 logging.warning("We are at the point of no return. Merge can not easily"
419 " be undone after this point.")
420 logging.info("Readd nodes")
421 self._ReaddMergedNodesAndRedist()
423 logging.info("Merge done, restart master daemon normally")
424 self._KillMasterDaemon()
425 self._StartMasterDaemon()
427 logging.info("Starting instances again")
428 self._StartupAllInstances()
429 logging.info("Post cluster verification")
430 self._VerifyCluster()
431 except errors.GenericError, e:
435 nodes = Flatten([data.nodes for data in self.merger_data])
437 "clusters": self.clusters,
440 logging.critical("In order to rollback do the following:")
442 logging.critical(" * %s", step % info)
444 logging.critical("Nothing to rollback.")
446 # TODO: Keep track of steps done for a flawless resume?
449 """Clean up our environment.
451 This cleans up remote private keys and configs and after that
452 deletes the temporary directory.
455 shutil.rmtree(self.work_dir)
458 def SetupLogging(options):
459 """Setting up logging infrastructure.
461 @param options: Parsed command line options
464 formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
466 stderr_handler = logging.StreamHandler()
467 stderr_handler.setFormatter(formatter)
469 stderr_handler.setLevel(logging.NOTSET)
470 elif options.verbose:
471 stderr_handler.setLevel(logging.INFO)
473 stderr_handler.setLevel(logging.ERROR)
475 root_logger = logging.getLogger("")
476 root_logger.setLevel(logging.NOTSET)
477 root_logger.addHandler(stderr_handler)
484 program = os.path.basename(sys.argv[0])
486 parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
487 " [--watcher-pause-period SECONDS]"
488 " <cluster> <cluster...>"),
490 parser.add_option(cli.DEBUG_OPT)
491 parser.add_option(cli.VERBOSE_OPT)
492 parser.add_option(PAUSE_PERIOD_OPT)
494 (options, args) = parser.parse_args()
496 SetupLogging(options)
499 parser.error("No clusters specified")
501 cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period)
504 cluster_merger.Setup()
505 cluster_merger.Merge()
506 except errors.GenericError, e:
508 return constants.EXIT_FAILURE
510 cluster_merger.Cleanup()
512 return constants.EXIT_SUCCESS
515 if __name__ == "__main__":