Retry connection in import-export daemon
authorMichael Hanselmann <hansmi@google.com>
Fri, 16 Apr 2010 12:20:36 +0000 (14:20 +0200)
committerMichael Hanselmann <hansmi@google.com>
Tue, 27 Apr 2010 14:22:18 +0000 (16:22 +0200)
Until now, exactly one attempt was made to establish a connection.
If it failed, the whole import/export for the disk in question
aborted. Retrying will make it more reliable.

Unfortunately the listening part can't be made completely
resiliant against DoS attacks without larger and complex changes
to the daemon and using socat's EXEC: address combined with
forking.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: RenĂ© Nussbaumer <rn@google.com>

daemons/import-export
test/import-export_unittest.bash

index 08674e7..a3b778c 100755 (executable)
@@ -83,10 +83,12 @@ MIN_UPDATE_INTERVAL = 5.0
 #: Give child process up to 5 seconds to exit after sending a signal
 CHILD_LINGER_TIMEOUT = 5.0
 
+#: How long to wait for a connection to be established
+DEFAULT_CONNECT_TIMEOUT = 60
+
 # Common options for socat
 SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
 SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
-SOCAT_CONNECT_TIMEOUT = 60
 
 
 # Global variable for options
@@ -185,6 +187,12 @@ class StatusFile:
     """
     self._data.connected = True
 
+  def GetConnected(self):
+    """Determines whether the daemon is connected.
+
+    """
+    return self._data.connected
+
   def SetExitStatus(self, exit_status, error_message):
     """Sets the exit status and an error message.
 
@@ -237,10 +245,11 @@ def _ProcessSocatOutput(status_file, level, msg):
         status_file.SetListenPort(port)
         return True
 
-    m = TRANSFER_LOOP_RE.match(msg)
-    if m:
-      status_file.SetConnected()
-      return True
+    if not status_file.GetConnected():
+      m = TRANSFER_LOOP_RE.match(msg)
+      if m:
+        status_file.SetConnected()
+        return True
 
   return False
 
@@ -311,6 +320,11 @@ def GetSocatCommand(mode):
     addr1 = [
       "OPENSSL-LISTEN:%s" % port,
       "reuseaddr",
+
+      # Retry to listen if connection wasn't established successfully, up to
+      # 100 times a second. Note that this still leaves room for DoS attacks.
+      "forever",
+      "intervall=0.01",
       ] + common_addr_opts
     addr2 = ["stdout"]
 
@@ -318,7 +332,13 @@ def GetSocatCommand(mode):
     addr1 = ["stdin"]
     addr2 = [
       "OPENSSL:%s:%s" % (options.host, options.port),
-      "connect-timeout=%s" % SOCAT_CONNECT_TIMEOUT,
+
+      # How long to wait per connection attempt
+      "connect-timeout=%s" % options.connect_timeout,
+
+      # Retry a few times before giving up to connect (once per second)
+      "retry=%s" % options.connect_retries,
+      "intervall=1",
       ] + common_addr_opts
 
   else:
@@ -371,8 +391,6 @@ def GetTransportCommand(mode, socat_stderr_fd):
     raise Error("Invalid mode")
 
   # TODO: Use "dd" to measure processed data (allows to give an ETA)
-  # TODO: If a connection to socat is dropped (e.g. due to a wrong
-  # certificate), socat should be restarted
 
   # TODO: Run transport as separate user
   # The transport uses its own shell to simplify running it as a separate user
@@ -400,7 +418,7 @@ def GetCommand(mode, socat_stderr_fd):
 
 
 def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
-                   signal_notify, signal_handler):
+                   signal_notify, signal_handler, mode):
   """Handles the child processes' output.
 
   """
@@ -429,19 +447,38 @@ def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
         utils.SetNonblockFlag(fd, True)
         poller.register(fd, select.POLLIN)
 
-      timeout_calculator = None
+      if options.connect_timeout and mode == constants.IEM_IMPORT:
+        listen_timeout = locking.RunningTimeout(options.connect_timeout, True)
+      else:
+        listen_timeout = None
+
+      exit_timeout = None
+
       while True:
         # Break out of loop if only signal notify FD is left
         if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
           break
 
-        if timeout_calculator:
-          timeout = timeout_calculator.Remaining() * 1000
+        timeout = None
+
+        if listen_timeout and not exit_timeout:
+          if status_file.GetConnected():
+            listen_timeout = None
+          elif listen_timeout.Remaining() < 0:
+            logging.info("Child process didn't establish connection in time")
+            child.Kill(signal.SIGTERM)
+            exit_timeout = \
+              locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
+            # Next block will calculate timeout
+          else:
+            # Not yet connected, check again in a second
+            timeout = 1000
+
+        if exit_timeout:
+          timeout = exit_timeout.Remaining() * 1000
           if timeout < 0:
             logging.info("Child process didn't exit in time")
             break
-        else:
-          timeout = None
 
         for fd, event in utils.RetryOnSignal(poller.poll, timeout):
           if event & (select.POLLIN | event & select.POLLPRI):
@@ -456,13 +493,13 @@ def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
                 # Signal handling
                 if signal_handler.called:
                   signal_handler.Clear()
-                  if timeout_calculator:
+                  if exit_timeout:
                     logging.info("Child process still has about %0.2f seconds"
-                                 " to exit", timeout_calculator.Remaining())
+                                 " to exit", exit_timeout.Remaining())
                   else:
                     logging.info("Giving child process %0.2f seconds to exit",
                                  CHILD_LINGER_TIMEOUT)
-                    timeout_calculator = \
+                    exit_timeout = \
                       locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
             else:
               poller.unregister(fd)
@@ -478,7 +515,7 @@ def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
 
       # If there was a timeout calculator, we were waiting for the child to
       # finish, e.g. due to a signal
-      return not bool(timeout_calculator)
+      return not bool(exit_timeout)
     finally:
       socat_stderr_lines.close()
   finally:
@@ -510,6 +547,13 @@ def ParseOptions():
                     help="Remote hostname")
   parser.add_option("--port", dest="port", action="store", type="int",
                     help="Remote port")
+  parser.add_option("--connect-retries", dest="connect_retries", action="store",
+                    type="int", default=0,
+                    help=("How many times the connection should be retried"
+                          " (export only)"))
+  parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
+                    type="int", default=DEFAULT_CONNECT_TIMEOUT,
+                    help="Timeout for connection to be established (seconds)")
   parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
                     type="string", help="Command prefix")
   parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
@@ -635,8 +679,8 @@ def main():
             utils.RetryOnSignal(os.close, socat_stderr_write_fd)
 
             if ProcessChildIO(child, socat_stderr_read_fd, status_file,
-                              child_logger, signal_wakeup,
-                              signal_handler):
+                              child_logger, signal_wakeup, signal_handler,
+                              mode):
               # The child closed all its file descriptors and there was no
               # signal
               # TODO: Implement timeout instead of waiting indefinitely
index 3b992ed..58ab812 100755 (executable)
@@ -23,7 +23,7 @@ set -o pipefail
 
 export PYTHON=${PYTHON:=python}
 
-impexpd="$PYTHON daemons/import-export"
+impexpd="$PYTHON daemons/import-export --connect-timeout=1 --connect-retries=1"
 
 # Add "-d" for debugging
 #impexpd+=' -d'