Disable the cluster-merge tool for the moment
[ganeti-local] / tools / ganeti-listrunner
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Run an executable on a list of hosts.
22
23 Script to serially run an executable on a list of hosts via ssh
24 with password auth as root. If the provided log dir does not yet
25 exist, it will try to create it.
26
27 Implementation:
28  - the main process spawns up to batch_size children, which:
29  - connects to the remote host via ssh as root
30  - uploads the executable with a random name to /tmp via sftp
31  - chmod 500s it
32  - via ssh: chdirs into the upload directory and runs the script
33  - deletes it
34  - writes status messages and all output to one logfile per host
35  - the main process gathers then the status of the children and
36    reports the success/failure ratio
37  - entire script can be aborted with Ctrl-C
38
39 Security considerations:
40  - the root password for the remote hosts is stored in memory for the
41    runtime of the script
42  - the executable to be run on the remote host is handled the following way:
43    - try to create a random directory with permissions 700 on the
44      remote host, abort furter processing on this host if this failes
45    - upload the executable with to a random filename in that directory
46    - set executable permissions to 500
47    - run the executable
48    - delete the execuable and the directory on the remote host
49
50 """
51
52 # pylint: disable-msg=C0103
53 # C0103: Invalid name ganeti-listrunner
54
55 import errno
56 import getopt
57 import getpass
58 import logging
59 import os
60 import random
61 import select
62 import socket
63 import sys
64 import time
65 import traceback
66
67 import paramiko
68
69
70 REMOTE_PATH_BASE = "/tmp/listrunner"
71
72
73 def LogDirUseable(logdir):
74   """Ensure log file directory is available and usable."""
75   testfile = "%s/test-%s-%s.deleteme" % (logdir, random.random(),
76                                          random.random())
77   try:
78     os.mkdir(logdir)
79   except OSError, err:
80     if err.errno != errno.EEXIST:
81       raise
82   try:
83     logtest = open(testfile, "aw")
84     logtest.writelines("log file writeability test\n")
85     logtest.close()
86     os.unlink(testfile)
87     return True
88   except (OSError, IOError):
89     return False
90
91
92 def ShowHelp(executable):
93   """Print short usage information."""
94   print ("usage: %s -l logdir [-c|-x] value [-b batch_size]"
95          " [-f hostfile|-h hosts] [-u username]"
96          " [-p password_file]" % executable)
97   print """        -l logdir to write logfiles to
98         -x executable to run on remote host(s)
99         -c shell command to run on remote host(s)
100         -f hostlist file (one host per line)
101         -a optional auxiliary file to upload (can be given multiple times)
102         -b batch-size, how many hosts to process in parallel [15]
103         -h comma-separated list of hosts or single hostname
104         -u username used to connect [root]
105         -p password used to authenticate"""
106
107
108 def GetTimeStamp(timestamp=None):
109   """Return ISO8601 timestamp.
110
111   Returns ISO8601 timestamp, optionally expects a time.localtime() tuple
112   in timestamp, but will use the current time if this argument is not
113   supplied.
114   """
115   if timestamp is None:
116     timestamp = time.localtime()
117
118   isotime = time.strftime("%Y-%m-%dT%H:%M:%S", timestamp)
119   return isotime
120
121
122 def PingByTcp(target, port, timeout=10, live_port_needed=False, source=None):
123   """Simple ping implementation using TCP connect(2).
124
125   Try to do a TCP connect(2) from an optional source IP to the
126   specified target IP and the specified target port. If the optional
127   parameter live_port_needed is set to true, requires the remote end
128   to accept the connection. The timeout is specified in seconds and
129   defaults to 10 seconds. If the source optional argument is not
130   passed, the source address selection is left to the kernel,
131   otherwise we try to connect using the passed address (failures to
132   bind other than EADDRNOTAVAIL will be ignored).
133
134   """
135   sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
136
137   success = False
138
139   if source is not None:
140     try:
141       sock.bind((source, 0))
142     except socket.error, (errcode):
143       if errcode == errno.EADDRNOTAVAIL:
144         success = False
145
146   sock.settimeout(timeout)
147
148   try:
149     sock.connect((target, port))
150     sock.close()
151     success = True
152   except socket.timeout:
153     success = False
154   except socket.error, (errcode):
155     success = (not live_port_needed) and (errcode == errno.ECONNREFUSED)
156
157   return success
158
159
160 def GetHosts(hostsfile):
161   """Return list of hosts from hostfile.
162
163   Reads the hostslist file and returns a list of hosts.
164   Expects the hostslist file to contain one hostname per line.
165
166   """
167   try:
168     datafile = open(hostsfile, "r")
169   except IOError, msg:
170     print "Failed to open hosts file %s: %s" % (hostsfile, msg)
171     sys.exit(2)
172
173   hosts = datafile.readlines()
174   datafile.close()
175
176   return hosts
177
178
179 def WriteLog(message, logfile):
180   """Writes message, terminated by newline, to logfile."""
181   try:
182     logfile = open(logfile, "aw")
183   except IOError, msg:
184     print "failed to open log file %s: %s" % (logfile, msg)
185     print "log message was: %s" % message
186     sys.exit(1)  # no being able to log is critical
187   try:
188     timestamp = GetTimeStamp()
189     logfile.writelines("%s %s\n" % (timestamp, message))
190     logfile.close()
191   except IOError, msg:
192     print "failed to write to logfile %s: %s" % (logfile, msg)
193     print "log message was: %s" % message
194     sys.exit(1)  # no being able to log is critical
195
196
197 def GetAgentKeys():
198   """Tries to get a list of ssh keys from an agent."""
199   try:
200     agent = paramiko.Agent()
201     return list(agent.get_keys())
202   except paramiko.SSHException:
203     return []
204
205
206 def SetupSshConnection(host, username, password, keys, logfile):
207   """Setup the ssh connection used for all later steps.
208
209   This function sets up the ssh connection that will be used both
210   for upload and remote command execution.
211
212   On success, it will return paramiko.Transport object with an
213   already logged in session. On failure, False will be returned.
214
215   """
216   # check if target is willing to talk to us at all
217   if not PingByTcp(host, 22, live_port_needed=True):
218     WriteLog("ERROR: FAILURE_NOT_REACHABLE", logfile)
219     print "  - ERROR: host not reachable on 22/tcp"
220     return False
221
222   all_kwargs = [{"pkey": k} for k in keys]
223   all_desc = ["key %d" % d for d in range(len(keys))]
224   if password is not None:
225     all_kwargs.append({"password": password})
226     all_desc.append("password")
227
228   # deal with logging out of paramiko.transport
229   handler = None
230
231   for desc, kwargs in zip(all_desc, all_kwargs):
232     try:
233       transport = paramiko.Transport((host, 22))
234
235       # only try to setup the logging handler once
236       if not handler:
237         handler = logging.StreamHandler()
238         handler.setLevel(logging.ERROR)
239         log = logging.getLogger(transport.get_log_channel())
240         log.addHandler(handler)
241
242       transport.connect(username=username, **kwargs) # pylint: disable-msg=W0142
243       WriteLog("ssh connection established using %s" % desc, logfile)
244       # strange ... when establishing the session and the immediately
245       # setting up the channels for sftp & shell from that, it sometimes
246       # fails, but waiting 1 second after session setup makes it always work
247       # time.sleep(1)
248       # FIXME apparently needfull to give sshd some time
249       return transport
250     except (socket.gaierror, socket.error, paramiko.SSHException):
251       continue
252
253   methods = ", ".join(all_desc)
254   WriteLog("ERROR: FAILURE_CONNECTION_SETUP (tried %s) " % methods, logfile)
255   WriteLog("aborted", logfile)
256   print "  - ERROR: connection setup failed (tried %s)" % methods
257
258   return False
259
260
261 def UploadFiles(connection, executable, filelist, logfile):
262   """Uploads the specified files via sftp.
263
264   Uploads the specified files to a random, freshly created directory with
265   a temporary name under /tmp. All uploaded files are chmod 0400 after upload
266   with the exception of executable, with is chmod 500.
267
268   Upon success, returns the absolute path to the remote upload directory,
269   but will return False upon failure.
270   """
271   remote_dir = "%s.%s-%s" % (REMOTE_PATH_BASE,
272                              random.random(), random.random())
273
274   try:
275     sftp = paramiko.SFTPClient.from_transport(connection)
276     sftp.mkdir(remote_dir, mode=0700)
277     for item in filelist:
278       remote_file = "%s/%s" % (remote_dir, item.split("/").pop())
279       WriteLog("uploading %s to remote %s" % (item, remote_file), logfile)
280       sftp.put(item, remote_file)
281       if item == executable:
282         sftp.chmod(remote_file, 0500)
283       else:
284         sftp.chmod(remote_file, 0400)
285     sftp.close()
286   except IOError, err:
287     WriteLog("ERROR: FAILURE_UPLOAD: %s" % err, logfile)
288     return False
289
290   return remote_dir
291
292
293 def CleanupRemoteDir(connection, upload_dir, filelist, logfile):
294   """Cleanes out and removes the remote work directory."""
295   try:
296     sftp = paramiko.SFTPClient.from_transport(connection)
297     for item in filelist:
298       fullpath = "%s/%s" % (upload_dir, item.split("/").pop())
299       WriteLog("removing remote %s" % fullpath, logfile)
300       sftp.remove(fullpath)
301     sftp.rmdir(upload_dir)
302     sftp.close()
303   except IOError, err:
304     WriteLog("ERROR: FAILURE_CLEANUP: %s" % err, logfile)
305     return False
306
307   return True
308
309
310 def RunRemoteCommand(connection, command, logfile):
311   """Execute the command via ssh on the remote host."""
312   session = connection.open_session()
313   session.setblocking(0)
314
315   # the following dance is needed because paramiko changed APIs:
316   # from returning True/False for success to always returning None
317   # and throwing an exception in case of problems.
318   # And I want to support both the old and the new API.
319   result = True  # being optimistic here, I know
320   message = None
321   try:
322     if session.exec_command("%s 2>&1" % command) is False:
323       result = False
324   except paramiko.SSHException, message:
325     result = False
326
327   if not result:
328     WriteLog("ERROR: FAILURE_COMMAND_EXECUTION: %s" % message, logfile)
329     return False
330
331    ### Read when data is available
332   output = ""
333   while select.select([session], [], []):
334     data = session.recv(1024)
335     if not data:
336       break
337     output += data
338     select.select([], [], [], .1)
339
340   WriteLog("SUCCESS: command output follows", logfile)
341   for line in output.split("\n"):
342     WriteLog("output = %s" %line, logfile)
343   WriteLog("command execution completed", logfile)
344   session.close()
345
346   return True
347
348
349 def HostWorker(logdir, username, password, keys, hostname,
350                executable, command, filelist):
351   """Per-host worker.
352
353   This function does not return - it's the main code of the childs,
354   which exit at the end of this function. The exit code 0 or 1 will be
355   interpreted by the parent.
356
357   @param logdir: the directory where the logfiles must be created
358   @param username: SSH username
359   @param password: SSH password
360   @param keys: SSH keys
361   @param hostname: the hostname to connect to
362   @param executable: the executable to upload, if not None
363   @param command: the command to run
364   @param filelist: auxiliary files to upload
365
366   """
367   # in the child/worker process
368   logfile = "%s/%s.log" % (logdir, hostname)
369   print "%s - starting" % hostname
370   result = 0  # optimism, I know
371   try:
372     connection = SetupSshConnection(hostname, username,
373                                     password, keys, logfile)
374     if connection is not False:
375       if executable is not None:
376         print "  %s: uploading files" % hostname
377         upload_dir = UploadFiles(connection, executable,
378                                  filelist, logfile)
379         command = "cd %s && ./%s" % (upload_dir,
380                                      executable.split("/").pop())
381       print "  %s: executing remote command" % hostname
382       cmd_result = RunRemoteCommand(connection, command, logfile)
383       if cmd_result is True:
384         print "  %s: remote command execution successful" % hostname
385       else:
386         print ("  %s: remote command execution failed,"
387                " check log for details" % hostname)
388         result = 1
389       if executable is not None:
390         print "  %s: cleaning up remote work dir" % hostname
391         cln_result = CleanupRemoteDir(connection, upload_dir,
392                                       filelist, logfile)
393         if cln_result is False:
394           print ("  %s: remote work dir cleanup failed, check"
395                  " log for details" % hostname)
396           result = 1
397       connection.close()
398     else:
399       print "  %s: connection setup failed, skipping" % hostname
400       result = 1
401   except KeyboardInterrupt:
402     print "  %s: received KeyboardInterrupt, aborting" % hostname
403     WriteLog("ERROR: ABORT_KEYBOARD_INTERRUPT", logfile)
404     result = 1
405   except Exception, err:
406     result = 1
407     trace = traceback.format_exc()
408     msg = "ERROR: UNHANDLED_EXECPTION_ERROR: %s\nTrace: %s" % (err, trace)
409     WriteLog(msg, logfile)
410     print "  %s: %s" % (hostname, msg)
411   # and exit with exit code 0 or 1, so the parent can compute statistics
412   sys.exit(result)
413
414
415 def LaunchWorker(child_pids, logdir, username, password, keys, hostname,
416                  executable, command, filelist):
417   """Launch the per-host worker.
418
419   Arguments are the same as for HostWorker, except for child_pids,
420   which is a dictionary holding the pid-to-hostname mapping.
421
422   """
423   hostname = hostname.rstrip("\n")
424   pid = os.fork()
425   if pid > 0:
426     # controller just record the pids
427     child_pids[pid] = hostname
428   else:
429     HostWorker(logdir, username, password, keys, hostname,
430                executable, command, filelist)
431
432
433 def main():
434   """main."""
435   try:
436     optlist, _ = getopt.getopt(sys.argv[1:], "l:x:h:f:a:c:b:u:p:A")
437   except getopt.GetoptError, err:
438     print str(err)
439     ShowHelp(sys.argv[0])
440     sys.exit(2)
441
442   logdir = executable = hostfile = hostlist = command = None
443   use_agent = False
444   auxfiles = []
445   username = "root"
446   password = None
447   batch_size = 15
448   for option in optlist:
449     if option[0] == "-l":
450       logdir = option[1]
451     if option[0] == "-x":
452       executable = option[1]
453     if option[0] == "-f":
454       hostfile = option[1]
455     if option[0] == "-h":
456       hostlist = option[1]
457     if option[0] == "-a":
458       auxfiles.append(option[1])
459     if option[0] == "-c":
460       command = option[1]
461     if option[0] == "-b":
462       batch_size = int(option[1])
463     if option[0] == "-u":
464       username = option[1]
465     if option[0] == "-p":
466       password = option[1]
467     if option[0] == "-A":
468       use_agent = True
469
470   if not (logdir and (executable or command) and (hostfile or hostlist)):
471     print "error: missing required commandline argument(s)"
472     ShowHelp(sys.argv[0])
473     sys.exit(3)
474
475   if executable and command:
476     print "error: can run either a command or an executable, not both"
477     ShowHelp(sys.argv[0])
478     sys.exit(3)
479
480   if hostlist and hostfile:
481     print "error: specify either -f or -h arguments, not both"
482     ShowHelp(sys.argv[0])
483     sys.exit(3)
484
485   ### Unbuffered sys.stdout
486   sys.stdout = os.fdopen(1, "w", 0)
487
488   if LogDirUseable(logdir) is False:
489     print "ERROR: cannot create logfiles in dir %s, aborting" % logdir
490     sys.exit(1)
491
492   keys = []
493   if use_agent:
494     keys = GetAgentKeys()
495   elif password:
496     try:
497       fh = file(password)
498       pwvalue = fh.readline().strip()
499       fh.close()
500     except IOError, e:
501       print "error: can not read in from password file %s: %s" % (password, e)
502       sys.exit(1)
503     password = pwvalue
504   else:
505     password = getpass.getpass("%s's password for all nodes: " % username)
506
507   if hostfile:
508     hosts = GetHosts(hostfile)
509   else:
510     if "," in hostlist:
511       hostlist = hostlist.rstrip(",")  # commandline robustness
512       hosts = hostlist.split(",")
513     else:
514       hosts = [hostlist]
515
516   successes = failures = 0
517
518   filelist = auxfiles[:]
519   filelist.append(executable)
520
521   # initial batch
522   batch = hosts[:batch_size]
523   hosts = hosts[batch_size:]
524   child_pids = {}
525   for hostname in batch:
526     LaunchWorker(child_pids, logdir, username, password, keys, hostname,
527                  executable, command, filelist)
528
529   while child_pids:
530     pid, status = os.wait()
531     hostname = child_pids.pop(pid, "<unknown host>")
532     print "  %s: done (in parent)" % hostname
533     if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0:
534       successes += 1
535     else:
536       failures += 1
537     if hosts:
538       LaunchWorker(child_pids, logdir, username, password, keys,
539                    hosts.pop(0), executable, command, filelist)
540
541   print
542   print "All done, %s successful and %s failed hosts" % (successes, failures)
543
544   sys.exit(0)
545
546
547 if __name__ == "__main__":
548   try:
549     main()
550   except KeyboardInterrupt:
551     print "Received KeyboardInterrupt, aborting"
552     sys.exit(1)