Statistics
| Branch: | Tag: | Revision:

root / tools / ganeti-listrunner @ 1817dca9

History | View | Annotate | Download (18.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Run an executable on a list of hosts.
22

    
23
Script to serially run an executable on a list of hosts via ssh
24
with password auth as root. If the provided log dir does not yet
25
exist, it will try to create it.
26

    
27
Implementation:
28
 - the main process spawns up to batch_size children, which:
29
 - connects to the remote host via ssh as root
30
 - uploads the executable with a random name to /tmp via sftp
31
 - chmod 500s it
32
 - via ssh: chdirs into the upload directory and runs the script
33
 - deletes it
34
 - writes status messages and all output to one logfile per host
35
 - the main process gathers then the status of the children and
36
   reports the success/failure ratio
37
 - entire script can be aborted with Ctrl-C
38

    
39
Security considerations:
40
 - the root password for the remote hosts is stored in memory for the
41
   runtime of the script
42
 - the executable to be run on the remote host is handled the following way:
43
   - try to create a random directory with permissions 700 on the
44
     remote host, abort furter processing on this host if this failes
45
   - upload the executable with to a random filename in that directory
46
   - set executable permissions to 500
47
   - run the executable
48
   - delete the execuable and the directory on the remote host
49

    
50
"""
51

    
52
# pylint: disable=C0103
53
# C0103: Invalid name ganeti-listrunner
54

    
55
import errno
56
import optparse
57
import getpass
58
import logging
59
import os
60
import random
61
import select
62
import socket
63
import sys
64
import time
65
import traceback
66

    
67
import paramiko
68

    
69

    
70
REMOTE_PATH_BASE = "/tmp/listrunner"
71

    
72
USAGE = ("%prog -l logdir {-c command | -x /path/to/file} [-b batch_size]"
73
         " {-f hostfile|-h hosts} [-u username]"
74
         " [-p password_file | -A]")
75

    
76

    
77
def LogDirUseable(logdir):
78
  """Ensure log file directory is available and usable."""
79
  testfile = "%s/test-%s-%s.deleteme" % (logdir, random.random(),
80
                                         random.random())
81
  try:
82
    os.mkdir(logdir)
83
  except OSError, err:
84
    if err.errno != errno.EEXIST:
85
      raise
86
  try:
87
    logtest = open(testfile, "aw")
88
    logtest.writelines("log file writeability test\n")
89
    logtest.close()
90
    os.unlink(testfile)
91
    return True
92
  except (OSError, IOError):
93
    return False
94

    
95

    
96
def GetTimeStamp(timestamp=None):
97
  """Return ISO8601 timestamp.
98

    
99
  Returns ISO8601 timestamp, optionally expects a time.localtime() tuple
100
  in timestamp, but will use the current time if this argument is not
101
  supplied.
102
  """
103
  if timestamp is None:
104
    timestamp = time.localtime()
105

    
106
  isotime = time.strftime("%Y-%m-%dT%H:%M:%S", timestamp)
107
  return isotime
108

    
109

    
110
def PingByTcp(target, port, timeout=10, live_port_needed=False, source=None):
111
  """Simple ping implementation using TCP connect(2).
112

    
113
  Try to do a TCP connect(2) from an optional source IP to the
114
  specified target IP and the specified target port. If the optional
115
  parameter live_port_needed is set to true, requires the remote end
116
  to accept the connection. The timeout is specified in seconds and
117
  defaults to 10 seconds. If the source optional argument is not
118
  passed, the source address selection is left to the kernel,
119
  otherwise we try to connect using the passed address (failures to
120
  bind other than EADDRNOTAVAIL will be ignored).
121

    
122
  """
123
  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
124

    
125
  success = False
126

    
127
  if source is not None:
128
    try:
129
      sock.bind((source, 0))
130
    except socket.error, (errcode):
131
      if errcode == errno.EADDRNOTAVAIL:
132
        success = False
133

    
134
  sock.settimeout(timeout)
135

    
136
  try:
137
    sock.connect((target, port))
138
    sock.close()
139
    success = True
140
  except socket.timeout:
141
    success = False
142
  except socket.error, (errcode):
143
    success = (not live_port_needed) and (errcode == errno.ECONNREFUSED)
144

    
145
  return success
146

    
147

    
148
def GetHosts(hostsfile):
149
  """Return list of hosts from hostfile.
150

    
151
  Reads the hostslist file and returns a list of hosts.
152
  Expects the hostslist file to contain one hostname per line.
153

    
154
  """
155
  try:
156
    datafile = open(hostsfile, "r")
157
  except IOError, msg:
158
    print "Failed to open hosts file %s: %s" % (hostsfile, msg)
159
    sys.exit(2)
160

    
161
  hosts = datafile.readlines()
162
  datafile.close()
163

    
164
  return hosts
165

    
166

    
167
def WriteLog(message, logfile):
168
  """Writes message, terminated by newline, to logfile."""
169
  try:
170
    logfile = open(logfile, "aw")
171
  except IOError, msg:
172
    print "failed to open log file %s: %s" % (logfile, msg)
173
    print "log message was: %s" % message
174
    sys.exit(1)  # no being able to log is critical
175
  try:
176
    timestamp = GetTimeStamp()
177
    logfile.writelines("%s %s\n" % (timestamp, message))
178
    logfile.close()
179
  except IOError, msg:
180
    print "failed to write to logfile %s: %s" % (logfile, msg)
181
    print "log message was: %s" % message
182
    sys.exit(1)  # no being able to log is critical
183

    
184

    
185
def GetAgentKeys():
186
  """Tries to get a list of ssh keys from an agent."""
187
  try:
188
    agent = paramiko.Agent()
189
    return list(agent.get_keys())
190
  except paramiko.SSHException:
191
    return []
192

    
193

    
194
def SetupSshConnection(host, username, password, use_agent, logfile):
195
  """Setup the ssh connection used for all later steps.
196

    
197
  This function sets up the ssh connection that will be used both
198
  for upload and remote command execution.
199

    
200
  On success, it will return paramiko.Transport object with an
201
  already logged in session. On failure, False will be returned.
202

    
203
  """
204
  # check if target is willing to talk to us at all
205
  if not PingByTcp(host, 22, live_port_needed=True):
206
    WriteLog("ERROR: FAILURE_NOT_REACHABLE", logfile)
207
    print "  - ERROR: host not reachable on 22/tcp"
208
    return False
209

    
210
  if use_agent:
211
    keys = GetAgentKeys()
212
  else:
213
    keys = []
214
  all_kwargs = [{"pkey": k} for k in keys]
215
  all_desc = ["key %d" % d for d in range(len(keys))]
216
  if password is not None:
217
    all_kwargs.append({"password": password})
218
    all_desc.append("password")
219

    
220
  # deal with logging out of paramiko.transport
221
  handler = None
222

    
223
  for desc, kwargs in zip(all_desc, all_kwargs):
224
    try:
225
      transport = paramiko.Transport((host, 22))
226

    
227
      # only try to setup the logging handler once
228
      if not handler:
229
        handler = logging.StreamHandler()
230
        handler.setLevel(logging.ERROR)
231
        log = logging.getLogger(transport.get_log_channel())
232
        log.addHandler(handler)
233

    
234
      transport.connect(username=username, **kwargs) # pylint: disable=W0142
235
      WriteLog("ssh connection established using %s" % desc, logfile)
236
      # strange ... when establishing the session and the immediately
237
      # setting up the channels for sftp & shell from that, it sometimes
238
      # fails, but waiting 1 second after session setup makes it always work
239
      # time.sleep(1)
240
      # FIXME apparently needfull to give sshd some time
241
      return transport
242
    except (socket.gaierror, socket.error, paramiko.SSHException):
243
      continue
244

    
245
  methods = ", ".join(all_desc)
246
  WriteLog("ERROR: FAILURE_CONNECTION_SETUP (tried %s) " % methods, logfile)
247
  WriteLog("aborted", logfile)
248
  print "  - ERROR: connection setup failed (tried %s)" % methods
249

    
250
  return False
251

    
252

    
253
def UploadFiles(connection, executable, filelist, logfile):
254
  """Uploads the specified files via sftp.
255

    
256
  Uploads the specified files to a random, freshly created directory with
257
  a temporary name under /tmp. All uploaded files are chmod 0400 after upload
258
  with the exception of executable, with is chmod 500.
259

    
260
  Upon success, returns the absolute path to the remote upload directory,
261
  but will return False upon failure.
262
  """
263
  remote_dir = "%s.%s-%s" % (REMOTE_PATH_BASE,
264
                             random.random(), random.random())
265

    
266
  try:
267
    sftp = paramiko.SFTPClient.from_transport(connection)
268
    sftp.mkdir(remote_dir, mode=0700)
269
    for item in filelist:
270
      remote_file = "%s/%s" % (remote_dir, os.path.basename(item))
271
      WriteLog("uploading %s to remote %s" % (item, remote_file), logfile)
272
      sftp.put(item, remote_file)
273
      if item == executable:
274
        sftp.chmod(remote_file, 0500)
275
      else:
276
        sftp.chmod(remote_file, 0400)
277
    sftp.close()
278
  except IOError, err:
279
    WriteLog("ERROR: FAILURE_UPLOAD: %s" % err, logfile)
280
    return False
281

    
282
  return remote_dir
283

    
284

    
285
def CleanupRemoteDir(connection, upload_dir, filelist, logfile):
286
  """Cleanes out and removes the remote work directory."""
287
  try:
288
    sftp = paramiko.SFTPClient.from_transport(connection)
289
    for item in filelist:
290
      fullpath = "%s/%s" % (upload_dir, os.path.basename(item))
291
      WriteLog("removing remote %s" % fullpath, logfile)
292
      sftp.remove(fullpath)
293
    sftp.rmdir(upload_dir)
294
    sftp.close()
295
  except IOError, err:
296
    WriteLog("ERROR: FAILURE_CLEANUP: %s" % err, logfile)
297
    return False
298

    
299
  return True
300

    
301

    
302
def RunRemoteCommand(connection, command, logfile):
303
  """Execute the command via ssh on the remote host."""
304
  session = connection.open_session()
305
  session.setblocking(0)
306

    
307
  # the following dance is needed because paramiko changed APIs:
308
  # from returning True/False for success to always returning None
309
  # and throwing an exception in case of problems.
310
  # And I want to support both the old and the new API.
311
  result = True  # being optimistic here, I know
312
  message = None
313
  try:
314
    if session.exec_command("%s 2>&1" % command) is False:
315
      result = False
316
  except paramiko.SSHException, message:
317
    result = False
318

    
319
  if not result:
320
    WriteLog("ERROR: FAILURE_COMMAND_EXECUTION: %s" % message, logfile)
321
    return False
322

    
323
   ### Read when data is available
324
  output = ""
325
  while select.select([session], [], []):
326
    try:
327
      data = session.recv(1024)
328
    except socket.timeout, err:
329
      data = None
330
      WriteLog("FAILED: socket.timeout %s" % err, logfile)
331
    except socket.error, err:
332
      data = None
333
      WriteLog("FAILED: socket.error %s" % err, logfile)
334
    if not data:
335
      break
336
    output += data
337
    select.select([], [], [], .1)
338

    
339
  WriteLog("SUCCESS: command output follows", logfile)
340
  for line in output.splitlines():
341
    WriteLog("output = %s" % line, logfile)
342
  WriteLog("command execution completed", logfile)
343
  session.close()
344

    
345
  return True
346

    
347

    
348
def HostWorker(logdir, username, password, use_agent, hostname,
349
               executable, exec_args, command, filelist):
350
  """Per-host worker.
351

    
352
  This function does not return - it's the main code of the childs,
353
  which exit at the end of this function. The exit code 0 or 1 will be
354
  interpreted by the parent.
355

    
356
  @param logdir: the directory where the logfiles must be created
357
  @param username: SSH username
358
  @param password: SSH password
359
  @param use_agent: whether we should instead use an agent
360
  @param hostname: the hostname to connect to
361
  @param executable: the executable to upload, if not None
362
  @param exec_args: Additional arguments for executable
363
  @param command: the command to run
364
  @param filelist: auxiliary files to upload
365

    
366
  """
367
  # in the child/worker process
368
  logfile = "%s/%s.log" % (logdir, hostname)
369
  print "%s - starting" % hostname
370
  result = 0  # optimism, I know
371
  try:
372
    connection = SetupSshConnection(hostname, username,
373
                                    password, use_agent, logfile)
374
    if connection is not False:
375
      if executable is not None:
376
        print "  %s: uploading files" % hostname
377
        upload_dir = UploadFiles(connection, executable,
378
                                 filelist, logfile)
379
        command = ("cd %s && ./%s" %
380
                   (upload_dir, os.path.basename(executable)))
381
        if exec_args:
382
          command += " %s" % exec_args
383
      print "  %s: executing remote command" % hostname
384
      cmd_result = RunRemoteCommand(connection, command, logfile)
385
      if cmd_result is True:
386
        print "  %s: remote command execution successful" % hostname
387
      else:
388
        print ("  %s: remote command execution failed,"
389
               " check log for details" % hostname)
390
        result = 1
391
      if executable is not None:
392
        print "  %s: cleaning up remote work dir" % hostname
393
        cln_result = CleanupRemoteDir(connection, upload_dir,
394
                                      filelist, logfile)
395
        if cln_result is False:
396
          print ("  %s: remote work dir cleanup failed, check"
397
                 " log for details" % hostname)
398
          result = 1
399
      connection.close()
400
    else:
401
      print "  %s: connection setup failed, skipping" % hostname
402
      result = 1
403
  except KeyboardInterrupt:
404
    print "  %s: received KeyboardInterrupt, aborting" % hostname
405
    WriteLog("ERROR: ABORT_KEYBOARD_INTERRUPT", logfile)
406
    result = 1
407
  except Exception, err:
408
    result = 1
409
    trace = traceback.format_exc()
410
    msg = "ERROR: UNHANDLED_EXECPTION_ERROR: %s\nTrace: %s" % (err, trace)
411
    WriteLog(msg, logfile)
412
    print "  %s: %s" % (hostname, msg)
413
  # and exit with exit code 0 or 1, so the parent can compute statistics
414
  sys.exit(result)
415

    
416

    
417
def LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
418
                 executable, exec_args, command, filelist):
419
  """Launch the per-host worker.
420

    
421
  Arguments are the same as for HostWorker, except for child_pids,
422
  which is a dictionary holding the pid-to-hostname mapping.
423

    
424
  """
425
  hostname = hostname.rstrip("\n")
426
  pid = os.fork()
427
  if pid > 0:
428
    # controller just record the pids
429
    child_pids[pid] = hostname
430
  else:
431
    HostWorker(logdir, username, password, use_agent, hostname,
432
               executable, exec_args, command, filelist)
433

    
434

    
435
def ParseOptions():
436
  """Parses the command line options.
437

    
438
  In case of command line errors, it will show the usage and exit the
439
  program.
440

    
441
  @return: the options in a tuple
442

    
443
  """
444
  # resolve because original used -h for hostfile, which conflicts
445
  # with -h for help
446
  parser = optparse.OptionParser(usage="\n%s" % USAGE,
447
                                 conflict_handler="resolve")
448

    
449
  parser.add_option("-l", dest="logdir", default=None,
450
                    help="directory to write logfiles to")
451
  parser.add_option("-x", dest="executable", default=None,
452
                    help="executable to run on remote host(s)",)
453
  parser.add_option("-f", dest="hostfile", default=None,
454
                    help="hostlist file (one host per line)")
455
  parser.add_option("-h", dest="hostlist", default=None, metavar="HOSTS",
456
                    help="comma-separated list of hosts or single hostname",)
457
  parser.add_option("-a", dest="auxfiles", action="append", default=[],
458
                    help="optional auxiliary file to upload"
459
                    " (can be given multiple times)",
460
                    metavar="FILE")
461
  parser.add_option("-c", dest="command", default=None,
462
                    help="shell command to run on remote host(s)")
463
  parser.add_option("-b", dest="batch_size", default=15, type="int",
464
                    help="batch-size, how many hosts to process"
465
                    " in parallel [15]")
466
  parser.add_option("-u", dest="username", default="root",
467
                    help="username used to connect [root]")
468
  parser.add_option("-p", dest="password", default=None,
469
                    help="password used to authenticate (when not"
470
                    " using an agent)")
471
  parser.add_option("-A", dest="use_agent", default=False, action="store_true",
472
                    help="instead of password, use keys from an SSH agent")
473
  parser.add_option("--args", dest="exec_args", default=None,
474
                    help="Arguments to be passed to executable (-x)")
475

    
476
  opts, args = parser.parse_args()
477

    
478
  if opts.executable and opts.command:
479
    parser.error("Options -x and -c conflict with each other")
480
  if not (opts.executable or opts.command):
481
    parser.error("One of -x and -c must be given")
482
  if opts.command and opts.exec_args:
483
    parser.error("Can't specify arguments when using custom command")
484
  if not opts.logdir:
485
    parser.error("Option -l is required")
486
  if opts.hostfile and opts.hostlist:
487
    parser.error("Options -f and -h conflict with each other")
488
  if not (opts.hostfile or opts.hostlist):
489
    parser.error("One of -f or -h must be given")
490
  if args:
491
    parser.error("This program doesn't take any arguments, passed in: %s" %
492
                 ", ".join(args))
493

    
494
  return (opts.logdir, opts.executable, opts.exec_args,
495
          opts.hostfile, opts.hostlist,
496
          opts.command, opts.use_agent, opts.auxfiles, opts.username,
497
          opts.password, opts.batch_size)
498

    
499

    
500
def main():
501
  """main."""
502
  (logdir, executable, exec_args, hostfile, hostlist,
503
   command, use_agent, auxfiles, username,
504
   password, batch_size) = ParseOptions()
505

    
506
  ### Unbuffered sys.stdout
507
  sys.stdout = os.fdopen(1, "w", 0)
508

    
509
  if LogDirUseable(logdir) is False:
510
    print "ERROR: cannot create logfiles in dir %s, aborting" % logdir
511
    sys.exit(1)
512

    
513
  if use_agent:
514
    pass
515
  elif password:
516
    try:
517
      fh = file(password)
518
      pwvalue = fh.readline().strip()
519
      fh.close()
520
    except IOError, e:
521
      print "error: can not read in from password file %s: %s" % (password, e)
522
      sys.exit(1)
523
    password = pwvalue
524
  else:
525
    password = getpass.getpass("%s's password for all nodes: " % username)
526

    
527
  if hostfile:
528
    hosts = GetHosts(hostfile)
529
  else:
530
    if "," in hostlist:
531
      hostlist = hostlist.rstrip(",")  # commandline robustness
532
      hosts = hostlist.split(",")
533
    else:
534
      hosts = [hostlist]
535

    
536
  successes = failures = 0
537

    
538
  filelist = auxfiles[:]
539
  filelist.append(executable)
540

    
541
  # initial batch
542
  batch = hosts[:batch_size]
543
  hosts = hosts[batch_size:]
544
  child_pids = {}
545
  for hostname in batch:
546
    LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
547
                 executable, exec_args, command, filelist)
548

    
549
  while child_pids:
550
    pid, status = os.wait()
551
    hostname = child_pids.pop(pid, "<unknown host>")
552
    print "  %s: done (in parent)" % hostname
553
    if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0:
554
      successes += 1
555
    else:
556
      failures += 1
557
    if hosts:
558
      LaunchWorker(child_pids, logdir, username, password, use_agent,
559
                   hosts.pop(0), executable, exec_args, command, filelist)
560

    
561
  print
562
  print "All done, %s successful and %s failed hosts" % (successes, failures)
563

    
564
  sys.exit(0)
565

    
566

    
567
if __name__ == "__main__":
568
  try:
569
    main()
570
  except KeyboardInterrupt:
571
    print "Received KeyboardInterrupt, aborting"
572
    sys.exit(1)