Revision 1e915b86

b/daemons/import-export
291 291
  status_file.Update(force_update)
292 292

  
293 293

  
294
def GetBashCommand(cmd):
295
  """Prepares a command to be run in Bash.
294
class CommandBuilder(object):
295
  def __init__(self, mode, opts, socat_stderr_fd):
296
    """Initializes this class.
296 297

  
297
  """
298
  return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
298
    @param mode: Daemon mode (import or export)
299
    @param opts: Options object
300
    @type socat_stderr_fd: int
301
    @param socat_stderr_fd: File descriptor socat should write its stderr to
299 302

  
303
    """
304
    self._opts = opts
305
    self._mode = mode
306
    self._socat_stderr_fd = socat_stderr_fd
300 307

  
301
def GetSocatCommand(mode):
302
  """Returns the socat command.
308
  @staticmethod
309
  def GetBashCommand(cmd):
310
    """Prepares a command to be run in Bash.
303 311

  
304
  """
305
  common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
306
    "key=%s" % options.key,
307
    "cert=%s" % options.cert,
308
    "cafile=%s" % options.ca,
309
    ]
310

  
311
  if options.bind is not None:
312
    common_addr_opts.append("bind=%s" % options.bind)
313

  
314
  if mode == constants.IEM_IMPORT:
315
    if options.port is None:
316
      port = 0
317
    else:
318
      port = options.port
312
    """
313
    return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
319 314

  
320
    addr1 = [
321
      "OPENSSL-LISTEN:%s" % port,
322
      "reuseaddr",
315
  def _GetSocatCommand(self):
316
    """Returns the socat command.
323 317

  
324
      # Retry to listen if connection wasn't established successfully, up to
325
      # 100 times a second. Note that this still leaves room for DoS attacks.
326
      "forever",
327
      "intervall=0.01",
328
      ] + common_addr_opts
329
    addr2 = ["stdout"]
318
    """
319
    common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
320
      "key=%s" % self._opts.key,
321
      "cert=%s" % self._opts.cert,
322
      "cafile=%s" % self._opts.ca,
323
      ]
324

  
325
    if self._opts.bind is not None:
326
      common_addr_opts.append("bind=%s" % self._opts.bind)
327

  
328
    if self._mode == constants.IEM_IMPORT:
329
      if self._opts.port is None:
330
        port = 0
331
      else:
332
        port = self._opts.port
330 333

  
331
  elif mode == constants.IEM_EXPORT:
332
    addr1 = ["stdin"]
333
    addr2 = [
334
      "OPENSSL:%s:%s" % (options.host, options.port),
334
      addr1 = [
335
        "OPENSSL-LISTEN:%s" % port,
336
        "reuseaddr",
335 337

  
336
      # How long to wait per connection attempt
337
      "connect-timeout=%s" % options.connect_timeout,
338
        # Retry to listen if connection wasn't established successfully, up to
339
        # 100 times a second. Note that this still leaves room for DoS attacks.
340
        "forever",
341
        "intervall=0.01",
342
        ] + common_addr_opts
343
      addr2 = ["stdout"]
338 344

  
339
      # Retry a few times before giving up to connect (once per second)
340
      "retry=%s" % options.connect_retries,
341
      "intervall=1",
342
      ] + common_addr_opts
345
    elif self._mode == constants.IEM_EXPORT:
346
      addr1 = ["stdin"]
347
      addr2 = [
348
        "OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
343 349

  
344
  else:
345
    raise Error("Invalid mode")
350
        # How long to wait per connection attempt
351
        "connect-timeout=%s" % self._opts.connect_timeout,
346 352

  
347
  for i in [addr1, addr2]:
348
    for value in i:
349
      if "," in value:
350
        raise Error("Comma not allowed in socat option value: %r" % value)
353
        # Retry a few times before giving up to connect (once per second)
354
        "retry=%s" % self._opts.connect_retries,
355
        "intervall=1",
356
        ] + common_addr_opts
