Statistics
| Branch: | Tag: | Revision:

root / tools / ganeti-listrunner @ 6eedd356

History | View | Annotate | Download (18.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Run an executable on a list of hosts.
22

    
23
Script to serially run an executable on a list of hosts via ssh
24
with password auth as root. If the provided log dir does not yet
25
exist, it will try to create it.
26

    
27
Implementation:
28
 - the main process spawns up to batch_size children, which:
29
 - connects to the remote host via ssh as root
30
 - uploads the executable with a random name to /tmp via sftp
31
 - chmod 500s it
32
 - via ssh: chdirs into the upload directory and runs the script
33
 - deletes it
34
 - writes status messages and all output to one logfile per host
35
 - the main process gathers then the status of the children and
36
   reports the success/failure ratio
37
 - entire script can be aborted with Ctrl-C
38

    
39
Security considerations:
40
 - the root password for the remote hosts is stored in memory for the
41
   runtime of the script
42
 - the executable to be run on the remote host is handled the following way:
43
   - try to create a random directory with permissions 700 on the
44
     remote host, abort furter processing on this host if this failes
45
   - upload the executable with to a random filename in that directory
46
   - set executable permissions to 500
47
   - run the executable
48
   - delete the execuable and the directory on the remote host
49

    
50
"""
51

    
52
# pylint: disable=C0103
53
# C0103: Invalid name ganeti-listrunner
54

    
55
import errno
56
import optparse
57
import getpass
58
import logging
59
import os
60
import random
61
import select
62
import socket
63
import sys
64
import time
65
import traceback
66

    
67
import paramiko
68

    
69

    
70
REMOTE_PATH_BASE = "/tmp/listrunner"
71

    
72
USAGE = ("%prog -l logdir {-c command | -x /path/to/file} [-b batch_size]"
73
         " {-f hostfile|-h hosts} [-u username]"
74
         " [-p password_file | -A]")
75

    
76

    
77
def LogDirUseable(logdir):
78
  """Ensure log file directory is available and usable."""
79
  testfile = "%s/test-%s-%s.deleteme" % (logdir, random.random(),
80
                                         random.random())
81
  try:
82
    os.mkdir(logdir)
83
  except OSError, err:
84
    if err.errno != errno.EEXIST:
85
      raise
86
  try:
87
    logtest = open(testfile, "aw")
88
    logtest.writelines("log file writeability test\n")
89
    logtest.close()
90
    os.unlink(testfile)
91
    return True
92
  except (OSError, IOError):
93
    return False
94

    
95

    
96
def GetTimeStamp(timestamp=None):
97
  """Return ISO8601 timestamp.
98

    
99
  Returns ISO8601 timestamp, optionally expects a time.localtime() tuple
100
  in timestamp, but will use the current time if this argument is not
101
  supplied.
102
  """
103
  if timestamp is None:
104
    timestamp = time.localtime()
105

    
106
  isotime = time.strftime("%Y-%m-%dT%H:%M:%S", timestamp)
107
  return isotime
108

    
109

    
110
def PingByTcp(target, port, timeout=10, live_port_needed=False, source=None):
111
  """Simple ping implementation using TCP connect(2).
112

    
113
  Try to do a TCP connect(2) from an optional source IP to the
114
  specified target IP and the specified target port. If the optional
115
  parameter live_port_needed is set to true, requires the remote end
116
  to accept the connection. The timeout is specified in seconds and
117
  defaults to 10 seconds. If the source optional argument is not
118
  passed, the source address selection is left to the kernel,
119
  otherwise we try to connect using the passed address (failures to
120
  bind other than EADDRNOTAVAIL will be ignored).
121

    
122
  """
123
  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
124

    
125
  success = False
126

    
127
  if source is not None:
128
    try:
129
      sock.bind((source, 0))
130
    except socket.error, (errcode):
131
      if errcode == errno.EADDRNOTAVAIL:
132
        success = False
133

    
134
  sock.settimeout(timeout)
135

    
136
  try:
137
    sock.connect((target, port))
138
    sock.close()
139
    success = True
140
  except socket.timeout:
141
    success = False
142
  except socket.error, (errcode):
143
    success = (not live_port_needed) and (errcode == errno.ECONNREFUSED)
144

    
145
  return success
146

    
147

    
148
def GetHosts(hostsfile):
149
  """Return list of hosts from hostfile.
150

    
151
  Reads the hostslist file and returns a list of hosts.
152
  Expects the hostslist file to contain one hostname per line.
153

    
154
  """
155
  try:
156
    datafile = open(hostsfile, "r")
157
  except IOError, msg:
158
    print "Failed to open hosts file %s: %s" % (hostsfile, msg)
159
    sys.exit(2)
160

    
161
  hosts = datafile.readlines()
162
  datafile.close()
163

    
164
  return hosts
165

    
166

    
167
def WriteLog(message, logfile):
168
  """Writes message, terminated by newline, to logfile."""
169
  try:
170
    logfile = open(logfile, "aw")
171
  except IOError, msg:
172
    print "failed to open log file %s: %s" % (logfile, msg)
173
    print "log message was: %s" % message
174
    sys.exit(1)  # no being able to log is critical
175
  try:
176
    timestamp = GetTimeStamp()
177
    logfile.writelines("%s %s\n" % (timestamp, message))
178
    logfile.close()
179
  except IOError, msg:
180
    print "failed to write to logfile %s: %s" % (logfile, msg)
181
    print "log message was: %s" % message
182
    sys.exit(1)  # no being able to log is critical
183

    
184

    
185
def GetAgentKeys():
186
  """Tries to get a list of ssh keys from an agent."""
187
  try:
188
    agent = paramiko.Agent()
189
    return list(agent.get_keys())
190
  except paramiko.SSHException:
191
    return []
192

    
193

    
194
def SetupSshConnection(host, username, password, use_agent, logfile):
195
  """Setup the ssh connection used for all later steps.
196

    
197
  This function sets up the ssh connection that will be used both
198
  for upload and remote command execution.
199

    
200
  On success, it will return paramiko.Transport object with an
201
  already logged in session. On failure, False will be returned.
202

    
203
  """
204
  # check if target is willing to talk to us at all
205
  if not PingByTcp(host, 22, live_port_needed=True):
206
    WriteLog("ERROR: FAILURE_NOT_REACHABLE", logfile)
207
    print "  - ERROR: host not reachable on 22/tcp"
208
    return False
209

    
210
  if use_agent:
211
    keys = GetAgentKeys()
212
  else:
213
    keys = []
214
  all_kwargs = [{"pkey": k} for k in keys]
215
  all_desc = ["key %d" % d for d in range(len(keys))]
216
  if password is not None:
217
    all_kwargs.append({"password": password})
218
    all_desc.append("password")
219

    
220
  # deal with logging out of paramiko.transport
221
  handler = None
222

    
223
  for desc, kwargs in zip(all_desc, all_kwargs):
224
    try:
225
      transport = paramiko.Transport((host, 22))
226

    
227
      # only try to setup the logging handler once
228
      if not handler:
229
        handler = logging.StreamHandler()
230
        handler.setLevel(logging.ERROR)
231
        log = logging.getLogger(transport.get_log_channel())
232
        log.addHandler(handler)
233

    
234
      transport.connect(username=username, **kwargs) # pylint: disable=W0142
235
      WriteLog("ssh connection established using %s" % desc, logfile)
236
      # strange ... when establishing the session and the immediately
237
      # setting up the channels for sftp & shell from that, it sometimes
238
      # fails, but waiting 1 second after session setup makes it always work
239
      # time.sleep(1)
240
      # FIXME apparently needfull to give sshd some time
241
      return transport
242
    except (socket.gaierror, socket.error, paramiko.SSHException):
243
      continue
244

    
245
  methods = ", ".join(all_desc)
246
  WriteLog("ERROR: FAILURE_CONNECTION_SETUP (tried %s) " % methods, logfile)
247
  WriteLog("aborted", logfile)
248
  print "  - ERROR: connection setup failed (tried %s)" % methods
249

    
250
  return False
251

    
252

    
253
def UploadFiles(connection, executable, filelist, logfile):
254
  """Uploads the specified files via sftp.
255

    
256
  Uploads the specified files to a random, freshly created directory with
257
  a temporary name under /tmp. All uploaded files are chmod 0400 after upload
258
  with the exception of executable, with is chmod 500.
259

    
260
  Upon success, returns the absolute path to the remote upload directory,
261
  but will return False upon failure.
262
  """
263
  remote_dir = "%s.%s-%s" % (REMOTE_PATH_BASE,
264
                             random.random(), random.random())
265

    
266
  try:
267
    sftp = paramiko.SFTPClient.from_transport(connection)
268
    sftp.mkdir(remote_dir, mode=0700)
269
    for item in filelist:
270
      remote_file = "%s/%s" % (remote_dir, os.path.basename(item))
271
      WriteLog("uploading %s to remote %s" % (item, remote_file), logfile)
272
      sftp.put(item, remote_file)
273
      if item == executable:
274
        sftp.chmod(remote_file, 0500)
275
      else:
276
        sftp.chmod(remote_file, 0400)
277
    sftp.close()
278
  except IOError, err:
279
    WriteLog("ERROR: FAILURE_UPLOAD: %s" % err, logfile)
280
    return False
281

    
282
  return remote_dir
283

    
284

    
285
def CleanupRemoteDir(connection, upload_dir, filelist, logfile):
286
  """Cleanes out and removes the remote work directory."""
287
  try:
288
    sftp = paramiko.SFTPClient.from_transport(connection)
289
    for item in filelist:
290
      fullpath = "%s/%s" % (upload_dir, os.path.basename(item))
291
      WriteLog("removing remote %s" % fullpath, logfile)
292
      sftp.remove(fullpath)
293
    sftp.rmdir(upload_dir)
294
    sftp.close()
295
  except IOError, err:
296
    WriteLog("ERROR: FAILURE_CLEANUP: %s" % err, logfile)
297
    return False
298

    
299
  return True
300

    
301

    
302
def RunRemoteCommand(connection, command, logfile):
303
  """Execute the command via ssh on the remote host."""
304
  session = connection.open_session()
305
  session.setblocking(0)
306

    
307
  # the following dance is needed because paramiko changed APIs:
308
  # from returning True/False for success to always returning None
309
  # and throwing an exception in case of problems.
310
  # And I want to support both the old and the new API.
311
  result = True  # being optimistic here, I know
312
  message = None
313
  try:
314
    if session.exec_command("%s 2>&1" % command) is False:
315
      result = False
316
  except paramiko.SSHException, message:
317
    result = False
318

    
319
  if not result:
320
    WriteLog("ERROR: FAILURE_COMMAND_EXECUTION: %s" % message, logfile)
321
    return False
322

    
323
   ### Read when data is available
324
  output = ""
325
  while select.select([session], [], []):
326
    try:
327
      data = session.recv(1024)
328
    except socket.timeout, err:
329
      data = None
330
      WriteLog("FAILED: socket.timeout %s" % err, logfile)
331
    except socket.error, err:
332
      data = None
333
      WriteLog("FAILED: socket.error %s" % err, logfile)
334
    if not data:
335
      break
336
    output += data
337
    select.select([], [], [], .1)
338

    
339
  WriteLog("SUCCESS: command output follows", logfile)
340
  for line in output.splitlines():
341
    WriteLog("output = %s" % line, logfile)
342
  WriteLog("command execution completed", logfile)
343
  session.close()
344

    
345
  return True
346

    
347

    
348
def HostWorker(logdir, username, password, use_agent, hostname,
349
               executable, exec_args, command, filelist):
350
  """Per-host worker.
351

    
352
  This function does not return - it's the main code of the childs,
353
  which exit at the end of this function. The exit code 0 or 1 will be
354
  interpreted by the parent.
355

    
356
  @param logdir: the directory where the logfiles must be created
357
  @param username: SSH username
358
  @param password: SSH password
359
  @param use_agent: whether we should instead use an agent
360
  @param hostname: the hostname to connect to
361
  @param executable: the executable to upload, if not None
362
  @param exec_args: Additional arguments for executable
363
  @param command: the command to run
364
  @param filelist: auxiliary files to upload
365

    
366
  """
367
  # in the child/worker process
368
  logfile = "%s/%s.log" % (logdir, hostname)
369
  print "%s - starting" % hostname
370
  result = 0  # optimism, I know
371
  try:
372
    connection = SetupSshConnection(hostname, username,
373
                                    password, use_agent, logfile)
374
    if connection is not False:
375
      if executable is not None:
376
        print "  %s: uploading files" % hostname
377
        upload_dir = UploadFiles(connection, executable,
378
                                 filelist, logfile)
379
        command = ("cd %s && ./%s %s" %
380
                   (upload_dir, os.path.basename(executable), exec_args))
381
      print "  %s: executing remote command" % hostname
382
      cmd_result = RunRemoteCommand(connection, command, logfile)
383
      if cmd_result is True:
384
        print "  %s: remote command execution successful" % hostname
385
      else:
386
        print ("  %s: remote command execution failed,"
387
               " check log for details" % hostname)
388
        result = 1
389
      if executable is not None:
390
        print "  %s: cleaning up remote work dir" % hostname
391
        cln_result = CleanupRemoteDir(connection, upload_dir,
392
                                      filelist, logfile)
393
        if cln_result is False:
394
          print ("  %s: remote work dir cleanup failed, check"
395
                 " log for details" % hostname)
396
          result = 1
397
      connection.close()
398
    else:
399
      print "  %s: connection setup failed, skipping" % hostname
400
      result = 1
401
  except KeyboardInterrupt:
402
    print "  %s: received KeyboardInterrupt, aborting" % hostname
403
    WriteLog("ERROR: ABORT_KEYBOARD_INTERRUPT", logfile)
404
    result = 1
405
  except Exception, err:
406
    result = 1
407
    trace = traceback.format_exc()
408
    msg = "ERROR: UNHANDLED_EXECPTION_ERROR: %s\nTrace: %s" % (err, trace)
409
    WriteLog(msg, logfile)
410
    print "  %s: %s" % (hostname, msg)
411
  # and exit with exit code 0 or 1, so the parent can compute statistics
412
  sys.exit(result)
413

    
414

    
415
def LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
416
                 executable, exec_args, command, filelist):
417
  """Launch the per-host worker.
418

    
419
  Arguments are the same as for HostWorker, except for child_pids,
420
  which is a dictionary holding the pid-to-hostname mapping.
421

    
422
  """
423
  hostname = hostname.rstrip("\n")
424
  pid = os.fork()
425
  if pid > 0:
426
    # controller just record the pids
427
    child_pids[pid] = hostname
428
  else:
429
    HostWorker(logdir, username, password, use_agent, hostname,
430
               executable, exec_args, command, filelist)
431

    
432

    
433
def ParseOptions():
434
  """Parses the command line options.
435

    
436
  In case of command line errors, it will show the usage and exit the
437
  program.
438

    
439
  @return: the options in a tuple
440

    
441
  """
442
  # resolve because original used -h for hostfile, which conflicts
443
  # with -h for help
444
  parser = optparse.OptionParser(usage="\n%s" % USAGE,
445
                                 conflict_handler="resolve")
446

    
447
  parser.add_option("-l", dest="logdir", default=None,
448
                    help="directory to write logfiles to")
449
  parser.add_option("-x", dest="executable", default=None,
450
                    help="executable to run on remote host(s)",)
451
  parser.add_option("-f", dest="hostfile", default=None,
452
                    help="hostlist file (one host per line)")
453
  parser.add_option("-h", dest="hostlist", default=None, metavar="HOSTS",
454
                    help="comma-separated list of hosts or single hostname",)
455
  parser.add_option("-a", dest="auxfiles", action="append", default=[],
456
                    help="optional auxiliary file to upload"
457
                    " (can be given multiple times)",
458
                    metavar="FILE")
459
  parser.add_option("-c", dest="command", default=None,
460
                    help="shell command to run on remote host(s)")
461
  parser.add_option("-b", dest="batch_size", default=15, type="int",
462
                    help="batch-size, how many hosts to process"
463
                    " in parallel [15]")
464
  parser.add_option("-u", dest="username", default="root",
465
                    help="username used to connect [root]")
466
  parser.add_option("-p", dest="password", default=None,
467
                    help="password used to authenticate (when not"
468
                    " using an agent)")
469
  parser.add_option("-A", dest="use_agent", default=False, action="store_true",
470
                    help="instead of password, use keys from an SSH agent")
471
  parser.add_option("--args", dest="exec_args", default=None,
472
                    help="Arguments to be passed to executable (-x)")
473

    
474
  opts, args = parser.parse_args()
475

    
476
  if opts.executable and opts.command:
477
    parser.error("Options -x and -c conflict with each other")
478
  if not (opts.executable or opts.command):
479
    parser.error("One of -x and -c must be given")
480
  if opts.command and opts.exec_args:
481
    parser.error("Can't specify arguments when using custom command")
482
  if not opts.logdir:
483
    parser.error("Option -l is required")
484
  if opts.hostfile and opts.hostlist:
485
    parser.error("Options -f and -h conflict with each other")
486
  if not (opts.hostfile or opts.hostlist):
487
    parser.error("One of -f or -h must be given")
488
  if args:
489
    parser.error("This program doesn't take any arguments, passed in: %s" %
490
                 ", ".join(args))
491

    
492
  return (opts.logdir, opts.executable, opts.exec_args,
493
          opts.hostfile, opts.hostlist,
494
          opts.command, opts.use_agent, opts.auxfiles, opts.username,
495
          opts.password, opts.batch_size)
496

    
497

    
498
def main():
499
  """main."""
500
  (logdir, executable, exec_args, hostfile, hostlist,
501
   command, use_agent, auxfiles, username,
502
   password, batch_size) = ParseOptions()
503

    
504
  ### Unbuffered sys.stdout
505
  sys.stdout = os.fdopen(1, "w", 0)
506

    
507
  if LogDirUseable(logdir) is False:
508
    print "ERROR: cannot create logfiles in dir %s, aborting" % logdir
509
    sys.exit(1)
510

    
511
  if use_agent:
512
    pass
513
  elif password:
514
    try:
515
      fh = file(password)
516
      pwvalue = fh.readline().strip()
517
      fh.close()
518
    except IOError, e:
519
      print "error: can not read in from password file %s: %s" % (password, e)
520
      sys.exit(1)
521
    password = pwvalue
522
  else:
523
    password = getpass.getpass("%s's password for all nodes: " % username)
524

    
525
  if hostfile:
526
    hosts = GetHosts(hostfile)
527
  else:
528
    if "," in hostlist:
529
      hostlist = hostlist.rstrip(",")  # commandline robustness
530
      hosts = hostlist.split(",")
531
    else:
532
      hosts = [hostlist]
533

    
534
  successes = failures = 0
535

    
536
  filelist = auxfiles[:]
537
  filelist.append(executable)
538

    
539
  # initial batch
540
  batch = hosts[:batch_size]
541
  hosts = hosts[batch_size:]
542
  child_pids = {}
543
  for hostname in batch:
544
    LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
545
                 executable, exec_args, command, filelist)
546

    
547
  while child_pids:
548
    pid, status = os.wait()
549
    hostname = child_pids.pop(pid, "<unknown host>")
550
    print "  %s: done (in parent)" % hostname
551
    if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0:
552
      successes += 1
553
    else:
554
      failures += 1
555
    if hosts:
556
      LaunchWorker(child_pids, logdir, username, password, use_agent,
557
                   hosts.pop(0), executable, exec_args, command, filelist)
558

    
559
  print
560
  print "All done, %s successful and %s failed hosts" % (successes, failures)
561

    
562
  sys.exit(0)
563

    
564

    
565
if __name__ == "__main__":
566
  try:
567
    main()
568
  except KeyboardInterrupt:
569
    print "Received KeyboardInterrupt, aborting"
570
    sys.exit(1)