Revision 043f2292 daemons/import-export

b/daemons/import-export
83 83
#: Give child process up to 5 seconds to exit after sending a signal
84 84
CHILD_LINGER_TIMEOUT = 5.0
85 85

  
86
#: How long to wait for a connection to be established
87
DEFAULT_CONNECT_TIMEOUT = 60
88

  
86 89
# Common options for socat
87 90
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
88 91
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
89
SOCAT_CONNECT_TIMEOUT = 60
90 92

  
91 93

  
92 94
# Global variable for options
......
185 187
    """
186 188
    self._data.connected = True
187 189

  
190
  def GetConnected(self):
191
    """Determines whether the daemon is connected.
192

  
193
    """
194
    return self._data.connected
195

  
188 196
  def SetExitStatus(self, exit_status, error_message):
189 197
    """Sets the exit status and an error message.
190 198

  
......
237 245
        status_file.SetListenPort(port)
238 246
        return True
239 247

  
240
    m = TRANSFER_LOOP_RE.match(msg)
241
    if m:
242
      status_file.SetConnected()
243
      return True
248
    if not status_file.GetConnected():
249
      m = TRANSFER_LOOP_RE.match(msg)
250
      if m:
251
        status_file.SetConnected()
252
        return True
244 253

  
245 254
  return False
246 255

  
......
311 320
    addr1 = [
312 321
      "OPENSSL-LISTEN:%s" % port,
313 322
      "reuseaddr",
323

  
324
      # Retry to listen if connection wasn't established successfully, up to
325
      # 100 times a second. Note that this still leaves room for DoS attacks.
326
      "forever",
327
      "intervall=0.01",
314 328
      ] + common_addr_opts
315 329
    addr2 = ["stdout"]
316 330

  
......
318 332
    addr1 = ["stdin"]
319 333
    addr2 = [
320 334
      "OPENSSL:%s:%s" % (options.host, options.port),
321
      "connect-timeout=%s" % SOCAT_CONNECT_TIMEOUT,
335

  
336
      # How long to wait per connection attempt
337
      "connect-timeout=%s" % options.connect_timeout,
338

  
339
      # Retry a few times before giving up to connect (once per second)
340
      "retry=%s" % options.connect_retries,
341
      "intervall=1",
322 342
      ] + common_addr_opts
323 343

  
324 344
  else:
......
371 391
    raise Error("Invalid mode")
372 392

  
373 393
  # TODO: Use "dd" to measure processed data (allows to give an ETA)
374
  # TODO: If a connection to socat is dropped (e.g. due to a wrong
375
  # certificate), socat should be restarted
376 394

  
377 395
  # TODO: Run transport as separate user
378 396
  # The transport uses its own shell to simplify running it as a separate user
......
400 418

  
401 419

  
402 420
def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
403
                   signal_notify, signal_handler):
421
                   signal_notify, signal_handler, mode):
404 422
  """Handles the child processes' output.
405 423

  
406 424
  """
......
429 447
        utils.SetNonblockFlag(fd, True)
430 448
        poller.register(fd, select.POLLIN)
431 449

  
432
      timeout_calculator = None
450
      if options.connect_timeout and mode == constants.IEM_IMPORT:
451
        listen_timeout = locking.RunningTimeout(options.connect_timeout, True)
452
      else:
453
        listen_timeout = None
454

  
455
      exit_timeout = None
456

  
433 457
      while True:
434 458
        # Break out of loop if only signal notify FD is left
435 459
        if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
436 460
          break
437 461

  
438
        if timeout_calculator:
439
          timeout = timeout_calculator.Remaining() * 1000
462
        timeout = None
463

  
464
        if listen_timeout and not exit_timeout:
465
          if status_file.GetConnected():
466
            listen_timeout = None
467
          elif listen_timeout.Remaining() < 0:
468
            logging.info("Child process didn't establish connection in time")
469
            child.Kill(signal.SIGTERM)
470
            exit_timeout = \
471
              locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
472
            # Next block will calculate timeout
473
          else:
474
            # Not yet connected, check again in a second
475
            timeout = 1000
476

  
477
        if exit_timeout:
478
          timeout = exit_timeout.Remaining() * 1000
440 479
          if timeout < 0:
441 480
            logging.info("Child process didn't exit in time")
442 481
            break
443
        else:
444
          timeout = None
445 482

  
446 483
        for fd, event in utils.RetryOnSignal(poller.poll, timeout):
447 484
          if event & (select.POLLIN | event & select.POLLPRI):
......
456 493
                # Signal handling
457 494
                if signal_handler.called:
458 495
                  signal_handler.Clear()
459
                  if timeout_calculator:
496
                  if exit_timeout:
460 497
                    logging.info("Child process still has about %0.2f seconds"
461
                                 " to exit", timeout_calculator.Remaining())
498
                                 " to exit", exit_timeout.Remaining())
462 499
                  else:
463 500
                    logging.info("Giving child process %0.2f seconds to exit",
464 501
                                 CHILD_LINGER_TIMEOUT)
465
                    timeout_calculator = \
502
                    exit_timeout = \
466 503
                      locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
467 504
            else:
468 505
              poller.unregister(fd)
......
478 515

  
479 516
      # If there was a timeout calculator, we were waiting for the child to
480 517
      # finish, e.g. due to a signal
481
      return not bool(timeout_calculator)
518
      return not bool(exit_timeout)
482 519
    finally:
483 520
      socat_stderr_lines.close()
484 521
  finally:
......
510 547
                    help="Remote hostname")
511 548
  parser.add_option("--port", dest="port", action="store", type="int",
512 549
                    help="Remote port")
550
  parser.add_option("--connect-retries", dest="connect_retries", action="store",
551
                    type="int", default=0,
552
                    help=("How many times the connection should be retried"
553
                          " (export only)"))
554
  parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
555
                    type="int", default=DEFAULT_CONNECT_TIMEOUT,
556
                    help="Timeout for connection to be established (seconds)")
513 557
  parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
514 558
                    type="string", help="Command prefix")
515 559
  parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
......
635 679
            utils.RetryOnSignal(os.close, socat_stderr_write_fd)
636 680

  
637 681
            if ProcessChildIO(child, socat_stderr_read_fd, status_file,
638
                              child_logger, signal_wakeup,
639
                              signal_handler):
682
                              child_logger, signal_wakeup, signal_handler,
683
                              mode):
640 684
              # The child closed all its file descriptors and there was no
641 685
              # signal
642 686
              # TODO: Implement timeout instead of waiting indefinitely

Also available in: Unified diff