IAllocator changes to work with shared storage
[ganeti-local] / tools / ganeti-listrunner
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Run an executable on a list of hosts.
22
23 Script to serially run an executable on a list of hosts via ssh
24 with password auth as root. If the provided log dir does not yet
25 exist, it will try to create it.
26
27 Implementation:
28  - the main process spawns up to batch_size children, which:
29  - connects to the remote host via ssh as root
30  - uploads the executable with a random name to /tmp via sftp
31  - chmod 500s it
32  - via ssh: chdirs into the upload directory and runs the script
33  - deletes it
34  - writes status messages and all output to one logfile per host
35  - the main process gathers then the status of the children and
36    reports the success/failure ratio
37  - entire script can be aborted with Ctrl-C
38
39 Security considerations:
40  - the root password for the remote hosts is stored in memory for the
41    runtime of the script
42  - the executable to be run on the remote host is handled the following way:
43    - try to create a random directory with permissions 700 on the
44      remote host, abort furter processing on this host if this failes
45    - upload the executable with to a random filename in that directory
46    - set executable permissions to 500
47    - run the executable
48    - delete the execuable and the directory on the remote host
49
50 """
51
52 # pylint: disable-msg=C0103
53 # C0103: Invalid name ganeti-listrunner
54
55 import errno
56 import optparse
57 import getpass
58 import logging
59 import os
60 import random
61 import select
62 import socket
63 import sys
64 import time
65 import traceback
66
67 import paramiko
68
69
70 REMOTE_PATH_BASE = "/tmp/listrunner"
71
72 USAGE = ("%prog -l logdir {-c command | -x /path/to/file} [-b batch_size]"
73          " {-f hostfile|-h hosts} [-u username]"
74          " [-p password_file | -A]")
75
76
77 def LogDirUseable(logdir):
78   """Ensure log file directory is available and usable."""
79   testfile = "%s/test-%s-%s.deleteme" % (logdir, random.random(),
80                                          random.random())
81   try:
82     os.mkdir(logdir)
83   except OSError, err:
84     if err.errno != errno.EEXIST:
85       raise
86   try:
87     logtest = open(testfile, "aw")
88     logtest.writelines("log file writeability test\n")
89     logtest.close()
90     os.unlink(testfile)
91     return True
92   except (OSError, IOError):
93     return False
94
95
96 def GetTimeStamp(timestamp=None):
97   """Return ISO8601 timestamp.
98
99   Returns ISO8601 timestamp, optionally expects a time.localtime() tuple
100   in timestamp, but will use the current time if this argument is not
101   supplied.
102   """
103   if timestamp is None:
104     timestamp = time.localtime()
105
106   isotime = time.strftime("%Y-%m-%dT%H:%M:%S", timestamp)
107   return isotime
108
109
110 def PingByTcp(target, port, timeout=10, live_port_needed=False, source=None):
111   """Simple ping implementation using TCP connect(2).
112
113   Try to do a TCP connect(2) from an optional source IP to the
114   specified target IP and the specified target port. If the optional
115   parameter live_port_needed is set to true, requires the remote end
116   to accept the connection. The timeout is specified in seconds and
117   defaults to 10 seconds. If the source optional argument is not
118   passed, the source address selection is left to the kernel,
119   otherwise we try to connect using the passed address (failures to
120   bind other than EADDRNOTAVAIL will be ignored).
121
122   """
123   sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
124
125   success = False
126
127   if source is not None:
128     try:
129       sock.bind((source, 0))
130     except socket.error, (errcode):
131       if errcode == errno.EADDRNOTAVAIL:
132         success = False
133
134   sock.settimeout(timeout)
135
136   try:
137     sock.connect((target, port))
138     sock.close()
139     success = True
140   except socket.timeout:
141     success = False
142   except socket.error, (errcode):
143     success = (not live_port_needed) and (errcode == errno.ECONNREFUSED)
144
145   return success
146
147
148 def GetHosts(hostsfile):
149   """Return list of hosts from hostfile.
150
151   Reads the hostslist file and returns a list of hosts.
152   Expects the hostslist file to contain one hostname per line.
153
154   """
155   try:
156     datafile = open(hostsfile, "r")
157   except IOError, msg:
158     print "Failed to open hosts file %s: %s" % (hostsfile, msg)
159     sys.exit(2)
160
161   hosts = datafile.readlines()
162   datafile.close()
163
164   return hosts
165
166
167 def WriteLog(message, logfile):
168   """Writes message, terminated by newline, to logfile."""
169   try:
170     logfile = open(logfile, "aw")
171   except IOError, msg:
172     print "failed to open log file %s: %s" % (logfile, msg)
173     print "log message was: %s" % message
174     sys.exit(1)  # no being able to log is critical
175   try:
176     timestamp = GetTimeStamp()
177     logfile.writelines("%s %s\n" % (timestamp, message))
178     logfile.close()
179   except IOError, msg:
180     print "failed to write to logfile %s: %s" % (logfile, msg)
181     print "log message was: %s" % message
182     sys.exit(1)  # no being able to log is critical
183
184
185 def GetAgentKeys():
186   """Tries to get a list of ssh keys from an agent."""
187   try:
188     agent = paramiko.Agent()
189     return list(agent.get_keys())
190   except paramiko.SSHException:
191     return []
192
193
194 def SetupSshConnection(host, username, password, use_agent, logfile):
195   """Setup the ssh connection used for all later steps.
196
197   This function sets up the ssh connection that will be used both
198   for upload and remote command execution.
199
200   On success, it will return paramiko.Transport object with an
201   already logged in session. On failure, False will be returned.
202
203   """
204   # check if target is willing to talk to us at all
205   if not PingByTcp(host, 22, live_port_needed=True):
206     WriteLog("ERROR: FAILURE_NOT_REACHABLE", logfile)
207     print "  - ERROR: host not reachable on 22/tcp"
208     return False
209
210   if use_agent:
211     keys = GetAgentKeys()
212   else:
213     keys = []
214   all_kwargs = [{"pkey": k} for k in keys]
215   all_desc = ["key %d" % d for d in range(len(keys))]
216   if password is not None:
217     all_kwargs.append({"password": password})
218     all_desc.append("password")
219
220   # deal with logging out of paramiko.transport
221   handler = None
222
223   for desc, kwargs in zip(all_desc, all_kwargs):
224     try:
225       transport = paramiko.Transport((host, 22))
226
227       # only try to setup the logging handler once
228       if not handler:
229         handler = logging.StreamHandler()
230         handler.setLevel(logging.ERROR)
231         log = logging.getLogger(transport.get_log_channel())
232         log.addHandler(handler)
233
234       transport.connect(username=username, **kwargs) # pylint: disable-msg=W0142
235       WriteLog("ssh connection established using %s" % desc, logfile)
236       # strange ... when establishing the session and the immediately
237       # setting up the channels for sftp & shell from that, it sometimes
238       # fails, but waiting 1 second after session setup makes it always work
239       # time.sleep(1)
240       # FIXME apparently needfull to give sshd some time
241       return transport
242     except (socket.gaierror, socket.error, paramiko.SSHException):
243       continue
244
245   methods = ", ".join(all_desc)
246   WriteLog("ERROR: FAILURE_CONNECTION_SETUP (tried %s) " % methods, logfile)
247   WriteLog("aborted", logfile)
248   print "  - ERROR: connection setup failed (tried %s)" % methods
249
250   return False
251
252
253 def UploadFiles(connection, executable, filelist, logfile):
254   """Uploads the specified files via sftp.
255
256   Uploads the specified files to a random, freshly created directory with
257   a temporary name under /tmp. All uploaded files are chmod 0400 after upload
258   with the exception of executable, with is chmod 500.
259
260   Upon success, returns the absolute path to the remote upload directory,
261   but will return False upon failure.
262   """
263   remote_dir = "%s.%s-%s" % (REMOTE_PATH_BASE,
264                              random.random(), random.random())
265
266   try:
267     sftp = paramiko.SFTPClient.from_transport(connection)
268     sftp.mkdir(remote_dir, mode=0700)
269     for item in filelist:
270       remote_file = "%s/%s" % (remote_dir, item.split("/").pop())
271       WriteLog("uploading %s to remote %s" % (item, remote_file), logfile)
272       sftp.put(item, remote_file)
273       if item == executable:
274         sftp.chmod(remote_file, 0500)
275       else:
276         sftp.chmod(remote_file, 0400)
277     sftp.close()
278   except IOError, err:
279     WriteLog("ERROR: FAILURE_UPLOAD: %s" % err, logfile)
280     return False
281
282   return remote_dir
283
284
285 def CleanupRemoteDir(connection, upload_dir, filelist, logfile):
286   """Cleanes out and removes the remote work directory."""
287   try:
288     sftp = paramiko.SFTPClient.from_transport(connection)
289     for item in filelist:
290       fullpath = "%s/%s" % (upload_dir, item.split("/").pop())
291       WriteLog("removing remote %s" % fullpath, logfile)
292       sftp.remove(fullpath)
293     sftp.rmdir(upload_dir)
294     sftp.close()
295   except IOError, err:
296     WriteLog("ERROR: FAILURE_CLEANUP: %s" % err, logfile)
297     return False
298
299   return True
300
301
302 def RunRemoteCommand(connection, command, logfile):
303   """Execute the command via ssh on the remote host."""
304   session = connection.open_session()
305   session.setblocking(0)
306
307   # the following dance is needed because paramiko changed APIs:
308   # from returning True/False for success to always returning None
309   # and throwing an exception in case of problems.
310   # And I want to support both the old and the new API.
311   result = True  # being optimistic here, I know
312   message = None
313   try:
314     if session.exec_command("%s 2>&1" % command) is False:
315       result = False
316   except paramiko.SSHException, message:
317     result = False
318
319   if not result:
320     WriteLog("ERROR: FAILURE_COMMAND_EXECUTION: %s" % message, logfile)
321     return False
322
323    ### Read when data is available
324   output = ""
325   while select.select([session], [], []):
326     data = session.recv(1024)
327     if not data:
328       break
329     output += data
330     select.select([], [], [], .1)
331
332   WriteLog("SUCCESS: command output follows", logfile)
333   for line in output.split("\n"):
334     WriteLog("output = %s" %line, logfile)
335   WriteLog("command execution completed", logfile)
336   session.close()
337
338   return True
339
340
341 def HostWorker(logdir, username, password, use_agent, hostname,
342                executable, command, filelist):
343   """Per-host worker.
344
345   This function does not return - it's the main code of the childs,
346   which exit at the end of this function. The exit code 0 or 1 will be
347   interpreted by the parent.
348
349   @param logdir: the directory where the logfiles must be created
350   @param username: SSH username
351   @param password: SSH password
352   @param use_agent: whether we should instead use an agent
353   @param hostname: the hostname to connect to
354   @param executable: the executable to upload, if not None
355   @param command: the command to run
356   @param filelist: auxiliary files to upload
357
358   """
359   # in the child/worker process
360   logfile = "%s/%s.log" % (logdir, hostname)
361   print "%s - starting" % hostname
362   result = 0  # optimism, I know
363   try:
364     connection = SetupSshConnection(hostname, username,
365                                     password, use_agent, logfile)
366     if connection is not False:
367       if executable is not None:
368         print "  %s: uploading files" % hostname
369         upload_dir = UploadFiles(connection, executable,
370                                  filelist, logfile)
371         command = "cd %s && ./%s" % (upload_dir,
372                                      executable.split("/").pop())
373       print "  %s: executing remote command" % hostname
374       cmd_result = RunRemoteCommand(connection, command, logfile)
375       if cmd_result is True:
376         print "  %s: remote command execution successful" % hostname
377       else:
378         print ("  %s: remote command execution failed,"
379                " check log for details" % hostname)
380         result = 1
381       if executable is not None:
382         print "  %s: cleaning up remote work dir" % hostname
383         cln_result = CleanupRemoteDir(connection, upload_dir,
384                                       filelist, logfile)
385         if cln_result is False:
386           print ("  %s: remote work dir cleanup failed, check"
387                  " log for details" % hostname)
388           result = 1
389       connection.close()
390     else:
391       print "  %s: connection setup failed, skipping" % hostname
392       result = 1
393   except KeyboardInterrupt:
394     print "  %s: received KeyboardInterrupt, aborting" % hostname
395     WriteLog("ERROR: ABORT_KEYBOARD_INTERRUPT", logfile)
396     result = 1
397   except Exception, err:
398     result = 1
399     trace = traceback.format_exc()
400     msg = "ERROR: UNHANDLED_EXECPTION_ERROR: %s\nTrace: %s" % (err, trace)
401     WriteLog(msg, logfile)
402     print "  %s: %s" % (hostname, msg)
403   # and exit with exit code 0 or 1, so the parent can compute statistics
404   sys.exit(result)
405
406
407 def LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
408                  executable, command, filelist):
409   """Launch the per-host worker.
410
411   Arguments are the same as for HostWorker, except for child_pids,
412   which is a dictionary holding the pid-to-hostname mapping.
413
414   """
415   hostname = hostname.rstrip("\n")
416   pid = os.fork()
417   if pid > 0:
418     # controller just record the pids
419     child_pids[pid] = hostname
420   else:
421     HostWorker(logdir, username, password, use_agent, hostname,
422                executable, command, filelist)
423
424
425 def ParseOptions():
426   """Parses the command line options.
427
428   In case of command line errors, it will show the usage and exit the
429   program.
430
431   @return: the options in a tuple
432
433   """
434   # resolve because original used -h for hostfile, which conflicts
435   # with -h for help
436   parser = optparse.OptionParser(usage="\n%s" % USAGE,
437                                  conflict_handler="resolve")
438
439   parser.add_option("-l", dest="logdir", default=None,
440                     help="directory to write logfiles to")
441   parser.add_option("-x", dest="executable", default=None,
442                     help="executable to run on remote host(s)",)
443   parser.add_option("-f", dest="hostfile", default=None,
444                     help="hostlist file (one host per line)")
445   parser.add_option("-h", dest="hostlist", default=None, metavar="HOSTS",
446                     help="comma-separated list of hosts or single hostname",)
447   parser.add_option("-a", dest="auxfiles", action="append", default=[],
448                     help="optional auxiliary file to upload"
449                     " (can be given multiple times",
450                     metavar="FILE")
451   parser.add_option("-c", dest="command", default=None,
452                     help="shell command to run on remote host(s)")
453   parser.add_option("-b", dest="batch_size", default=15, type="int",
454                     help="batch-size, how many hosts to process"
455                     " in parallel [15]")
456   parser.add_option("-u", dest="username", default="root",
457                     help="username used to connect [root]")
458   parser.add_option("-p", dest="password", default=None,
459                     help="password used to authenticate (when not"
460                     " using an agent)")
461   parser.add_option("-A", dest="use_agent", default=False, action="store_true",
462                     help="instead of password, use keys from an SSH agent")
463
464   opts, args = parser.parse_args()
465
466   if opts.executable and opts.command:
467     parser.error("Options -x and -c conflict with each other")
468   if not (opts.executable or opts.command):
469     parser.error("One of -x and -c must be given")
470   if not opts.logdir:
471     parser.error("Option -l is required")
472   if opts.hostfile and opts.hostlist:
473     parser.error("Options -f and -h conflict with each other")
474   if not (opts.hostfile or opts.hostlist):
475     parser.error("One of -f or -h must be given")
476   if args:
477     parser.error("This program doesn't take any arguments, passed in: %s" %
478                  ", ".join(args))
479
480   return (opts.logdir, opts.executable, opts.hostfile, opts.hostlist,
481           opts.command, opts.use_agent, opts.auxfiles, opts.username,
482           opts.password, opts.batch_size)
483
484
485 def main():
486   """main."""
487   (logdir, executable, hostfile, hostlist,
488    command, use_agent, auxfiles, username,
489    password, batch_size) = ParseOptions()
490
491   ### Unbuffered sys.stdout
492   sys.stdout = os.fdopen(1, "w", 0)
493
494   if LogDirUseable(logdir) is False:
495     print "ERROR: cannot create logfiles in dir %s, aborting" % logdir
496     sys.exit(1)
497
498   if use_agent:
499     pass
500   elif password:
501     try:
502       fh = file(password)
503       pwvalue = fh.readline().strip()
504       fh.close()
505     except IOError, e:
506       print "error: can not read in from password file %s: %s" % (password, e)
507       sys.exit(1)
508     password = pwvalue
509   else:
510     password = getpass.getpass("%s's password for all nodes: " % username)
511
512   if hostfile:
513     hosts = GetHosts(hostfile)
514   else:
515     if "," in hostlist:
516       hostlist = hostlist.rstrip(",")  # commandline robustness
517       hosts = hostlist.split(",")
518     else:
519       hosts = [hostlist]
520
521   successes = failures = 0
522
523   filelist = auxfiles[:]
524   filelist.append(executable)
525
526   # initial batch
527   batch = hosts[:batch_size]
528   hosts = hosts[batch_size:]
529   child_pids = {}
530   for hostname in batch:
531     LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
532                  executable, command, filelist)
533
534   while child_pids:
535     pid, status = os.wait()
536     hostname = child_pids.pop(pid, "<unknown host>")
537     print "  %s: done (in parent)" % hostname
538     if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0:
539       successes += 1
540     else:
541       failures += 1
542     if hosts:
543       LaunchWorker(child_pids, logdir, username, password, use_agent,
544                    hosts.pop(0), executable, command, filelist)
545
546   print
547   print "All done, %s successful and %s failed hosts" % (successes, failures)
548
549   sys.exit(0)
550
551
552 if __name__ == "__main__":
553   try:
554     main()
555   except KeyboardInterrupt:
556     print "Received KeyboardInterrupt, aborting"
557     sys.exit(1)