Statistics
| Branch: | Tag: | Revision:

root / tools / ganeti-listrunner @ 18397489

History | View | Annotate | Download (18.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Run an executable on a list of hosts.
22

    
23
Script to serially run an executable on a list of hosts via ssh
24
with password auth as root. If the provided log dir does not yet
25
exist, it will try to create it.
26

    
27
Implementation:
28
 - the main process spawns up to batch_size children, which:
29
 - connects to the remote host via ssh as root
30
 - uploads the executable with a random name to /tmp via sftp
31
 - chmod 500s it
32
 - via ssh: chdirs into the upload directory and runs the script
33
 - deletes it
34
 - writes status messages and all output to one logfile per host
35
 - the main process gathers then the status of the children and
36
   reports the success/failure ratio
37
 - entire script can be aborted with Ctrl-C
38

    
39
Security considerations:
40
 - the root password for the remote hosts is stored in memory for the
41
   runtime of the script
42
 - the executable to be run on the remote host is handled the following way:
43
   - try to create a random directory with permissions 700 on the
44
     remote host, abort furter processing on this host if this failes
45
   - upload the executable with to a random filename in that directory
46
   - set executable permissions to 500
47
   - run the executable
48
   - delete the execuable and the directory on the remote host
49

    
50
"""
51

    
52
# pylint: disable=C0103
53
# C0103: Invalid name ganeti-listrunner
54

    
55
import errno
56
import optparse
57
import getpass
58
import logging
59
import os
60
import random
61
import select
62
import socket
63
import sys
64
import time
65
import traceback
66

    
67
try:
68
  import paramiko
69
except ImportError:
70
  print >> sys.stderr, \
71
    ("The \"paramiko\" module could not be imported. Install it from your"
72
     " distribution's repository. The package is usually named"
73
     " \"python-paramiko\".")
74
  sys.exit(1)
75

    
76

    
77
REMOTE_PATH_BASE = "/tmp/listrunner"
78

    
79
USAGE = ("%prog -l logdir {-c command | -x /path/to/file} [-b batch_size]"
80
         " {-f hostfile|-h hosts} [-u username]"
81
         " [-p password_file | -A]")
82

    
83

    
84
def LogDirUseable(logdir):
85
  """Ensure log file directory is available and usable."""
86
  testfile = "%s/test-%s-%s.deleteme" % (logdir, random.random(),
87
                                         random.random())
88
  try:
89
    os.mkdir(logdir)
90
  except OSError, err:
91
    if err.errno != errno.EEXIST:
92
      raise
93
  try:
94
    logtest = open(testfile, "aw")
95
    logtest.writelines("log file writeability test\n")
96
    logtest.close()
97
    os.unlink(testfile)
98
    return True
99
  except (OSError, IOError):
100
    return False
101

    
102

    
103
def GetTimeStamp(timestamp=None):
104
  """Return ISO8601 timestamp.
105

    
106
  Returns ISO8601 timestamp, optionally expects a time.localtime() tuple
107
  in timestamp, but will use the current time if this argument is not
108
  supplied.
109
  """
110
  if timestamp is None:
111
    timestamp = time.localtime()
112

    
113
  isotime = time.strftime("%Y-%m-%dT%H:%M:%S", timestamp)
114
  return isotime
115

    
116

    
117
def PingByTcp(target, port, timeout=10, live_port_needed=False, source=None):
118
  """Simple ping implementation using TCP connect(2).
119

    
120
  Try to do a TCP connect(2) from an optional source IP to the
121
  specified target IP and the specified target port. If the optional
122
  parameter live_port_needed is set to true, requires the remote end
123
  to accept the connection. The timeout is specified in seconds and
124
  defaults to 10 seconds. If the source optional argument is not
125
  passed, the source address selection is left to the kernel,
126
  otherwise we try to connect using the passed address (failures to
127
  bind other than EADDRNOTAVAIL will be ignored).
128

    
129
  """
130
  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
131

    
132
  success = False
133

    
134
  if source is not None:
135
    try:
136
      sock.bind((source, 0))
137
    except socket.error, (errcode):
138
      if errcode == errno.EADDRNOTAVAIL:
139
        success = False
140

    
141
  sock.settimeout(timeout)
142

    
143
  try:
144
    sock.connect((target, port))
145
    sock.close()
146
    success = True
147
  except socket.timeout:
148
    success = False
149
  except socket.error, (errcode):
150
    success = (not live_port_needed) and (errcode == errno.ECONNREFUSED)
151

    
152
  return success
153

    
154

    
155
def GetHosts(hostsfile):
156
  """Return list of hosts from hostfile.
157

    
158
  Reads the hostslist file and returns a list of hosts.
159
  Expects the hostslist file to contain one hostname per line.
160

    
161
  """
162
  try:
163
    datafile = open(hostsfile, "r")
164
  except IOError, msg:
165
    print "Failed to open hosts file %s: %s" % (hostsfile, msg)
166
    sys.exit(2)
167

    
168
  hosts = datafile.readlines()
169
  datafile.close()
170

    
171
  return hosts
172

    
173

    
174
def WriteLog(message, logfile):
175
  """Writes message, terminated by newline, to logfile."""
176
  try:
177
    logfile = open(logfile, "aw")
178
  except IOError, msg:
179
    print "failed to open log file %s: %s" % (logfile, msg)
180
    print "log message was: %s" % message
181
    sys.exit(1)  # no being able to log is critical
182
  try:
183
    timestamp = GetTimeStamp()
184
    logfile.writelines("%s %s\n" % (timestamp, message))
185
    logfile.close()
186
  except IOError, msg:
187
    print "failed to write to logfile %s: %s" % (logfile, msg)
188
    print "log message was: %s" % message
189
    sys.exit(1)  # no being able to log is critical
190

    
191

    
192
def GetAgentKeys():
193
  """Tries to get a list of ssh keys from an agent."""
194
  try:
195
    agent = paramiko.Agent()
196
    return list(agent.get_keys())
197
  except paramiko.SSHException:
198
    return []
199

    
200

    
201
def SetupSshConnection(host, username, password, use_agent, logfile):
202
  """Setup the ssh connection used for all later steps.
203

    
204
  This function sets up the ssh connection that will be used both
205
  for upload and remote command execution.
206

    
207
  On success, it will return paramiko.Transport object with an
208
  already logged in session. On failure, False will be returned.
209

    
210
  """
211
  # check if target is willing to talk to us at all
212
  if not PingByTcp(host, 22, live_port_needed=True):
213
    WriteLog("ERROR: FAILURE_NOT_REACHABLE", logfile)
214
    print "  - ERROR: host not reachable on 22/tcp"
215
    return False
216

    
217
  if use_agent:
218
    keys = GetAgentKeys()
219
  else:
220
    keys = []
221
  all_kwargs = [{"pkey": k} for k in keys]
222
  all_desc = ["key %d" % d for d in range(len(keys))]
223
  if password is not None:
224
    all_kwargs.append({"password": password})
225
    all_desc.append("password")
226

    
227
  # deal with logging out of paramiko.transport
228
  handler = None
229

    
230
  for desc, kwargs in zip(all_desc, all_kwargs):
231
    try:
232
      transport = paramiko.Transport((host, 22))
233

    
234
      # only try to setup the logging handler once
235
      if not handler:
236
        handler = logging.StreamHandler()
237
        handler.setLevel(logging.ERROR)
238
        log = logging.getLogger(transport.get_log_channel())
239
        log.addHandler(handler)
240

    
241
      transport.connect(username=username, **kwargs) # pylint: disable=W0142
242
      WriteLog("ssh connection established using %s" % desc, logfile)
243
      # strange ... when establishing the session and the immediately
244
      # setting up the channels for sftp & shell from that, it sometimes
245
      # fails, but waiting 1 second after session setup makes it always work
246
      # time.sleep(1)
247
      # FIXME apparently needfull to give sshd some time
248
      return transport
249
    except (socket.gaierror, socket.error, paramiko.SSHException):
250
      continue
251

    
252
  methods = ", ".join(all_desc)
253
  WriteLog("ERROR: FAILURE_CONNECTION_SETUP (tried %s) " % methods, logfile)
254
  WriteLog("aborted", logfile)
255
  print "  - ERROR: connection setup failed (tried %s)" % methods
256

    
257
  return False
258

    
259

    
260
def UploadFiles(connection, executable, filelist, logfile):
261
  """Uploads the specified files via sftp.
262

    
263
  Uploads the specified files to a random, freshly created directory with
264
  a temporary name under /tmp. All uploaded files are chmod 0400 after upload
265
  with the exception of executable, with is chmod 500.
266

    
267
  Upon success, returns the absolute path to the remote upload directory,
268
  but will return False upon failure.
269
  """
270
  remote_dir = "%s.%s-%s" % (REMOTE_PATH_BASE,
271
                             random.random(), random.random())
272

    
273
  try:
274
    sftp = paramiko.SFTPClient.from_transport(connection)
275
    sftp.mkdir(remote_dir, mode=0700)
276
    for item in filelist:
277
      remote_file = "%s/%s" % (remote_dir, os.path.basename(item))
278
      WriteLog("uploading %s to remote %s" % (item, remote_file), logfile)
279
      sftp.put(item, remote_file)
280
      if item == executable:
281
        sftp.chmod(remote_file, 0500)
282
      else:
283
        sftp.chmod(remote_file, 0400)
284
    sftp.close()
285
  except IOError, err:
286
    WriteLog("ERROR: FAILURE_UPLOAD: %s" % err, logfile)
287
    return False
288

    
289
  return remote_dir
290

    
291

    
292
def CleanupRemoteDir(connection, upload_dir, filelist, logfile):
293
  """Cleanes out and removes the remote work directory."""
294
  try:
295
    sftp = paramiko.SFTPClient.from_transport(connection)
296
    for item in filelist:
297
      fullpath = "%s/%s" % (upload_dir, os.path.basename(item))
298
      WriteLog("removing remote %s" % fullpath, logfile)
299
      sftp.remove(fullpath)
300
    sftp.rmdir(upload_dir)
301
    sftp.close()
302
  except IOError, err:
303
    WriteLog("ERROR: FAILURE_CLEANUP: %s" % err, logfile)
304
    return False
305

    
306
  return True
307

    
308

    
309
def RunRemoteCommand(connection, command, logfile):
310
  """Execute the command via ssh on the remote host."""
311
  session = connection.open_session()
312
  session.setblocking(0)
313

    
314
  # the following dance is needed because paramiko changed APIs:
315
  # from returning True/False for success to always returning None
316
  # and throwing an exception in case of problems.
317
  # And I want to support both the old and the new API.
318
  result = True  # being optimistic here, I know
319
  message = None
320
  try:
321
    if session.exec_command("%s 2>&1" % command) is False:
322
      result = False
323
  except paramiko.SSHException, message:
324
    result = False
325

    
326
  if not result:
327
    WriteLog("ERROR: FAILURE_COMMAND_EXECUTION: %s" % message, logfile)
328
    return False
329

    
330
   ### Read when data is available
331
  output = ""
332
  while select.select([session], [], []):
333
    try:
334
      data = session.recv(1024)
335
    except socket.timeout, err:
336
      data = None
337
      WriteLog("FAILED: socket.timeout %s" % err, logfile)
338
    except socket.error, err:
339
      data = None
340
      WriteLog("FAILED: socket.error %s" % err, logfile)
341
    if not data:
342
      break
343
    output += data
344
    select.select([], [], [], .1)
345

    
346
  WriteLog("SUCCESS: command output follows", logfile)
347
  for line in output.splitlines():
348
    WriteLog("output = %s" % line, logfile)
349
  WriteLog("command execution completed", logfile)
350
  session.close()
351

    
352
  return True
353

    
354

    
355
def HostWorker(logdir, username, password, use_agent, hostname,
356
               executable, exec_args, command, filelist):
357
  """Per-host worker.
358

    
359
  This function does not return - it's the main code of the childs,
360
  which exit at the end of this function. The exit code 0 or 1 will be
361
  interpreted by the parent.
362

    
363
  @param logdir: the directory where the logfiles must be created
364
  @param username: SSH username
365
  @param password: SSH password
366
  @param use_agent: whether we should instead use an agent
367
  @param hostname: the hostname to connect to
368
  @param executable: the executable to upload, if not None
369
  @param exec_args: Additional arguments for executable
370
  @param command: the command to run
371
  @param filelist: auxiliary files to upload
372

    
373
  """
374
  # in the child/worker process
375
  logfile = "%s/%s.log" % (logdir, hostname)
376
  print "%s - starting" % hostname
377
  result = 0  # optimism, I know
378
  try:
379
    connection = SetupSshConnection(hostname, username,
380
                                    password, use_agent, logfile)
381
    if connection is not False:
382
      if executable is not None:
383
        print "  %s: uploading files" % hostname
384
        upload_dir = UploadFiles(connection, executable,
385
                                 filelist, logfile)
386
        command = ("cd %s && ./%s" %
387
                   (upload_dir, os.path.basename(executable)))
388
        if exec_args:
389
          command += " %s" % exec_args
390
      print "  %s: executing remote command" % hostname
391
      cmd_result = RunRemoteCommand(connection, command, logfile)
392
      if cmd_result is True:
393
        print "  %s: remote command execution successful" % hostname
394
      else:
395
        print ("  %s: remote command execution failed,"
396
               " check log for details" % hostname)
397
        result = 1
398
      if executable is not None:
399
        print "  %s: cleaning up remote work dir" % hostname
400
        cln_result = CleanupRemoteDir(connection, upload_dir,
401
                                      filelist, logfile)
402
        if cln_result is False:
403
          print ("  %s: remote work dir cleanup failed, check"
404
                 " log for details" % hostname)
405
          result = 1
406
      connection.close()
407
    else:
408
      print "  %s: connection setup failed, skipping" % hostname
409
      result = 1
410
  except KeyboardInterrupt:
411
    print "  %s: received KeyboardInterrupt, aborting" % hostname
412
    WriteLog("ERROR: ABORT_KEYBOARD_INTERRUPT", logfile)
413
    result = 1
414
  except Exception, err:
415
    result = 1
416
    trace = traceback.format_exc()
417
    msg = "ERROR: UNHANDLED_EXECPTION_ERROR: %s\nTrace: %s" % (err, trace)
418
    WriteLog(msg, logfile)
419
    print "  %s: %s" % (hostname, msg)
420
  # and exit with exit code 0 or 1, so the parent can compute statistics
421
  sys.exit(result)
422

    
423

    
424
def LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
425
                 executable, exec_args, command, filelist):
426
  """Launch the per-host worker.
427

    
428
  Arguments are the same as for HostWorker, except for child_pids,
429
  which is a dictionary holding the pid-to-hostname mapping.
430

    
431
  """
432
  hostname = hostname.rstrip("\n")
433
  pid = os.fork()
434
  if pid > 0:
435
    # controller just record the pids
436
    child_pids[pid] = hostname
437
  else:
438
    HostWorker(logdir, username, password, use_agent, hostname,
439
               executable, exec_args, command, filelist)
440

    
441

    
442
def ParseOptions():
443
  """Parses the command line options.
444

    
445
  In case of command line errors, it will show the usage and exit the
446
  program.
447

    
448
  @return: the options in a tuple
449

    
450
  """
451
  # resolve because original used -h for hostfile, which conflicts
452
  # with -h for help
453
  parser = optparse.OptionParser(usage="\n%s" % USAGE,
454
                                 conflict_handler="resolve")
455

    
456
  parser.add_option("-l", dest="logdir", default=None,
457
                    help="directory to write logfiles to")
458
  parser.add_option("-x", dest="executable", default=None,
459
                    help="executable to run on remote host(s)",)
460
  parser.add_option("-f", dest="hostfile", default=None,
461
                    help="hostlist file (one host per line)")
462
  parser.add_option("-h", dest="hostlist", default=None, metavar="HOSTS",
463
                    help="comma-separated list of hosts or single hostname",)
464
  parser.add_option("-a", dest="auxfiles", action="append", default=[],
465
                    help="optional auxiliary file to upload"
466
                    " (can be given multiple times)",
467
                    metavar="FILE")
468
  parser.add_option("-c", dest="command", default=None,
469
                    help="shell command to run on remote host(s)")
470
  parser.add_option("-b", dest="batch_size", default=15, type="int",
471
                    help="batch-size, how many hosts to process"
472
                    " in parallel [15]")
473
  parser.add_option("-u", dest="username", default="root",
474
                    help="username used to connect [root]")
475
  parser.add_option("-p", dest="password", default=None,
476
                    help="password used to authenticate (when not"
477
                    " using an agent)")
478
  parser.add_option("-A", dest="use_agent", default=False, action="store_true",
479
                    help="instead of password, use keys from an SSH agent")
480
  parser.add_option("--args", dest="exec_args", default=None,
481
                    help="Arguments to be passed to executable (-x)")
482

    
483
  opts, args = parser.parse_args()
484

    
485
  if opts.executable and opts.command:
486
    parser.error("Options -x and -c conflict with each other")
487
  if not (opts.executable or opts.command):
488
    parser.error("One of -x and -c must be given")
489
  if opts.command and opts.exec_args:
490
    parser.error("Can't specify arguments when using custom command")
491
  if not opts.logdir:
492
    parser.error("Option -l is required")
493
  if opts.hostfile and opts.hostlist:
494
    parser.error("Options -f and -h conflict with each other")
495
  if not (opts.hostfile or opts.hostlist):
496
    parser.error("One of -f or -h must be given")
497
  if args:
498
    parser.error("This program doesn't take any arguments, passed in: %s" %
499
                 ", ".join(args))
500

    
501
  return (opts.logdir, opts.executable, opts.exec_args,
502
          opts.hostfile, opts.hostlist,
503
          opts.command, opts.use_agent, opts.auxfiles, opts.username,
504
          opts.password, opts.batch_size)
505

    
506

    
507
def main():
508
  """main."""
509
  (logdir, executable, exec_args, hostfile, hostlist,
510
   command, use_agent, auxfiles, username,
511
   password, batch_size) = ParseOptions()
512

    
513
  ### Unbuffered sys.stdout
514
  sys.stdout = os.fdopen(1, "w", 0)
515

    
516
  if LogDirUseable(logdir) is False:
517
    print "ERROR: cannot create logfiles in dir %s, aborting" % logdir
518
    sys.exit(1)
519

    
520
  if use_agent:
521
    pass
522
  elif password:
523
    try:
524
      fh = file(password)
525
      pwvalue = fh.readline().strip()
526
      fh.close()
527
    except IOError, e:
528
      print "error: can not read in from password file %s: %s" % (password, e)
529
      sys.exit(1)
530
    password = pwvalue
531
  else:
532
    password = getpass.getpass("%s's password for all nodes: " % username)
533

    
534
  if hostfile:
535
    hosts = GetHosts(hostfile)
536
  else:
537
    if "," in hostlist:
538
      hostlist = hostlist.rstrip(",")  # commandline robustness
539
      hosts = hostlist.split(",")
540
    else:
541
      hosts = [hostlist]
542

    
543
  successes = failures = 0
544

    
545
  filelist = auxfiles[:]
546
  filelist.append(executable)
547

    
548
  # initial batch
549
  batch = hosts[:batch_size]
550
  hosts = hosts[batch_size:]
551
  child_pids = {}
552
  for hostname in batch:
553
    LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
554
                 executable, exec_args, command, filelist)
555

    
556
  while child_pids:
557
    pid, status = os.wait()
558
    hostname = child_pids.pop(pid, "<unknown host>")
559
    print "  %s: done (in parent)" % hostname
560
    if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0:
561
      successes += 1
562
    else:
563
      failures += 1
564
    if hosts:
565
      LaunchWorker(child_pids, logdir, username, password, use_agent,
566
                   hosts.pop(0), executable, exec_args, command, filelist)
567

    
568
  print
569
  print "All done, %s successful and %s failed hosts" % (successes, failures)
570

    
571
  sys.exit(0)
572

    
573

    
574
if __name__ == "__main__":
575
  try:
576
    main()
577
  except KeyboardInterrupt:
578
    print "Received KeyboardInterrupt, aborting"
579
    sys.exit(1)