Revision da7e44ee

b/Makefile.am
324 324
	qa/qa_tags.py \
325 325
	qa/qa_utils.py
326 326

  
327
dist_sbin_SCRIPTS =
327
dist_sbin_SCRIPTS = \
328
	tools/ganeti-listrunner
328 329

  
329 330
nodist_sbin_SCRIPTS = \
330 331
	$(PYTHON_BOOTSTRAP) \
......
397 398
	man/ganeti.7 \
398 399
	man/ganeti-cleaner.8 \
399 400
	man/ganeti-confd.8 \
401
	man/ganeti-listrunner.8 \
400 402
	man/ganeti-masterd.8 \
401 403
	man/ganeti-noded.8 \
402 404
	man/ganeti-os-interface.7 \
b/man/ganeti-listrunner.rst
1
ganeti-listrunner(8) Ganeti | Version @GANETI_VERSION@
2
======================================================
3

  
4
NAME
5
----
6

  
7
ganeti-listrunner - Run commands in parallel over multiple machines
8

  
9

  
10
SYNOPSIS
11
--------
12

  
13
**ganeti-listrunner** ``-l`` *logdir*
14
{``-x`` *executable* | ``-c`` *shell-cmd*}
15
{``-f`` *hostfile* | ``-h`` *hostlist*}
16
[``-a`` *aux-file*]
17
[``-b`` *batch-size*]
18
[``-u`` *username*]
19
[``-A``]
20

  
21

  
22
DESCRIPTION
23
-----------
24

  
25
**ganeti-listrunner** is a tool to run commands in parallel over multiple
26
machines. It differs from ``dsh`` or other tools in that it asks for the
27
password once (if not using ``ssh-agent``) and then reuses the password to
28
connect to all machines, thus being easily usable even when public key
29
authentication or Kerberos authentication is not available.
30

  
31
It can run either a command or a script (which gets uploaded first and deleted
32
after execution) on a  list  of hosts provided either via a file (one host per
33
line) or as a comma-separated list on the commandline. The out‐ put (stdout and
34
stderr are merged) of the remote execution is written to a logfile. One logfile
35
per  host  is written.
36

  
37

  
38
OPTIONS
39
-------
40

  
41
The options that can be passed to the program are as follows:
42

  
43
``-l`` *logdir*
44
  The directory under which the logfiles files should be written.
45

  
46
``-x`` *executable*
47
  The executable to copy and run on the target hosts.
48

  
49
``-c`` *shell-cmd*
50
  The shell command to run on the remote hosts.
51

  
52
``-f`` *hostfile*
53
  The file with the target hosts, one hostname per line.
54

  
55
``-h`` *hostlist*
56
  Comma-separated list of target hosts.
57

  
58
``-a`` *aux-file*
59
  A file to copy to the target hosts. Can be given multiple times, in which case
60
  all files will be copied to the temporary directory. The executable or the
61
  shell command will be run from the (temporary) directory where these files
62
  have been copied.
63

  
64
``-b`` *batch-size*
65
  The host list will be split into batches of batch-size which will be processed
66
  in parallel. The default if 15, and should be increased if faster processing
67
  is needed.
68

  
69
``-u`` *username*
70
  Username to connect as instead of the default root username.
71

  
72
``-A``
73
  Use an existing ssh-agent instead of password authentication.
74

  
75

  
76
EXIT STATUS
77
-----------
78

  
79
The exist status of the command will be zero, unless it was aborted in some way
80
(e.g. ^C).
81

  
82

  
83
EXAMPLE
84
-------
85

  
86
Run a command on a list of hosts::
87

  
88
  listrunner -l logdir -c "uname -a" -h host1,host2,host3
89

  
90
Upload a script, some auxiliary files and run the script::
91

  
92
  listrunner -l logdir -x runme.sh -a seed.dat -a golden.dat -h host1,host2,host3