351 357

  
352
  return [
353
    constants.SOCAT_PATH,
358
    else:
359
      raise Error("Invalid mode '%s'" % self._mode)
354 360

  
355
    # Log to stderr
356
    "-ls",
361
    for i in [addr1, addr2]:
362
      for value in i:
363
        if "," in value:
364
          raise Error("Comma not allowed in socat option value: %r" % value)
357 365

  
358
    # Log level
359
    "-d", "-d",
366
    return [
367
      constants.SOCAT_PATH,
360 368

  
361
    # Buffer size
362
    "-b%s" % SOCAT_BUFSIZE,
369
      # Log to stderr
370
      "-ls",
363 371

  
364
    # Unidirectional mode, the first address is only used for reading, and the
365
    # second address is only used for writing
366
    "-u",
372
      # Log level
373
      "-d", "-d",
367 374

  
368
    ",".join(addr1), ",".join(addr2)
369
    ]
375
      # Buffer size
376
      "-b%s" % SOCAT_BUFSIZE,
370 377

  
378
      # Unidirectional mode, the first address is only used for reading, and the
379
      # second address is only used for writing
380
      "-u",
371 381

  
372
def GetTransportCommand(mode, socat_stderr_fd):
373
  """Returns the command for the transport part of the daemon.
382
      ",".join(addr1), ",".join(addr2)
383
      ]
374 384

  
375
  @param mode: Daemon mode (import or export)
376
  @type socat_stderr_fd: int
377
  @param socat_stderr_fd: File descriptor socat should write its stderr to
385
  def _GetTransportCommand(self):
386
    """Returns the command for the transport part of the daemon.
378 387

  
379
  """
380
  socat_cmd = ("%s 2>&%d" %
381
               (utils.ShellQuoteArgs(GetSocatCommand(mode)),
382
                socat_stderr_fd))
388
    """
389
    socat_cmd = ("%s 2>&%d" %
390
                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
391
                  self._socat_stderr_fd))
383 392

  
384
  compr = options.compress
393
    compr = self._opts.compress
385 394

  
386
  assert compr in constants.IEC_ALL
395
    assert compr in constants.IEC_ALL
387 396

  
388
  if mode == constants.IEM_IMPORT:
389
    if compr == constants.IEC_GZIP:
390
      transport_cmd = "%s | gunzip -c" % socat_cmd
391
    else:
392
      transport_cmd = socat_cmd
393
  elif mode == constants.IEM_EXPORT:
394
    if compr == constants.IEC_GZIP:
395
      transport_cmd = "gzip -c | %s" % socat_cmd
397
    if self._mode == constants.IEM_IMPORT:
398
      if compr == constants.IEC_GZIP:
399
        transport_cmd = "%s | gunzip -c" % socat_cmd
400
      else:
401
        transport_cmd = socat_cmd
402
    elif self._mode == constants.IEM_EXPORT:
403
      if compr == constants.IEC_GZIP:
404
        transport_cmd = "gzip -c | %s" % socat_cmd
405
      else:
406
        transport_cmd = socat_cmd
396 407
    else:
397
      transport_cmd = socat_cmd
398
  else:
399
    raise Error("Invalid mode")
408
      raise Error("Invalid mode '%s'" % self._mode)
400 409

  
401
  # TODO: Use "dd" to measure processed data (allows to give an ETA)
410
    # TODO: Use "dd" to measure processed data (allows to give an ETA)
402 411

  
403
  # TODO: Run transport as separate user
404
  # The transport uses its own shell to simplify running it as a separate user
405
  # in the future.
406
  return GetBashCommand(transport_cmd)
412
    # TODO: Run transport as separate user
413
    # The transport uses its own shell to simplify running it as a separate user
414
    # in the future.
415
    return self.GetBashCommand(transport_cmd)
407 416

  
417
  def GetCommand(self):
418
    """Returns the complete child process command.
408 419

  
409
def GetCommand(mode, socat_stderr_fd):
410
  """Returns the complete child process command.
420
    """
421
    transport_cmd = self._GetTransportCommand()
411 422

  
412
  """
413
  buf = StringIO()
423
    buf = StringIO()
414 424

  
415
  if options.cmd_prefix:
416
    buf.write(options.cmd_prefix)
417
    buf.write(" ")
425
    if self._opts.cmd_prefix:
426
      buf.write(self._opts.cmd_prefix)
427
      buf.write(" ")
418 428

  
419
  buf.write(utils.ShellQuoteArgs(GetTransportCommand(mode, socat_stderr_fd)))
429
    buf.write(utils.ShellQuoteArgs(transport_cmd))
420 430

  
421
  if options.cmd_suffix:
422
    buf.write(" ")
423
    buf.write(options.cmd_suffix)
431
    if self._opts.cmd_suffix:
432
      buf.write(" ")
433
      buf.write(self._opts.cmd_suffix)
424 434

  
425
  return GetBashCommand(buf.getvalue())
435
    return self.GetBashCommand(buf.getvalue())
426 436

  
427 437

  
428 438
def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
......
663 673
      (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
664 674

  
665 675
      # Get child process command
666
      cmd = GetCommand(mode, socat_stderr_write_fd)
676
      cmd = CommandBuilder(mode, options, socat_stderr_write_fd).GetCommand()
667 677

  
668 678
      logging.debug("Starting command %r", cmd)
669 679

  

Also available in: Unified diff