4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Run an executable on a list of hosts.
23 Script to serially run an executable on a list of hosts via ssh
24 with password auth as root. If the provided log dir does not yet
25 exist, it will try to create it.
28 - the main process spawns up to batch_size children, which:
29 - connects to the remote host via ssh as root
30 - uploads the executable with a random name to /tmp via sftp
32 - via ssh: chdirs into the upload directory and runs the script
34 - writes status messages and all output to one logfile per host
35 - the main process gathers then the status of the children and
36 reports the success/failure ratio
37 - entire script can be aborted with Ctrl-C
39 Security considerations:
40 - the root password for the remote hosts is stored in memory for the
42 - the executable to be run on the remote host is handled the following way:
43 - try to create a random directory with permissions 700 on the
44 remote host, abort furter processing on this host if this failes
45 - upload the executable with to a random filename in that directory
46 - set executable permissions to 500
48 - delete the execuable and the directory on the remote host
52 # pylint: disable-msg=C0103
53 # C0103: Invalid name ganeti-listrunner
70 REMOTE_PATH_BASE = "/tmp/listrunner"
72 USAGE = ("%prog -l logdir {-c command | -x /path/to/file} [-b batch_size]"
73 " {-f hostfile|-h hosts} [-u username]"
74 " [-p password_file | -A]")
77 def LogDirUseable(logdir):
78 """Ensure log file directory is available and usable."""
79 testfile = "%s/test-%s-%s.deleteme" % (logdir, random.random(),
84 if err.errno != errno.EEXIST:
87 logtest = open(testfile, "aw")
88 logtest.writelines("log file writeability test\n")
92 except (OSError, IOError):
96 def GetTimeStamp(timestamp=None):
97 """Return ISO8601 timestamp.
99 Returns ISO8601 timestamp, optionally expects a time.localtime() tuple
100 in timestamp, but will use the current time if this argument is not
103 if timestamp is None:
104 timestamp = time.localtime()
106 isotime = time.strftime("%Y-%m-%dT%H:%M:%S", timestamp)
110 def PingByTcp(target, port, timeout=10, live_port_needed=False, source=None):
111 """Simple ping implementation using TCP connect(2).
113 Try to do a TCP connect(2) from an optional source IP to the
114 specified target IP and the specified target port. If the optional
115 parameter live_port_needed is set to true, requires the remote end
116 to accept the connection. The timeout is specified in seconds and
117 defaults to 10 seconds. If the source optional argument is not
118 passed, the source address selection is left to the kernel,
119 otherwise we try to connect using the passed address (failures to
120 bind other than EADDRNOTAVAIL will be ignored).
123 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
127 if source is not None:
129 sock.bind((source, 0))
130 except socket.error, (errcode):
131 if errcode == errno.EADDRNOTAVAIL:
134 sock.settimeout(timeout)
137 sock.connect((target, port))
140 except socket.timeout:
142 except socket.error, (errcode):
143 success = (not live_port_needed) and (errcode == errno.ECONNREFUSED)
148 def GetHosts(hostsfile):
149 """Return list of hosts from hostfile.
151 Reads the hostslist file and returns a list of hosts.
152 Expects the hostslist file to contain one hostname per line.
156 datafile = open(hostsfile, "r")
158 print "Failed to open hosts file %s: %s" % (hostsfile, msg)
161 hosts = datafile.readlines()
167 def WriteLog(message, logfile):
168 """Writes message, terminated by newline, to logfile."""
170 logfile = open(logfile, "aw")
172 print "failed to open log file %s: %s" % (logfile, msg)
173 print "log message was: %s" % message
174 sys.exit(1) # no being able to log is critical
176 timestamp = GetTimeStamp()
177 logfile.writelines("%s %s\n" % (timestamp, message))
180 print "failed to write to logfile %s: %s" % (logfile, msg)
181 print "log message was: %s" % message
182 sys.exit(1) # no being able to log is critical
186 """Tries to get a list of ssh keys from an agent."""
188 agent = paramiko.Agent()
189 return list(agent.get_keys())
190 except paramiko.SSHException:
194 def SetupSshConnection(host, username, password, use_agent, logfile):
195 """Setup the ssh connection used for all later steps.
197 This function sets up the ssh connection that will be used both
198 for upload and remote command execution.
200 On success, it will return paramiko.Transport object with an
201 already logged in session. On failure, False will be returned.
204 # check if target is willing to talk to us at all
205 if not PingByTcp(host, 22, live_port_needed=True):
206 WriteLog("ERROR: FAILURE_NOT_REACHABLE", logfile)
207 print " - ERROR: host not reachable on 22/tcp"
211 keys = GetAgentKeys()
214 all_kwargs = [{"pkey": k} for k in keys]
215 all_desc = ["key %d" % d for d in range(len(keys))]
216 if password is not None:
217 all_kwargs.append({"password": password})
218 all_desc.append("password")
220 # deal with logging out of paramiko.transport
223 for desc, kwargs in zip(all_desc, all_kwargs):
225 transport = paramiko.Transport((host, 22))
227 # only try to setup the logging handler once
229 handler = logging.StreamHandler()
230 handler.setLevel(logging.ERROR)
231 log = logging.getLogger(transport.get_log_channel())
232 log.addHandler(handler)
234 transport.connect(username=username, **kwargs) # pylint: disable-msg=W0142
235 WriteLog("ssh connection established using %s" % desc, logfile)
236 # strange ... when establishing the session and the immediately
237 # setting up the channels for sftp & shell from that, it sometimes
238 # fails, but waiting 1 second after session setup makes it always work
240 # FIXME apparently needfull to give sshd some time
242 except (socket.gaierror, socket.error, paramiko.SSHException):
245 methods = ", ".join(all_desc)
246 WriteLog("ERROR: FAILURE_CONNECTION_SETUP (tried %s) " % methods, logfile)
247 WriteLog("aborted", logfile)
248 print " - ERROR: connection setup failed (tried %s)" % methods
253 def UploadFiles(connection, executable, filelist, logfile):
254 """Uploads the specified files via sftp.
256 Uploads the specified files to a random, freshly created directory with
257 a temporary name under /tmp. All uploaded files are chmod 0400 after upload
258 with the exception of executable, with is chmod 500.
260 Upon success, returns the absolute path to the remote upload directory,
261 but will return False upon failure.
263 remote_dir = "%s.%s-%s" % (REMOTE_PATH_BASE,
264 random.random(), random.random())
267 sftp = paramiko.SFTPClient.from_transport(connection)
268 sftp.mkdir(remote_dir, mode=0700)
269 for item in filelist:
270 remote_file = "%s/%s" % (remote_dir, os.path.basename(item))
271 WriteLog("uploading %s to remote %s" % (item, remote_file), logfile)
272 sftp.put(item, remote_file)
273 if item == executable:
274 sftp.chmod(remote_file, 0500)
276 sftp.chmod(remote_file, 0400)
279 WriteLog("ERROR: FAILURE_UPLOAD: %s" % err, logfile)
285 def CleanupRemoteDir(connection, upload_dir, filelist, logfile):
286 """Cleanes out and removes the remote work directory."""
288 sftp = paramiko.SFTPClient.from_transport(connection)
289 for item in filelist:
290 fullpath = "%s/%s" % (upload_dir, os.path.basename(item))
291 WriteLog("removing remote %s" % fullpath, logfile)
292 sftp.remove(fullpath)
293 sftp.rmdir(upload_dir)
296 WriteLog("ERROR: FAILURE_CLEANUP: %s" % err, logfile)
302 def RunRemoteCommand(connection, command, logfile):
303 """Execute the command via ssh on the remote host."""
304 session = connection.open_session()
305 session.setblocking(0)
307 # the following dance is needed because paramiko changed APIs:
308 # from returning True/False for success to always returning None
309 # and throwing an exception in case of problems.
310 # And I want to support both the old and the new API.
311 result = True # being optimistic here, I know
314 if session.exec_command("%s 2>&1" % command) is False:
316 except paramiko.SSHException, message:
320 WriteLog("ERROR: FAILURE_COMMAND_EXECUTION: %s" % message, logfile)
323 ### Read when data is available
325 while select.select([session], [], []):
327 data = session.recv(1024)
328 except socket.timeout, err:
330 WriteLog("FAILED: socket.timeout %s" % err, logfile)
331 except socket.error, err:
333 WriteLog("FAILED: socket.error %s" % err, logfile)
337 select.select([], [], [], .1)
339 WriteLog("SUCCESS: command output follows", logfile)
340 for line in output.splitlines():
341 WriteLog("output = %s" % line, logfile)
342 WriteLog("command execution completed", logfile)
348 def HostWorker(logdir, username, password, use_agent, hostname,
349 executable, command, filelist):
352 This function does not return - it's the main code of the childs,
353 which exit at the end of this function. The exit code 0 or 1 will be
354 interpreted by the parent.
356 @param logdir: the directory where the logfiles must be created
357 @param username: SSH username
358 @param password: SSH password
359 @param use_agent: whether we should instead use an agent
360 @param hostname: the hostname to connect to
361 @param executable: the executable to upload, if not None
362 @param command: the command to run
363 @param filelist: auxiliary files to upload
366 # in the child/worker process
367 logfile = "%s/%s.log" % (logdir, hostname)
368 print "%s - starting" % hostname
369 result = 0 # optimism, I know
371 connection = SetupSshConnection(hostname, username,
372 password, use_agent, logfile)
373 if connection is not False:
374 if executable is not None:
375 print " %s: uploading files" % hostname
376 upload_dir = UploadFiles(connection, executable,
378 command = "cd %s && ./%s" % (upload_dir, os.path.basename(executable))
379 print " %s: executing remote command" % hostname
380 cmd_result = RunRemoteCommand(connection, command, logfile)
381 if cmd_result is True:
382 print " %s: remote command execution successful" % hostname
384 print (" %s: remote command execution failed,"
385 " check log for details" % hostname)
387 if executable is not None:
388 print " %s: cleaning up remote work dir" % hostname
389 cln_result = CleanupRemoteDir(connection, upload_dir,
391 if cln_result is False:
392 print (" %s: remote work dir cleanup failed, check"
393 " log for details" % hostname)
397 print " %s: connection setup failed, skipping" % hostname
399 except KeyboardInterrupt:
400 print " %s: received KeyboardInterrupt, aborting" % hostname
401 WriteLog("ERROR: ABORT_KEYBOARD_INTERRUPT", logfile)
403 except Exception, err:
405 trace = traceback.format_exc()
406 msg = "ERROR: UNHANDLED_EXECPTION_ERROR: %s\nTrace: %s" % (err, trace)
407 WriteLog(msg, logfile)
408 print " %s: %s" % (hostname, msg)
409 # and exit with exit code 0 or 1, so the parent can compute statistics
413 def LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
414 executable, command, filelist):
415 """Launch the per-host worker.
417 Arguments are the same as for HostWorker, except for child_pids,
418 which is a dictionary holding the pid-to-hostname mapping.
421 hostname = hostname.rstrip("\n")
424 # controller just record the pids
425 child_pids[pid] = hostname
427 HostWorker(logdir, username, password, use_agent, hostname,
428 executable, command, filelist)
432 """Parses the command line options.
434 In case of command line errors, it will show the usage and exit the
437 @return: the options in a tuple
440 # resolve because original used -h for hostfile, which conflicts
442 parser = optparse.OptionParser(usage="\n%s" % USAGE,
443 conflict_handler="resolve")
445 parser.add_option("-l", dest="logdir", default=None,
446 help="directory to write logfiles to")
447 parser.add_option("-x", dest="executable", default=None,
448 help="executable to run on remote host(s)",)
449 parser.add_option("-f", dest="hostfile", default=None,
450 help="hostlist file (one host per line)")
451 parser.add_option("-h", dest="hostlist", default=None, metavar="HOSTS",
452 help="comma-separated list of hosts or single hostname",)
453 parser.add_option("-a", dest="auxfiles", action="append", default=[],
454 help="optional auxiliary file to upload"
455 " (can be given multiple times",
457 parser.add_option("-c", dest="command", default=None,
458 help="shell command to run on remote host(s)")
459 parser.add_option("-b", dest="batch_size", default=15, type="int",
460 help="batch-size, how many hosts to process"
462 parser.add_option("-u", dest="username", default="root",
463 help="username used to connect [root]")
464 parser.add_option("-p", dest="password", default=None,
465 help="password used to authenticate (when not"
467 parser.add_option("-A", dest="use_agent", default=False, action="store_true",
468 help="instead of password, use keys from an SSH agent")
470 opts, args = parser.parse_args()
472 if opts.executable and opts.command:
473 parser.error("Options -x and -c conflict with each other")
474 if not (opts.executable or opts.command):
475 parser.error("One of -x and -c must be given")
477 parser.error("Option -l is required")
478 if opts.hostfile and opts.hostlist:
479 parser.error("Options -f and -h conflict with each other")
480 if not (opts.hostfile or opts.hostlist):
481 parser.error("One of -f or -h must be given")
483 parser.error("This program doesn't take any arguments, passed in: %s" %
486 return (opts.logdir, opts.executable, opts.hostfile, opts.hostlist,
487 opts.command, opts.use_agent, opts.auxfiles, opts.username,
488 opts.password, opts.batch_size)
493 (logdir, executable, hostfile, hostlist,
494 command, use_agent, auxfiles, username,
495 password, batch_size) = ParseOptions()
497 ### Unbuffered sys.stdout
498 sys.stdout = os.fdopen(1, "w", 0)
500 if LogDirUseable(logdir) is False:
501 print "ERROR: cannot create logfiles in dir %s, aborting" % logdir
509 pwvalue = fh.readline().strip()
512 print "error: can not read in from password file %s: %s" % (password, e)
516 password = getpass.getpass("%s's password for all nodes: " % username)
519 hosts = GetHosts(hostfile)
522 hostlist = hostlist.rstrip(",") # commandline robustness
523 hosts = hostlist.split(",")
527 successes = failures = 0
529 filelist = auxfiles[:]
530 filelist.append(executable)
533 batch = hosts[:batch_size]
534 hosts = hosts[batch_size:]
536 for hostname in batch:
537 LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
538 executable, command, filelist)
541 pid, status = os.wait()
542 hostname = child_pids.pop(pid, "<unknown host>")
543 print " %s: done (in parent)" % hostname
544 if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0:
549 LaunchWorker(child_pids, logdir, username, password, use_agent,
550 hosts.pop(0), executable, command, filelist)
553 print "All done, %s successful and %s failed hosts" % (successes, failures)
558 if __name__ == "__main__":
561 except KeyboardInterrupt:
562 print "Received KeyboardInterrupt, aborting"