93

  
94

  
95
SEE ALSO
96
--------
97

  
98
dsh(1), cssh(1)
b/tools/ganeti-listrunner
1
#!/usr/bin/python
2
#
3

  
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21
"""Run an executable on a list of hosts.
22

  
23
Script to serially run an executable on a list of hosts via ssh
24
with password auth as root. If the provided log dir does not yet
25
exist, it will try to create it.
26

  
27
Implementation:
28
 - the main process spawns up to batch_size children, which:
29
 - connects to the remote host via ssh as root
30
 - uploads the executable with a random name to /tmp via sftp
31
 - chmod 500s it
32
 - via ssh: chdirs into the upload directory and runs the script
33
 - deletes it
34
 - writes status messages and all output to one logfile per host
35
 - the main process gathers then the status of the children and
36
   reports the success/failure ratio
37
 - entire script can be aborted with Ctrl-C
38

  
39
Security considerations:
40
 - the root password for the remote hosts is stored in memory for the
41
   runtime of the script
42
 - the executable to be run on the remote host is handled the following way:
43
   - try to create a random directory with permissions 700 on the
44
     remote host, abort furter processing on this host if this failes
45
   - upload the executable with to a random filename in that directory
46
   - set executable permissions to 500
47
   - run the executable
48
   - delete the execuable and the directory on the remote host
49

  
50
"""
51

  
52
# pylint: disable-msg=C0103
53
# C0103: Invalid name ganeti-listrunner
54

  
55
import errno
56
import getopt
57
import getpass
58
import logging
59
import os
60
import random
61
import select
62
import socket
63
import sys
64
import time
65
import traceback
66

  
67
import paramiko
68

  
69

  
70
REMOTE_PATH_BASE = "/tmp/listrunner"
71

  
72

  
73
def LogDirUseable(logdir):
74
  """Ensure log file directory is available and usable."""
75
  testfile = "%s/test-%s-%s.deleteme" % (logdir, random.random(),
76
                                         random.random())
77
  try:
78
    os.mkdir(logdir)
79
  except OSError, err:
80
    if err.errno != errno.EEXIST:
81
      raise
82
  try:
83
    logtest = open(testfile, "aw")
84
    logtest.writelines("log file writeability test\n")
85
    logtest.close()
86
    os.unlink(testfile)
87
    return True
88
  except (OSError, IOError):
89
    return False
90

  
91

  
92
def ShowHelp(executable):
93
  """Print short usage information."""
94
  print ("usage: %s -l logdir [-c|-x] value [-b batch_size]"
95
         " [-f hostfile|-h hosts] [-u username]"
96
         " [-p password_file]" % executable)
97
  print """        -l logdir to write logfiles to
98
        -x executable to run on remote host(s)
99
        -c shell command to run on remote host(s)
100
        -f hostlist file (one host per line)
101
        -a optional auxiliary file to upload (can be given multiple times)
102
        -b batch-size, how many hosts to process in parallel [15]
103
        -h comma-separated list of hosts or single hostname
104
        -u username used to connect [root]
105
        -p password used to authenticate"""
106

  
107

  
108
def GetTimeStamp(timestamp=None):
109
  """Return ISO8601 timestamp.
110

  
111
  Returns ISO8601 timestamp, optionally expects a time.localtime() tuple
112
  in timestamp, but will use the current time if this argument is not
113
  supplied.
114
  """
115
  if timestamp is None:
116
    timestamp = time.localtime()
117

  
118
  isotime = time.strftime("%Y-%m-%dT%H:%M:%S", timestamp)
119
  return isotime
120

  
121

  
122
def PingByTcp(target, port, timeout=10, live_port_needed=False, source=None):
123
  """Simple ping implementation using TCP connect(2).
124

  
125
  Try to do a TCP connect(2) from an optional source IP to the
126
  specified target IP and the specified target port. If the optional
127
  parameter live_port_needed is set to true, requires the remote end
128
  to accept the connection. The timeout is specified in seconds and
129
  defaults to 10 seconds. If the source optional argument is not
130
  passed, the source address selection is left to the kernel,
131
  otherwise we try to connect using the passed address (failures to
132
  bind other than EADDRNOTAVAIL will be ignored).
133

  
134
  """
135
  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
136

  
137
  success = False
138

  
139
  if source is not None:
140
    try:
141
      sock.bind((source, 0))
142
    except socket.error, (errcode):
143
      if errcode == errno.EADDRNOTAVAIL:
144
        success = False
145

  
146
  sock.settimeout(timeout)
147

  
148
  try:
149
    sock.connect((target, port))
150
    sock.close()
151
    success = True
152
  except socket.timeout:
153
    success = False
154
  except socket.error, (errcode):
155
    success = (not live_port_needed) and (errcode == errno.ECONNREFUSED)
156

  
157
  return success
158

  
159

  
160
def GetHosts(hostsfile):
161
  """Return list of hosts from hostfile.
162

  
163
  Reads the hostslist file and returns a list of hosts.
164
  Expects the hostslist file to contain one hostname per line.
165

  
166
  """
167
  try:
168
    datafile = open(hostsfile, "r")
169
  except IOError, msg:
170
    print "Failed to open hosts file %s: %s" % (hostsfile, msg)
171
    sys.exit(2)
172

  
173
  hosts = datafile.readlines()
174
  datafile.close()
175

  
176
  return hosts
