Statistics
| Branch: | Tag: | Revision:

root / tools / ganeti-listrunner @ 2c094917

History | View | Annotate | Download (18 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Run an executable on a list of hosts.
22

    
23
Script to serially run an executable on a list of hosts via ssh
24
with password auth as root. If the provided log dir does not yet
25
exist, it will try to create it.
26

    
27
Implementation:
28
 - the main process spawns up to batch_size children, which:
29
 - connects to the remote host via ssh as root
30
 - uploads the executable with a random name to /tmp via sftp
31
 - chmod 500s it
32
 - via ssh: chdirs into the upload directory and runs the script
33
 - deletes it
34
 - writes status messages and all output to one logfile per host
35
 - the main process gathers then the status of the children and
36
   reports the success/failure ratio
37
 - entire script can be aborted with Ctrl-C
38

    
39
Security considerations:
40
 - the root password for the remote hosts is stored in memory for the
41
   runtime of the script
42
 - the executable to be run on the remote host is handled the following way:
43
   - try to create a random directory with permissions 700 on the
44
     remote host, abort furter processing on this host if this failes
45
   - upload the executable with to a random filename in that directory
46
   - set executable permissions to 500
47
   - run the executable
48
   - delete the execuable and the directory on the remote host
49

    
50
"""
51

    
52
# pylint: disable-msg=C0103
53
# C0103: Invalid name ganeti-listrunner
54

    
55
import errno
56
import optparse
57
import getpass
58
import logging
59
import os
60
import random
61
import select
62
import socket
63
import sys
64
import time
65
import traceback
66

    
67
import paramiko
68

    
69

    
70
REMOTE_PATH_BASE = "/tmp/listrunner"
71

    
72
USAGE = ("%prog -l logdir {-c command | -x /path/to/file} [-b batch_size]"
73
         " {-f hostfile|-h hosts} [-u username]"
74
         " [-p password_file | -A]")
75

    
76

    
77
def LogDirUseable(logdir):
78
  """Ensure log file directory is available and usable."""
79
  testfile = "%s/test-%s-%s.deleteme" % (logdir, random.random(),
80
                                         random.random())
81
  try:
82
    os.mkdir(logdir)
83
  except OSError, err:
84
    if err.errno != errno.EEXIST:
85
      raise
86
  try:
87
    logtest = open(testfile, "aw")
88
    logtest.writelines("log file writeability test\n")
89
    logtest.close()
90
    os.unlink(testfile)
91
    return True
92
  except (OSError, IOError):
93
    return False
94

    
95

    
96
def GetTimeStamp(timestamp=None):
97
  """Return ISO8601 timestamp.
98

    
99
  Returns ISO8601 timestamp, optionally expects a time.localtime() tuple
100
  in timestamp, but will use the current time if this argument is not
101
  supplied.
102
  """
103
  if timestamp is None:
104
    timestamp = time.localtime()
105

    
106
  isotime = time.strftime("%Y-%m-%dT%H:%M:%S", timestamp)
107
  return isotime
108

    
109

    
110
def PingByTcp(target, port, timeout=10, live_port_needed=False, source=None):
111
  """Simple ping implementation using TCP connect(2).
112

    
113
  Try to do a TCP connect(2) from an optional source IP to the
114
  specified target IP and the specified target port. If the optional
115
  parameter live_port_needed is set to true, requires the remote end
116
  to accept the connection. The timeout is specified in seconds and
117
  defaults to 10 seconds. If the source optional argument is not
118
  passed, the source address selection is left to the kernel,
119
  otherwise we try to connect using the passed address (failures to
120
  bind other than EADDRNOTAVAIL will be ignored).
121

    
122
  """
123
  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
124

    
125
  success = False
126

    
127
  if source is not None:
128
    try:
129
      sock.bind((source, 0))
130
    except socket.error, (errcode):
131
      if errcode == errno.EADDRNOTAVAIL:
132
        success = False
133

    
134
  sock.settimeout(timeout)
135

    
136
  try:
137
    sock.connect((target, port))
138
    sock.close()
139
    success = True
140
  except socket.timeout:
141
    success = False
142
  except socket.error, (errcode):
143
    success = (not live_port_needed) and (errcode == errno.ECONNREFUSED)
144

    
145
  return success
146

    
147

    
148
def GetHosts(hostsfile):
149
  """Return list of hosts from hostfile.
150

    
151
  Reads the hostslist file and returns a list of hosts.
152
  Expects the hostslist file to contain one hostname per line.
153

    
154
  """
155
  try:
156
    datafile = open(hostsfile, "r")
157
  except IOError, msg:
158
    print "Failed to open hosts file %s: %s" % (hostsfile, msg)
159
    sys.exit(2)
160

    
161
  hosts = datafile.readlines()
162
  datafile.close()
163

    
164
  return hosts
165

    
166

    
167
def WriteLog(message, logfile):
168
  """Writes message, terminated by newline, to logfile."""
169
  try:
170
    logfile = open(logfile, "aw")
171
  except IOError, msg:
172
    print "failed to open log file %s: %s" % (logfile, msg)
173
    print "log message was: %s" % message
174
    sys.exit(1)  # no being able to log is critical
175
  try:
176
    timestamp = GetTimeStamp()
177
    logfile.writelines("%s %s\n" % (timestamp, message))
178
    logfile.close()
179
  except IOError, msg:
180
    print "failed to write to logfile %s: %s" % (logfile, msg)
181
    print "log message was: %s" % message
182
    sys.exit(1)  # no being able to log is critical
183

    
184

    
185
def GetAgentKeys():
186
  """Tries to get a list of ssh keys from an agent."""
187
  try:
188
    agent = paramiko.Agent()
189
    return list(agent.get_keys())
190
  except paramiko.SSHException:
191
    return []
192

    
193

    
194
def SetupSshConnection(host, username, password, use_agent, logfile):
195
  """Setup the ssh connection used for all later steps.
196

    
197
  This function sets up the ssh connection that will be used both
198
  for upload and remote command execution.
199

    
200
  On success, it will return paramiko.Transport object with an
201
  already logged in session. On failure, False will be returned.
202

    
203
  """
204
  # check if target is willing to talk to us at all
205
  if not PingByTcp(host, 22, live_port_needed=True):
206
    WriteLog("ERROR: FAILURE_NOT_REACHABLE", logfile)
207
    print "  - ERROR: host not reachable on 22/tcp"
208
    return False
209

    
210
  if use_agent:
211
    keys = GetAgentKeys()
212
  else:
213
    keys = []
214
  all_kwargs = [{"pkey": k} for k in keys]
215
  all_desc = ["key %d" % d for d in range(len(keys))]
216
  if password is not None:
217
    all_kwargs.append({"password": password})
218
    all_desc.append("password")
219

    
220
  # deal with logging out of paramiko.transport
221
  handler = None
222

    
223
  for desc, kwargs in zip(all_desc, all_kwargs):
224
    try:
225
      transport = paramiko.Transport((host, 22))
226

    
227
      # only try to setup the logging handler once
228
      if not handler:
229
        handler = logging.StreamHandler()
230
        handler.setLevel(logging.ERROR)
231
        log = logging.getLogger(transport.get_log_channel())
232
        log.addHandler(handler)
233

    
234
      transport.connect(username=username, **kwargs) # pylint: disable-msg=W0142
235
      WriteLog("ssh connection established using %s" % desc, logfile)
236
      # strange ... when establishing the session and the immediately
237
      # setting up the channels for sftp & shell from that, it sometimes
238
      # fails, but waiting 1 second after session setup makes it always work
239
      # time.sleep(1)
240
      # FIXME apparently needfull to give sshd some time
241
      return transport
242
    except (socket.gaierror, socket.error, paramiko.SSHException):
243
      continue
244

    
245
  methods = ", ".join(all_desc)
246
  WriteLog("ERROR: FAILURE_CONNECTION_SETUP (tried %s) " % methods, logfile)
247
  WriteLog("aborted", logfile)
248
  print "  - ERROR: connection setup failed (tried %s)" % methods
249

    
250
  return False
251

    
252

    
253
def UploadFiles(connection, executable, filelist, logfile):
254
  """Uploads the specified files via sftp.
255

    
256
  Uploads the specified files to a random, freshly created directory with
257
  a temporary name under /tmp. All uploaded files are chmod 0400 after upload
258
  with the exception of executable, with is chmod 500.
259

    
260
  Upon success, returns the absolute path to the remote upload directory,
261
  but will return False upon failure.
262
  """
263
  remote_dir = "%s.%s-%s" % (REMOTE_PATH_BASE,
264
                             random.random(), random.random())
265

    
266
  try:
267
    sftp = paramiko.SFTPClient.from_transport(connection)
268
    sftp.mkdir(remote_dir, mode=0700)
269
    for item in filelist:
270
      remote_file = "%s/%s" % (remote_dir, os.path.basename(item))
271
      WriteLog("uploading %s to remote %s" % (item, remote_file), logfile)
272
      sftp.put(item, remote_file)
273
      if item == executable:
274
        sftp.chmod(remote_file, 0500)
275
      else:
276
        sftp.chmod(remote_file, 0400)
277
    sftp.close()
278
  except IOError, err:
279
    WriteLog("ERROR: FAILURE_UPLOAD: %s" % err, logfile)
280
    return False
281

    
282
  return remote_dir
283

    
284

    
285
def CleanupRemoteDir(connection, upload_dir, filelist, logfile):
286
  """Cleanes out and removes the remote work directory."""
287
  try:
288
    sftp = paramiko.SFTPClient.from_transport(connection)
289
    for item in filelist:
290
      fullpath = "%s/%s" % (upload_dir, os.path.basename(item))
291
      WriteLog("removing remote %s" % fullpath, logfile)
292
      sftp.remove(fullpath)
293
    sftp.rmdir(upload_dir)
294
    sftp.close()
295
  except IOError, err:
296
    WriteLog("ERROR: FAILURE_CLEANUP: %s" % err, logfile)
297
    return False
298

    
299
  return True
300

    
301

    
302
def RunRemoteCommand(connection, command, logfile):
303
  """Execute the command via ssh on the remote host."""
304
  session = connection.open_session()
305
  session.setblocking(0)
306

    
307
  # the following dance is needed because paramiko changed APIs:
308
  # from returning True/False for success to always returning None
309
  # and throwing an exception in case of problems.
310
  # And I want to support both the old and the new API.
311
  result = True  # being optimistic here, I know
312
  message = None
313
  try:
314
    if session.exec_command("%s 2>&1" % command) is False:
315
      result = False
316
  except paramiko.SSHException, message:
317
    result = False
318

    
319
  if not result:
320
    WriteLog("ERROR: FAILURE_COMMAND_EXECUTION: %s" % message, logfile)
321
    return False
322

    
323
   ### Read when data is available
324
  output = ""
325
  while select.select([session], [], []):
326
    try:
327
      data = session.recv(1024)
328
    except socket.timeout, err:
329
      data = None
330
      WriteLog("FAILED: socket.timeout %s" % err, logfile)
331
    except socket.error, err:
332
      data = None
333
      WriteLog("FAILED: socket.error %s" % err, logfile)
334
    if not data:
335
      break
336
    output += data
337
    select.select([], [], [], .1)
338

    
339
  WriteLog("SUCCESS: command output follows", logfile)
340
  for line in output.splitlines():
341
    WriteLog("output = %s" % line, logfile)
342
  WriteLog("command execution completed", logfile)
343
  session.close()
344

    
345
  return True
346

    
347

    
348
def HostWorker(logdir, username, password, use_agent, hostname,
349
               executable, command, filelist):
350
  """Per-host worker.
351

    
352
  This function does not return - it's the main code of the childs,
353
  which exit at the end of this function. The exit code 0 or 1 will be
354
  interpreted by the parent.
355

    
356
  @param logdir: the directory where the logfiles must be created
357
  @param username: SSH username
358
  @param password: SSH password
359
  @param use_agent: whether we should instead use an agent
360
  @param hostname: the hostname to connect to
361
  @param executable: the executable to upload, if not None
362
  @param command: the command to run
363
  @param filelist: auxiliary files to upload
364

    
365
  """
366
  # in the child/worker process
367
  logfile = "%s/%s.log" % (logdir, hostname)
368
  print "%s - starting" % hostname
369
  result = 0  # optimism, I know
370
  try:
371
    connection = SetupSshConnection(hostname, username,
372
                                    password, use_agent, logfile)
373
    if connection is not False:
374
      if executable is not None:
375
        print "  %s: uploading files" % hostname
376
        upload_dir = UploadFiles(connection, executable,
377
                                 filelist, logfile)
378
        command = "cd %s && ./%s" % (upload_dir, os.path.basename(executable))
379
      print "  %s: executing remote command" % hostname
380
      cmd_result = RunRemoteCommand(connection, command, logfile)
381
      if cmd_result is True:
382
        print "  %s: remote command execution successful" % hostname
383
      else:
384
        print ("  %s: remote command execution failed,"
385
               " check log for details" % hostname)
386
        result = 1
387
      if executable is not None:
388
        print "  %s: cleaning up remote work dir" % hostname
389
        cln_result = CleanupRemoteDir(connection, upload_dir,
390
                                      filelist, logfile)
391
        if cln_result is False:
392
          print ("  %s: remote work dir cleanup failed, check"
393
                 " log for details" % hostname)
394
          result = 1
395
      connection.close()
396
    else:
397
      print "  %s: connection setup failed, skipping" % hostname
398
      result = 1
399
  except KeyboardInterrupt:
400
    print "  %s: received KeyboardInterrupt, aborting" % hostname
401
    WriteLog("ERROR: ABORT_KEYBOARD_INTERRUPT", logfile)
402
    result = 1
403
  except Exception, err:
404
    result = 1
405
    trace = traceback.format_exc()
406
    msg = "ERROR: UNHANDLED_EXECPTION_ERROR: %s\nTrace: %s" % (err, trace)
407
    WriteLog(msg, logfile)
408
    print "  %s: %s" % (hostname, msg)
409
  # and exit with exit code 0 or 1, so the parent can compute statistics
410
  sys.exit(result)
411

    
412

    
413
def LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
414
                 executable, command, filelist):
415
  """Launch the per-host worker.
416

    
417
  Arguments are the same as for HostWorker, except for child_pids,
418
  which is a dictionary holding the pid-to-hostname mapping.
419

    
420
  """
421
  hostname = hostname.rstrip("\n")
422
  pid = os.fork()
423
  if pid > 0:
424
    # controller just record the pids
425
    child_pids[pid] = hostname
426
  else:
427
    HostWorker(logdir, username, password, use_agent, hostname,
428
               executable, command, filelist)
429

    
430

    
431
def ParseOptions():
432
  """Parses the command line options.
433

    
434
  In case of command line errors, it will show the usage and exit the
435
  program.
436

    
437
  @return: the options in a tuple
438

    
439
  """
440
  # resolve because original used -h for hostfile, which conflicts
441
  # with -h for help
442
  parser = optparse.OptionParser(usage="\n%s" % USAGE,
443
                                 conflict_handler="resolve")
444

    
445
  parser.add_option("-l", dest="logdir", default=None,
446
                    help="directory to write logfiles to")
447
  parser.add_option("-x", dest="executable", default=None,
448
                    help="executable to run on remote host(s)",)
449
  parser.add_option("-f", dest="hostfile", default=None,
450
                    help="hostlist file (one host per line)")
451
  parser.add_option("-h", dest="hostlist", default=None, metavar="HOSTS",
452
                    help="comma-separated list of hosts or single hostname",)
453
  parser.add_option("-a", dest="auxfiles", action="append", default=[],
454
                    help="optional auxiliary file to upload"
455
                    " (can be given multiple times",
456
                    metavar="FILE")
457
  parser.add_option("-c", dest="command", default=None,
458
                    help="shell command to run on remote host(s)")
459
  parser.add_option("-b", dest="batch_size", default=15, type="int",
460
                    help="batch-size, how many hosts to process"
461
                    " in parallel [15]")
462
  parser.add_option("-u", dest="username", default="root",
463
                    help="username used to connect [root]")
464
  parser.add_option("-p", dest="password", default=None,
465
                    help="password used to authenticate (when not"
466
                    " using an agent)")
467
  parser.add_option("-A", dest="use_agent", default=False, action="store_true",
468
                    help="instead of password, use keys from an SSH agent")
469

    
470
  opts, args = parser.parse_args()
471

    
472
  if opts.executable and opts.command:
473
    parser.error("Options -x and -c conflict with each other")
474
  if not (opts.executable or opts.command):
475
    parser.error("One of -x and -c must be given")
476
  if not opts.logdir:
477
    parser.error("Option -l is required")
478
  if opts.hostfile and opts.hostlist:
479
    parser.error("Options -f and -h conflict with each other")
480
  if not (opts.hostfile or opts.hostlist):
481
    parser.error("One of -f or -h must be given")
482
  if args:
483
    parser.error("This program doesn't take any arguments, passed in: %s" %
484
                 ", ".join(args))
485

    
486
  return (opts.logdir, opts.executable, opts.hostfile, opts.hostlist,
487
          opts.command, opts.use_agent, opts.auxfiles, opts.username,
488
          opts.password, opts.batch_size)
489

    
490

    
491
def main():
492
  """main."""
493
  (logdir, executable, hostfile, hostlist,
494
   command, use_agent, auxfiles, username,
495
   password, batch_size) = ParseOptions()
496

    
497
  ### Unbuffered sys.stdout
498
  sys.stdout = os.fdopen(1, "w", 0)
499

    
500
  if LogDirUseable(logdir) is False:
501
    print "ERROR: cannot create logfiles in dir %s, aborting" % logdir
502
    sys.exit(1)
503

    
504
  if use_agent:
505
    pass
506
  elif password:
507
    try:
508
      fh = file(password)
509
      pwvalue = fh.readline().strip()
510
      fh.close()
511
    except IOError, e:
512
      print "error: can not read in from password file %s: %s" % (password, e)
513
      sys.exit(1)
514
    password = pwvalue
515
  else:
516
    password = getpass.getpass("%s's password for all nodes: " % username)
517

    
518
  if hostfile:
519
    hosts = GetHosts(hostfile)
520
  else:
521
    if "," in hostlist:
522
      hostlist = hostlist.rstrip(",")  # commandline robustness
523
      hosts = hostlist.split(",")
524
    else:
525
      hosts = [hostlist]
526

    
527
  successes = failures = 0
528

    
529
  filelist = auxfiles[:]
530
  filelist.append(executable)
531

    
532
  # initial batch
533
  batch = hosts[:batch_size]
534
  hosts = hosts[batch_size:]
535
  child_pids = {}
536
  for hostname in batch:
537
    LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
538
                 executable, command, filelist)
539

    
540
  while child_pids:
541
    pid, status = os.wait()
542
    hostname = child_pids.pop(pid, "<unknown host>")
543
    print "  %s: done (in parent)" % hostname
544
    if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0:
545
      successes += 1
546
    else:
547
      failures += 1
548
    if hosts:
549
      LaunchWorker(child_pids, logdir, username, password, use_agent,
550
                   hosts.pop(0), executable, command, filelist)
551

    
552
  print
553
  print "All done, %s successful and %s failed hosts" % (successes, failures)
554

    
555
  sys.exit(0)
556

    
557

    
558
if __name__ == "__main__":
559
  try:
560
    main()
561
  except KeyboardInterrupt:
562
    print "Received KeyboardInterrupt, aborting"
563
    sys.exit(1)