root / tools / ganeti-listrunner @ da5f09ef
History | View | Annotate | Download (18.7 kB)
1 |
#!/usr/bin/python |
---|---|
2 |
# |
3 |
|
4 |
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc. |
5 |
# |
6 |
# This program is free software; you can redistribute it and/or modify |
7 |
# it under the terms of the GNU General Public License as published by |
8 |
# the Free Software Foundation; either version 2 of the License, or |
9 |
# (at your option) any later version. |
10 |
# |
11 |
# This program is distributed in the hope that it will be useful, but |
12 |
# WITHOUT ANY WARRANTY; without even the implied warranty of |
13 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
14 |
# General Public License for more details. |
15 |
# |
16 |
# You should have received a copy of the GNU General Public License |
17 |
# along with this program; if not, write to the Free Software |
18 |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
19 |
# 02110-1301, USA. |
20 |
|
21 |
"""Run an executable on a list of hosts. |
22 |
|
23 |
Script to serially run an executable on a list of hosts via ssh |
24 |
with password auth as root. If the provided log dir does not yet |
25 |
exist, it will try to create it. |
26 |
|
27 |
Implementation: |
28 |
- the main process spawns up to batch_size children, which: |
29 |
- connects to the remote host via ssh as root |
30 |
- uploads the executable with a random name to /tmp via sftp |
31 |
- chmod 500s it |
32 |
- via ssh: chdirs into the upload directory and runs the script |
33 |
- deletes it |
34 |
- writes status messages and all output to one logfile per host |
35 |
- the main process gathers then the status of the children and |
36 |
reports the success/failure ratio |
37 |
- entire script can be aborted with Ctrl-C |
38 |
|
39 |
Security considerations: |
40 |
- the root password for the remote hosts is stored in memory for the |
41 |
runtime of the script |
42 |
- the executable to be run on the remote host is handled the following way: |
43 |
- try to create a random directory with permissions 700 on the |
44 |
remote host, abort furter processing on this host if this failes |
45 |
- upload the executable with to a random filename in that directory |
46 |
- set executable permissions to 500 |
47 |
- run the executable |
48 |
- delete the execuable and the directory on the remote host |
49 |
|
50 |
""" |
51 |
|
52 |
# pylint: disable=C0103 |
53 |
# C0103: Invalid name ganeti-listrunner |
54 |
|
55 |
import errno |
56 |
import optparse |
57 |
import getpass |
58 |
import logging |
59 |
import os |
60 |
import random |
61 |
import select |
62 |
import socket |
63 |
import sys |
64 |
import time |
65 |
import traceback |
66 |
|
67 |
try: |
68 |
import paramiko |
69 |
except ImportError: |
70 |
print >> sys.stderr, \ |
71 |
("The \"paramiko\" module could not be imported. Install it from your" |
72 |
" distribution's repository. The package is usually named" |
73 |
" \"python-paramiko\".") |
74 |
sys.exit(1) |
75 |
|
76 |
|
77 |
REMOTE_PATH_BASE = "/tmp/listrunner" |
78 |
|
79 |
USAGE = ("%prog -l logdir {-c command | -x /path/to/file} [-b batch_size]" |
80 |
" {-f hostfile|-h hosts} [-u username]" |
81 |
" [-p password_file | -A]") |
82 |
|
83 |
|
84 |
def LogDirUseable(logdir): |
85 |
"""Ensure log file directory is available and usable.""" |
86 |
testfile = "%s/test-%s-%s.deleteme" % (logdir, random.random(), |
87 |
random.random()) |
88 |
try: |
89 |
os.mkdir(logdir) |
90 |
except OSError, err: |
91 |
if err.errno != errno.EEXIST: |
92 |
raise |
93 |
try: |
94 |
logtest = open(testfile, "aw") |
95 |
logtest.writelines("log file writeability test\n") |
96 |
logtest.close() |
97 |
os.unlink(testfile) |
98 |
return True |
99 |
except (OSError, IOError): |
100 |
return False |
101 |
|
102 |
|
103 |
def GetTimeStamp(timestamp=None): |
104 |
"""Return ISO8601 timestamp. |
105 |
|
106 |
Returns ISO8601 timestamp, optionally expects a time.localtime() tuple |
107 |
in timestamp, but will use the current time if this argument is not |
108 |
supplied. |
109 |
""" |
110 |
if timestamp is None: |
111 |
timestamp = time.localtime() |
112 |
|
113 |
isotime = time.strftime("%Y-%m-%dT%H:%M:%S", timestamp) |
114 |
return isotime |
115 |
|
116 |
|
117 |
def PingByTcp(target, port, timeout=10, live_port_needed=False, source=None): |
118 |
"""Simple ping implementation using TCP connect(2). |
119 |
|
120 |
Try to do a TCP connect(2) from an optional source IP to the |
121 |
specified target IP and the specified target port. If the optional |
122 |
parameter live_port_needed is set to true, requires the remote end |
123 |
to accept the connection. The timeout is specified in seconds and |
124 |
defaults to 10 seconds. If the source optional argument is not |
125 |
passed, the source address selection is left to the kernel, |
126 |
otherwise we try to connect using the passed address (failures to |
127 |
bind other than EADDRNOTAVAIL will be ignored). |
128 |
|
129 |
""" |
130 |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
131 |
|
132 |
success = False |
133 |
|
134 |
if source is not None: |
135 |
try: |
136 |
sock.bind((source, 0)) |
137 |
except socket.error, (errcode): |
138 |
if errcode == errno.EADDRNOTAVAIL: |
139 |
success = False |
140 |
|
141 |
sock.settimeout(timeout) |
142 |
|
143 |
try: |
144 |
sock.connect((target, port)) |
145 |
sock.close() |
146 |
success = True |
147 |
except socket.timeout: |
148 |
success = False |
149 |
except socket.error, (errcode): |
150 |
success = (not live_port_needed) and (errcode == errno.ECONNREFUSED) |
151 |
|
152 |
return success |
153 |
|
154 |
|
155 |
def GetHosts(hostsfile): |
156 |
"""Return list of hosts from hostfile. |
157 |
|
158 |
Reads the hostslist file and returns a list of hosts. |
159 |
Expects the hostslist file to contain one hostname per line. |
160 |
|
161 |
""" |
162 |
try: |
163 |
datafile = open(hostsfile, "r") |
164 |
except IOError, msg: |
165 |
print "Failed to open hosts file %s: %s" % (hostsfile, msg) |
166 |
sys.exit(2) |
167 |
|
168 |
hosts = datafile.readlines() |
169 |
datafile.close() |
170 |
|
171 |
return hosts |
172 |
|
173 |
|
174 |
def WriteLog(message, logfile): |
175 |
"""Writes message, terminated by newline, to logfile.""" |
176 |
try: |
177 |
logfile = open(logfile, "aw") |
178 |
except IOError, msg: |
179 |
print "failed to open log file %s: %s" % (logfile, msg) |
180 |
print "log message was: %s" % message |
181 |
sys.exit(1) # no being able to log is critical |
182 |
try: |
183 |
timestamp = GetTimeStamp() |
184 |
logfile.writelines("%s %s\n" % (timestamp, message)) |
185 |
logfile.close() |
186 |
except IOError, msg: |
187 |
print "failed to write to logfile %s: %s" % (logfile, msg) |
188 |
print "log message was: %s" % message |
189 |
sys.exit(1) # no being able to log is critical |
190 |
|
191 |
|
192 |
def GetAgentKeys(): |
193 |
"""Tries to get a list of ssh keys from an agent.""" |
194 |
try: |
195 |
agent = paramiko.Agent() |
196 |
return list(agent.get_keys()) |
197 |
except paramiko.SSHException: |
198 |
return [] |
199 |
|
200 |
|
201 |
def SetupSshConnection(host, username, password, use_agent, logfile): |
202 |
"""Setup the ssh connection used for all later steps. |
203 |
|
204 |
This function sets up the ssh connection that will be used both |
205 |
for upload and remote command execution. |
206 |
|
207 |
On success, it will return paramiko.Transport object with an |
208 |
already logged in session. On failure, False will be returned. |
209 |
|
210 |
""" |
211 |
# check if target is willing to talk to us at all |
212 |
if not PingByTcp(host, 22, live_port_needed=True): |
213 |
WriteLog("ERROR: FAILURE_NOT_REACHABLE", logfile) |
214 |
print " - ERROR: host not reachable on 22/tcp" |
215 |
return False |
216 |
|
217 |
if use_agent: |
218 |
keys = GetAgentKeys() |
219 |
else: |
220 |
keys = [] |
221 |
all_kwargs = [{"pkey": k} for k in keys] |
222 |
all_desc = ["key %d" % d for d in range(len(keys))] |
223 |
if password is not None: |
224 |
all_kwargs.append({"password": password}) |
225 |
all_desc.append("password") |
226 |
|
227 |
# deal with logging out of paramiko.transport |
228 |
handler = None |
229 |
|
230 |
for desc, kwargs in zip(all_desc, all_kwargs): |
231 |
try: |
232 |
transport = paramiko.Transport((host, 22)) |
233 |
|
234 |
# only try to setup the logging handler once |
235 |
if not handler: |
236 |
handler = logging.StreamHandler() |
237 |
handler.setLevel(logging.ERROR) |
238 |
log = logging.getLogger(transport.get_log_channel()) |
239 |
log.addHandler(handler) |
240 |
|
241 |
transport.connect(username=username, **kwargs) # pylint: disable=W0142 |
242 |
WriteLog("ssh connection established using %s" % desc, logfile) |
243 |
# strange ... when establishing the session and the immediately |
244 |
# setting up the channels for sftp & shell from that, it sometimes |
245 |
# fails, but waiting 1 second after session setup makes it always work |
246 |
# time.sleep(1) |
247 |
# FIXME apparently needfull to give sshd some time |
248 |
return transport |
249 |
except (socket.gaierror, socket.error, paramiko.SSHException): |
250 |
continue |
251 |
|
252 |
methods = ", ".join(all_desc) |
253 |
WriteLog("ERROR: FAILURE_CONNECTION_SETUP (tried %s) " % methods, logfile) |
254 |
WriteLog("aborted", logfile) |
255 |
print " - ERROR: connection setup failed (tried %s)" % methods |
256 |
|
257 |
return False |
258 |
|
259 |
|
260 |
def UploadFiles(connection, executable, filelist, logfile): |
261 |
"""Uploads the specified files via sftp. |
262 |
|
263 |
Uploads the specified files to a random, freshly created directory with |
264 |
a temporary name under /tmp. All uploaded files are chmod 0400 after upload |
265 |
with the exception of executable, with is chmod 500. |
266 |
|
267 |
Upon success, returns the absolute path to the remote upload directory, |
268 |
but will return False upon failure. |
269 |
""" |
270 |
remote_dir = "%s.%s-%s" % (REMOTE_PATH_BASE, |
271 |
random.random(), random.random()) |
272 |
|
273 |
try: |
274 |
sftp = paramiko.SFTPClient.from_transport(connection) |
275 |
sftp.mkdir(remote_dir, mode=0700) |
276 |
for item in filelist: |
277 |
remote_file = "%s/%s" % (remote_dir, os.path.basename(item)) |
278 |
WriteLog("uploading %s to remote %s" % (item, remote_file), logfile) |
279 |
sftp.put(item, remote_file) |
280 |
if item == executable: |
281 |
sftp.chmod(remote_file, 0500) |
282 |
else: |
283 |
sftp.chmod(remote_file, 0400) |
284 |
sftp.close() |
285 |
except IOError, err: |
286 |
WriteLog("ERROR: FAILURE_UPLOAD: %s" % err, logfile) |
287 |
return False |
288 |
|
289 |
return remote_dir |
290 |
|
291 |
|
292 |
def CleanupRemoteDir(connection, upload_dir, filelist, logfile): |
293 |
"""Cleanes out and removes the remote work directory.""" |
294 |
try: |
295 |
sftp = paramiko.SFTPClient.from_transport(connection) |
296 |
for item in filelist: |
297 |
fullpath = "%s/%s" % (upload_dir, os.path.basename(item)) |
298 |
WriteLog("removing remote %s" % fullpath, logfile) |
299 |
sftp.remove(fullpath) |
300 |
sftp.rmdir(upload_dir) |
301 |
sftp.close() |
302 |
except IOError, err: |
303 |
WriteLog("ERROR: FAILURE_CLEANUP: %s" % err, logfile) |
304 |
return False |
305 |
|
306 |
return True |
307 |
|
308 |
|
309 |
def RunRemoteCommand(connection, command, logfile): |
310 |
"""Execute the command via ssh on the remote host.""" |
311 |
session = connection.open_session() |
312 |
session.setblocking(0) |
313 |
|
314 |
# the following dance is needed because paramiko changed APIs: |
315 |
# from returning True/False for success to always returning None |
316 |
# and throwing an exception in case of problems. |
317 |
# And I want to support both the old and the new API. |
318 |
result = True # being optimistic here, I know |
319 |
message = None |
320 |
try: |
321 |
if session.exec_command("%s 2>&1" % command) is False: |
322 |
result = False |
323 |
except paramiko.SSHException, message: |
324 |
result = False |
325 |
|
326 |
if not result: |
327 |
WriteLog("ERROR: FAILURE_COMMAND_EXECUTION: %s" % message, logfile) |
328 |
return False |
329 |
|
330 |
### Read when data is available |
331 |
output = "" |
332 |
while select.select([session], [], []): |
333 |
try: |
334 |
data = session.recv(1024) |
335 |
except socket.timeout, err: |
336 |
data = None |
337 |
WriteLog("FAILED: socket.timeout %s" % err, logfile) |
338 |
except socket.error, err: |
339 |
data = None |
340 |
WriteLog("FAILED: socket.error %s" % err, logfile) |
341 |
if not data: |
342 |
break |
343 |
output += data |
344 |
select.select([], [], [], .1) |
345 |
|
346 |
WriteLog("SUCCESS: command output follows", logfile) |
347 |
for line in output.splitlines(): |
348 |
WriteLog("output = %s" % line, logfile) |
349 |
WriteLog("command execution completed", logfile) |
350 |
session.close() |
351 |
|
352 |
return True |
353 |
|
354 |
|
355 |
def HostWorker(logdir, username, password, use_agent, hostname, |
356 |
executable, exec_args, command, filelist): |
357 |
"""Per-host worker. |
358 |
|
359 |
This function does not return - it's the main code of the childs, |
360 |
which exit at the end of this function. The exit code 0 or 1 will be |
361 |
interpreted by the parent. |
362 |
|
363 |
@param logdir: the directory where the logfiles must be created |
364 |
@param username: SSH username |
365 |
@param password: SSH password |
366 |
@param use_agent: whether we should instead use an agent |
367 |
@param hostname: the hostname to connect to |
368 |
@param executable: the executable to upload, if not None |
369 |
@param exec_args: Additional arguments for executable |
370 |
@param command: the command to run |
371 |
@param filelist: auxiliary files to upload |
372 |
|
373 |
""" |
374 |
# in the child/worker process |
375 |
logfile = "%s/%s.log" % (logdir, hostname) |
376 |
print "%s - starting" % hostname |
377 |
result = 0 # optimism, I know |
378 |
try: |
379 |
connection = SetupSshConnection(hostname, username, |
380 |
password, use_agent, logfile) |
381 |
if connection is not False: |
382 |
if executable is not None: |
383 |
print " %s: uploading files" % hostname |
384 |
upload_dir = UploadFiles(connection, executable, |
385 |
filelist, logfile) |
386 |
command = ("cd %s && ./%s" % |
387 |
(upload_dir, os.path.basename(executable))) |
388 |
if exec_args: |
389 |
command += " %s" % exec_args |
390 |
print " %s: executing remote command" % hostname |
391 |
cmd_result = RunRemoteCommand(connection, command, logfile) |
392 |
if cmd_result is True: |
393 |
print " %s: remote command execution successful" % hostname |
394 |
else: |
395 |
print (" %s: remote command execution failed," |
396 |
" check log for details" % hostname) |
397 |
result = 1 |
398 |
if executable is not None: |
399 |
print " %s: cleaning up remote work dir" % hostname |
400 |
cln_result = CleanupRemoteDir(connection, upload_dir, |
401 |
filelist, logfile) |
402 |
if cln_result is False: |
403 |
print (" %s: remote work dir cleanup failed, check" |
404 |
" log for details" % hostname) |
405 |
result = 1 |
406 |
connection.close() |
407 |
else: |
408 |
print " %s: connection setup failed, skipping" % hostname |
409 |
result = 1 |
410 |
except KeyboardInterrupt: |
411 |
print " %s: received KeyboardInterrupt, aborting" % hostname |
412 |
WriteLog("ERROR: ABORT_KEYBOARD_INTERRUPT", logfile) |
413 |
result = 1 |
414 |
except Exception, err: |
415 |
result = 1 |
416 |
trace = traceback.format_exc() |
417 |
msg = "ERROR: UNHANDLED_EXECPTION_ERROR: %s\nTrace: %s" % (err, trace) |
418 |
WriteLog(msg, logfile) |
419 |
print " %s: %s" % (hostname, msg) |
420 |
# and exit with exit code 0 or 1, so the parent can compute statistics |
421 |
sys.exit(result) |
422 |
|
423 |
|
424 |
def LaunchWorker(child_pids, logdir, username, password, use_agent, hostname, |
425 |
executable, exec_args, command, filelist): |
426 |
"""Launch the per-host worker. |
427 |
|
428 |
Arguments are the same as for HostWorker, except for child_pids, |
429 |
which is a dictionary holding the pid-to-hostname mapping. |
430 |
|
431 |
""" |
432 |
hostname = hostname.rstrip("\n") |
433 |
pid = os.fork() |
434 |
if pid > 0: |
435 |
# controller just record the pids |
436 |
child_pids[pid] = hostname |
437 |
else: |
438 |
HostWorker(logdir, username, password, use_agent, hostname, |
439 |
executable, exec_args, command, filelist) |
440 |
|
441 |
|
442 |
def ParseOptions(): |
443 |
"""Parses the command line options. |
444 |
|
445 |
In case of command line errors, it will show the usage and exit the |
446 |
program. |
447 |
|
448 |
@return: the options in a tuple |
449 |
|
450 |
""" |
451 |
# resolve because original used -h for hostfile, which conflicts |
452 |
# with -h for help |
453 |
parser = optparse.OptionParser(usage="\n%s" % USAGE, |
454 |
conflict_handler="resolve") |
455 |
|
456 |
parser.add_option("-l", dest="logdir", default=None, |
457 |
help="directory to write logfiles to") |
458 |
parser.add_option("-x", dest="executable", default=None, |
459 |
help="executable to run on remote host(s)",) |
460 |
parser.add_option("-f", dest="hostfile", default=None, |
461 |
help="hostlist file (one host per line)") |
462 |
parser.add_option("-h", dest="hostlist", default=None, metavar="HOSTS", |
463 |
help="comma-separated list of hosts or single hostname",) |
464 |
parser.add_option("-a", dest="auxfiles", action="append", default=[], |
465 |
help="optional auxiliary file to upload" |
466 |
" (can be given multiple times)", |
467 |
metavar="FILE") |
468 |
parser.add_option("-c", dest="command", default=None, |
469 |
help="shell command to run on remote host(s)") |
470 |
parser.add_option("-b", dest="batch_size", default=15, type="int", |
471 |
help="batch-size, how many hosts to process" |
472 |
" in parallel [15]") |
473 |
parser.add_option("-u", dest="username", default="root", |
474 |
help="username used to connect [root]") |
475 |
parser.add_option("-p", dest="password", default=None, |
476 |
help="password used to authenticate (when not" |
477 |
" using an agent)") |
478 |
parser.add_option("-A", dest="use_agent", default=False, action="store_true", |
479 |
help="instead of password, use keys from an SSH agent") |
480 |
parser.add_option("--args", dest="exec_args", default=None, |
481 |
help="Arguments to be passed to executable (-x)") |
482 |
|
483 |
opts, args = parser.parse_args() |
484 |
|
485 |
if opts.executable and opts.command: |
486 |
parser.error("Options -x and -c conflict with each other") |
487 |
if not (opts.executable or opts.command): |
488 |
parser.error("One of -x and -c must be given") |
489 |
if opts.command and opts.exec_args: |
490 |
parser.error("Can't specify arguments when using custom command") |
491 |
if not opts.logdir: |
492 |
parser.error("Option -l is required") |
493 |
if opts.hostfile and opts.hostlist: |
494 |
parser.error("Options -f and -h conflict with each other") |
495 |
if not (opts.hostfile or opts.hostlist): |
496 |
parser.error("One of -f or -h must be given") |
497 |
if args: |
498 |
parser.error("This program doesn't take any arguments, passed in: %s" % |
499 |
", ".join(args)) |
500 |
|
501 |
return (opts.logdir, opts.executable, opts.exec_args, |
502 |
opts.hostfile, opts.hostlist, |
503 |
opts.command, opts.use_agent, opts.auxfiles, opts.username, |
504 |
opts.password, opts.batch_size) |
505 |
|
506 |
|
507 |
def main(): |
508 |
"""main.""" |
509 |
(logdir, executable, exec_args, hostfile, hostlist, |
510 |
command, use_agent, auxfiles, username, |
511 |
password, batch_size) = ParseOptions() |
512 |
|
513 |
### Unbuffered sys.stdout |
514 |
sys.stdout = os.fdopen(1, "w", 0) |
515 |
|
516 |
if LogDirUseable(logdir) is False: |
517 |
print "ERROR: cannot create logfiles in dir %s, aborting" % logdir |
518 |
sys.exit(1) |
519 |
|
520 |
if use_agent: |
521 |
pass |
522 |
elif password: |
523 |
try: |
524 |
fh = file(password) |
525 |
pwvalue = fh.readline().strip() |
526 |
fh.close() |
527 |
except IOError, e: |
528 |
print "error: can not read in from password file %s: %s" % (password, e) |
529 |
sys.exit(1) |
530 |
password = pwvalue |
531 |
else: |
532 |
password = getpass.getpass("%s's password for all nodes: " % username) |
533 |
|
534 |
if hostfile: |
535 |
hosts = GetHosts(hostfile) |
536 |
else: |
537 |
if "," in hostlist: |
538 |
hostlist = hostlist.rstrip(",") # commandline robustness |
539 |
hosts = hostlist.split(",") |
540 |
else: |
541 |
hosts = [hostlist] |
542 |
|
543 |
successes = failures = 0 |
544 |
|
545 |
filelist = auxfiles[:] |
546 |
filelist.append(executable) |
547 |
|
548 |
# initial batch |
549 |
batch = hosts[:batch_size] |
550 |
hosts = hosts[batch_size:] |
551 |
child_pids = {} |
552 |
for hostname in batch: |
553 |
LaunchWorker(child_pids, logdir, username, password, use_agent, hostname, |
554 |
executable, exec_args, command, filelist) |
555 |
|
556 |
while child_pids: |
557 |
pid, status = os.wait() |
558 |
hostname = child_pids.pop(pid, "<unknown host>") |
559 |
print " %s: done (in parent)" % hostname |
560 |
if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0: |
561 |
successes += 1 |
562 |
else: |
563 |
failures += 1 |
564 |
if hosts: |
565 |
LaunchWorker(child_pids, logdir, username, password, use_agent, |
566 |
hosts.pop(0), executable, exec_args, command, filelist) |
567 |
|
568 |
|
569 |
print "All done, %s successful and %s failed hosts" % (successes, failures) |
570 |
|
571 |
sys.exit(0) |
572 |
|
573 |
|
574 |
if __name__ == "__main__": |
575 |
try: |
576 |
main() |
577 |
except KeyboardInterrupt: |
578 |
print "Received KeyboardInterrupt, aborting" |
579 |
sys.exit(1) |