177

  
178

  
179
def WriteLog(message, logfile):
180
  """Writes message, terminated by newline, to logfile."""
181
  try:
182
    logfile = open(logfile, "aw")
183
  except IOError, msg:
184
    print "failed to open log file %s: %s" % (logfile, msg)
185
    print "log message was: %s" % message
186
    sys.exit(1)  # no being able to log is critical
187
  try:
188
    timestamp = GetTimeStamp()
189
    logfile.writelines("%s %s\n" % (timestamp, message))
190
    logfile.close()
191
  except IOError, msg:
192
    print "failed to write to logfile %s: %s" % (logfile, msg)
193
    print "log message was: %s" % message
194
    sys.exit(1)  # no being able to log is critical
195

  
196

  
197
def GetAgentKeys():
198
  """Tries to get a list of ssh keys from an agent."""
199
  try:
200
    agent = paramiko.Agent()
201
    return list(agent.get_keys())
202
  except paramiko.SSHException:
203
    return []
204

  
205

  
206
def SetupSshConnection(host, username, password, keys, logfile):
207
  """Setup the ssh connection used for all later steps.
208

  
209
  This function sets up the ssh connection that will be used both
210
  for upload and remote command execution.
211

  
212
  On success, it will return paramiko.Transport object with an
213
  already logged in session. On failure, False will be returned.
214

  
215
  """
216
  # check if target is willing to talk to us at all
217
  if not PingByTcp(host, 22, live_port_needed=True):
218
    WriteLog("ERROR: FAILURE_NOT_REACHABLE", logfile)
219
    print "  - ERROR: host not reachable on 22/tcp"
220
    return False
221

  
222
  all_kwargs = [{"pkey": k} for k in keys]
223
  all_desc = ["key %d" % d for d in range(len(keys))]
224
  if password is not None:
225
    all_kwargs.append({"password": password})
226
    all_desc.append("password")
227

  
228
  # deal with logging out of paramiko.transport
229
  handler = None
230

  
231
  for desc, kwargs in zip(all_desc, all_kwargs):
232
    try:
233
      transport = paramiko.Transport((host, 22))
234

  
235
      # only try to setup the logging handler once
236
      if not handler:
237
        handler = logging.StreamHandler()
238
        handler.setLevel(logging.ERROR)
239
        log = logging.getLogger(transport.get_log_channel())
240
        log.addHandler(handler)
241

  
242
      transport.connect(username=username, **kwargs) # pylint: disable-msg=W0142
243
      WriteLog("ssh connection established using %s" % desc, logfile)
244
      # strange ... when establishing the session and the immediately
245
      # setting up the channels for sftp & shell from that, it sometimes
246
      # fails, but waiting 1 second after session setup makes it always work
247
      # time.sleep(1)
248
      # FIXME apparently needfull to give sshd some time
249
      return transport
250
    except (socket.gaierror, socket.error, paramiko.SSHException):
251
      continue
252

  
253
  methods = ", ".join(all_desc)
254
  WriteLog("ERROR: FAILURE_CONNECTION_SETUP (tried %s) " % methods, logfile)
255
  WriteLog("aborted", logfile)
256
  print "  - ERROR: connection setup failed (tried %s)" % methods
257

  
258
  return False
259

  
260

  
261
def UploadFiles(connection, executable, filelist, logfile):
262
  """Uploads the specified files via sftp.
263

  
264
  Uploads the specified files to a random, freshly created directory with
265
  a temporary name under /tmp. All uploaded files are chmod 0400 after upload
266
  with the exception of executable, with is chmod 500.
267

  
268
  Upon success, returns the absolute path to the remote upload directory,
269
  but will return False upon failure.
270
  """
271
  remote_dir = "%s.%s-%s" % (REMOTE_PATH_BASE,
272
                             random.random(), random.random())
273

  
274
  try:
275
    sftp = paramiko.SFTPClient.from_transport(connection)
276
    sftp.mkdir(remote_dir, mode=0700)
277
    for item in filelist:
278
      remote_file = "%s/%s" % (remote_dir, item.split("/").pop())
279
      WriteLog("uploading %s to remote %s" % (item, remote_file), logfile)
280
      sftp.put(item, remote_file)
281
      if item == executable:
282
        sftp.chmod(remote_file, 0500)
283
      else:
284
        sftp.chmod(remote_file, 0400)
285
    sftp.close()
286
  except IOError, err:
287
    WriteLog("ERROR: FAILURE_UPLOAD: %s" % err, logfile)
288
    return False
289

  
290
  return remote_dir
291

  
292

  
293
def CleanupRemoteDir(connection, upload_dir, filelist, logfile):
294
  """Cleanes out and removes the remote work directory."""
