root / tools / cluster-merge @ 3dc66ebc
History | View | Annotate | Download (16.6 kB)
1 |
#!/usr/bin/python |
---|---|
2 |
# |
3 |
|
4 |
# Copyright (C) 2010 Google Inc. |
5 |
# |
6 |
# This program is free software; you can redistribute it and/or modify |
7 |
# it under the terms of the GNU General Public License as published by |
8 |
# the Free Software Foundation; either version 2 of the License, or |
9 |
# (at your option) any later version. |
10 |
# |
11 |
# This program is distributed in the hope that it will be useful, but |
12 |
# WITHOUT ANY WARRANTY; without even the implied warranty of |
13 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
14 |
# General Public License for more details. |
15 |
# |
16 |
# You should have received a copy of the GNU General Public License |
17 |
# along with this program; if not, write to the Free Software |
18 |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
19 |
# 02110-1301, USA. |
20 |
|
21 |
"""Tool to merge two or more clusters together. |
22 |
|
23 |
The clusters have to run the same version of Ganeti! |
24 |
|
25 |
""" |
26 |
|
27 |
# pylint: disable-msg=C0103 |
28 |
# C0103: Invalid name cluster-merge |
29 |
|
30 |
import logging |
31 |
import os |
32 |
import optparse |
33 |
import shutil |
34 |
import sys |
35 |
import tempfile |
36 |
|
37 |
from ganeti import cli |
38 |
from ganeti import config |
39 |
from ganeti import constants |
40 |
from ganeti import errors |
41 |
from ganeti import ssh |
42 |
from ganeti import utils |
43 |
|
44 |
|
45 |
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800, |
46 |
action="store", type="int", |
47 |
dest="pause_period", |
48 |
help=("Amount of time in seconds watcher" |
49 |
" should be suspended from running")) |
50 |
|
51 |
|
52 |
def Flatten(unflatten_list): |
53 |
"""Flattens a list. |
54 |
|
55 |
@param unflatten_list: A list of unflatten list objects. |
56 |
@return: A flatten list |
57 |
|
58 |
""" |
59 |
flatten_list = [] |
60 |
|
61 |
for item in unflatten_list: |
62 |
if isinstance(item, list): |
63 |
flatten_list.extend(Flatten(item)) |
64 |
else: |
65 |
flatten_list.append(item) |
66 |
return flatten_list |
67 |
|
68 |
|
69 |
class MergerData(object): |
70 |
"""Container class to hold data used for merger. |
71 |
|
72 |
""" |
73 |
def __init__(self, cluster, key_path, nodes, instances, config_path=None): |
74 |
"""Initialize the container. |
75 |
|
76 |
@param cluster: The name of the cluster |
77 |
@param key_path: Path to the ssh private key used for authentication |
78 |
@param config_path: Path to the merging cluster config |
79 |
@param nodes: List of nodes in the merging cluster |
80 |
@param instances: List of instances running on merging cluster |
81 |
|
82 |
""" |
83 |
self.cluster = cluster |
84 |
self.key_path = key_path |
85 |
self.config_path = config_path |
86 |
self.instances = instances |
87 |
self.nodes = nodes |
88 |
|
89 |
|
90 |
class Merger(object): |
91 |
"""Handling the merge. |
92 |
|
93 |
""" |
94 |
def __init__(self, clusters, pause_period): |
95 |
"""Initialize object with sane defaults and infos required. |
96 |
|
97 |
@param clusters: The list of clusters to merge in |
98 |
@param pause_period: The time watcher shall be disabled for |
99 |
|
100 |
""" |
101 |
self.merger_data = [] |
102 |
self.clusters = clusters |
103 |
self.pause_period = pause_period |
104 |
self.work_dir = tempfile.mkdtemp(suffix="cluster-merger") |
105 |
self.cluster_name = cli.GetClient().QueryConfigValues(["cluster_name"]) |
106 |
self.ssh_runner = ssh.SshRunner(self.cluster_name) |
107 |
|
108 |
def Setup(self): |
109 |
"""Sets up our end so we can do the merger. |
110 |
|
111 |
This method is setting us up as a preparation for the merger. |
112 |
It makes the initial contact and gathers information needed. |
113 |
|
114 |
@raise errors.RemoteError: for errors in communication/grabbing |
115 |
|
116 |
""" |
117 |
(remote_path, _, _) = ssh.GetUserFiles("root") |
118 |
|
119 |
# Fetch remotes private key |
120 |
for cluster in self.clusters: |
121 |
result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False, |
122 |
ask_key=False) |
123 |
if result.failed: |
124 |
raise errors.RemoteError("There was an error while grabbing ssh private" |
125 |
" key from %s. Fail reason: %s; output: %s" % |
126 |
(cluster, result.fail_reason, result.output)) |
127 |
|
128 |
key_path = utils.PathJoin(self.work_dir, cluster) |
129 |
utils.WriteFile(key_path, mode=0600, data=result.stdout) |
130 |
|
131 |
result = self._RunCmd(cluster, "gnt-node list -o name --no-header", |
132 |
private_key=key_path) |
133 |
if result.failed: |
134 |
raise errors.RemoteError("Unable to retrieve list of nodes from %s." |
135 |
" Fail reason: %s; output: %s" % |
136 |
(cluster, result.fail_reason, result.output)) |
137 |
nodes = result.stdout.splitlines() |
138 |
|
139 |
result = self._RunCmd(cluster, "gnt-instance list -o name --no-header", |
140 |
private_key=key_path) |
141 |
if result.failed: |
142 |
raise errors.RemoteError("Unable to retrieve list of instances from" |
143 |
" %s. Fail reason: %s; output: %s" % |
144 |
(cluster, result.fail_reason, result.output)) |
145 |
instances = result.stdout.splitlines() |
146 |
|
147 |
self.merger_data.append(MergerData(cluster, key_path, nodes, instances)) |
148 |
|
149 |
def _PrepareAuthorizedKeys(self): |
150 |
"""Prepare the authorized_keys on every merging node. |
151 |
|
152 |
This method add our public key to remotes authorized_key for further |
153 |
communication. |
154 |
|
155 |
""" |
156 |
(_, pub_key_file, auth_keys) = ssh.GetUserFiles("root") |
157 |
pub_key = utils.ReadFile(pub_key_file) |
158 |
|
159 |
for data in self.merger_data: |
160 |
for node in data.nodes: |
161 |
result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" % |
162 |
(auth_keys, pub_key)), |
163 |
private_key=data.key_path) |
164 |
|
165 |
if result.failed: |
166 |
raise errors.RemoteError("Unable to add our public key to %s in %s." |
167 |
" Fail reason: %s; output: %s" % |
168 |
(node, data.cluster, result.fail_reason, |
169 |
result.output)) |
170 |
|
171 |
def _RunCmd(self, hostname, command, user="root", use_cluster_key=False, |
172 |
strict_host_check=False, private_key=None, batch=True, |
173 |
ask_key=False): |
174 |
"""Wrapping SshRunner.Run with default parameters. |
175 |
|
176 |
For explanation of parameters see L{ganeti.ssh.SshRunner.Run}. |
177 |
|
178 |
""" |
179 |
return self.ssh_runner.Run(hostname=hostname, command=command, user=user, |
180 |
use_cluster_key=use_cluster_key, |
181 |
strict_host_check=strict_host_check, |
182 |
private_key=private_key, batch=batch, |
183 |
ask_key=ask_key) |
184 |
|
185 |
def _StopMergingInstances(self): |
186 |
"""Stop instances on merging clusters. |
187 |
|
188 |
""" |
189 |
for cluster in self.clusters: |
190 |
result = self._RunCmd(cluster, "gnt-instance shutdown --all" |
191 |
" --force-multiple") |
192 |
|
193 |
if result.failed: |
194 |
raise errors.RemoteError("Unable to stop instances on %s." |
195 |
" Fail reason: %s; output: %s" % |
196 |
(cluster, result.fail_reason, result.output)) |
197 |
|
198 |
def _DisableWatcher(self): |
199 |
"""Disable watch on all merging clusters, including ourself. |
200 |
|
201 |
""" |
202 |
for cluster in ["localhost"] + self.clusters: |
203 |
result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" % |
204 |
self.pause_period) |
205 |
|
206 |
if result.failed: |
207 |
raise errors.RemoteError("Unable to pause watcher on %s." |
208 |
" Fail reason: %s; output: %s" % |
209 |
(cluster, result.fail_reason, result.output)) |
210 |
|
211 |
def _StopDaemons(self): |
212 |
"""Stop all daemons on merging nodes. |
213 |
|
214 |
""" |
215 |
cmd = "%s stop-all" % constants.DAEMON_UTIL |
216 |
for data in self.merger_data: |
217 |
for node in data.nodes: |
218 |
result = self._RunCmd(node, cmd) |
219 |
|
220 |
if result.failed: |
221 |
raise errors.RemoteError("Unable to stop daemons on %s." |
222 |
" Fail reason: %s; output: %s." % |
223 |
(node, result.fail_reason, result.output)) |
224 |
|
225 |
def _FetchRemoteConfig(self): |
226 |
"""Fetches and stores remote cluster config from the master. |
227 |
|
228 |
This step is needed before we can merge the config. |
229 |
|
230 |
""" |
231 |
for data in self.merger_data: |
232 |
result = self._RunCmd(data.cluster, "cat %s" % |
233 |
constants.CLUSTER_CONF_FILE) |
234 |
|
235 |
if result.failed: |
236 |
raise errors.RemoteError("Unable to retrieve remote config on %s." |
237 |
" Fail reason: %s; output %s" % |
238 |
(data.cluster, result.fail_reason, |
239 |
result.output)) |
240 |
|
241 |
data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" % |
242 |
data.cluster) |
243 |
utils.WriteFile(data.config_path, data=result.stdout) |
244 |
|
245 |
# R0201: Method could be a function |
246 |
def _KillMasterDaemon(self): # pylint: disable-msg=R0201 |
247 |
"""Kills the local master daemon. |
248 |
|
249 |
@raise errors.CommandError: If unable to kill |
250 |
|
251 |
""" |
252 |
result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"]) |
253 |
if result.failed: |
254 |
raise errors.CommandError("Unable to stop master daemons." |
255 |
" Fail reason: %s; output: %s" % |
256 |
(result.fail_reason, result.output)) |
257 |
|
258 |
def _MergeConfig(self): |
259 |
"""Merges all foreign config into our own config. |
260 |
|
261 |
""" |
262 |
my_config = config.ConfigWriter(offline=True) |
263 |
fake_ec_id = 0 # Needs to be uniq over the whole config merge |
264 |
|
265 |
for data in self.merger_data: |
266 |
other_config = config.ConfigWriter(data.config_path) |
267 |
|
268 |
for node in other_config.GetNodeList(): |
269 |
node_info = other_config.GetNodeInfo(node) |
270 |
node_info.master_candidate = False |
271 |
my_config.AddNode(node_info, str(fake_ec_id)) |
272 |
fake_ec_id += 1 |
273 |
|
274 |
for instance in other_config.GetInstanceList(): |
275 |
instance_info = other_config.GetInstanceInfo(instance) |
276 |
|
277 |
# Update the DRBD port assignments |
278 |
# This is a little bit hackish |
279 |
for dsk in instance_info.disks: |
280 |
if dsk.dev_type in constants.LDS_DRBD: |
281 |
port = my_config.AllocatePort() |
282 |
|
283 |
logical_id = list(dsk.logical_id) |
284 |
logical_id[2] = port |
285 |
dsk.logical_id = tuple(logical_id) |
286 |
|
287 |
physical_id = list(dsk.physical_id) |
288 |
physical_id[1] = physical_id[3] = port |
289 |
dsk.physical_id = tuple(physical_id) |
290 |
|
291 |
my_config.AddInstance(instance_info, str(fake_ec_id)) |
292 |
fake_ec_id += 1 |
293 |
|
294 |
# R0201: Method could be a function |
295 |
def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201 |
296 |
"""Starts the local master daemon. |
297 |
|
298 |
@param no_vote: Should the masterd started without voting? default: False |
299 |
@raise errors.CommandError: If unable to start daemon. |
300 |
|
301 |
""" |
302 |
env = {} |
303 |
if no_vote: |
304 |
env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it" |
305 |
|
306 |
result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env) |
307 |
if result.failed: |
308 |
raise errors.CommandError("Couldn't start ganeti master." |
309 |
" Fail reason: %s; output: %s" % |
310 |
(result.fail_reason, result.output)) |
311 |
|
312 |
def _ReaddMergedNodesAndRedist(self): |
313 |
"""Readds all merging nodes and make sure their config is up-to-date. |
314 |
|
315 |
@raise errors.CommandError: If anything fails. |
316 |
|
317 |
""" |
318 |
for data in self.merger_data: |
319 |
for node in data.nodes: |
320 |
result = utils.RunCmd(["gnt-node", "add", "--readd", |
321 |
"--no-ssh-key-check", node]) |
322 |
if result.failed: |
323 |
raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;" |
324 |
" output: %s" % (node, result.fail_reason, |
325 |
result.output)) |
326 |
|
327 |
result = utils.RunCmd(["gnt-cluster", "redist-conf"]) |
328 |
if result.failed: |
329 |
raise errors.CommandError("Redistribution failed. Fail reason: %s;" |
330 |
" output: %s" % (result.fail_reason, |
331 |
result.output)) |
332 |
|
333 |
# R0201: Method could be a function |
334 |
def _StartupAllInstances(self): # pylint: disable-msg=R0201 |
335 |
"""Starts up all instances (locally). |
336 |
|
337 |
@raise errors.CommandError: If unable to start clusters |
338 |
|
339 |
""" |
340 |
result = utils.RunCmd(["gnt-instance", "startup", "--all", |
341 |
"--force-multiple"]) |
342 |
if result.failed: |
343 |
raise errors.CommandError("Unable to start all instances." |
344 |
" Fail reason: %s; output: %s" % |
345 |
(result.fail_reason, result.output)) |
346 |
|
347 |
# R0201: Method could be a function |
348 |
def _VerifyCluster(self): # pylint: disable-msg=R0201 |
349 |
"""Runs gnt-cluster verify to verify the health. |
350 |
|
351 |
@raise errors.ProgrammError: If cluster fails on verification |
352 |
|
353 |
""" |
354 |
result = utils.RunCmd(["gnt-cluster", "verify"]) |
355 |
if result.failed: |
356 |
raise errors.CommandError("Verification of cluster failed." |
357 |
" Fail reason: %s; output: %s" % |
358 |
(result.fail_reason, result.output)) |
359 |
|
360 |
def Merge(self): |
361 |
"""Does the actual merge. |
362 |
|
363 |
It runs all the steps in the right order and updates the user about steps |
364 |
taken. Also it keeps track of rollback_steps to undo everything. |
365 |
|
366 |
""" |
367 |
rbsteps = [] |
368 |
try: |
369 |
logging.info("Pre cluster verification") |
370 |
self._VerifyCluster() |
371 |
|
372 |
logging.info("Prepare authorized_keys") |
373 |
rbsteps.append("Remove our key from authorized_keys on nodes:" |
374 |
" %(nodes)s") |
375 |
self._PrepareAuthorizedKeys() |
376 |
|
377 |
rbsteps.append("Start all instances again on the merging" |
378 |
" clusters: %(clusters)s") |
379 |
logging.info("Stopping merging instances (takes a while)") |
380 |
self._StopMergingInstances() |
381 |
|
382 |
logging.info("Disable watcher") |
383 |
self._DisableWatcher() |
384 |
logging.info("Stop daemons on merging nodes") |
385 |
self._StopDaemons() |
386 |
logging.info("Merging config") |
387 |
self._FetchRemoteConfig() |
388 |
|
389 |
def _OfflineClusterMerge(): |
390 |
"""Closure run when master daemons stopped |
391 |
|
392 |
""" |
393 |
rbsteps.append("Restore %s from another master candidate" % |
394 |
constants.CLUSTER_CONF_FILE) |
395 |
self._MergeConfig() |
396 |
self._StartMasterDaemon(no_vote=True) |
397 |
|
398 |
# Point of no return, delete rbsteps |
399 |
del rbsteps[:] |
400 |
|
401 |
logging.warning("We are at the point of no return. Merge can not easily" |
402 |
" be undone after this point.") |
403 |
logging.info("Readd nodes and redistribute config") |
404 |
self._ReaddMergedNodesAndRedist() |
405 |
self._KillMasterDaemon() |
406 |
|
407 |
cli.RunWhileClusterStopped(logging.info, _OfflineClusterMerge) |
408 |
|
409 |
logging.info("Starting instances again") |
410 |
self._StartupAllInstances() |
411 |
logging.info("Post cluster verification") |
412 |
self._VerifyCluster() |
413 |
except errors.GenericError, e: |
414 |
logging.exception(e) |
415 |
|
416 |
if rbsteps: |
417 |
nodes = Flatten([data.nodes for data in self.merger_data]) |
418 |
info = { |
419 |
"clusters": self.clusters, |
420 |
"nodes": nodes, |
421 |
} |
422 |
logging.critical("In order to rollback do the following:") |
423 |
for step in rbsteps: |
424 |
logging.critical(" * %s", step % info) |
425 |
else: |
426 |
logging.critical("Nothing to rollback.") |
427 |
|
428 |
# TODO: Keep track of steps done for a flawless resume? |
429 |
|
430 |
def Cleanup(self): |
431 |
"""Clean up our environment. |
432 |
|
433 |
This cleans up remote private keys and configs and after that |
434 |
deletes the temporary directory. |
435 |
|
436 |
""" |
437 |
shutil.rmtree(self.work_dir) |
438 |
|
439 |
|
440 |
def SetupLogging(options): |
441 |
"""Setting up logging infrastructure. |
442 |
|
443 |
@param options: Parsed command line options |
444 |
|
445 |
""" |
446 |
formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s") |
447 |
|
448 |
stderr_handler = logging.StreamHandler() |
449 |
stderr_handler.setFormatter(formatter) |
450 |
if options.debug: |
451 |
stderr_handler.setLevel(logging.NOTSET) |
452 |
elif options.verbose: |
453 |
stderr_handler.setLevel(logging.INFO) |
454 |
else: |
455 |
stderr_handler.setLevel(logging.ERROR) |
456 |
|
457 |
root_logger = logging.getLogger("") |
458 |
root_logger.setLevel(logging.NOTSET) |
459 |
root_logger.addHandler(stderr_handler) |
460 |
|
461 |
|
462 |
def main(): |
463 |
"""Main routine. |
464 |
|
465 |
""" |
466 |
program = os.path.basename(sys.argv[0]) |
467 |
|
468 |
parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]" |
469 |
" [--watcher-pause-period SECONDS]" |
470 |
" <cluster> <cluster...>"), |
471 |
prog=program) |
472 |
parser.add_option(cli.DEBUG_OPT) |
473 |
parser.add_option(cli.VERBOSE_OPT) |
474 |
parser.add_option(PAUSE_PERIOD_OPT) |
475 |
|
476 |
(options, args) = parser.parse_args() |
477 |
|
478 |
SetupLogging(options) |
479 |
|
480 |
if not args: |
481 |
parser.error("No clusters specified") |
482 |
|
483 |
cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period) |
484 |
try: |
485 |
try: |
486 |
cluster_merger.Setup() |
487 |
cluster_merger.Merge() |
488 |
except errors.GenericError, e: |
489 |
logging.exception(e) |
490 |
return constants.EXIT_FAILURE |
491 |
finally: |
492 |
cluster_merger.Cleanup() |
493 |
|
494 |
return constants.EXIT_SUCCESS |
495 |
|
496 |
|
497 |
if __name__ == "__main__": |
498 |
sys.exit(main()) |