Statistics
| Branch: | Tag: | Revision:

root / tools / ganeti-listrunner @ 6bc1c168

History | View | Annotate | Download (18.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Run an executable on a list of hosts.
22

    
23
Script to serially run an executable on a list of hosts via ssh
24
with password auth as root. If the provided log dir does not yet
25
exist, it will try to create it.
26

    
27
Implementation:
28
 - the main process spawns up to batch_size children, which:
29
 - connects to the remote host via ssh as root
30
 - uploads the executable with a random name to /tmp via sftp
31
 - chmod 500s it
32
 - via ssh: chdirs into the upload directory and runs the script
33
 - deletes it
34
 - writes status messages and all output to one logfile per host
35
 - the main process gathers then the status of the children and
36
   reports the success/failure ratio
37
 - entire script can be aborted with Ctrl-C
38

    
39
Security considerations:
40
 - the root password for the remote hosts is stored in memory for the
41
   runtime of the script
42
 - the executable to be run on the remote host is handled the following way:
43
   - try to create a random directory with permissions 700 on the
44
     remote host, abort furter processing on this host if this failes
45
   - upload the executable with to a random filename in that directory
46
   - set executable permissions to 500
47
   - run the executable
48
   - delete the execuable and the directory on the remote host
49

    
50
"""
51

    
52
# pylint: disable-msg=C0103
53
# C0103: Invalid name ganeti-listrunner
54

    
55
import errno
56
import optparse
57
import getpass
58
import logging
59
import os
60
import random
61
import select
62
import socket
63
import sys
64
import time
65
import traceback
66

    
67
import paramiko
68

    
69

    
70
REMOTE_PATH_BASE = "/tmp/listrunner"
71

    
72
USAGE = ("%prog -l logdir {-c command | -x /path/to/file} [-b batch_size]"
73
         " {-f hostfile|-h hosts} [-u username]"
74
         " [-p password_file | -A]")
75

    
76

    
77
def LogDirUseable(logdir):
78
  """Ensure log file directory is available and usable."""
79
  testfile = "%s/test-%s-%s.deleteme" % (logdir, random.random(),
80
                                         random.random())
81
  try:
82
    os.mkdir(logdir)
83
  except OSError, err:
84
    if err.errno != errno.EEXIST:
85
      raise
86
  try:
87
    logtest = open(testfile, "aw")
88
    logtest.writelines("log file writeability test\n")
89
    logtest.close()
90
    os.unlink(testfile)
91
    return True
92
  except (OSError, IOError):
93
    return False
94

    
95

    
96
def GetTimeStamp(timestamp=None):
97
  """Return ISO8601 timestamp.
98

    
99
  Returns ISO8601 timestamp, optionally expects a time.localtime() tuple
100
  in timestamp, but will use the current time if this argument is not
101
  supplied.
102
  """
103
  if timestamp is None:
104
    timestamp = time.localtime()
105

    
106
  isotime = time.strftime("%Y-%m-%dT%H:%M:%S", timestamp)
107
  return isotime
108

    
109

    
110
def PingByTcp(target, port, timeout=10, live_port_needed=False, source=None):
111
  """Simple ping implementation using TCP connect(2).
112

    
113
  Try to do a TCP connect(2) from an optional source IP to the
114
  specified target IP and the specified target port. If the optional
115
  parameter live_port_needed is set to true, requires the remote end
116
  to accept the connection. The timeout is specified in seconds and
117
  defaults to 10 seconds. If the source optional argument is not
118
  passed, the source address selection is left to the kernel,
119
  otherwise we try to connect using the passed address (failures to
120
  bind other than EADDRNOTAVAIL will be ignored).
121

    
122
  """
123
  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
124

    
125
  success = False
126

    
127
  if source is not None:
128
    try:
129
      sock.bind((source, 0))
130
    except socket.error, (errcode):
131
      if errcode == errno.EADDRNOTAVAIL:
132
        success = False
133

    
134
  sock.settimeout(timeout)
135

    
136
  try:
137
    sock.connect((target, port))
138
    sock.close()
139
    success = True
140
  except socket.timeout:
141
    success = False
142
  except socket.error, (errcode):
143
    success = (not live_port_needed) and (errcode == errno.ECONNREFUSED)
144

    
145
  return success
146

    
147

    
148
def GetHosts(hostsfile):
149
  """Return list of hosts from hostfile.
150

    
151
  Reads the hostslist file and returns a list of hosts.
152
  Expects the hostslist file to contain one hostname per line.
153

    
154
  """
155
  try:
156
    datafile = open(hostsfile, "r")
157
  except IOError, msg:
158
    print "Failed to open hosts file %s: %s" % (hostsfile, msg)
159
    sys.exit(2)
160

    
161
  hosts = datafile.readlines()
162
  datafile.close()
163

    
164
  return hosts
165

    
166

    
167
def WriteLog(message, logfile):
168
  """Writes message, terminated by newline, to logfile."""
169
  try:
170
    logfile = open(logfile, "aw")
171
  except IOError, msg:
172
    print "failed to open log file %s: %s" % (logfile, msg)
173
    print "log message was: %s" % message
174
    sys.exit(1)  # no being able to log is critical
175
  try:
176
    timestamp = GetTimeStamp()
177
    logfile.writelines("%s %s\n" % (timestamp, message))
178
    logfile.close()
179
  except IOError, msg:
180
    print "failed to write to logfile %s: %s" % (logfile, msg)
181
    print "log message was: %s" % message
182
    sys.exit(1)  # no being able to log is critical
183

    
184

    
185
def GetAgentKeys():
186
  """Tries to get a list of ssh keys from an agent."""
187
  try:
188
    agent = paramiko.Agent()
189
    return list(agent.get_keys())
190
  except paramiko.SSHException:
191
    return []
192

    
193

    
194
def SetupSshConnection(host, username, password, use_agent, logfile):
195
  """Setup the ssh connection used for all later steps.
196

    
197
  This function sets up the ssh connection that will be used both
198
  for upload and remote command execution.
199

    
200
  On success, it will return paramiko.Transport object with an
201
  already logged in session. On failure, False will be returned.
202

    
203
  """
204
  # check if target is willing to talk to us at all
205
  if not PingByTcp(host, 22, live_port_needed=True):
206
    WriteLog("ERROR: FAILURE_NOT_REACHABLE", logfile)
207
    print "  - ERROR: host not reachable on 22/tcp"
208
    return False
209

    
210
  if use_agent:
211
    keys = GetAgentKeys()
212
  else:
213
    keys = []
214
  all_kwargs = [{"pkey": k} for k in keys]
215
  all_desc = ["key %d" % d for d in range(len(keys))]
216
  if password is not None:
217
    all_kwargs.append({"password": password})
218
    all_desc.append("password")
219

    
220
  # deal with logging out of paramiko.transport
221
  handler = None
222

    
223
  for desc, kwargs in zip(all_desc, all_kwargs):
224
    try:
225
      transport = paramiko.Transport((host, 22))
226

    
227
      # only try to setup the logging handler once
228
      if not handler:
229
        handler = logging.StreamHandler()
230
        handler.setLevel(logging.ERROR)
231
        log = logging.getLogger(transport.get_log_channel())
232
        log.addHandler(handler)
233

    
234
      transport.connect(username=username, **kwargs) # pylint: disable-msg=W0142
235
      WriteLog("ssh connection established using %s" % desc, logfile)
236
      # strange ... when establishing the session and the immediately
237
      # setting up the channels for sftp & shell from that, it sometimes
238
      # fails, but waiting 1 second after session setup makes it always work
239
      # time.sleep(1)
240
      # FIXME apparently needfull to give sshd some time
241
      return transport
242
    except (socket.gaierror, socket.error, paramiko.SSHException):
243
      continue
244

    
245
  methods = ", ".join(all_desc)
246
  WriteLog("ERROR: FAILURE_CONNECTION_SETUP (tried %s) " % methods, logfile)
247
  WriteLog("aborted", logfile)
248
  print "  - ERROR: connection setup failed (tried %s)" % methods
249

    
250
  return False
251

    
252

    
253
def UploadFiles(connection, executable, filelist, logfile):
254
  """Uploads the specified files via sftp.
255

    
256
  Uploads the specified files to a random, freshly created directory with
257
  a temporary name under /tmp. All uploaded files are chmod 0400 after upload
258
  with the exception of executable, with is chmod 500.
259

    
260
  Upon success, returns the absolute path to the remote upload directory,
261
  but will return False upon failure.
262
  """
263
  remote_dir = "%s.%s-%s" % (REMOTE_PATH_BASE,
264
                             random.random(), random.random())
265

    
266
  try:
267
    sftp = paramiko.SFTPClient.from_transport(connection)
268
    sftp.mkdir(remote_dir, mode=0700)
269
    for item in filelist:
270
      remote_file = "%s/%s" % (remote_dir, item.split("/").pop())
271
      WriteLog("uploading %s to remote %s" % (item, remote_file), logfile)
272
      sftp.put(item, remote_file)
273
      if item == executable:
274
        sftp.chmod(remote_file, 0500)
275
      else:
276
        sftp.chmod(remote_file, 0400)
277
    sftp.close()
278
  except IOError, err:
279
    WriteLog("ERROR: FAILURE_UPLOAD: %s" % err, logfile)
280
    return False
281

    
282
  return remote_dir
283

    
284

    
285
def CleanupRemoteDir(connection, upload_dir, filelist, logfile):
286
  """Cleanes out and removes the remote work directory."""
287
  try:
288
    sftp = paramiko.SFTPClient.from_transport(connection)
289
    for item in filelist:
290
      fullpath = "%s/%s" % (upload_dir, item.split("/").pop())
291
      WriteLog("removing remote %s" % fullpath, logfile)
292
      sftp.remove(fullpath)
293
    sftp.rmdir(upload_dir)
294
    sftp.close()
295
  except IOError, err:
296
    WriteLog("ERROR: FAILURE_CLEANUP: %s" % err, logfile)
297
    return False
298

    
299
  return True
300

    
301

    
302
def RunRemoteCommand(connection, command, logfile):
303
  """Execute the command via ssh on the remote host."""
304
  session = connection.open_session()
305
  session.setblocking(0)
306

    
307
  # the following dance is needed because paramiko changed APIs:
308
  # from returning True/False for success to always returning None
309
  # and throwing an exception in case of problems.
310
  # And I want to support both the old and the new API.
311
  result = True  # being optimistic here, I know
312
  message = None
313
  try:
314
    if session.exec_command("%s 2>&1" % command) is False:
315
      result = False
316
  except paramiko.SSHException, message:
317
    result = False
318

    
319
  if not result:
320
    WriteLog("ERROR: FAILURE_COMMAND_EXECUTION: %s" % message, logfile)
321
    return False
322

    
323
   ### Read when data is available
324
  output = ""
325
  while select.select([session], [], []):
326
    try:
327
      data = session.recv(1024)
328
    except socket.timeout, err:
329
      data = None
330
      WriteLog("FAILED: socket.timeout %s" % err, logfile)
331
    except socket.error, err:
332
      data = None
333
      WriteLog("FAILED: socket.error %s" % err, logfile)
334
    if not data:
335
      break
336
    output += data
337
    select.select([], [], [], .1)
338

    
339
  WriteLog("SUCCESS: command output follows", logfile)
340
  for line in output.split("\n"):
341
    WriteLog("output = %s" %line, logfile)
342
  WriteLog("command execution completed", logfile)
343
  session.close()
344

    
345
  return True
346

    
347

    
348
def HostWorker(logdir, username, password, use_agent, hostname,
349
               executable, command, filelist):
350
  """Per-host worker.
351

    
352
  This function does not return - it's the main code of the childs,
353
  which exit at the end of this function. The exit code 0 or 1 will be
354
  interpreted by the parent.
355

    
356
  @param logdir: the directory where the logfiles must be created
357
  @param username: SSH username
358
  @param password: SSH password
359
  @param use_agent: whether we should instead use an agent
360
  @param hostname: the hostname to connect to
361
  @param executable: the executable to upload, if not None
362
  @param command: the command to run
363
  @param filelist: auxiliary files to upload
364

    
365
  """
366
  # in the child/worker process
367
  logfile = "%s/%s.log" % (logdir, hostname)
368
  print "%s - starting" % hostname
369
  result = 0  # optimism, I know
370
  try:
371
    connection = SetupSshConnection(hostname, username,
372
                                    password, use_agent, logfile)
373
    if connection is not False:
374
      if executable is not None:
375
        print "  %s: uploading files" % hostname
376
        upload_dir = UploadFiles(connection, executable,
377
                                 filelist, logfile)
378
        command = "cd %s && ./%s" % (upload_dir,
379
                                     executable.split("/").pop())
380
      print "  %s: executing remote command" % hostname
381
      cmd_result = RunRemoteCommand(connection, command, logfile)
382
      if cmd_result is True:
383
        print "  %s: remote command execution successful" % hostname
384
      else:
385
        print ("  %s: remote command execution failed,"
386
               " check log for details" % hostname)
387
        result = 1
388
      if executable is not None:
389
        print "  %s: cleaning up remote work dir" % hostname
390
        cln_result = CleanupRemoteDir(connection, upload_dir,
391
                                      filelist, logfile)
392
        if cln_result is False:
393
          print ("  %s: remote work dir cleanup failed, check"
394
                 " log for details" % hostname)
395
          result = 1
396
      connection.close()
397
    else:
398
      print "  %s: connection setup failed, skipping" % hostname
399
      result = 1
400
  except KeyboardInterrupt:
401
    print "  %s: received KeyboardInterrupt, aborting" % hostname
402
    WriteLog("ERROR: ABORT_KEYBOARD_INTERRUPT", logfile)
403
    result = 1
404
  except Exception, err:
405
    result = 1
406
    trace = traceback.format_exc()
407
    msg = "ERROR: UNHANDLED_EXECPTION_ERROR: %s\nTrace: %s" % (err, trace)
408
    WriteLog(msg, logfile)
409
    print "  %s: %s" % (hostname, msg)
410
  # and exit with exit code 0 or 1, so the parent can compute statistics
411
  sys.exit(result)
412

    
413

    
414
def LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
415
                 executable, command, filelist):
416
  """Launch the per-host worker.
417

    
418
  Arguments are the same as for HostWorker, except for child_pids,
419
  which is a dictionary holding the pid-to-hostname mapping.
420

    
421
  """
422
  hostname = hostname.rstrip("\n")
423
  pid = os.fork()
424
  if pid > 0:
425
    # controller just record the pids
426
    child_pids[pid] = hostname
427
  else:
428
    HostWorker(logdir, username, password, use_agent, hostname,
429
               executable, command, filelist)
430

    
431

    
432
def ParseOptions():
433
  """Parses the command line options.
434

    
435
  In case of command line errors, it will show the usage and exit the
436
  program.
437

    
438
  @return: the options in a tuple
439

    
440
  """
441
  # resolve because original used -h for hostfile, which conflicts
442
  # with -h for help
443
  parser = optparse.OptionParser(usage="\n%s" % USAGE,
444
                                 conflict_handler="resolve")
445

    
446
  parser.add_option("-l", dest="logdir", default=None,
447
                    help="directory to write logfiles to")
448
  parser.add_option("-x", dest="executable", default=None,
449
                    help="executable to run on remote host(s)",)
450
  parser.add_option("-f", dest="hostfile", default=None,
451
                    help="hostlist file (one host per line)")
452
  parser.add_option("-h", dest="hostlist", default=None, metavar="HOSTS",
453
                    help="comma-separated list of hosts or single hostname",)
454
  parser.add_option("-a", dest="auxfiles", action="append", default=[],
455
                    help="optional auxiliary file to upload"
456
                    " (can be given multiple times",
457
                    metavar="FILE")
458
  parser.add_option("-c", dest="command", default=None,
459
                    help="shell command to run on remote host(s)")
460
  parser.add_option("-b", dest="batch_size", default=15, type="int",
461
                    help="batch-size, how many hosts to process"
462
                    " in parallel [15]")
463
  parser.add_option("-u", dest="username", default="root",
464
                    help="username used to connect [root]")
465
  parser.add_option("-p", dest="password", default=None,
466
                    help="password used to authenticate (when not"
467
                    " using an agent)")
468
  parser.add_option("-A", dest="use_agent", default=False, action="store_true",
469
                    help="instead of password, use keys from an SSH agent")
470

    
471
  opts, args = parser.parse_args()
472

    
473
  if opts.executable and opts.command:
474
    parser.error("Options -x and -c conflict with each other")
475
  if not (opts.executable or opts.command):
476
    parser.error("One of -x and -c must be given")
477
  if not opts.logdir:
478
    parser.error("Option -l is required")
479
  if opts.hostfile and opts.hostlist:
480
    parser.error("Options -f and -h conflict with each other")
481
  if not (opts.hostfile or opts.hostlist):
482
    parser.error("One of -f or -h must be given")
483
  if args:
484
    parser.error("This program doesn't take any arguments, passed in: %s" %
485
                 ", ".join(args))
486

    
487
  return (opts.logdir, opts.executable, opts.hostfile, opts.hostlist,
488
          opts.command, opts.use_agent, opts.auxfiles, opts.username,
489
          opts.password, opts.batch_size)
490

    
491

    
492
def main():
493
  """main."""
494
  (logdir, executable, hostfile, hostlist,
495
   command, use_agent, auxfiles, username,
496
   password, batch_size) = ParseOptions()
497

    
498
  ### Unbuffered sys.stdout
499
  sys.stdout = os.fdopen(1, "w", 0)
500

    
501
  if LogDirUseable(logdir) is False:
502
    print "ERROR: cannot create logfiles in dir %s, aborting" % logdir
503
    sys.exit(1)
504

    
505
  if use_agent:
506
    pass
507
  elif password:
508
    try:
509
      fh = file(password)
510
      pwvalue = fh.readline().strip()
511
      fh.close()
512
    except IOError, e:
513
      print "error: can not read in from password file %s: %s" % (password, e)
514
      sys.exit(1)
515
    password = pwvalue
516
  else:
517
    password = getpass.getpass("%s's password for all nodes: " % username)
518

    
519
  if hostfile:
520
    hosts = GetHosts(hostfile)
521
  else:
522
    if "," in hostlist:
523
      hostlist = hostlist.rstrip(",")  # commandline robustness
524
      hosts = hostlist.split(",")
525
    else:
526
      hosts = [hostlist]
527

    
528
  successes = failures = 0
529

    
530
  filelist = auxfiles[:]
531
  filelist.append(executable)
532

    
533
  # initial batch
534
  batch = hosts[:batch_size]
535
  hosts = hosts[batch_size:]
536
  child_pids = {}
537
  for hostname in batch:
538
    LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
539
                 executable, command, filelist)
540

    
541
  while child_pids:
542
    pid, status = os.wait()
543
    hostname = child_pids.pop(pid, "<unknown host>")
544
    print "  %s: done (in parent)" % hostname
545
    if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0:
546
      successes += 1
547
    else:
548
      failures += 1
549
    if hosts:
550
      LaunchWorker(child_pids, logdir, username, password, use_agent,
551
                   hosts.pop(0), executable, command, filelist)
552

    
553
  print
554
  print "All done, %s successful and %s failed hosts" % (successes, failures)
555

    
556
  sys.exit(0)
557

    
558

    
559
if __name__ == "__main__":
560
  try:
561
    main()
562
  except KeyboardInterrupt:
563
    print "Received KeyboardInterrupt, aborting"
564
    sys.exit(1)