295
  try:
296
    sftp = paramiko.SFTPClient.from_transport(connection)
297
    for item in filelist:
298
      fullpath = "%s/%s" % (upload_dir, item.split("/").pop())
299
      WriteLog("removing remote %s" % fullpath, logfile)
300
      sftp.remove(fullpath)
301
    sftp.rmdir(upload_dir)
302
    sftp.close()
303
  except IOError, err:
304
    WriteLog("ERROR: FAILURE_CLEANUP: %s" % err, logfile)
305
    return False
306

  
307
  return True
308

  
309

  
310
def RunRemoteCommand(connection, command, logfile):
311
  """Execute the command via ssh on the remote host."""
312
  session = connection.open_session()
313
  session.setblocking(0)
314

  
315
  # the following dance is needed because paramiko changed APIs:
316
  # from returning True/False for success to always returning None
317
  # and throwing an exception in case of problems.
318
  # And I want to support both the old and the new API.
319
  result = True  # being optimistic here, I know
320
  message = None
321
  try:
322
    if session.exec_command("%s 2>&1" % command) is False:
323
      result = False
324
  except paramiko.SSHException, message:
325
    result = False
326

  
327
  if not result:
328
    WriteLog("ERROR: FAILURE_COMMAND_EXECUTION: %s" % message, logfile)
329
    return False
330

  
331
   ### Read when data is available
332
  output = ""
333
  while select.select([session], [], []):
334
    data = session.recv(1024)
335
    if not data:
336
      break
337
    output += data
338
    select.select([], [], [], .1)
339

  
340
  WriteLog("SUCCESS: command output follows", logfile)
341
  for line in output.split("\n"):
342
    WriteLog("output = %s" %line, logfile)
343
  WriteLog("command execution completed", logfile)
344
  session.close()
345

  
346
  return True
347

  
348

  
349
def HostWorker(logdir, username, password, keys, hostname,
350
               executable, command, filelist):
351
  """Per-host worker.
352

  
353
  This function does not return - it's the main code of the childs,
354
  which exit at the end of this function. The exit code 0 or 1 will be
355
  interpreted by the parent.
356

  
357
  Args:
358
    logdir: the directory where the logfiles must be created
359
    username: SSH username
360
    password: SSH password
361
    keys: SSH keys
362
    hostname: the hostname to connect to
363
    executable: the executable to upload, if not None
364
    command: the command to run
365
    filelist: auxiliary files to upload
366

  
367
  """
368
  # in the child/worker process
369
  logfile = "%s/%s.log" % (logdir, hostname)
370
  print "%s - starting" % hostname
371
  result = 0  # optimism, I know
372
  try:
373
    connection = SetupSshConnection(hostname, username,
374
                                    password, keys, logfile)
375
    if connection is not False:
376
      if executable is not None:
377
        print "  %s: uploading files" % hostname
378
        upload_dir = UploadFiles(connection, executable,
379
                                 filelist, logfile)
380
        command = "cd %s && ./%s" % (upload_dir,
381
                                     executable.split("/").pop())
382
      print "  %s: executing remote command" % hostname
383
      cmd_result = RunRemoteCommand(connection, command, logfile)
384
      if cmd_result is True:
385
        print "  %s: remote command execution successful" % hostname
386
      else:
387
        print ("  %s: remote command execution failed,"
388
               " check log for details" % hostname)
389
        result = 1
390
      if executable is not None:
391
        print "  %s: cleaning up remote work dir" % hostname
392
        cln_result = CleanupRemoteDir(connection, upload_dir,
393
                                      filelist, logfile)
394
        if cln_result is False:
395
          print ("  %s: remote work dir cleanup failed, check"
396
                 " log for details" % hostname)
397
          result = 1
398
      connection.close()
399
    else:
400
      print "  %s: connection setup failed, skipping" % hostname
401
      result = 1
402
  except KeyboardInterrupt:
403
    print "  %s: received KeyboardInterrupt, aborting" % hostname
404
    WriteLog("ERROR: ABORT_KEYBOARD_INTERRUPT", logfile)
405
    result = 1
406
  except Exception, err:
407
    result = 1
408
    trace = traceback.format_exc()
409
    msg = "ERROR: UNHANDLED_EXECPTION_ERROR: %s\nTrace: %s" % (err, trace)
410
    WriteLog(msg, logfile)
411
    print "  %s: %s" % (hostname, msg)
412
  # and exit with exit code 0 or 1, so the parent can compute statistics
413
  sys.exit(result)
