Remove useless code in backend for network hooks
[ganeti-local] / tools / ganeti-listrunner
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Run an executable on a list of hosts.
22
23 Script to serially run an executable on a list of hosts via ssh
24 with password auth as root. If the provided log dir does not yet
25 exist, it will try to create it.
26
27 Implementation:
28  - the main process spawns up to batch_size children, which:
29  - connects to the remote host via ssh as root
30  - uploads the executable with a random name to /tmp via sftp
31  - chmod 500s it
32  - via ssh: chdirs into the upload directory and runs the script
33  - deletes it
34  - writes status messages and all output to one logfile per host
35  - the main process gathers then the status of the children and
36    reports the success/failure ratio
37  - entire script can be aborted with Ctrl-C
38
39 Security considerations:
40  - the root password for the remote hosts is stored in memory for the
41    runtime of the script
42  - the executable to be run on the remote host is handled the following way:
43    - try to create a random directory with permissions 700 on the
44      remote host, abort furter processing on this host if this failes
45    - upload the executable with to a random filename in that directory
46    - set executable permissions to 500
47    - run the executable
48    - delete the execuable and the directory on the remote host
49
50 """
51
52 # pylint: disable=C0103
53 # C0103: Invalid name ganeti-listrunner
54
55 import errno
56 import optparse
57 import getpass
58 import logging
59 import os
60 import random
61 import select
62 import socket
63 import sys
64 import time
65 import traceback
66
67 try:
68   import paramiko
69 except ImportError:
70   print >> sys.stderr, \
71     ("The \"paramiko\" module could not be imported. Install it from your"
72      " distribution's repository. The package is usually named"
73      " \"python-paramiko\".")
74   sys.exit(1)
75
76
77 REMOTE_PATH_BASE = "/tmp/listrunner"
78
79 USAGE = ("%prog -l logdir {-c command | -x /path/to/file} [-b batch_size]"
80          " {-f hostfile|-h hosts} [-u username]"
81          " [-p password_file | -A]")
82
83
84 def LogDirUseable(logdir):
85   """Ensure log file directory is available and usable."""
86   testfile = "%s/test-%s-%s.deleteme" % (logdir, random.random(),
87                                          random.random())
88   try:
89     os.mkdir(logdir)
90   except OSError, err:
91     if err.errno != errno.EEXIST:
92       raise
93   try:
94     logtest = open(testfile, "aw")
95     logtest.writelines("log file writeability test\n")
96     logtest.close()
97     os.unlink(testfile)
98     return True
99   except (OSError, IOError):
100     return False
101
102
103 def GetTimeStamp(timestamp=None):
104   """Return ISO8601 timestamp.
105
106   Returns ISO8601 timestamp, optionally expects a time.localtime() tuple
107   in timestamp, but will use the current time if this argument is not
108   supplied.
109   """
110   if timestamp is None:
111     timestamp = time.localtime()
112
113   isotime = time.strftime("%Y-%m-%dT%H:%M:%S", timestamp)
114   return isotime
115
116
117 def PingByTcp(target, port, timeout=10, live_port_needed=False, source=None):
118   """Simple ping implementation using TCP connect(2).
119
120   Try to do a TCP connect(2) from an optional source IP to the
121   specified target IP and the specified target port. If the optional
122   parameter live_port_needed is set to true, requires the remote end
123   to accept the connection. The timeout is specified in seconds and
124   defaults to 10 seconds. If the source optional argument is not
125   passed, the source address selection is left to the kernel,
126   otherwise we try to connect using the passed address (failures to
127   bind other than EADDRNOTAVAIL will be ignored).
128
129   """
130   sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
131
132   success = False
133
134   if source is not None:
135     try:
136       sock.bind((source, 0))
137     except socket.error, (errcode):
138       if errcode == errno.EADDRNOTAVAIL:
139         success = False
140
141   sock.settimeout(timeout)
142
143   try:
144     sock.connect((target, port))
145     sock.close()
146     success = True
147   except socket.timeout:
148     success = False
149   except socket.error, (errcode):
150     success = (not live_port_needed) and (errcode == errno.ECONNREFUSED)
151
152   return success
153
154
155 def GetHosts(hostsfile):
156   """Return list of hosts from hostfile.
157
158   Reads the hostslist file and returns a list of hosts.
159   Expects the hostslist file to contain one hostname per line.
160
161   """
162   try:
163     datafile = open(hostsfile, "r")
164   except IOError, msg:
165     print "Failed to open hosts file %s: %s" % (hostsfile, msg)
166     sys.exit(2)
167
168   hosts = datafile.readlines()
169   datafile.close()
170
171   return hosts
172
173
174 def WriteLog(message, logfile):
175   """Writes message, terminated by newline, to logfile."""
176   try:
177     logfile = open(logfile, "aw")
178   except IOError, msg:
179     print "failed to open log file %s: %s" % (logfile, msg)
180     print "log message was: %s" % message
181     sys.exit(1)  # no being able to log is critical
182   try:
183     timestamp = GetTimeStamp()
184     logfile.writelines("%s %s\n" % (timestamp, message))
185     logfile.close()
186   except IOError, msg:
187     print "failed to write to logfile %s: %s" % (logfile, msg)
188     print "log message was: %s" % message
189     sys.exit(1)  # no being able to log is critical
190
191
192 def GetAgentKeys():
193   """Tries to get a list of ssh keys from an agent."""
194   try:
195     agent = paramiko.Agent()
196     return list(agent.get_keys())
197   except paramiko.SSHException:
198     return []
199
200
201 def SetupSshConnection(host, username, password, use_agent, logfile):
202   """Setup the ssh connection used for all later steps.
203
204   This function sets up the ssh connection that will be used both
205   for upload and remote command execution.
206
207   On success, it will return paramiko.Transport object with an
208   already logged in session. On failure, False will be returned.
209
210   """
211   # check if target is willing to talk to us at all
212   if not PingByTcp(host, 22, live_port_needed=True):
213     WriteLog("ERROR: FAILURE_NOT_REACHABLE", logfile)
214     print "  - ERROR: host not reachable on 22/tcp"
215     return False
216
217   if use_agent:
218     keys = GetAgentKeys()
219   else:
220     keys = []
221   all_kwargs = [{"pkey": k} for k in keys]
222   all_desc = ["key %d" % d for d in range(len(keys))]
223   if password is not None:
224     all_kwargs.append({"password": password})
225     all_desc.append("password")
226
227   # deal with logging out of paramiko.transport
228   handler = None
229
230   for desc, kwargs in zip(all_desc, all_kwargs):
231     try:
232       transport = paramiko.Transport((host, 22))
233
234       # only try to setup the logging handler once
235       if not handler:
236         handler = logging.StreamHandler()
237         handler.setLevel(logging.ERROR)
238         log = logging.getLogger(transport.get_log_channel())
239         log.addHandler(handler)
240
241       transport.connect(username=username, **kwargs) # pylint: disable=W0142
242       WriteLog("ssh connection established using %s" % desc, logfile)
243       # strange ... when establishing the session and the immediately
244       # setting up the channels for sftp & shell from that, it sometimes
245       # fails, but waiting 1 second after session setup makes it always work
246       # time.sleep(1)
247       # FIXME apparently needfull to give sshd some time
248       return transport
249     except (socket.gaierror, socket.error, paramiko.SSHException):
250       continue
251
252   methods = ", ".join(all_desc)
253   WriteLog("ERROR: FAILURE_CONNECTION_SETUP (tried %s) " % methods, logfile)
254   WriteLog("aborted", logfile)
255   print "  - ERROR: connection setup failed (tried %s)" % methods
256
257   return False
258
259
260 def UploadFiles(connection, executable, filelist, logfile):
261   """Uploads the specified files via sftp.
262
263   Uploads the specified files to a random, freshly created directory with
264   a temporary name under /tmp. All uploaded files are chmod 0400 after upload
265   with the exception of executable, with is chmod 500.
266
267   Upon success, returns the absolute path to the remote upload directory,
268   but will return False upon failure.
269   """
270   remote_dir = "%s.%s-%s" % (REMOTE_PATH_BASE,
271                              random.random(), random.random())
272
273   try:
274     sftp = paramiko.SFTPClient.from_transport(connection)
275     sftp.mkdir(remote_dir, mode=0700)
276     for item in filelist:
277       remote_file = "%s/%s" % (remote_dir, os.path.basename(item))
278       WriteLog("uploading %s to remote %s" % (item, remote_file), logfile)
279       sftp.put(item, remote_file)
280       if item == executable:
281         sftp.chmod(remote_file, 0500)
282       else:
283         sftp.chmod(remote_file, 0400)
284     sftp.close()
285   except IOError, err:
286     WriteLog("ERROR: FAILURE_UPLOAD: %s" % err, logfile)
287     return False
288
289   return remote_dir
290
291
292 def CleanupRemoteDir(connection, upload_dir, filelist, logfile):
293   """Cleanes out and removes the remote work directory."""
294   try:
295     sftp = paramiko.SFTPClient.from_transport(connection)
296     for item in filelist:
297       fullpath = "%s/%s" % (upload_dir, os.path.basename(item))
298       WriteLog("removing remote %s" % fullpath, logfile)
299       sftp.remove(fullpath)
300     sftp.rmdir(upload_dir)
301     sftp.close()
302   except IOError, err:
303     WriteLog("ERROR: FAILURE_CLEANUP: %s" % err, logfile)
304     return False
305
306   return True
307
308
309 def RunRemoteCommand(connection, command, logfile):
310   """Execute the command via ssh on the remote host."""
311   session = connection.open_session()
312   session.setblocking(0)
313
314   # the following dance is needed because paramiko changed APIs:
315   # from returning True/False for success to always returning None
316   # and throwing an exception in case of problems.
317   # And I want to support both the old and the new API.
318   result = True  # being optimistic here, I know
319   message = None
320   try:
321     if session.exec_command("%s 2>&1" % command) is False:
322       result = False
323   except paramiko.SSHException, message:
324     result = False
325
326   if not result:
327     WriteLog("ERROR: FAILURE_COMMAND_EXECUTION: %s" % message, logfile)
328     return False
329
330    ### Read when data is available
331   output = ""
332   while select.select([session], [], []):
333     try:
334       data = session.recv(1024)
335     except socket.timeout, err:
336       data = None
337       WriteLog("FAILED: socket.timeout %s" % err, logfile)
338     except socket.error, err:
339       data = None
340       WriteLog("FAILED: socket.error %s" % err, logfile)
341     if not data:
342       break
343     output += data
344     select.select([], [], [], .1)
345
346   WriteLog("SUCCESS: command output follows", logfile)
347   for line in output.splitlines():
348     WriteLog("output = %s" % line, logfile)
349   WriteLog("command execution completed", logfile)
350   session.close()
351
352   return True
353
354
355 def HostWorker(logdir, username, password, use_agent, hostname,
356                executable, exec_args, command, filelist):
357   """Per-host worker.
358
359   This function does not return - it's the main code of the childs,
360   which exit at the end of this function. The exit code 0 or 1 will be
361   interpreted by the parent.
362
363   @param logdir: the directory where the logfiles must be created
364   @param username: SSH username
365   @param password: SSH password
366   @param use_agent: whether we should instead use an agent
367   @param hostname: the hostname to connect to
368   @param executable: the executable to upload, if not None
369   @param exec_args: Additional arguments for executable
370   @param command: the command to run
371   @param filelist: auxiliary files to upload
372
373   """
374   # in the child/worker process
375   logfile = "%s/%s.log" % (logdir, hostname)
376   print "%s - starting" % hostname
377   result = 0  # optimism, I know
378   try:
379     connection = SetupSshConnection(hostname, username,
380                                     password, use_agent, logfile)
381     if connection is not False:
382       if executable is not None:
383         print "  %s: uploading files" % hostname
384         upload_dir = UploadFiles(connection, executable,
385                                  filelist, logfile)
386         command = ("cd %s && ./%s" %
387                    (upload_dir, os.path.basename(executable)))
388         if exec_args:
389           command += " %s" % exec_args
390       print "  %s: executing remote command" % hostname
391       cmd_result = RunRemoteCommand(connection, command, logfile)
392       if cmd_result is True:
393         print "  %s: remote command execution successful" % hostname
394       else:
395         print ("  %s: remote command execution failed,"
396                " check log for details" % hostname)
397         result = 1
398       if executable is not None:
399         print "  %s: cleaning up remote work dir" % hostname
400         cln_result = CleanupRemoteDir(connection, upload_dir,
401                                       filelist, logfile)
402         if cln_result is False:
403           print ("  %s: remote work dir cleanup failed, check"
404                  " log for details" % hostname)
405           result = 1
406       connection.close()
407     else:
408       print "  %s: connection setup failed, skipping" % hostname
409       result = 1
410   except KeyboardInterrupt:
411     print "  %s: received KeyboardInterrupt, aborting" % hostname
412     WriteLog("ERROR: ABORT_KEYBOARD_INTERRUPT", logfile)
413     result = 1
414   except Exception, err:
415     result = 1
416     trace = traceback.format_exc()
417     msg = "ERROR: UNHANDLED_EXECPTION_ERROR: %s\nTrace: %s" % (err, trace)
418     WriteLog(msg, logfile)
419     print "  %s: %s" % (hostname, msg)
420   # and exit with exit code 0 or 1, so the parent can compute statistics
421   sys.exit(result)
422
423
424 def LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
425                  executable, exec_args, command, filelist):
426   """Launch the per-host worker.
427
428   Arguments are the same as for HostWorker, except for child_pids,
429   which is a dictionary holding the pid-to-hostname mapping.
430
431   """
432   hostname = hostname.rstrip("\n")
433   pid = os.fork()
434   if pid > 0:
435     # controller just record the pids
436     child_pids[pid] = hostname
437   else:
438     HostWorker(logdir, username, password, use_agent, hostname,
439                executable, exec_args, command, filelist)
440
441
442 def ParseOptions():
443   """Parses the command line options.
444
445   In case of command line errors, it will show the usage and exit the
446   program.
447
448   @return: the options in a tuple
449
450   """
451   # resolve because original used -h for hostfile, which conflicts
452   # with -h for help
453   parser = optparse.OptionParser(usage="\n%s" % USAGE,
454                                  conflict_handler="resolve")
455
456   parser.add_option("-l", dest="logdir", default=None,
457                     help="directory to write logfiles to")
458   parser.add_option("-x", dest="executable", default=None,
459                     help="executable to run on remote host(s)",)
460   parser.add_option("-f", dest="hostfile", default=None,
461                     help="hostlist file (one host per line)")
462   parser.add_option("-h", dest="hostlist", default=None, metavar="HOSTS",
463                     help="comma-separated list of hosts or single hostname",)
464   parser.add_option("-a", dest="auxfiles", action="append", default=[],
465                     help="optional auxiliary file to upload"
466                     " (can be given multiple times)",
467                     metavar="FILE")
468   parser.add_option("-c", dest="command", default=None,
469                     help="shell command to run on remote host(s)")
470   parser.add_option("-b", dest="batch_size", default=15, type="int",
471                     help="batch-size, how many hosts to process"
472                     " in parallel [15]")
473   parser.add_option("-u", dest="username", default="root",
474                     help="username used to connect [root]")
475   parser.add_option("-p", dest="password", default=None,
476                     help="password used to authenticate (when not"
477                     " using an agent)")
478   parser.add_option("-A", dest="use_agent", default=False, action="store_true",
479                     help="instead of password, use keys from an SSH agent")
480   parser.add_option("--args", dest="exec_args", default=None,
481                     help="Arguments to be passed to executable (-x)")
482
483   opts, args = parser.parse_args()
484
485   if opts.executable and opts.command:
486     parser.error("Options -x and -c conflict with each other")
487   if not (opts.executable or opts.command):
488     parser.error("One of -x and -c must be given")
489   if opts.command and opts.exec_args:
490     parser.error("Can't specify arguments when using custom command")
491   if not opts.logdir:
492     parser.error("Option -l is required")
493   if opts.hostfile and opts.hostlist:
494     parser.error("Options -f and -h conflict with each other")
495   if not (opts.hostfile or opts.hostlist):
496     parser.error("One of -f or -h must be given")
497   if args:
498     parser.error("This program doesn't take any arguments, passed in: %s" %
499                  ", ".join(args))
500
501   return (opts.logdir, opts.executable, opts.exec_args,
502           opts.hostfile, opts.hostlist,
503           opts.command, opts.use_agent, opts.auxfiles, opts.username,
504           opts.password, opts.batch_size)
505
506
507 def main():
508   """main."""
509   (logdir, executable, exec_args, hostfile, hostlist,
510    command, use_agent, auxfiles, username,
511    password, batch_size) = ParseOptions()
512
513   ### Unbuffered sys.stdout
514   sys.stdout = os.fdopen(1, "w", 0)
515
516   if LogDirUseable(logdir) is False:
517     print "ERROR: cannot create logfiles in dir %s, aborting" % logdir
518     sys.exit(1)
519
520   if use_agent:
521     pass
522   elif password:
523     try:
524       fh = file(password)
525       pwvalue = fh.readline().strip()
526       fh.close()
527     except IOError, e:
528       print "error: can not read in from password file %s: %s" % (password, e)
529       sys.exit(1)
530     password = pwvalue
531   else:
532     password = getpass.getpass("%s's password for all nodes: " % username)
533
534   if hostfile:
535     hosts = GetHosts(hostfile)
536   else:
537     if "," in hostlist:
538       hostlist = hostlist.rstrip(",")  # commandline robustness
539       hosts = hostlist.split(",")
540     else:
541       hosts = [hostlist]
542
543   successes = failures = 0
544
545   filelist = auxfiles[:]
546   filelist.append(executable)
547
548   # initial batch
549   batch = hosts[:batch_size]
550   hosts = hosts[batch_size:]
551   child_pids = {}
552   for hostname in batch:
553     LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
554                  executable, exec_args, command, filelist)
555
556   while child_pids:
557     pid, status = os.wait()
558     hostname = child_pids.pop(pid, "<unknown host>")
559     print "  %s: done (in parent)" % hostname
560     if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0:
561       successes += 1
562     else:
563       failures += 1
564     if hosts:
565       LaunchWorker(child_pids, logdir, username, password, use_agent,
566                    hosts.pop(0), executable, exec_args, command, filelist)
567
568   print
569   print "All done, %s successful and %s failed hosts" % (successes, failures)
570
571   sys.exit(0)
572
573
574 if __name__ == "__main__":
575   try:
576     main()
577   except KeyboardInterrupt:
578     print "Received KeyboardInterrupt, aborting"
579     sys.exit(1)