Statistics
| Branch: | Tag: | Revision:

root / tools / ganeti-listrunner @ b74c0684

History | View | Annotate | Download (17.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Run an executable on a list of hosts.
22

    
23
Script to serially run an executable on a list of hosts via ssh
24
with password auth as root. If the provided log dir does not yet
25
exist, it will try to create it.
26

    
27
Implementation:
28
 - the main process spawns up to batch_size children, which:
29
 - connects to the remote host via ssh as root
30
 - uploads the executable with a random name to /tmp via sftp
31
 - chmod 500s it
32
 - via ssh: chdirs into the upload directory and runs the script
33
 - deletes it
34
 - writes status messages and all output to one logfile per host
35
 - the main process gathers then the status of the children and
36
   reports the success/failure ratio
37
 - entire script can be aborted with Ctrl-C
38

    
39
Security considerations:
40
 - the root password for the remote hosts is stored in memory for the
41
   runtime of the script
42
 - the executable to be run on the remote host is handled the following way:
43
   - try to create a random directory with permissions 700 on the
44
     remote host, abort furter processing on this host if this failes
45
   - upload the executable with to a random filename in that directory
46
   - set executable permissions to 500
47
   - run the executable
48
   - delete the execuable and the directory on the remote host
49

    
50
"""
51

    
52
# pylint: disable-msg=C0103
53
# C0103: Invalid name ganeti-listrunner
54

    
55
import errno
56
import optparse
57
import getpass
58
import logging
59
import os
60
import random
61
import select
62
import socket
63
import sys
64
import time
65
import traceback
66

    
67
import paramiko
68

    
69

    
70
REMOTE_PATH_BASE = "/tmp/listrunner"
71

    
72
USAGE = ("%prog -l logdir {-c command | -x /path/to/file} [-b batch_size]"
73
         " {-f hostfile|-h hosts} [-u username]"
74
         " [-p password_file | -A]")
75

    
76

    
77
def LogDirUseable(logdir):
78
  """Ensure log file directory is available and usable."""
79
  testfile = "%s/test-%s-%s.deleteme" % (logdir, random.random(),
80
                                         random.random())
81
  try:
82
    os.mkdir(logdir)
83
  except OSError, err:
84
    if err.errno != errno.EEXIST:
85
      raise
86
  try:
87
    logtest = open(testfile, "aw")
88
    logtest.writelines("log file writeability test\n")
89
    logtest.close()
90
    os.unlink(testfile)
91
    return True
92
  except (OSError, IOError):
93
    return False
94

    
95

    
96
def GetTimeStamp(timestamp=None):
97
  """Return ISO8601 timestamp.
98

    
99
  Returns ISO8601 timestamp, optionally expects a time.localtime() tuple
100
  in timestamp, but will use the current time if this argument is not
101
  supplied.
102
  """
103
  if timestamp is None:
104
    timestamp = time.localtime()
105

    
106
  isotime = time.strftime("%Y-%m-%dT%H:%M:%S", timestamp)
107
  return isotime
108

    
109

    
110
def PingByTcp(target, port, timeout=10, live_port_needed=False, source=None):
111
  """Simple ping implementation using TCP connect(2).
112

    
113
  Try to do a TCP connect(2) from an optional source IP to the
114
  specified target IP and the specified target port. If the optional
115
  parameter live_port_needed is set to true, requires the remote end
116
  to accept the connection. The timeout is specified in seconds and
117
  defaults to 10 seconds. If the source optional argument is not
118
  passed, the source address selection is left to the kernel,
119
  otherwise we try to connect using the passed address (failures to
120
  bind other than EADDRNOTAVAIL will be ignored).
121

    
122
  """
123
  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
124

    
125
  success = False
126

    
127
  if source is not None:
128
    try:
129
      sock.bind((source, 0))
130
    except socket.error, (errcode):
131
      if errcode == errno.EADDRNOTAVAIL:
132
        success = False
133

    
134
  sock.settimeout(timeout)
135

    
136
  try:
137
    sock.connect((target, port))
138
    sock.close()
139
    success = True
140
  except socket.timeout:
141
    success = False
142
  except socket.error, (errcode):
143
    success = (not live_port_needed) and (errcode == errno.ECONNREFUSED)
144

    
145
  return success
146

    
147

    
148
def GetHosts(hostsfile):
149
  """Return list of hosts from hostfile.
150

    
151
  Reads the hostslist file and returns a list of hosts.
152
  Expects the hostslist file to contain one hostname per line.
153

    
154
  """
155
  try:
156
    datafile = open(hostsfile, "r")
157
  except IOError, msg:
158
    print "Failed to open hosts file %s: %s" % (hostsfile, msg)
159
    sys.exit(2)
160

    
161
  hosts = datafile.readlines()
162
  datafile.close()
163

    
164
  return hosts
165

    
166

    
167
def WriteLog(message, logfile):
168
  """Writes message, terminated by newline, to logfile."""
169
  try:
170
    logfile = open(logfile, "aw")
171
  except IOError, msg:
172
    print "failed to open log file %s: %s" % (logfile, msg)
173
    print "log message was: %s" % message
174
    sys.exit(1)  # no being able to log is critical
175
  try:
176
    timestamp = GetTimeStamp()
177
    logfile.writelines("%s %s\n" % (timestamp, message))
178
    logfile.close()
179
  except IOError, msg:
180
    print "failed to write to logfile %s: %s" % (logfile, msg)
181
    print "log message was: %s" % message
182
    sys.exit(1)  # no being able to log is critical
183

    
184

    
185
def GetAgentKeys():
186
  """Tries to get a list of ssh keys from an agent."""
187
  try:
188
    agent = paramiko.Agent()
189
    return list(agent.get_keys())
190
  except paramiko.SSHException:
191
    return []
192

    
193

    
194
def SetupSshConnection(host, username, password, use_agent, logfile):
195
  """Setup the ssh connection used for all later steps.
196

    
197
  This function sets up the ssh connection that will be used both
198
  for upload and remote command execution.
199

    
200
  On success, it will return paramiko.Transport object with an
201
  already logged in session. On failure, False will be returned.
202

    
203
  """
204
  # check if target is willing to talk to us at all
205
  if not PingByTcp(host, 22, live_port_needed=True):
206
    WriteLog("ERROR: FAILURE_NOT_REACHABLE", logfile)
207
    print "  - ERROR: host not reachable on 22/tcp"
208
    return False
209

    
210
  if use_agent:
211
    keys = GetAgentKeys()
212
  else:
213
    keys = []
214
  all_kwargs = [{"pkey": k} for k in keys]
215
  all_desc = ["key %d" % d for d in range(len(keys))]
216
  if password is not None:
217
    all_kwargs.append({"password": password})
218
    all_desc.append("password")
219

    
220
  # deal with logging out of paramiko.transport
221
  handler = None
222

    
223
  for desc, kwargs in zip(all_desc, all_kwargs):
224
    try:
225
      transport = paramiko.Transport((host, 22))
226

    
227
      # only try to setup the logging handler once
228
      if not handler:
229
        handler = logging.StreamHandler()
230
        handler.setLevel(logging.ERROR)
231
        log = logging.getLogger(transport.get_log_channel())
232
        log.addHandler(handler)
233

    
234
      transport.connect(username=username, **kwargs) # pylint: disable-msg=W0142
235
      WriteLog("ssh connection established using %s" % desc, logfile)
236
      # strange ... when establishing the session and the immediately
237
      # setting up the channels for sftp & shell from that, it sometimes
238
      # fails, but waiting 1 second after session setup makes it always work
239
      # time.sleep(1)
240
      # FIXME apparently needfull to give sshd some time
241
      return transport
242
    except (socket.gaierror, socket.error, paramiko.SSHException):
243
      continue
244

    
245
  methods = ", ".join(all_desc)
246
  WriteLog("ERROR: FAILURE_CONNECTION_SETUP (tried %s) " % methods, logfile)
247
  WriteLog("aborted", logfile)
248
  print "  - ERROR: connection setup failed (tried %s)" % methods
249

    
250
  return False
251

    
252

    
253
def UploadFiles(connection, executable, filelist, logfile):
254
  """Uploads the specified files via sftp.
255

    
256
  Uploads the specified files to a random, freshly created directory with
257
  a temporary name under /tmp. All uploaded files are chmod 0400 after upload
258
  with the exception of executable, with is chmod 500.
259

    
260
  Upon success, returns the absolute path to the remote upload directory,
261
  but will return False upon failure.
262
  """
263
  remote_dir = "%s.%s-%s" % (REMOTE_PATH_BASE,
264
                             random.random(), random.random())
265

    
266
  try:
267
    sftp = paramiko.SFTPClient.from_transport(connection)
268
    sftp.mkdir(remote_dir, mode=0700)
269
    for item in filelist:
270
      remote_file = "%s/%s" % (remote_dir, item.split("/").pop())
271
      WriteLog("uploading %s to remote %s" % (item, remote_file), logfile)
272
      sftp.put(item, remote_file)
273
      if item == executable:
274
        sftp.chmod(remote_file, 0500)
275
      else:
276
        sftp.chmod(remote_file, 0400)
277
    sftp.close()
278
  except IOError, err:
279
    WriteLog("ERROR: FAILURE_UPLOAD: %s" % err, logfile)
280
    return False
281

    
282
  return remote_dir
283

    
284

    
285
def CleanupRemoteDir(connection, upload_dir, filelist, logfile):
286
  """Cleanes out and removes the remote work directory."""
287
  try:
288
    sftp = paramiko.SFTPClient.from_transport(connection)
289
    for item in filelist:
290
      fullpath = "%s/%s" % (upload_dir, item.split("/").pop())
291
      WriteLog("removing remote %s" % fullpath, logfile)
292
      sftp.remove(fullpath)
293
    sftp.rmdir(upload_dir)
294
    sftp.close()
295
  except IOError, err:
296
    WriteLog("ERROR: FAILURE_CLEANUP: %s" % err, logfile)
297
    return False
298

    
299
  return True
300

    
301

    
302
def RunRemoteCommand(connection, command, logfile):
303
  """Execute the command via ssh on the remote host."""
304
  session = connection.open_session()
305
  session.setblocking(0)
306

    
307
  # the following dance is needed because paramiko changed APIs:
308
  # from returning True/False for success to always returning None
309
  # and throwing an exception in case of problems.
310
  # And I want to support both the old and the new API.
311
  result = True  # being optimistic here, I know
312
  message = None
313
  try:
314
    if session.exec_command("%s 2>&1" % command) is False:
315
      result = False
316
  except paramiko.SSHException, message:
317
    result = False
318

    
319
  if not result:
320
    WriteLog("ERROR: FAILURE_COMMAND_EXECUTION: %s" % message, logfile)
321
    return False
322

    
323
   ### Read when data is available
324
  output = ""
325
  while select.select([session], [], []):
326
    data = session.recv(1024)
327
    if not data:
328
      break
329
    output += data
330
    select.select([], [], [], .1)
331

    
332
  WriteLog("SUCCESS: command output follows", logfile)
333
  for line in output.split("\n"):
334
    WriteLog("output = %s" %line, logfile)
335
  WriteLog("command execution completed", logfile)
336
  session.close()
337

    
338
  return True
339

    
340

    
341
def HostWorker(logdir, username, password, use_agent, hostname,
342
               executable, command, filelist):
343
  """Per-host worker.
344

    
345
  This function does not return - it's the main code of the childs,
346
  which exit at the end of this function. The exit code 0 or 1 will be
347
  interpreted by the parent.
348

    
349
  @param logdir: the directory where the logfiles must be created
350
  @param username: SSH username
351
  @param password: SSH password
352
  @param use_agent: whether we should instead use an agent
353
  @param hostname: the hostname to connect to
354
  @param executable: the executable to upload, if not None
355
  @param command: the command to run
356
  @param filelist: auxiliary files to upload
357

    
358
  """
359
  # in the child/worker process
360
  logfile = "%s/%s.log" % (logdir, hostname)
361
  print "%s - starting" % hostname
362
  result = 0  # optimism, I know
363
  try:
364
    connection = SetupSshConnection(hostname, username,
365
                                    password, use_agent, logfile)
366
    if connection is not False:
367
      if executable is not None:
368
        print "  %s: uploading files" % hostname
369
        upload_dir = UploadFiles(connection, executable,
370
                                 filelist, logfile)
371
        command = "cd %s && ./%s" % (upload_dir,
372
                                     executable.split("/").pop())
373
      print "  %s: executing remote command" % hostname
374
      cmd_result = RunRemoteCommand(connection, command, logfile)
375
      if cmd_result is True:
376
        print "  %s: remote command execution successful" % hostname
377
      else:
378
        print ("  %s: remote command execution failed,"
379
               " check log for details" % hostname)
380
        result = 1
381
      if executable is not None:
382
        print "  %s: cleaning up remote work dir" % hostname
383
        cln_result = CleanupRemoteDir(connection, upload_dir,
384
                                      filelist, logfile)
385
        if cln_result is False:
386
          print ("  %s: remote work dir cleanup failed, check"
387
                 " log for details" % hostname)
388
          result = 1
389
      connection.close()
390
    else:
391
      print "  %s: connection setup failed, skipping" % hostname
392
      result = 1
393
  except KeyboardInterrupt:
394
    print "  %s: received KeyboardInterrupt, aborting" % hostname
395
    WriteLog("ERROR: ABORT_KEYBOARD_INTERRUPT", logfile)
396
    result = 1
397
  except Exception, err:
398
    result = 1
399
    trace = traceback.format_exc()
400
    msg = "ERROR: UNHANDLED_EXECPTION_ERROR: %s\nTrace: %s" % (err, trace)
401
    WriteLog(msg, logfile)
402
    print "  %s: %s" % (hostname, msg)
403
  # and exit with exit code 0 or 1, so the parent can compute statistics
404
  sys.exit(result)
405

    
406

    
407
def LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
408
                 executable, command, filelist):
409
  """Launch the per-host worker.
410

    
411
  Arguments are the same as for HostWorker, except for child_pids,
412
  which is a dictionary holding the pid-to-hostname mapping.
413

    
414
  """
415
  hostname = hostname.rstrip("\n")
416
  pid = os.fork()
417
  if pid > 0:
418
    # controller just record the pids
419
    child_pids[pid] = hostname
420
  else:
421
    HostWorker(logdir, username, password, use_agent, hostname,
422
               executable, command, filelist)
423

    
424

    
425
def ParseOptions():
426
  """Parses the command line options.
427

    
428
  In case of command line errors, it will show the usage and exit the
429
  program.
430

    
431
  @return: the options in a tuple
432

    
433
  """
434
  # resolve because original used -h for hostfile, which conflicts
435
  # with -h for help
436
  parser = optparse.OptionParser(usage="\n%s" % USAGE,
437
                                 conflict_handler="resolve")
438

    
439
  parser.add_option("-l", dest="logdir", default=None,
440
                    help="directory to write logfiles to")
441
  parser.add_option("-x", dest="executable", default=None,
442
                    help="executable to run on remote host(s)",)
443
  parser.add_option("-f", dest="hostfile", default=None,
444
                    help="hostlist file (one host per line)")
445
  parser.add_option("-h", dest="hostlist", default=None, metavar="HOSTS",
446
                    help="comma-separated list of hosts or single hostname",)
447
  parser.add_option("-a", dest="auxfiles", action="append", default=[],
448
                    help="optional auxiliary file to upload"
449
                    " (can be given multiple times",
450
                    metavar="FILE")
451
  parser.add_option("-c", dest="command", default=None,
452
                    help="shell command to run on remote host(s)")
453
  parser.add_option("-b", dest="batch_size", default=15, type="int",
454
                    help="batch-size, how many hosts to process"
455
                    " in parallel [15]")
456
  parser.add_option("-u", dest="username", default="root",
457
                    help="username used to connect [root]")
458
  parser.add_option("-p", dest="password", default=None,
459
                    help="password used to authenticate (when not"
460
                    " using an agent)")
461
  parser.add_option("-A", dest="use_agent", default=False, action="store_true",
462
                    help="instead of password, use keys from an SSH agent")
463

    
464
  opts, args = parser.parse_args()
465

    
466
  if opts.executable and opts.command:
467
    parser.error("Options -x and -c conflict with each other")
468
  if not (opts.executable or opts.command):
469
    parser.error("One of -x and -c must be given")
470
  if not opts.logdir:
471
    parser.error("Option -l is required")
472
  if opts.hostfile and opts.hostlist:
473
    parser.error("Options -f and -h conflict with each other")
474
  if not (opts.hostfile or opts.hostlist):
475
    parser.error("One of -f or -h must be given")
476
  if args:
477
    parser.error("This program doesn't take any arguments, passed in: %s" %
478
                 ", ".join(args))
479

    
480
  return (opts.logdir, opts.executable, opts.hostfile, opts.hostlist,
481
          opts.command, opts.use_agent, opts.auxfiles, opts.username,
482
          opts.password, opts.batch_size)
483

    
484

    
485
def main():
486
  """main."""
487
  (logdir, executable, hostfile, hostlist,
488
   command, use_agent, auxfiles, username,
489
   password, batch_size) = ParseOptions()
490

    
491
  ### Unbuffered sys.stdout
492
  sys.stdout = os.fdopen(1, "w", 0)
493

    
494
  if LogDirUseable(logdir) is False:
495
    print "ERROR: cannot create logfiles in dir %s, aborting" % logdir
496
    sys.exit(1)
497

    
498
  if use_agent:
499
    pass
500
  elif password:
501
    try:
502
      fh = file(password)
503
      pwvalue = fh.readline().strip()
504
      fh.close()
505
    except IOError, e:
506
      print "error: can not read in from password file %s: %s" % (password, e)
507
      sys.exit(1)
508
    password = pwvalue
509
  else:
510
    password = getpass.getpass("%s's password for all nodes: " % username)
511

    
512
  if hostfile:
513
    hosts = GetHosts(hostfile)
514
  else:
515
    if "," in hostlist:
516
      hostlist = hostlist.rstrip(",")  # commandline robustness
517
      hosts = hostlist.split(",")
518
    else:
519
      hosts = [hostlist]
520

    
521
  successes = failures = 0
522

    
523
  filelist = auxfiles[:]
524
  filelist.append(executable)
525

    
526
  # initial batch
527
  batch = hosts[:batch_size]
528
  hosts = hosts[batch_size:]
529
  child_pids = {}
530
  for hostname in batch:
531
    LaunchWorker(child_pids, logdir, username, password, use_agent, hostname,
532
                 executable, command, filelist)
533

    
534
  while child_pids:
535
    pid, status = os.wait()
536
    hostname = child_pids.pop(pid, "<unknown host>")
537
    print "  %s: done (in parent)" % hostname
538
    if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0:
539
      successes += 1
540
    else:
541
      failures += 1
542
    if hosts:
543
      LaunchWorker(child_pids, logdir, username, password, use_agent,
544
                   hosts.pop(0), executable, command, filelist)
545

    
546
  print
547
  print "All done, %s successful and %s failed hosts" % (successes, failures)
548

    
549
  sys.exit(0)
550

    
551

    
552
if __name__ == "__main__":
553
  try:
554
    main()
555
  except KeyboardInterrupt:
556
    print "Received KeyboardInterrupt, aborting"
557
    sys.exit(1)