4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Tool to merge two or more clusters together.
23 The clusters have to run the same version of Ganeti!
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
45 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
46 action="store", type="int",
48 help=("Amount of time in seconds watcher"
49 " should be suspended from running"))
52 def Flatten(unflatten_list):
55 @param unflatten_list: A list of unflatten list objects.
56 @return: A flatten list
61 for item in unflatten_list:
62 if isinstance(item, list):
63 flatten_list.extend(Flatten(item))
65 flatten_list.append(item)
69 class MergerData(object):
70 """Container class to hold data used for merger.
73 def __init__(self, cluster, key_path, nodes, instances, config_path=None):
74 """Initialize the container.
76 @param cluster: The name of the cluster
77 @param key_path: Path to the ssh private key used for authentication
78 @param config_path: Path to the merging cluster config
79 @param nodes: List of nodes in the merging cluster
80 @param instances: List of instances running on merging cluster
83 self.cluster = cluster
84 self.key_path = key_path
85 self.config_path = config_path
86 self.instances = instances
91 """Handling the merge.
94 def __init__(self, clusters, pause_period):
95 """Initialize object with sane defaults and infos required.
97 @param clusters: The list of clusters to merge in
98 @param pause_period: The time watcher shall be disabled for
101 self.merger_data = []
102 self.clusters = clusters
103 self.pause_period = pause_period
104 self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
105 self.cluster_name = cli.GetClient().QueryConfigValues(["cluster_name"])
106 self.ssh_runner = ssh.SshRunner(self.cluster_name)
109 """Sets up our end so we can do the merger.
111 This method is setting us up as a preparation for the merger.
112 It makes the initial contact and gathers information needed.
114 @raise errors.RemoteError: for errors in communication/grabbing
117 (remote_path, _, _) = ssh.GetUserFiles("root")
119 # Fetch remotes private key
120 for cluster in self.clusters:
121 result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
124 raise errors.RemoteError("There was an error while grabbing ssh private"
125 " key from %s. Fail reason: %s; output: %s" %
126 (cluster, result.fail_reason, result.output))
128 key_path = utils.PathJoin(self.work_dir, cluster)
129 utils.WriteFile(key_path, mode=0600, data=result.stdout)
131 result = self._RunCmd(cluster, "gnt-node list -o name --no-header",
132 private_key=key_path)
134 raise errors.RemoteError("Unable to retrieve list of nodes from %s."
135 " Fail reason: %s; output: %s" %
136 (cluster, result.fail_reason, result.output))
137 nodes = result.stdout.splitlines()
139 result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
140 private_key=key_path)
142 raise errors.RemoteError("Unable to retrieve list of instances from"
143 " %s. Fail reason: %s; output: %s" %
144 (cluster, result.fail_reason, result.output))
145 instances = result.stdout.splitlines()
147 self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
149 def _PrepareAuthorizedKeys(self):
150 """Prepare the authorized_keys on every merging node.
152 This method add our public key to remotes authorized_key for further
156 (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
157 pub_key = utils.ReadFile(pub_key_file)
159 for data in self.merger_data:
160 for node in data.nodes:
161 result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
162 (auth_keys, pub_key)),
163 private_key=data.key_path)
166 raise errors.RemoteError("Unable to add our public key to %s in %s."
167 " Fail reason: %s; output: %s" %
168 (node, data.cluster, result.fail_reason,
171 def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
172 strict_host_check=False, private_key=None, batch=True,
174 """Wrapping SshRunner.Run with default parameters.
176 For explanation of parameters see L{ssh.SshRunner.Run}.
179 return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
180 use_cluster_key=use_cluster_key,
181 strict_host_check=strict_host_check,
182 private_key=private_key, batch=batch,
185 def _StopMergingInstances(self):
186 """Stop instances on merging clusters.
189 for cluster in self.clusters:
190 result = self._RunCmd(cluster, "gnt-instance shutdown --all"
194 raise errors.RemoteError("Unable to stop instances on %s."
195 " Fail reason: %s; output: %s" %
196 (cluster, result.fail_reason, result.output))
198 def _DisableWatcher(self):
199 """Disable watch on all merging clusters, including ourself.
202 for cluster in ["localhost"] + self.clusters:
203 result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
207 raise errors.RemoteError("Unable to pause watcher on %s."
208 " Fail reason: %s; output: %s" %
209 (cluster, result.fail_reason, result.output))
212 # R0201: Method could be a function
213 def _EnableWatcher(self): # pylint: disable-msg=R0201
214 """Reenable watcher (locally).
217 result = utils.RunCmd(["gnt-cluster", "watcher", "continue"])
220 logging.warning("Unable to continue watcher. Fail reason: %s;"
221 " output: %s", result.fail_reason, result.output)
223 def _StopDaemons(self):
224 """Stop all daemons on merging nodes.
227 # FIXME: Worth to put this into constants?
229 for daemon in (constants.RAPI, constants.MASTERD,
230 constants.NODED, constants.CONFD):
231 cmds.append("%s stop %s" % (constants.DAEMON_UTIL, daemon))
232 for data in self.merger_data:
233 for node in data.nodes:
234 result = self._RunCmd(node, " && ".join(cmds))
237 raise errors.RemoteError("Unable to stop daemons on %s."
238 " Fail reason: %s; output: %s." %
239 (node, result.fail_reason, result.output))
241 def _FetchRemoteConfig(self):
242 """Fetches and stores remote cluster config from the master.
244 This step is needed before we can merge the config.
247 for data in self.merger_data:
248 result = self._RunCmd(data.cluster, "cat %s" %
249 constants.CLUSTER_CONF_FILE)
252 raise errors.RemoteError("Unable to retrieve remote config on %s."
253 " Fail reason: %s; output %s" %
254 (data.cluster, result.fail_reason,
257 data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
259 utils.WriteFile(data.config_path, data=result.stdout)
261 # R0201: Method could be a function
262 def _KillMasterDaemon(self): # pylint: disable-msg=R0201
263 """Kills the local master daemon.
265 @raise errors.CommandError: If unable to kill
268 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
270 raise errors.CommandError("Unable to stop master daemons."
271 " Fail reason: %s; output: %s" %
272 (result.fail_reason, result.output))
274 def _MergeConfig(self):
275 """Merges all foreign config into our own config.
278 my_config = config.ConfigWriter(offline=True)
279 fake_ec_id = 0 # Needs to be uniq over the whole config merge
281 for data in self.merger_data:
282 other_config = config.ConfigWriter(data.config_path)
284 for node in other_config.GetNodeList():
285 node_info = other_config.GetNodeInfo(node)
286 node_info.master_candidate = False
287 my_config.AddNode(node_info, str(fake_ec_id))
290 for instance in other_config.GetInstanceList():
291 instance_info = other_config.GetInstanceInfo(instance)
293 # Update the DRBD port assignments
294 # This is a little bit hackish
295 for dsk in instance_info.disks:
296 if dsk.dev_type in constants.LDS_DRBD:
297 port = my_config.AllocatePort()
299 logical_id = list(dsk.logical_id)
301 dsk.logical_id = tuple(logical_id)
303 physical_id = list(dsk.physical_id)
304 physical_id[1] = physical_id[3] = port
305 dsk.physical_id = tuple(physical_id)
307 my_config.AddInstance(instance_info, str(fake_ec_id))
310 # R0201: Method could be a function
311 def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
312 """Starts the local master daemon.
314 @param no_vote: Should the masterd started without voting? default: False
315 @raise errors.CommandError: If unable to start daemon.
320 env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
322 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
324 raise errors.CommandError("Couldn't start ganeti master."
325 " Fail reason: %s; output: %s" %
326 (result.fail_reason, result.output))
328 def _ReaddMergedNodesAndRedist(self):
329 """Readds all merging nodes and make sure their config is up-to-date.
331 @raise errors.CommandError: If anything fails.
334 for data in self.merger_data:
335 for node in data.nodes:
336 result = utils.RunCmd(["gnt-node", "add", "--readd",
337 "--no-ssh-key-check", node])
339 raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
340 " output: %s" % (node, result.fail_reason,
343 result = utils.RunCmd(["gnt-cluster", "redist-conf"])
345 raise errors.CommandError("Redistribution failed. Fail reason: %s;"
346 " output: %s" % (result.fail_reason,
349 # R0201: Method could be a function
350 def _StartupAllInstances(self): # pylint: disable-msg=R0201
351 """Starts up all instances (locally).
353 @raise errors.CommandError: If unable to start clusters
356 result = utils.RunCmd(["gnt-instance", "startup", "--all",
359 raise errors.CommandError("Unable to start all instances."
360 " Fail reason: %s; output: %s" %
361 (result.fail_reason, result.output))
363 # R0201: Method could be a function
364 def _VerifyCluster(self): # pylint: disable-msg=R0201
365 """Runs gnt-cluster verify to verify the health.
367 @raise errors.ProgrammError: If cluster fails on verification
370 result = utils.RunCmd(["gnt-cluster", "verify"])
372 raise errors.CommandError("Verification of cluster failed."
373 " Fail reason: %s; output: %s" %
374 (result.fail_reason, result.output))
377 """Does the actual merge.
379 It runs all the steps in the right order and updates the user about steps
380 taken. Also it keeps track of rollback_steps to undo everything.
385 logging.info("Pre cluster verification")
386 self._VerifyCluster()
388 logging.info("Prepare authorized_keys")
389 rbsteps.append("Remove our key from authorized_keys on nodes:"
391 self._PrepareAuthorizedKeys()
393 rbsteps.append("Start all instances again on the merging"
394 " clusters: %(clusters)s")
395 logging.info("Stopping merging instances (takes a while)")
396 self._StopMergingInstances()
398 logging.info("Disable watcher")
399 self._DisableWatcher()
400 logging.info("Stop daemons on merging nodes")
402 logging.info("Merging config")
403 self._FetchRemoteConfig()
404 self._KillMasterDaemon()
406 rbsteps.append("Restore %s from another master candidate" %
407 constants.CLUSTER_CONF_FILE)
409 self._StartMasterDaemon(no_vote=True)
411 # Point of no return, delete rbsteps
414 logging.warning("We are at the point of no return. Merge can not easily"
415 " be undone after this point.")
416 logging.info("Readd nodes and redistribute config")
417 self._ReaddMergedNodesAndRedist()
418 self._KillMasterDaemon()
419 self._StartMasterDaemon()
420 logging.info("Starting instances again")
421 self._StartupAllInstances()
422 logging.info("Post cluster verification")
423 self._VerifyCluster()
424 except errors.GenericError, e:
428 nodes = Flatten([data.nodes for data in self.merger_data])
430 "clusters": self.clusters,
433 logging.critical("In order to rollback do the following:")
435 logging.critical(" * %s", step % info)
437 logging.critical("Nothing to rollback.")
439 # TODO: Keep track of steps done for a flawless resume?
442 """Clean up our environment.
444 This cleans up remote private keys and configs and after that
445 deletes the temporary directory.
448 shutil.rmtree(self.work_dir)
451 def SetupLogging(options):
452 """Setting up logging infrastructure.
454 @param options: Parsed command line options
457 formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
459 stderr_handler = logging.StreamHandler()
460 stderr_handler.setFormatter(formatter)
462 stderr_handler.setLevel(logging.NOTSET)
463 elif options.verbose:
464 stderr_handler.setLevel(logging.INFO)
466 stderr_handler.setLevel(logging.ERROR)
468 root_logger = logging.getLogger("")
469 root_logger.setLevel(logging.NOTSET)
470 root_logger.addHandler(stderr_handler)
477 program = os.path.basename(sys.argv[0])
479 parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
480 " [--watcher-pause-period SECONDS]"
481 " <cluster> <cluster...>"),
483 parser.add_option(cli.DEBUG_OPT)
484 parser.add_option(cli.VERBOSE_OPT)
485 parser.add_option(PAUSE_PERIOD_OPT)
487 (options, args) = parser.parse_args()
489 SetupLogging(options)
492 parser.error("No clusters specified")
494 cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period)
497 cluster_merger.Setup()
498 cluster_merger.Merge()
499 except errors.GenericError, e:
501 return constants.EXIT_FAILURE
503 cluster_merger.Cleanup()
505 return constants.EXIT_SUCCESS
508 if __name__ == "__main__":