4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Run an executable on a list of hosts.
23 Script to serially run an executable on a list of hosts via ssh
24 with password auth as root. If the provided log dir does not yet
25 exist, it will try to create it.
28 - the main process spawns up to batch_size children, which:
29 - connects to the remote host via ssh as root
30 - uploads the executable with a random name to /tmp via sftp
32 - via ssh: chdirs into the upload directory and runs the script
34 - writes status messages and all output to one logfile per host
35 - the main process gathers then the status of the children and
36 reports the success/failure ratio
37 - entire script can be aborted with Ctrl-C
39 Security considerations:
40 - the root password for the remote hosts is stored in memory for the
42 - the executable to be run on the remote host is handled the following way:
43 - try to create a random directory with permissions 700 on the
44 remote host, abort furter processing on this host if this failes
45 - upload the executable with to a random filename in that directory
46 - set executable permissions to 500
48 - delete the execuable and the directory on the remote host
52 # pylint: disable=C0103
53 # C0103: Invalid name ganeti-listrunner
70 REMOTE_PATH_BASE = "/tmp/listrunner"
72 USAGE = ("%prog -l logdir {-c command | -x /path/to/file} [-b batch_size]"
73 " {-f hostfile|-h hosts} [-u username]"
74 " [-p password_file | -A]")
77 def LogDirUseable(logdir):
78 """Ensure log file directory is available and usable."""
79 testfile = "%s/test-%s-%s.deleteme" % (logdir, random.random(),
84 if err.errno != errno.EEXIST:
87 logtest = open(testfile, "aw")
88 logtest.writelines("log file writeability test\n")
92 except (OSError, IOError):
96 def GetTimeStamp(timestamp=None):
97 """Return ISO8601 timestamp.
99 Returns ISO8601 timestamp, optionally expects a time.localtime() tuple
100 in timestamp, but will use the current time if this argument is not
103 if timestamp is None:
104 timestamp = time.localtime()
106 isotime = time.strftime("%Y-%m-%dT%H:%M:%S", timestamp)
110 def PingByTcp(target, port, timeout=10, live_port_needed=False, source=None):
111 """Simple ping implementation using TCP connect(2).
113 Try to do a TCP connect(2) from an optional source IP to the
114 specified target IP and the specified target port. If the optional
115 parameter live_port_needed is set to true, requires the remote end
116 to accept the connection. The timeout is specified in seconds and
117 defaults to 10 seconds. If the source optional argument is not
118 passed, the source address selection is left to the kernel,
119 otherwise we try to connect using the passed address (failures to
120 bind other than EADDRNOTAVAIL will be ignored).
123 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
127 if source is not None:
129 sock.bind((source, 0))
130 except socket.error, (errcode):
131 if errcode == errno.EADDRNOTAVAIL:
134 sock.settimeout(timeout)
137 sock.connect((target, port))
140 except socket.timeout:
142 except socket.error, (errcode):
143 success = (not live_port_needed) and (errcode == errno.ECONNREFUSED)
148 def GetHosts(hostsfile):
149 """Return list of hosts from hostfile.
151 Reads the hostslist file and returns a list of hosts.
152 Expects the hostslist file to contain one hostname per line.
156 datafile = open(hostsfile, "r")
158 print "Failed to open hosts file %s: %s" % (hostsfile, msg)
161 hosts = datafile.readlines()
167 def WriteLog(message, logfile):
168 """Writes message, terminated by newline, to logfile."""
170 logfile = open(logfile, "aw")
172 print "failed to open log file %s: %s" % (logfile, msg)
173 print "log message was: %s" % message
174 sys.exit(1) # no being able to log is critical
176 timestamp = GetTimeStamp()
177 logfile.writelines("%s %s\n" % (timestamp, message))
180 print "failed to write to logfile %s: %s" % (logfile, msg)
181 print "log message was: %s" % message
182 sys.exit(1) # no being able to log is critical
186 """Tries to get a list of ssh keys from an agent."""
188 agent = paramiko.Agent()
189 return list(agent.get_keys())
190 except paramiko.SSHException:
194 def SetupSshConnection(host, username, password, use_agent, logfile):
195 """Setup the ssh connection used for all later steps.
197 This function sets up the ssh connection that will be used both
198 for upload and remote command execution.
200 On success, it will return paramiko.Transport object with an
201 already logged in session. On failure, False will be returned.
204 # check if target is willing to talk to us at all
205 if not PingByTcp(host, 22, live_port_needed=True):
206 WriteLog("ERROR: FAILURE_NOT_REACHABLE", logfile)
207 print " - ERROR: host not reachable on 22/tcp"
211 keys = GetAgentKeys()
214 all_kwargs = [{"pkey": k} for k in keys]
215 all_desc = ["key %d" % d for d in range(len(keys))]
216 if password is not None:
217 all_kwargs.append({"password": password})
218 all_desc.append("password")
220 # deal with logging out of paramiko.transport
223 for desc, kwargs in zip(all_desc, all_kwargs):
225 transport = paramiko.Transport((host, 22))
227 # only try to setup the logging handler once
229 handler = logging.StreamHandler()
230 handler.setLevel(logging.ERROR)
231 log = logging.getLogger(transport.get_log_channel())
232 log.addHandler(handler)
234 transport.connect(username=username, **kwargs) # pylint: disable=W0142
235 WriteLog("ssh connection established using %s" % desc, logfile)
236 # strange ... when establishing the session and the immediately
237 # setting up the channels for sftp & shell from that, it sometimes
238 # fails, but waiting 1 second after session setup makes it always work
240 # FIXME apparently needfull to give sshd some time
242 except (socket.gaierror, socket.error, paramiko.SSHException):
245 methods = ", ".join(all_desc)
246 WriteLog("ERROR: FAILURE_CONNECTION_SETUP (tried %s) " % methods, logfile)
247 WriteLog("aborted", logfile)
248 print " - ERROR: connection setup failed (tried %s)" % methods
253 def UploadFiles(connection, executable, filelist, logfile):
254 """Uploads the specified files via sftp.
256 Uploads the specified files to a random, freshly created directory with
257 a temporary name under /tmp. All uploaded files are chmod 0400 after upload
258 with the exception of executable, with is chmod 500.
260 Upon success, returns the absolute path to the remote upload directory,
261 but will return False upon failure.
263 remote_dir = "%s.%s-%s" % (REMOTE_PATH_BASE,
264 random.random(), random.random())
267 sftp = paramiko.SFTPClient.from_transport(connection)
268 sftp.mkdir(remote_dir, mode=0700)
269 for item in filelist:
270 remote_file = "%s/%s" % (remote_dir, os.path.basename(item))
271 WriteLog("uploading %s to remote %s" % (item, remote_file), logfile)
272 sftp.put(item, remote_file)
273 if item == executable:
274 sftp.chmod(remote_file, 0500)
276 sftp.chmod(remote_file, 0400)
279 WriteLog("ERROR: FAILURE_UPLOAD: %s" % err, logfile)
285 def CleanupRemoteDir(connection, upload_dir, filelist, logfile):
286 """Cleanes out and removes the remote work directory."""
288 sftp = paramiko.SFTPClient.from_transport(connection)
289 for item in filelist:
290 fullpath = "%s/%s" % (upload_dir, os.path.basename(item))
291 WriteLog("removing remote %s" % fullpath, logfile)
292 sftp.remove(fullpath)
293 sftp.rmdir(upload_dir)
296 WriteLog("ERROR: FAILURE_CLEANUP: %s" % err, logfile)
302 def RunRemoteCommand(connection, command, logfile):
303 """Execute the command via ssh on the remote host."""
304 session = connection.open_session()
305 session.setblocking(0)
307 # the following dance is needed because paramiko changed APIs:
308 # from returning True/False for success to always returning None
309 # and throwing an exception in case of problems.
310 # And I want to support both the old and the new API.
311 result = True # being optimistic here, I know
314 if session.exec_command("%s 2>&1" % command) is False:
316 except paramiko.SSHException, message:
320 WriteLog("ERROR: FAILURE_COMMAND_EXECUTION: %s" % message, logfile)
323 ### Read when data is available
325 while select.select([session], [], []):
327 data = session.recv(1024)
328 except socket.timeout, err:
330 WriteLog("FAILED: socket.timeout %s" % err, logfile)
331 except socket.error, err:
333 WriteLog("FAILED: socket.error %s" % err, logfile)
337 select.select([], [], [], .1)
339 WriteLog("SUCCESS: command output follows", logfile)
340 for line in output.splitlines():
341 WriteLog("output = %s" % line, logfile)
342 WriteLog("command execution completed", logfile)
348 def HostWorker(logdir, username, password, use_agent, hostname,
349 executable, exec_args, command, filelist):
352 This function does not return - it's the main code of the childs,
353 which exit at the end of this function. The exit code 0 or 1 will be
354 interpreted by the parent.
356 @param logdir: the directory where the logfiles must be created
357 @param username: SSH username
358 @param password: SSH password
359 @param use_agent: whether we should instead use an agent
360 @param hostname: the hostname to connect to
361 @param executable: the executable to upload, if not None
362 @param exec_args: Additional arguments for executable
363 @param command: the command to run
364 @param filelist: auxiliary files to upload
367 # in the child/worker process
368 logfile = "%s/%s.log" % (logdir, hostname)
369 print "%s - starting" % hostname
370 result = 0 # optimism, I know
372 connection = SetupSshConnection(hostname, username,
373 password, use_agent, logfile)
374 if connection is not False:
375 if executable is not None:
376 print " %s: uploading files" % hostname
377 upload_dir = UploadFiles(connection, executable,
379 command = ("cd %s && ./%s %s" %
380 (upload_dir, os.path.basename(executable), exec_args))
381 print " %s: executing remote command" % hostname
382 cmd_result = RunRemoteCommand(connection, command, logfile)
383 if cmd_result is True:
384 print " %s: remote command execution successful" % hostname
386 print (" %s: remote command execution failed,"
387 " check log for details" % hostname)
389 if executable is not None:
390 print " %s: cleaning up remote work dir" % hostname
391 cln_result = CleanupRemoteDir(connection, upload_dir,
393 if cln_result is False:
394 print (" %s: remote work dir cleanup failed, check"
395 " log for details" % hostname)
399 print " %s: connection setup failed, skipping" % hostname
401 except KeyboardInterrupt:
402 print " %s: received KeyboardInterrupt, aborting" % hostname
403 WriteLog("ERROR: ABORT_KEYBOARD_INTERRUPT", logfile)
405 except Exception, err:
407 trace = traceback.format_exc()
408 msg = "ERROR: UNHANDLED_EXECPTION_ERROR: %s\nTrace: %s" % (err, trace)
409 WriteLog(msg, logfile)
410 print " %s: %s" % (hostname, msg)
411 # and exit with exit code 0 or 1, so the parent can compute statistics
415 def LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
416 executable, exec_args, command, filelist):
417 """Launch the per-host worker.
419 Arguments are the same as for HostWorker, except for child_pids,
420 which is a dictionary holding the pid-to-hostname mapping.
423 hostname = hostname.rstrip("\n")
426 # controller just record the pids
427 child_pids[pid] = hostname
429 HostWorker(logdir, username, password, use_agent, hostname,
430 executable, exec_args, command, filelist)
434 """Parses the command line options.
436 In case of command line errors, it will show the usage and exit the
439 @return: the options in a tuple
442 # resolve because original used -h for hostfile, which conflicts
444 parser = optparse.OptionParser(usage="\n%s" % USAGE,
445 conflict_handler="resolve")
447 parser.add_option("-l", dest="logdir", default=None,
448 help="directory to write logfiles to")
449 parser.add_option("-x", dest="executable", default=None,
450 help="executable to run on remote host(s)",)
451 parser.add_option("-f", dest="hostfile", default=None,
452 help="hostlist file (one host per line)")
453 parser.add_option("-h", dest="hostlist", default=None, metavar="HOSTS",
454 help="comma-separated list of hosts or single hostname",)
455 parser.add_option("-a", dest="auxfiles", action="append", default=[],
456 help="optional auxiliary file to upload"
457 " (can be given multiple times)",
459 parser.add_option("-c", dest="command", default=None,
460 help="shell command to run on remote host(s)")
461 parser.add_option("-b", dest="batch_size", default=15, type="int",
462 help="batch-size, how many hosts to process"
464 parser.add_option("-u", dest="username", default="root",
465 help="username used to connect [root]")
466 parser.add_option("-p", dest="password", default=None,
467 help="password used to authenticate (when not"
469 parser.add_option("-A", dest="use_agent", default=False, action="store_true",
470 help="instead of password, use keys from an SSH agent")
471 parser.add_option("--args", dest="exec_args", default=None,
472 help="Arguments to be passed to executable (-x)")
474 opts, args = parser.parse_args()
476 if opts.executable and opts.command:
477 parser.error("Options -x and -c conflict with each other")
478 if not (opts.executable or opts.command):
479 parser.error("One of -x and -c must be given")
480 if opts.command and opts.exec_args:
481 parser.error("Can't specify arguments when using custom command")
483 parser.error("Option -l is required")
484 if opts.hostfile and opts.hostlist:
485 parser.error("Options -f and -h conflict with each other")
486 if not (opts.hostfile or opts.hostlist):
487 parser.error("One of -f or -h must be given")
489 parser.error("This program doesn't take any arguments, passed in: %s" %
492 return (opts.logdir, opts.executable, opts.exec_args,
493 opts.hostfile, opts.hostlist,
494 opts.command, opts.use_agent, opts.auxfiles, opts.username,
495 opts.password, opts.batch_size)
500 (logdir, executable, exec_args, hostfile, hostlist,
501 command, use_agent, auxfiles, username,
502 password, batch_size) = ParseOptions()
504 ### Unbuffered sys.stdout
505 sys.stdout = os.fdopen(1, "w", 0)
507 if LogDirUseable(logdir) is False:
508 print "ERROR: cannot create logfiles in dir %s, aborting" % logdir
516 pwvalue = fh.readline().strip()
519 print "error: can not read in from password file %s: %s" % (password, e)
523 password = getpass.getpass("%s's password for all nodes: " % username)
526 hosts = GetHosts(hostfile)
529 hostlist = hostlist.rstrip(",") # commandline robustness
530 hosts = hostlist.split(",")
534 successes = failures = 0
536 filelist = auxfiles[:]
537 filelist.append(executable)
540 batch = hosts[:batch_size]
541 hosts = hosts[batch_size:]
543 for hostname in batch:
544 LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
545 executable, exec_args, command, filelist)
548 pid, status = os.wait()
549 hostname = child_pids.pop(pid, "<unknown host>")
550 print " %s: done (in parent)" % hostname
551 if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0:
556 LaunchWorker(child_pids, logdir, username, password, use_agent,
557 hosts.pop(0), executable, exec_args, command, filelist)
560 print "All done, %s successful and %s failed hosts" % (successes, failures)
565 if __name__ == "__main__":
568 except KeyboardInterrupt:
569 print "Received KeyboardInterrupt, aborting"