4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Run an executable on a list of hosts.
23 Script to serially run an executable on a list of hosts via ssh
24 with password auth as root. If the provided log dir does not yet
25 exist, it will try to create it.
28 - the main process spawns up to batch_size children, which:
29 - connects to the remote host via ssh as root
30 - uploads the executable with a random name to /tmp via sftp
32 - via ssh: chdirs into the upload directory and runs the script
34 - writes status messages and all output to one logfile per host
35 - the main process gathers then the status of the children and
36 reports the success/failure ratio
37 - entire script can be aborted with Ctrl-C
39 Security considerations:
40 - the root password for the remote hosts is stored in memory for the
42 - the executable to be run on the remote host is handled the following way:
43 - try to create a random directory with permissions 700 on the
44 remote host, abort furter processing on this host if this failes
45 - upload the executable with to a random filename in that directory
46 - set executable permissions to 500
48 - delete the execuable and the directory on the remote host
52 # pylint: disable=C0103
53 # C0103: Invalid name ganeti-listrunner
70 print >> sys.stderr, \
71 ("The \"paramiko\" module could not be imported. Install it from your"
72 " distribution's repository. The package is usually named"
73 " \"python-paramiko\".")
77 REMOTE_PATH_BASE = "/tmp/listrunner"
79 USAGE = ("%prog -l logdir {-c command | -x /path/to/file} [-b batch_size]"
80 " {-f hostfile|-h hosts} [-u username]"
81 " [-p password_file | -A]")
84 def LogDirUseable(logdir):
85 """Ensure log file directory is available and usable."""
86 testfile = "%s/test-%s-%s.deleteme" % (logdir, random.random(),
91 if err.errno != errno.EEXIST:
94 logtest = open(testfile, "aw")
95 logtest.writelines("log file writeability test\n")
99 except (OSError, IOError):
103 def GetTimeStamp(timestamp=None):
104 """Return ISO8601 timestamp.
106 Returns ISO8601 timestamp, optionally expects a time.localtime() tuple
107 in timestamp, but will use the current time if this argument is not
110 if timestamp is None:
111 timestamp = time.localtime()
113 isotime = time.strftime("%Y-%m-%dT%H:%M:%S", timestamp)
117 def PingByTcp(target, port, timeout=10, live_port_needed=False, source=None):
118 """Simple ping implementation using TCP connect(2).
120 Try to do a TCP connect(2) from an optional source IP to the
121 specified target IP and the specified target port. If the optional
122 parameter live_port_needed is set to true, requires the remote end
123 to accept the connection. The timeout is specified in seconds and
124 defaults to 10 seconds. If the source optional argument is not
125 passed, the source address selection is left to the kernel,
126 otherwise we try to connect using the passed address (failures to
127 bind other than EADDRNOTAVAIL will be ignored).
130 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
134 if source is not None:
136 sock.bind((source, 0))
137 except socket.error, (errcode):
138 if errcode == errno.EADDRNOTAVAIL:
141 sock.settimeout(timeout)
144 sock.connect((target, port))
147 except socket.timeout:
149 except socket.error, (errcode):
150 success = (not live_port_needed) and (errcode == errno.ECONNREFUSED)
155 def GetHosts(hostsfile):
156 """Return list of hosts from hostfile.
158 Reads the hostslist file and returns a list of hosts.
159 Expects the hostslist file to contain one hostname per line.
163 datafile = open(hostsfile, "r")
165 print "Failed to open hosts file %s: %s" % (hostsfile, msg)
168 hosts = datafile.readlines()
174 def WriteLog(message, logfile):
175 """Writes message, terminated by newline, to logfile."""
177 logfile = open(logfile, "aw")
179 print "failed to open log file %s: %s" % (logfile, msg)
180 print "log message was: %s" % message
181 sys.exit(1) # no being able to log is critical
183 timestamp = GetTimeStamp()
184 logfile.writelines("%s %s\n" % (timestamp, message))
187 print "failed to write to logfile %s: %s" % (logfile, msg)
188 print "log message was: %s" % message
189 sys.exit(1) # no being able to log is critical
193 """Tries to get a list of ssh keys from an agent."""
195 agent = paramiko.Agent()
196 return list(agent.get_keys())
197 except paramiko.SSHException:
201 def SetupSshConnection(host, username, password, use_agent, logfile):
202 """Setup the ssh connection used for all later steps.
204 This function sets up the ssh connection that will be used both
205 for upload and remote command execution.
207 On success, it will return paramiko.Transport object with an
208 already logged in session. On failure, False will be returned.
211 # check if target is willing to talk to us at all
212 if not PingByTcp(host, 22, live_port_needed=True):
213 WriteLog("ERROR: FAILURE_NOT_REACHABLE", logfile)
214 print " - ERROR: host not reachable on 22/tcp"
218 keys = GetAgentKeys()
221 all_kwargs = [{"pkey": k} for k in keys]
222 all_desc = ["key %d" % d for d in range(len(keys))]
223 if password is not None:
224 all_kwargs.append({"password": password})
225 all_desc.append("password")
227 # deal with logging out of paramiko.transport
230 for desc, kwargs in zip(all_desc, all_kwargs):
232 transport = paramiko.Transport((host, 22))
234 # only try to setup the logging handler once
236 handler = logging.StreamHandler()
237 handler.setLevel(logging.ERROR)
238 log = logging.getLogger(transport.get_log_channel())
239 log.addHandler(handler)
241 transport.connect(username=username, **kwargs) # pylint: disable=W0142
242 WriteLog("ssh connection established using %s" % desc, logfile)
243 # strange ... when establishing the session and the immediately
244 # setting up the channels for sftp & shell from that, it sometimes
245 # fails, but waiting 1 second after session setup makes it always work
247 # FIXME apparently needfull to give sshd some time
249 except (socket.gaierror, socket.error, paramiko.SSHException):
252 methods = ", ".join(all_desc)
253 WriteLog("ERROR: FAILURE_CONNECTION_SETUP (tried %s) " % methods, logfile)
254 WriteLog("aborted", logfile)
255 print " - ERROR: connection setup failed (tried %s)" % methods
260 def UploadFiles(connection, executable, filelist, logfile):
261 """Uploads the specified files via sftp.
263 Uploads the specified files to a random, freshly created directory with
264 a temporary name under /tmp. All uploaded files are chmod 0400 after upload
265 with the exception of executable, with is chmod 500.
267 Upon success, returns the absolute path to the remote upload directory,
268 but will return False upon failure.
270 remote_dir = "%s.%s-%s" % (REMOTE_PATH_BASE,
271 random.random(), random.random())
274 sftp = paramiko.SFTPClient.from_transport(connection)
275 sftp.mkdir(remote_dir, mode=0700)
276 for item in filelist:
277 remote_file = "%s/%s" % (remote_dir, os.path.basename(item))
278 WriteLog("uploading %s to remote %s" % (item, remote_file), logfile)
279 sftp.put(item, remote_file)
280 if item == executable:
281 sftp.chmod(remote_file, 0500)
283 sftp.chmod(remote_file, 0400)
286 WriteLog("ERROR: FAILURE_UPLOAD: %s" % err, logfile)
292 def CleanupRemoteDir(connection, upload_dir, filelist, logfile):
293 """Cleanes out and removes the remote work directory."""
295 sftp = paramiko.SFTPClient.from_transport(connection)
296 for item in filelist:
297 fullpath = "%s/%s" % (upload_dir, os.path.basename(item))
298 WriteLog("removing remote %s" % fullpath, logfile)
299 sftp.remove(fullpath)
300 sftp.rmdir(upload_dir)
303 WriteLog("ERROR: FAILURE_CLEANUP: %s" % err, logfile)
309 def RunRemoteCommand(connection, command, logfile):
310 """Execute the command via ssh on the remote host."""
311 session = connection.open_session()
312 session.setblocking(0)
314 # the following dance is needed because paramiko changed APIs:
315 # from returning True/False for success to always returning None
316 # and throwing an exception in case of problems.
317 # And I want to support both the old and the new API.
318 result = True # being optimistic here, I know
321 if session.exec_command("%s 2>&1" % command) is False:
323 except paramiko.SSHException, message:
327 WriteLog("ERROR: FAILURE_COMMAND_EXECUTION: %s" % message, logfile)
330 ### Read when data is available
332 while select.select([session], [], []):
334 data = session.recv(1024)
335 except socket.timeout, err:
337 WriteLog("FAILED: socket.timeout %s" % err, logfile)
338 except socket.error, err:
340 WriteLog("FAILED: socket.error %s" % err, logfile)
344 select.select([], [], [], .1)
346 WriteLog("SUCCESS: command output follows", logfile)
347 for line in output.splitlines():
348 WriteLog("output = %s" % line, logfile)
349 WriteLog("command execution completed", logfile)
355 def HostWorker(logdir, username, password, use_agent, hostname,
356 executable, exec_args, command, filelist):
359 This function does not return - it's the main code of the childs,
360 which exit at the end of this function. The exit code 0 or 1 will be
361 interpreted by the parent.
363 @param logdir: the directory where the logfiles must be created
364 @param username: SSH username
365 @param password: SSH password
366 @param use_agent: whether we should instead use an agent
367 @param hostname: the hostname to connect to
368 @param executable: the executable to upload, if not None
369 @param exec_args: Additional arguments for executable
370 @param command: the command to run
371 @param filelist: auxiliary files to upload
374 # in the child/worker process
375 logfile = "%s/%s.log" % (logdir, hostname)
376 print "%s - starting" % hostname
377 result = 0 # optimism, I know
379 connection = SetupSshConnection(hostname, username,
380 password, use_agent, logfile)
381 if connection is not False:
382 if executable is not None:
383 print " %s: uploading files" % hostname
384 upload_dir = UploadFiles(connection, executable,
386 command = ("cd %s && ./%s" %
387 (upload_dir, os.path.basename(executable)))
389 command += " %s" % exec_args
390 print " %s: executing remote command" % hostname
391 cmd_result = RunRemoteCommand(connection, command, logfile)
392 if cmd_result is True:
393 print " %s: remote command execution successful" % hostname
395 print (" %s: remote command execution failed,"
396 " check log for details" % hostname)
398 if executable is not None:
399 print " %s: cleaning up remote work dir" % hostname
400 cln_result = CleanupRemoteDir(connection, upload_dir,
402 if cln_result is False:
403 print (" %s: remote work dir cleanup failed, check"
404 " log for details" % hostname)
408 print " %s: connection setup failed, skipping" % hostname
410 except KeyboardInterrupt:
411 print " %s: received KeyboardInterrupt, aborting" % hostname
412 WriteLog("ERROR: ABORT_KEYBOARD_INTERRUPT", logfile)
414 except Exception, err:
416 trace = traceback.format_exc()
417 msg = "ERROR: UNHANDLED_EXECPTION_ERROR: %s\nTrace: %s" % (err, trace)
418 WriteLog(msg, logfile)
419 print " %s: %s" % (hostname, msg)
420 # and exit with exit code 0 or 1, so the parent can compute statistics
424 def LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
425 executable, exec_args, command, filelist):
426 """Launch the per-host worker.
428 Arguments are the same as for HostWorker, except for child_pids,
429 which is a dictionary holding the pid-to-hostname mapping.
432 hostname = hostname.rstrip("\n")
435 # controller just record the pids
436 child_pids[pid] = hostname
438 HostWorker(logdir, username, password, use_agent, hostname,
439 executable, exec_args, command, filelist)
443 """Parses the command line options.
445 In case of command line errors, it will show the usage and exit the
448 @return: the options in a tuple
451 # resolve because original used -h for hostfile, which conflicts
453 parser = optparse.OptionParser(usage="\n%s" % USAGE,
454 conflict_handler="resolve")
456 parser.add_option("-l", dest="logdir", default=None,
457 help="directory to write logfiles to")
458 parser.add_option("-x", dest="executable", default=None,
459 help="executable to run on remote host(s)",)
460 parser.add_option("-f", dest="hostfile", default=None,
461 help="hostlist file (one host per line)")
462 parser.add_option("-h", dest="hostlist", default=None, metavar="HOSTS",
463 help="comma-separated list of hosts or single hostname",)
464 parser.add_option("-a", dest="auxfiles", action="append", default=[],
465 help="optional auxiliary file to upload"
466 " (can be given multiple times)",
468 parser.add_option("-c", dest="command", default=None,
469 help="shell command to run on remote host(s)")
470 parser.add_option("-b", dest="batch_size", default=15, type="int",
471 help="batch-size, how many hosts to process"
473 parser.add_option("-u", dest="username", default="root",
474 help="username used to connect [root]")
475 parser.add_option("-p", dest="password", default=None,
476 help="password used to authenticate (when not"
478 parser.add_option("-A", dest="use_agent", default=False, action="store_true",
479 help="instead of password, use keys from an SSH agent")
480 parser.add_option("--args", dest="exec_args", default=None,
481 help="Arguments to be passed to executable (-x)")
483 opts, args = parser.parse_args()
485 if opts.executable and opts.command:
486 parser.error("Options -x and -c conflict with each other")
487 if not (opts.executable or opts.command):
488 parser.error("One of -x and -c must be given")
489 if opts.command and opts.exec_args:
490 parser.error("Can't specify arguments when using custom command")
492 parser.error("Option -l is required")
493 if opts.hostfile and opts.hostlist:
494 parser.error("Options -f and -h conflict with each other")
495 if not (opts.hostfile or opts.hostlist):
496 parser.error("One of -f or -h must be given")
498 parser.error("This program doesn't take any arguments, passed in: %s" %
501 return (opts.logdir, opts.executable, opts.exec_args,
502 opts.hostfile, opts.hostlist,
503 opts.command, opts.use_agent, opts.auxfiles, opts.username,
504 opts.password, opts.batch_size)
509 (logdir, executable, exec_args, hostfile, hostlist,
510 command, use_agent, auxfiles, username,
511 password, batch_size) = ParseOptions()
513 ### Unbuffered sys.stdout
514 sys.stdout = os.fdopen(1, "w", 0)
516 if LogDirUseable(logdir) is False:
517 print "ERROR: cannot create logfiles in dir %s, aborting" % logdir
525 pwvalue = fh.readline().strip()
528 print "error: can not read in from password file %s: %s" % (password, e)
532 password = getpass.getpass("%s's password for all nodes: " % username)
535 hosts = GetHosts(hostfile)
538 hostlist = hostlist.rstrip(",") # commandline robustness
539 hosts = hostlist.split(",")
543 successes = failures = 0
545 filelist = auxfiles[:]
546 filelist.append(executable)
549 batch = hosts[:batch_size]
550 hosts = hosts[batch_size:]
552 for hostname in batch:
553 LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
554 executable, exec_args, command, filelist)
557 pid, status = os.wait()
558 hostname = child_pids.pop(pid, "<unknown host>")
559 print " %s: done (in parent)" % hostname
560 if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0:
565 LaunchWorker(child_pids, logdir, username, password, use_agent,
566 hosts.pop(0), executable, exec_args, command, filelist)
569 print "All done, %s successful and %s failed hosts" % (successes, failures)
574 if __name__ == "__main__":
577 except KeyboardInterrupt:
578 print "Received KeyboardInterrupt, aborting"