bash_completion: Enable extglob while parsing file
[ganeti-local] / tools / ganeti-listrunner
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Run an executable on a list of hosts.
22
23 Script to serially run an executable on a list of hosts via ssh
24 with password auth as root. If the provided log dir does not yet
25 exist, it will try to create it.
26
27 Implementation:
28  - the main process spawns up to batch_size children, which:
29  - connects to the remote host via ssh as root
30  - uploads the executable with a random name to /tmp via sftp
31  - chmod 500s it
32  - via ssh: chdirs into the upload directory and runs the script
33  - deletes it
34  - writes status messages and all output to one logfile per host
35  - the main process gathers then the status of the children and
36    reports the success/failure ratio
37  - entire script can be aborted with Ctrl-C
38
39 Security considerations:
40  - the root password for the remote hosts is stored in memory for the
41    runtime of the script
42  - the executable to be run on the remote host is handled the following way:
43    - try to create a random directory with permissions 700 on the
44      remote host, abort furter processing on this host if this failes
45    - upload the executable with to a random filename in that directory
46    - set executable permissions to 500
47    - run the executable
48    - delete the execuable and the directory on the remote host
49
50 """
51
52 # pylint: disable=C0103
53 # C0103: Invalid name ganeti-listrunner
54
55 import errno
56 import optparse
57 import getpass
58 import logging
59 import os
60 import random
61 import select
62 import socket
63 import sys
64 import time
65 import traceback
66
67 import paramiko
68
69
70 REMOTE_PATH_BASE = "/tmp/listrunner"
71
72 USAGE = ("%prog -l logdir {-c command | -x /path/to/file} [-b batch_size]"
73          " {-f hostfile|-h hosts} [-u username]"
74          " [-p password_file | -A]")
75
76
77 def LogDirUseable(logdir):
78   """Ensure log file directory is available and usable."""
79   testfile = "%s/test-%s-%s.deleteme" % (logdir, random.random(),
80                                          random.random())
81   try:
82     os.mkdir(logdir)
83   except OSError, err:
84     if err.errno != errno.EEXIST:
85       raise
86   try:
87     logtest = open(testfile, "aw")
88     logtest.writelines("log file writeability test\n")
89     logtest.close()
90     os.unlink(testfile)
91     return True
92   except (OSError, IOError):
93     return False
94
95
96 def GetTimeStamp(timestamp=None):
97   """Return ISO8601 timestamp.
98
99   Returns ISO8601 timestamp, optionally expects a time.localtime() tuple
100   in timestamp, but will use the current time if this argument is not
101   supplied.
102   """
103   if timestamp is None:
104     timestamp = time.localtime()
105
106   isotime = time.strftime("%Y-%m-%dT%H:%M:%S", timestamp)
107   return isotime
108
109
110 def PingByTcp(target, port, timeout=10, live_port_needed=False, source=None):
111   """Simple ping implementation using TCP connect(2).
112
113   Try to do a TCP connect(2) from an optional source IP to the
114   specified target IP and the specified target port. If the optional
115   parameter live_port_needed is set to true, requires the remote end
116   to accept the connection. The timeout is specified in seconds and
117   defaults to 10 seconds. If the source optional argument is not
118   passed, the source address selection is left to the kernel,
119   otherwise we try to connect using the passed address (failures to
120   bind other than EADDRNOTAVAIL will be ignored).
121
122   """
123   sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
124
125   success = False
126
127   if source is not None:
128     try:
129       sock.bind((source, 0))
130     except socket.error, (errcode):
131       if errcode == errno.EADDRNOTAVAIL:
132         success = False
133
134   sock.settimeout(timeout)
135
136   try:
137     sock.connect((target, port))
138     sock.close()
139     success = True
140   except socket.timeout:
141     success = False
142   except socket.error, (errcode):
143     success = (not live_port_needed) and (errcode == errno.ECONNREFUSED)
144
145   return success
146
147
148 def GetHosts(hostsfile):
149   """Return list of hosts from hostfile.
150
151   Reads the hostslist file and returns a list of hosts.
152   Expects the hostslist file to contain one hostname per line.
153
154   """
155   try:
156     datafile = open(hostsfile, "r")
157   except IOError, msg:
158     print "Failed to open hosts file %s: %s" % (hostsfile, msg)
159     sys.exit(2)
160
161   hosts = datafile.readlines()
162   datafile.close()
163
164   return hosts
165
166
167 def WriteLog(message, logfile):
168   """Writes message, terminated by newline, to logfile."""
169   try:
170     logfile = open(logfile, "aw")
171   except IOError, msg:
172     print "failed to open log file %s: %s" % (logfile, msg)
173     print "log message was: %s" % message
174     sys.exit(1)  # no being able to log is critical
175   try:
176     timestamp = GetTimeStamp()
177     logfile.writelines("%s %s\n" % (timestamp, message))
178     logfile.close()
179   except IOError, msg:
180     print "failed to write to logfile %s: %s" % (logfile, msg)
181     print "log message was: %s" % message
182     sys.exit(1)  # no being able to log is critical
183
184
185 def GetAgentKeys():
186   """Tries to get a list of ssh keys from an agent."""
187   try:
188     agent = paramiko.Agent()
189     return list(agent.get_keys())
190   except paramiko.SSHException:
191     return []
192
193
194 def SetupSshConnection(host, username, password, use_agent, logfile):
195   """Setup the ssh connection used for all later steps.
196
197   This function sets up the ssh connection that will be used both
198   for upload and remote command execution.
199
200   On success, it will return paramiko.Transport object with an
201   already logged in session. On failure, False will be returned.
202
203   """
204   # check if target is willing to talk to us at all
205   if not PingByTcp(host, 22, live_port_needed=True):
206     WriteLog("ERROR: FAILURE_NOT_REACHABLE", logfile)
207     print "  - ERROR: host not reachable on 22/tcp"
208     return False
209
210   if use_agent:
211     keys = GetAgentKeys()
212   else:
213     keys = []
214   all_kwargs = [{"pkey": k} for k in keys]
215   all_desc = ["key %d" % d for d in range(len(keys))]
216   if password is not None:
217     all_kwargs.append({"password": password})
218     all_desc.append("password")
219
220   # deal with logging out of paramiko.transport
221   handler = None
222
223   for desc, kwargs in zip(all_desc, all_kwargs):
224     try:
225       transport = paramiko.Transport((host, 22))
226
227       # only try to setup the logging handler once
228       if not handler:
229         handler = logging.StreamHandler()
230         handler.setLevel(logging.ERROR)
231         log = logging.getLogger(transport.get_log_channel())
232         log.addHandler(handler)
233
234       transport.connect(username=username, **kwargs) # pylint: disable=W0142
235       WriteLog("ssh connection established using %s" % desc, logfile)
236       # strange ... when establishing the session and the immediately
237       # setting up the channels for sftp & shell from that, it sometimes
238       # fails, but waiting 1 second after session setup makes it always work
239       # time.sleep(1)
240       # FIXME apparently needfull to give sshd some time
241       return transport
242     except (socket.gaierror, socket.error, paramiko.SSHException):
243       continue
244
245   methods = ", ".join(all_desc)
246   WriteLog("ERROR: FAILURE_CONNECTION_SETUP (tried %s) " % methods, logfile)
247   WriteLog("aborted", logfile)
248   print "  - ERROR: connection setup failed (tried %s)" % methods
249
250   return False
251
252
253 def UploadFiles(connection, executable, filelist, logfile):
254   """Uploads the specified files via sftp.
255
256   Uploads the specified files to a random, freshly created directory with
257   a temporary name under /tmp. All uploaded files are chmod 0400 after upload
258   with the exception of executable, with is chmod 500.
259
260   Upon success, returns the absolute path to the remote upload directory,
261   but will return False upon failure.
262   """
263   remote_dir = "%s.%s-%s" % (REMOTE_PATH_BASE,
264                              random.random(), random.random())
265
266   try:
267     sftp = paramiko.SFTPClient.from_transport(connection)
268     sftp.mkdir(remote_dir, mode=0700)
269     for item in filelist:
270       remote_file = "%s/%s" % (remote_dir, os.path.basename(item))
271       WriteLog("uploading %s to remote %s" % (item, remote_file), logfile)
272       sftp.put(item, remote_file)
273       if item == executable:
274         sftp.chmod(remote_file, 0500)
275       else:
276         sftp.chmod(remote_file, 0400)
277     sftp.close()
278   except IOError, err:
279     WriteLog("ERROR: FAILURE_UPLOAD: %s" % err, logfile)
280     return False
281
282   return remote_dir
283
284
285 def CleanupRemoteDir(connection, upload_dir, filelist, logfile):
286   """Cleanes out and removes the remote work directory."""
287   try:
288     sftp = paramiko.SFTPClient.from_transport(connection)
289     for item in filelist:
290       fullpath = "%s/%s" % (upload_dir, os.path.basename(item))
291       WriteLog("removing remote %s" % fullpath, logfile)
292       sftp.remove(fullpath)
293     sftp.rmdir(upload_dir)
294     sftp.close()
295   except IOError, err:
296     WriteLog("ERROR: FAILURE_CLEANUP: %s" % err, logfile)
297     return False
298
299   return True
300
301
302 def RunRemoteCommand(connection, command, logfile):
303   """Execute the command via ssh on the remote host."""
304   session = connection.open_session()
305   session.setblocking(0)
306
307   # the following dance is needed because paramiko changed APIs:
308   # from returning True/False for success to always returning None
309   # and throwing an exception in case of problems.
310   # And I want to support both the old and the new API.
311   result = True  # being optimistic here, I know
312   message = None
313   try:
314     if session.exec_command("%s 2>&1" % command) is False:
315       result = False
316   except paramiko.SSHException, message:
317     result = False
318
319   if not result:
320     WriteLog("ERROR: FAILURE_COMMAND_EXECUTION: %s" % message, logfile)
321     return False
322
323    ### Read when data is available
324   output = ""
325   while select.select([session], [], []):
326     try:
327       data = session.recv(1024)
328     except socket.timeout, err:
329       data = None
330       WriteLog("FAILED: socket.timeout %s" % err, logfile)
331     except socket.error, err:
332       data = None
333       WriteLog("FAILED: socket.error %s" % err, logfile)
334     if not data:
335       break
336     output += data
337     select.select([], [], [], .1)
338
339   WriteLog("SUCCESS: command output follows", logfile)
340   for line in output.splitlines():
341     WriteLog("output = %s" % line, logfile)
342   WriteLog("command execution completed", logfile)
343   session.close()
344
345   return True
346
347
348 def HostWorker(logdir, username, password, use_agent, hostname,
349                executable, exec_args, command, filelist):
350   """Per-host worker.
351
352   This function does not return - it's the main code of the childs,
353   which exit at the end of this function. The exit code 0 or 1 will be
354   interpreted by the parent.
355
356   @param logdir: the directory where the logfiles must be created
357   @param username: SSH username
358   @param password: SSH password
359   @param use_agent: whether we should instead use an agent
360   @param hostname: the hostname to connect to
361   @param executable: the executable to upload, if not None
362   @param exec_args: Additional arguments for executable
363   @param command: the command to run
364   @param filelist: auxiliary files to upload
365
366   """
367   # in the child/worker process
368   logfile = "%s/%s.log" % (logdir, hostname)
369   print "%s - starting" % hostname
370   result = 0  # optimism, I know
371   try:
372     connection = SetupSshConnection(hostname, username,
373                                     password, use_agent, logfile)
374     if connection is not False:
375       if executable is not None:
376         print "  %s: uploading files" % hostname
377         upload_dir = UploadFiles(connection, executable,
378                                  filelist, logfile)
379         command = ("cd %s && ./%s" %
380                    (upload_dir, os.path.basename(executable)))
381         if exec_args:
382           command += " %s" % exec_args
383       print "  %s: executing remote command" % hostname
384       cmd_result = RunRemoteCommand(connection, command, logfile)
385       if cmd_result is True:
386         print "  %s: remote command execution successful" % hostname
387       else:
388         print ("  %s: remote command execution failed,"
389                " check log for details" % hostname)
390         result = 1
391       if executable is not None:
392         print "  %s: cleaning up remote work dir" % hostname
393         cln_result = CleanupRemoteDir(connection, upload_dir,
394                                       filelist, logfile)
395         if cln_result is False:
396           print ("  %s: remote work dir cleanup failed, check"
397                  " log for details" % hostname)
398           result = 1
399       connection.close()
400     else:
401       print "  %s: connection setup failed, skipping" % hostname
402       result = 1
403   except KeyboardInterrupt:
404     print "  %s: received KeyboardInterrupt, aborting" % hostname
405     WriteLog("ERROR: ABORT_KEYBOARD_INTERRUPT", logfile)
406     result = 1
407   except Exception, err:
408     result = 1
409     trace = traceback.format_exc()
410     msg = "ERROR: UNHANDLED_EXECPTION_ERROR: %s\nTrace: %s" % (err, trace)
411     WriteLog(msg, logfile)
412     print "  %s: %s" % (hostname, msg)
413   # and exit with exit code 0 or 1, so the parent can compute statistics
414   sys.exit(result)
415
416
417 def LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
418                  executable, exec_args, command, filelist):
419   """Launch the per-host worker.
420
421   Arguments are the same as for HostWorker, except for child_pids,
422   which is a dictionary holding the pid-to-hostname mapping.
423
424   """
425   hostname = hostname.rstrip("\n")
426   pid = os.fork()
427   if pid > 0:
428     # controller just record the pids
429     child_pids[pid] = hostname
430   else:
431     HostWorker(logdir, username, password, use_agent, hostname,
432                executable, exec_args, command, filelist)
433
434
435 def ParseOptions():
436   """Parses the command line options.
437
438   In case of command line errors, it will show the usage and exit the
439   program.
440
441   @return: the options in a tuple
442
443   """
444   # resolve because original used -h for hostfile, which conflicts
445   # with -h for help
446   parser = optparse.OptionParser(usage="\n%s" % USAGE,
447                                  conflict_handler="resolve")
448
449   parser.add_option("-l", dest="logdir", default=None,
450                     help="directory to write logfiles to")
451   parser.add_option("-x", dest="executable", default=None,
452                     help="executable to run on remote host(s)",)
453   parser.add_option("-f", dest="hostfile", default=None,
454                     help="hostlist file (one host per line)")
455   parser.add_option("-h", dest="hostlist", default=None, metavar="HOSTS",
456                     help="comma-separated list of hosts or single hostname",)
457   parser.add_option("-a", dest="auxfiles", action="append", default=[],
458                     help="optional auxiliary file to upload"
459                     " (can be given multiple times)",
460                     metavar="FILE")
461   parser.add_option("-c", dest="command", default=None,
462                     help="shell command to run on remote host(s)")
463   parser.add_option("-b", dest="batch_size", default=15, type="int",
464                     help="batch-size, how many hosts to process"
465                     " in parallel [15]")
466   parser.add_option("-u", dest="username", default="root",
467                     help="username used to connect [root]")
468   parser.add_option("-p", dest="password", default=None,
469                     help="password used to authenticate (when not"
470                     " using an agent)")
471   parser.add_option("-A", dest="use_agent", default=False, action="store_true",
472                     help="instead of password, use keys from an SSH agent")
473   parser.add_option("--args", dest="exec_args", default=None,
474                     help="Arguments to be passed to executable (-x)")
475
476   opts, args = parser.parse_args()
477
478   if opts.executable and opts.command:
479     parser.error("Options -x and -c conflict with each other")
480   if not (opts.executable or opts.command):
481     parser.error("One of -x and -c must be given")
482   if opts.command and opts.exec_args:
483     parser.error("Can't specify arguments when using custom command")
484   if not opts.logdir:
485     parser.error("Option -l is required")
486   if opts.hostfile and opts.hostlist:
487     parser.error("Options -f and -h conflict with each other")
488   if not (opts.hostfile or opts.hostlist):
489     parser.error("One of -f or -h must be given")
490   if args:
491     parser.error("This program doesn't take any arguments, passed in: %s" %
492                  ", ".join(args))
493
494   return (opts.logdir, opts.executable, opts.exec_args,
495           opts.hostfile, opts.hostlist,
496           opts.command, opts.use_agent, opts.auxfiles, opts.username,
497           opts.password, opts.batch_size)
498
499
500 def main():
501   """main."""
502   (logdir, executable, exec_args, hostfile, hostlist,
503    command, use_agent, auxfiles, username,
504    password, batch_size) = ParseOptions()
505
506   ### Unbuffered sys.stdout
507   sys.stdout = os.fdopen(1, "w", 0)
508
509   if LogDirUseable(logdir) is False:
510     print "ERROR: cannot create logfiles in dir %s, aborting" % logdir
511     sys.exit(1)
512
513   if use_agent:
514     pass
515   elif password:
516     try:
517       fh = file(password)
518       pwvalue = fh.readline().strip()
519       fh.close()
520     except IOError, e:
521       print "error: can not read in from password file %s: %s" % (password, e)
522       sys.exit(1)
523     password = pwvalue
524   else:
525     password = getpass.getpass("%s's password for all nodes: " % username)
526
527   if hostfile:
528     hosts = GetHosts(hostfile)
529   else:
530     if "," in hostlist:
531       hostlist = hostlist.rstrip(",")  # commandline robustness
532       hosts = hostlist.split(",")
533     else:
534       hosts = [hostlist]
535
536   successes = failures = 0
537
538   filelist = auxfiles[:]
539   filelist.append(executable)
540
541   # initial batch
542   batch = hosts[:batch_size]
543   hosts = hosts[batch_size:]
544   child_pids = {}
545   for hostname in batch:
546     LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
547                  executable, exec_args, command, filelist)
548
549   while child_pids:
550     pid, status = os.wait()
551     hostname = child_pids.pop(pid, "<unknown host>")
552     print "  %s: done (in parent)" % hostname
553     if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0:
554       successes += 1
555     else:
556       failures += 1
557     if hosts:
558       LaunchWorker(child_pids, logdir, username, password, use_agent,
559                    hosts.pop(0), executable, exec_args, command, filelist)
560
561   print
562   print "All done, %s successful and %s failed hosts" % (successes, failures)
563
564   sys.exit(0)
565
566
567 if __name__ == "__main__":
568   try:
569     main()
570   except KeyboardInterrupt:
571     print "Received KeyboardInterrupt, aborting"
572     sys.exit(1)