listrunner: Replace str.split with library functions
[ganeti-local] / tools / ganeti-listrunner
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Run an executable on a list of hosts.
22
23 Script to serially run an executable on a list of hosts via ssh
24 with password auth as root. If the provided log dir does not yet
25 exist, it will try to create it.
26
27 Implementation:
28  - the main process spawns up to batch_size children, which:
29  - connects to the remote host via ssh as root
30  - uploads the executable with a random name to /tmp via sftp
31  - chmod 500s it
32  - via ssh: chdirs into the upload directory and runs the script
33  - deletes it
34  - writes status messages and all output to one logfile per host
35  - the main process gathers then the status of the children and
36    reports the success/failure ratio
37  - entire script can be aborted with Ctrl-C
38
39 Security considerations:
40  - the root password for the remote hosts is stored in memory for the
41    runtime of the script
42  - the executable to be run on the remote host is handled the following way:
43    - try to create a random directory with permissions 700 on the
44      remote host, abort furter processing on this host if this failes
45    - upload the executable with to a random filename in that directory
46    - set executable permissions to 500
47    - run the executable
48    - delete the execuable and the directory on the remote host
49
50 """
51
52 # pylint: disable-msg=C0103
53 # C0103: Invalid name ganeti-listrunner
54
55 import errno
56 import optparse
57 import getpass
58 import logging
59 import os
60 import random
61 import select
62 import socket
63 import sys
64 import time
65 import traceback
66
67 import paramiko
68
69
70 REMOTE_PATH_BASE = "/tmp/listrunner"
71
72 USAGE = ("%prog -l logdir {-c command | -x /path/to/file} [-b batch_size]"
73          " {-f hostfile|-h hosts} [-u username]"
74          " [-p password_file | -A]")
75
76
77 def LogDirUseable(logdir):
78   """Ensure log file directory is available and usable."""
79   testfile = "%s/test-%s-%s.deleteme" % (logdir, random.random(),
80                                          random.random())
81   try:
82     os.mkdir(logdir)
83   except OSError, err:
84     if err.errno != errno.EEXIST:
85       raise
86   try:
87     logtest = open(testfile, "aw")
88     logtest.writelines("log file writeability test\n")
89     logtest.close()
90     os.unlink(testfile)
91     return True
92   except (OSError, IOError):
93     return False
94
95
96 def GetTimeStamp(timestamp=None):
97   """Return ISO8601 timestamp.
98
99   Returns ISO8601 timestamp, optionally expects a time.localtime() tuple
100   in timestamp, but will use the current time if this argument is not
101   supplied.
102   """
103   if timestamp is None:
104     timestamp = time.localtime()
105
106   isotime = time.strftime("%Y-%m-%dT%H:%M:%S", timestamp)
107   return isotime
108
109
110 def PingByTcp(target, port, timeout=10, live_port_needed=False, source=None):
111   """Simple ping implementation using TCP connect(2).
112
113   Try to do a TCP connect(2) from an optional source IP to the
114   specified target IP and the specified target port. If the optional
115   parameter live_port_needed is set to true, requires the remote end
116   to accept the connection. The timeout is specified in seconds and
117   defaults to 10 seconds. If the source optional argument is not
118   passed, the source address selection is left to the kernel,
119   otherwise we try to connect using the passed address (failures to
120   bind other than EADDRNOTAVAIL will be ignored).
121
122   """
123   sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
124
125   success = False
126
127   if source is not None:
128     try:
129       sock.bind((source, 0))
130     except socket.error, (errcode):
131       if errcode == errno.EADDRNOTAVAIL:
132         success = False
133
134   sock.settimeout(timeout)
135
136   try:
137     sock.connect((target, port))
138     sock.close()
139     success = True
140   except socket.timeout:
141     success = False
142   except socket.error, (errcode):
143     success = (not live_port_needed) and (errcode == errno.ECONNREFUSED)
144
145   return success
146
147
148 def GetHosts(hostsfile):
149   """Return list of hosts from hostfile.
150
151   Reads the hostslist file and returns a list of hosts.
152   Expects the hostslist file to contain one hostname per line.
153
154   """
155   try:
156     datafile = open(hostsfile, "r")
157   except IOError, msg:
158     print "Failed to open hosts file %s: %s" % (hostsfile, msg)
159     sys.exit(2)
160
161   hosts = datafile.readlines()
162   datafile.close()
163
164   return hosts
165
166
167 def WriteLog(message, logfile):
168   """Writes message, terminated by newline, to logfile."""
169   try:
170     logfile = open(logfile, "aw")
171   except IOError, msg:
172     print "failed to open log file %s: %s" % (logfile, msg)
173     print "log message was: %s" % message
174     sys.exit(1)  # no being able to log is critical
175   try:
176     timestamp = GetTimeStamp()
177     logfile.writelines("%s %s\n" % (timestamp, message))
178     logfile.close()
179   except IOError, msg:
180     print "failed to write to logfile %s: %s" % (logfile, msg)
181     print "log message was: %s" % message
182     sys.exit(1)  # no being able to log is critical
183
184
185 def GetAgentKeys():
186   """Tries to get a list of ssh keys from an agent."""
187   try:
188     agent = paramiko.Agent()
189     return list(agent.get_keys())
190   except paramiko.SSHException:
191     return []
192
193
194 def SetupSshConnection(host, username, password, use_agent, logfile):
195   """Setup the ssh connection used for all later steps.
196
197   This function sets up the ssh connection that will be used both
198   for upload and remote command execution.
199
200   On success, it will return paramiko.Transport object with an
201   already logged in session. On failure, False will be returned.
202
203   """
204   # check if target is willing to talk to us at all
205   if not PingByTcp(host, 22, live_port_needed=True):
206     WriteLog("ERROR: FAILURE_NOT_REACHABLE", logfile)
207     print "  - ERROR: host not reachable on 22/tcp"
208     return False
209
210   if use_agent:
211     keys = GetAgentKeys()
212   else:
213     keys = []
214   all_kwargs = [{"pkey": k} for k in keys]
215   all_desc = ["key %d" % d for d in range(len(keys))]
216   if password is not None:
217     all_kwargs.append({"password": password})
218     all_desc.append("password")
219
220   # deal with logging out of paramiko.transport
221   handler = None
222
223   for desc, kwargs in zip(all_desc, all_kwargs):
224     try:
225       transport = paramiko.Transport((host, 22))
226
227       # only try to setup the logging handler once
228       if not handler:
229         handler = logging.StreamHandler()
230         handler.setLevel(logging.ERROR)
231         log = logging.getLogger(transport.get_log_channel())
232         log.addHandler(handler)
233
234       transport.connect(username=username, **kwargs) # pylint: disable-msg=W0142
235       WriteLog("ssh connection established using %s" % desc, logfile)
236       # strange ... when establishing the session and the immediately
237       # setting up the channels for sftp & shell from that, it sometimes
238       # fails, but waiting 1 second after session setup makes it always work
239       # time.sleep(1)
240       # FIXME apparently needfull to give sshd some time
241       return transport
242     except (socket.gaierror, socket.error, paramiko.SSHException):
243       continue
244
245   methods = ", ".join(all_desc)
246   WriteLog("ERROR: FAILURE_CONNECTION_SETUP (tried %s) " % methods, logfile)
247   WriteLog("aborted", logfile)
248   print "  - ERROR: connection setup failed (tried %s)" % methods
249
250   return False
251
252
253 def UploadFiles(connection, executable, filelist, logfile):
254   """Uploads the specified files via sftp.
255
256   Uploads the specified files to a random, freshly created directory with
257   a temporary name under /tmp. All uploaded files are chmod 0400 after upload
258   with the exception of executable, with is chmod 500.
259
260   Upon success, returns the absolute path to the remote upload directory,
261   but will return False upon failure.
262   """
263   remote_dir = "%s.%s-%s" % (REMOTE_PATH_BASE,
264                              random.random(), random.random())
265
266   try:
267     sftp = paramiko.SFTPClient.from_transport(connection)
268     sftp.mkdir(remote_dir, mode=0700)
269     for item in filelist:
270       remote_file = "%s/%s" % (remote_dir, os.path.basename(item))
271       WriteLog("uploading %s to remote %s" % (item, remote_file), logfile)
272       sftp.put(item, remote_file)
273       if item == executable:
274         sftp.chmod(remote_file, 0500)
275       else:
276         sftp.chmod(remote_file, 0400)
277     sftp.close()
278   except IOError, err:
279     WriteLog("ERROR: FAILURE_UPLOAD: %s" % err, logfile)
280     return False
281
282   return remote_dir
283
284
285 def CleanupRemoteDir(connection, upload_dir, filelist, logfile):
286   """Cleanes out and removes the remote work directory."""
287   try:
288     sftp = paramiko.SFTPClient.from_transport(connection)
289     for item in filelist:
290       fullpath = "%s/%s" % (upload_dir, os.path.basename(item))
291       WriteLog("removing remote %s" % fullpath, logfile)
292       sftp.remove(fullpath)
293     sftp.rmdir(upload_dir)
294     sftp.close()
295   except IOError, err:
296     WriteLog("ERROR: FAILURE_CLEANUP: %s" % err, logfile)
297     return False
298
299   return True
300
301
302 def RunRemoteCommand(connection, command, logfile):
303   """Execute the command via ssh on the remote host."""
304   session = connection.open_session()
305   session.setblocking(0)
306
307   # the following dance is needed because paramiko changed APIs:
308   # from returning True/False for success to always returning None
309   # and throwing an exception in case of problems.
310   # And I want to support both the old and the new API.
311   result = True  # being optimistic here, I know
312   message = None
313   try:
314     if session.exec_command("%s 2>&1" % command) is False:
315       result = False
316   except paramiko.SSHException, message:
317     result = False
318
319   if not result:
320     WriteLog("ERROR: FAILURE_COMMAND_EXECUTION: %s" % message, logfile)
321     return False
322
323    ### Read when data is available
324   output = ""
325   while select.select([session], [], []):
326     try:
327       data = session.recv(1024)
328     except socket.timeout, err:
329       data = None
330       WriteLog("FAILED: socket.timeout %s" % err, logfile)
331     except socket.error, err:
332       data = None
333       WriteLog("FAILED: socket.error %s" % err, logfile)
334     if not data:
335       break
336     output += data
337     select.select([], [], [], .1)
338
339   WriteLog("SUCCESS: command output follows", logfile)
340   for line in output.splitlines():
341     WriteLog("output = %s" % line, logfile)
342   WriteLog("command execution completed", logfile)
343   session.close()
344
345   return True
346
347
348 def HostWorker(logdir, username, password, use_agent, hostname,
349                executable, command, filelist):
350   """Per-host worker.
351
352   This function does not return - it's the main code of the childs,
353   which exit at the end of this function. The exit code 0 or 1 will be
354   interpreted by the parent.
355
356   @param logdir: the directory where the logfiles must be created
357   @param username: SSH username
358   @param password: SSH password
359   @param use_agent: whether we should instead use an agent
360   @param hostname: the hostname to connect to
361   @param executable: the executable to upload, if not None
362   @param command: the command to run
363   @param filelist: auxiliary files to upload
364
365   """
366   # in the child/worker process
367   logfile = "%s/%s.log" % (logdir, hostname)
368   print "%s - starting" % hostname
369   result = 0  # optimism, I know
370   try:
371     connection = SetupSshConnection(hostname, username,
372                                     password, use_agent, logfile)
373     if connection is not False:
374       if executable is not None:
375         print "  %s: uploading files" % hostname
376         upload_dir = UploadFiles(connection, executable,
377                                  filelist, logfile)
378         command = "cd %s && ./%s" % (upload_dir, os.path.basename(executable))
379       print "  %s: executing remote command" % hostname
380       cmd_result = RunRemoteCommand(connection, command, logfile)
381       if cmd_result is True:
382         print "  %s: remote command execution successful" % hostname
383       else:
384         print ("  %s: remote command execution failed,"
385                " check log for details" % hostname)
386         result = 1
387       if executable is not None:
388         print "  %s: cleaning up remote work dir" % hostname
389         cln_result = CleanupRemoteDir(connection, upload_dir,
390                                       filelist, logfile)
391         if cln_result is False:
392           print ("  %s: remote work dir cleanup failed, check"
393                  " log for details" % hostname)
394           result = 1
395       connection.close()
396     else:
397       print "  %s: connection setup failed, skipping" % hostname
398       result = 1
399   except KeyboardInterrupt:
400     print "  %s: received KeyboardInterrupt, aborting" % hostname
401     WriteLog("ERROR: ABORT_KEYBOARD_INTERRUPT", logfile)
402     result = 1
403   except Exception, err:
404     result = 1
405     trace = traceback.format_exc()
406     msg = "ERROR: UNHANDLED_EXECPTION_ERROR: %s\nTrace: %s" % (err, trace)
407     WriteLog(msg, logfile)
408     print "  %s: %s" % (hostname, msg)
409   # and exit with exit code 0 or 1, so the parent can compute statistics
410   sys.exit(result)
411
412
413 def LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
414                  executable, command, filelist):
415   """Launch the per-host worker.
416
417   Arguments are the same as for HostWorker, except for child_pids,
418   which is a dictionary holding the pid-to-hostname mapping.
419
420   """
421   hostname = hostname.rstrip("\n")
422   pid = os.fork()
423   if pid > 0:
424     # controller just record the pids
425     child_pids[pid] = hostname
426   else:
427     HostWorker(logdir, username, password, use_agent, hostname,
428                executable, command, filelist)
429
430
431 def ParseOptions():
432   """Parses the command line options.
433
434   In case of command line errors, it will show the usage and exit the
435   program.
436
437   @return: the options in a tuple
438
439   """
440   # resolve because original used -h for hostfile, which conflicts
441   # with -h for help
442   parser = optparse.OptionParser(usage="\n%s" % USAGE,
443                                  conflict_handler="resolve")
444
445   parser.add_option("-l", dest="logdir", default=None,
446                     help="directory to write logfiles to")
447   parser.add_option("-x", dest="executable", default=None,
448                     help="executable to run on remote host(s)",)
449   parser.add_option("-f", dest="hostfile", default=None,
450                     help="hostlist file (one host per line)")
451   parser.add_option("-h", dest="hostlist", default=None, metavar="HOSTS",
452                     help="comma-separated list of hosts or single hostname",)
453   parser.add_option("-a", dest="auxfiles", action="append", default=[],
454                     help="optional auxiliary file to upload"
455                     " (can be given multiple times",
456                     metavar="FILE")
457   parser.add_option("-c", dest="command", default=None,
458                     help="shell command to run on remote host(s)")
459   parser.add_option("-b", dest="batch_size", default=15, type="int",
460                     help="batch-size, how many hosts to process"
461                     " in parallel [15]")
462   parser.add_option("-u", dest="username", default="root",
463                     help="username used to connect [root]")
464   parser.add_option("-p", dest="password", default=None,
465                     help="password used to authenticate (when not"
466                     " using an agent)")
467   parser.add_option("-A", dest="use_agent", default=False, action="store_true",
468                     help="instead of password, use keys from an SSH agent")
469
470   opts, args = parser.parse_args()
471
472   if opts.executable and opts.command:
473     parser.error("Options -x and -c conflict with each other")
474   if not (opts.executable or opts.command):
475     parser.error("One of -x and -c must be given")
476   if not opts.logdir:
477     parser.error("Option -l is required")
478   if opts.hostfile and opts.hostlist:
479     parser.error("Options -f and -h conflict with each other")
480   if not (opts.hostfile or opts.hostlist):
481     parser.error("One of -f or -h must be given")
482   if args:
483     parser.error("This program doesn't take any arguments, passed in: %s" %
484                  ", ".join(args))
485
486   return (opts.logdir, opts.executable, opts.hostfile, opts.hostlist,
487           opts.command, opts.use_agent, opts.auxfiles, opts.username,
488           opts.password, opts.batch_size)
489
490
491 def main():
492   """main."""
493   (logdir, executable, hostfile, hostlist,
494    command, use_agent, auxfiles, username,
495    password, batch_size) = ParseOptions()
496
497   ### Unbuffered sys.stdout
498   sys.stdout = os.fdopen(1, "w", 0)
499
500   if LogDirUseable(logdir) is False:
501     print "ERROR: cannot create logfiles in dir %s, aborting" % logdir
502     sys.exit(1)
503
504   if use_agent:
505     pass
506   elif password:
507     try:
508       fh = file(password)
509       pwvalue = fh.readline().strip()
510       fh.close()
511     except IOError, e:
512       print "error: can not read in from password file %s: %s" % (password, e)
513       sys.exit(1)
514     password = pwvalue
515   else:
516     password = getpass.getpass("%s's password for all nodes: " % username)
517
518   if hostfile:
519     hosts = GetHosts(hostfile)
520   else:
521     if "," in hostlist:
522       hostlist = hostlist.rstrip(",")  # commandline robustness
523       hosts = hostlist.split(",")
524     else:
525       hosts = [hostlist]
526
527   successes = failures = 0
528
529   filelist = auxfiles[:]
530   filelist.append(executable)
531
532   # initial batch
533   batch = hosts[:batch_size]
534   hosts = hosts[batch_size:]
535   child_pids = {}
536   for hostname in batch:
537     LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
538                  executable, command, filelist)
539
540   while child_pids:
541     pid, status = os.wait()
542     hostname = child_pids.pop(pid, "<unknown host>")
543     print "  %s: done (in parent)" % hostname
544     if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0:
545       successes += 1
546     else:
547       failures += 1
548     if hosts:
549       LaunchWorker(child_pids, logdir, username, password, use_agent,
550                    hosts.pop(0), executable, command, filelist)
551
552   print
553   print "All done, %s successful and %s failed hosts" % (successes, failures)
554
555   sys.exit(0)
556
557
558 if __name__ == "__main__":
559   try:
560     main()
561   except KeyboardInterrupt:
562     print "Received KeyboardInterrupt, aborting"
563     sys.exit(1)