414

  
415

  
416
def LaunchWorker(child_pids, logdir, username, password, keys, hostname,
417
                 executable, command, filelist):
418
  """Launch the per-host worker.
419

  
420
  Arguments are the same as for HostWorker, except for child_pids,
421
  which is a dictionary holding the pid-to-hostname mapping.
422

  
423
  """
424
  hostname = hostname.rstrip("\n")
425
  pid = os.fork()
426
  if pid > 0:
427
    # controller just record the pids
428
    child_pids[pid] = hostname
429
  else:
430
    HostWorker(logdir, username, password, keys, hostname,
431
               executable, command, filelist)
432

  
433

  
434
def main():
435
  """main."""
436
  try:
437
    optlist, _ = getopt.getopt(sys.argv[1:], "l:x:h:f:a:c:b:u:p:A")
438
  except getopt.GetoptError, err:
439
    print str(err)
440
    ShowHelp(sys.argv[0])
441
    sys.exit(2)
442

  
443
  logdir = executable = hostfile = hostlist = command = None
444
  use_agent = False
445
  auxfiles = []
446
  username = "root"
447
  password = None
448
  batch_size = 15
449
  for option in optlist:
450
    if option[0] == "-l":
451
      logdir = option[1]
452
    if option[0] == "-x":
453
      executable = option[1]
454
    if option[0] == "-f":
455
      hostfile = option[1]
456
    if option[0] == "-h":
457
      hostlist = option[1]
458
    if option[0] == "-a":
459
      auxfiles.append(option[1])
460
    if option[0] == "-c":
461
      command = option[1]
462
    if option[0] == "-b":
463
      batch_size = int(option[1])
464
    if option[0] == "-u":
465
      username = option[1]
466
    if option[0] == "-p":
467
      password = option[1]
468
    if option[0] == "-A":
469
      use_agent = True
470

  
471
  if not (logdir and (executable or command) and (hostfile or hostlist)):
472
    print "error: missing required commandline argument(s)"
473
    ShowHelp(sys.argv[0])
474
    sys.exit(3)
475

  
476
  if executable and command:
477
    print "error: can run either a command or an executable, not both"
478
    ShowHelp(sys.argv[0])
479
    sys.exit(3)
480

  
481
  if hostlist and hostfile:
482
    print "error: specify either -f or -h arguments, not both"
483
    ShowHelp(sys.argv[0])
484
    sys.exit(3)
485

  
486
  ### Unbuffered sys.stdout
487
  sys.stdout = os.fdopen(1, "w", 0)
488

  
489
  if LogDirUseable(logdir) is False:
490
    print "ERROR: cannot create logfiles in dir %s, aborting" % logdir
491
    sys.exit(1)
492

  
493
  keys = []
494
  if use_agent:
495
    keys = GetAgentKeys()
496
  elif password:
497
    try:
498
      fh = file(password)
499
      pwvalue = fh.readline().strip()
500
      fh.close()
501
    except IOError, e:
502
      print "error: can not read in from password file %s: %s" % (password, e)
503
      sys.exit(1)
504
    password = pwvalue
505
  else:
506
    password = getpass.getpass("%s's password for all nodes: " % username)
507

  
508
  if hostfile:
509
    hosts = GetHosts(hostfile)
510
  else:
511
    if "," in hostlist:
512
      hostlist = hostlist.rstrip(",")  # commandline robustness
513
      hosts = hostlist.split(",")
514
    else:
515
      hosts = [hostlist]
516

  
517
  successes = failures = 0
518

  
519
  filelist = auxfiles[:]
520
  filelist.append(executable)
521

  
522
  # initial batch
523
  batch = hosts[:batch_size]
524
  hosts = hosts[batch_size:]
525
  child_pids = {}
526
  for hostname in batch:
527
    LaunchWorker(child_pids, logdir, username, password, keys, hostname,
528
                 executable, command, filelist)
529

  
530
  while child_pids:
531
    pid, status = os.wait()
532
    hostname = child_pids.pop(pid, "<unknown host>")
533
    print "  %s: done (in parent)" % hostname
534
    if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0:
535
      successes += 1
536
    else:
537
      failures += 1
538
    if hosts:
539
      LaunchWorker(child_pids, logdir, username, password, keys,
540
                   hosts.pop(0), executable, command, filelist)
541

  
542
  print
543
  print "All done, %s successful and %s failed hosts" % (successes, failures)
544

  
545
  sys.exit(0)
546

  
547

  
548
if __name__ == "__main__":
549
  try:
550
    main()
551
  except KeyboardInterrupt:
552
    print "Received KeyboardInterrupt, aborting"
553
    sys.exit(1)

Also available in: Unified diff