cmdlib: remove usage of ENABLE_FILE_STORAGE
[ganeti-local] / daemons / import-export
index af16f31..21d1d34 100755 (executable)
 
 """
 
-# pylint: disable-msg=C0103
+# pylint: disable=C0103
 # C0103: Invalid name import-export
 
 import errno
 import logging
 import optparse
 import os
-import re
 import select
 import signal
-import socket
 import subprocess
 import sys
 import time
-from cStringIO import StringIO
+import math
 
 from ganeti import constants
 from ganeti import cli
 from ganeti import utils
+from ganeti import errors
 from ganeti import serializer
 from ganeti import objects
-from ganeti import locking
+from ganeti import impexpd
+from ganeti import netutils
 
 
-#: Used to recognize point at which socat(1) starts to listen on its socket.
-#: The local address is required for the remote peer to connect (in particular
-#: the port number).
-LISTENING_RE = re.compile(r"^listening on\s+"
-                          r"AF=(?P<family>\d+)\s+"
-                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
-
-#: Used to recognize point at which socat(1) is sending data over the wire
-TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
-                              re.I)
-
-SOCAT_LOG_DEBUG = "D"
-SOCAT_LOG_INFO = "I"
-SOCAT_LOG_NOTICE = "N"
-SOCAT_LOG_WARNING = "W"
-SOCAT_LOG_ERROR = "E"
-SOCAT_LOG_FATAL = "F"
-
-SOCAT_LOG_IGNORE = frozenset([
-  SOCAT_LOG_DEBUG,
-  SOCAT_LOG_INFO,
-  SOCAT_LOG_NOTICE,
-  ])
-
-#: Socat buffer size: at most this many bytes are transferred per step
-SOCAT_BUFSIZE = 1024 * 1024
-
 #: How many lines to keep in the status file
 MAX_RECENT_OUTPUT_LINES = 20
 
 #: Don't update status file more than once every 5 seconds (unless forced)
 MIN_UPDATE_INTERVAL = 5.0
 
-#: Give child process up to 5 seconds to exit after sending a signal
-CHILD_LINGER_TIMEOUT = 5.0
+#: How long to wait for a connection to be established
+DEFAULT_CONNECT_TIMEOUT = 60
 
-# Common options for socat
-SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
-SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
-SOCAT_CONNECT_TIMEOUT = 60
+#: Get dd(1) statistics every few seconds
+DD_STATISTICS_INTERVAL = 5.0
 
+#: Seconds for throughput calculation
+DD_THROUGHPUT_INTERVAL = 60.0
 
-# Global variable for options
-options = None
+#: Number of samples for throughput calculation
+DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) /
+                                      DD_STATISTICS_INTERVAL))
 
 
-class Error(Exception):
-  """Generic exception"""
+# Global variable for options
+options = None
 
 
 def SetupLogging():
@@ -125,22 +99,6 @@ def SetupLogging():
   return child_logger
 
 
-def _VerifyListening(family, address, port):
-  """Verify address given as listening address by socat.
-
-  """
-  # TODO: Implement IPv6 support
-  if family != socket.AF_INET:
-    raise Error("Address family %r not supported" % family)
-
-  try:
-    packed_address = socket.inet_pton(family, address)
-  except socket.error:
-    raise Error("Invalid address %r for family %s" % (address, family))
-
-  return (socket.inet_ntop(family, packed_address), port)
-
-
 class StatusFile:
   """Status file manager.
 
@@ -170,7 +128,7 @@ class StatusFile:
     @param port: TCP/UDP port
 
     """
-    assert isinstance(port, (int, long)) and 0 < port < 2**16
+    assert isinstance(port, (int, long)) and 0 < port < (2 ** 16)
     self._data.listen_port = port
 
   def GetListenPort(self):
@@ -185,6 +143,30 @@ class StatusFile:
     """
     self._data.connected = True
 
+  def GetConnected(self):
+    """Determines whether the daemon is connected.
+
+    """
+    return self._data.connected
+
+  def SetProgress(self, mbytes, throughput, percent, eta):
+    """Sets how much data has been transferred so far.
+
+    @type mbytes: number
+    @param mbytes: Transferred amount of data in MiB.
+    @type throughput: float
+    @param throughput: MiB/second
+    @type percent: number
+    @param percent: Percent processed
+    @type eta: number
+    @param eta: Expected number of seconds until done
+
+    """
+    self._data.progress_mbytes = mbytes
+    self._data.progress_throughput = throughput
+    self._data.progress_percent = percent
+    self._data.progress_eta = eta
+
   def SetExitStatus(self, exit_status, error_message):
     """Sets the exit status and an error message.
 
@@ -218,264 +200,151 @@ class StatusFile:
 
     self._data.mtime = time.time()
     utils.WriteFile(self._path,
-                    data=serializer.DumpJson(self._data.ToDict(), indent=True),
+                    data=serializer.DumpJson(self._data.ToDict()),
                     mode=0400)
 
 
-def _ProcessSocatOutput(status_file, level, msg):
-  """Interprets socat log output.
-
-  """
-  if level == SOCAT_LOG_NOTICE:
-    if status_file.GetListenPort() is None:
-      # TODO: Maybe implement timeout to not listen forever
-      m = LISTENING_RE.match(msg)
-      if m:
-        (_, port) = _VerifyListening(int(m.group("family")), m.group("address"),
-                                     int(m.group("port")))
-
-        status_file.SetListenPort(port)
-        return True
-
-    m = TRANSFER_LOOP_RE.match(msg)
-    if m:
-      status_file.SetConnected()
-      return True
-
-  return False
-
-
-def ProcessOutput(line, status_file, logger, socat):
-  """Takes care of child process output.
-
-  @param status_file: Status file manager
-  @param logger: Child output logger
-  @type socat: bool
-  @param socat: Whether it's a socat output line
-  @type line: string
-  @param line: Child output line
-
-  """
-  force_update = False
-  forward_line = line
-
-  if socat:
-    level = None
-    parts = line.split(None, 4)
-
-    if len(parts) == 5:
-      (_, _, _, level, msg) = parts
-
-      force_update = _ProcessSocatOutput(status_file, level, msg)
-
-      if options.debug or (level and level not in SOCAT_LOG_IGNORE):
-        forward_line = "socat: %s %s" % (level, msg)
-      else:
-        forward_line = None
-    else:
-      forward_line = "socat: %s" % line
-
-  if forward_line:
-    logger.info(forward_line)
-    status_file.AddRecentOutput(forward_line)
-
-  status_file.Update(force_update)
-
-
-def GetBashCommand(cmd):
-  """Prepares a command to be run in Bash.
-
-  """
-  return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
-
-
-def GetSocatCommand(mode):
-  """Returns the socat command.
+def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
+                   dd_pid_read_fd, exp_size_read_fd, status_file, child_logger,
+                   signal_notify, signal_handler, mode):
+  """Handles the child processes' output.
 
   """
-  common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
-    "key=%s" % options.key,
-    "cert=%s" % options.cert,
-    "cafile=%s" % options.ca,
-    ]
-
-  if options.bind is not None:
-    common_addr_opts.append("bind=%s" % options.bind)
-
-  if mode == constants.IEM_IMPORT:
-    if options.port is None:
-      port = 0
-    else:
-      port = options.port
+  assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
+         "Other signals are not handled in this function"
 
-    addr1 = [
-      "OPENSSL-LISTEN:%s" % port,
-      "reuseaddr",
-      ] + common_addr_opts
-    addr2 = ["stdout"]
+  # Buffer size 0 is important, otherwise .read() with a specified length
+  # might buffer data while poll(2) won't mark its file descriptor as
+  # readable again.
+  socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
+  dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
+  dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
+  exp_size_read = os.fdopen(exp_size_read_fd, "r", 0)
 
-  elif mode == constants.IEM_EXPORT:
-    addr1 = ["stdin"]
-    addr2 = [
-      "OPENSSL:%s:%s" % (options.host, options.port),
-      "connect-timeout=%s" % SOCAT_CONNECT_TIMEOUT,
-      ] + common_addr_opts
+  tp_samples = DD_THROUGHPUT_SAMPLES
 
+  if options.exp_size == constants.IE_CUSTOM_SIZE:
+    exp_size = None
   else:
-    raise Error("Invalid mode")
-
-  for i in [addr1, addr2]:
-    for value in i:
-      if "," in value:
-        raise Error("Comma not allowed in socat option value: %r" % value)
-
-  return [
-    constants.SOCAT_PATH,
-
-    # Log to stderr
-    "-ls",
-
-    # Log level
-    "-d", "-d",
-
-    # Buffer size
-    "-b%s" % SOCAT_BUFSIZE,
-
-    # Unidirectional mode, the first address is only used for reading, and the
-    # second address is only used for writing
-    "-u",
-
-    ",".join(addr1), ",".join(addr2)
-    ]
-
-
-def GetTransportCommand(mode, socat_stderr_fd):
-  """Returns the command for the transport part of the daemon.
-
-  @param mode: Daemon mode (import or export)
-  @type socat_stderr_fd: int
-  @param socat_stderr_fd: File descriptor socat should write its stderr to
-
-  """
-  socat_cmd = ("%s 2>&%d" %
-               (utils.ShellQuoteArgs(GetSocatCommand(mode)),
-                socat_stderr_fd))
-
-  # TODO: Make compression configurable
-
-  if mode == constants.IEM_IMPORT:
-    transport_cmd = "%s | gunzip -c" % socat_cmd
-  elif mode == constants.IEM_EXPORT:
-    transport_cmd = "gzip -c | %s" % socat_cmd
-  else:
-    raise Error("Invalid mode")
-
-  # TODO: Use "dd" to measure processed data (allows to give an ETA)
-  # TODO: If a connection to socat is dropped (e.g. due to a wrong
-  # certificate), socat should be restarted
-
-  # TODO: Run transport as separate user
-  # The transport uses its own shell to simplify running it as a separate user
-  # in the future.
-  return GetBashCommand(transport_cmd)
-
-
-def GetCommand(mode, socat_stderr_fd):
-  """Returns the complete child process command.
-
-  """
-  buf = StringIO()
-
-  if options.cmd_prefix:
-    buf.write(options.cmd_prefix)
-    buf.write(" ")
-
-  buf.write(utils.ShellQuoteArgs(GetTransportCommand(mode, socat_stderr_fd)))
-
-  if options.cmd_suffix:
-    buf.write(" ")
-    buf.write(options.cmd_suffix)
-
-  return GetBashCommand(buf.getvalue())
+    exp_size = options.exp_size
 
+  child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
+                                           child_logger, tp_samples,
+                                           exp_size)
+  try:
+    fdmap = {
+      child.stderr.fileno():
+        (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)),
+      socat_stderr_read.fileno():
+        (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)),
+      dd_pid_read.fileno():
+        (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
+      dd_stderr_read.fileno():
+        (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
+      exp_size_read.fileno():
+        (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)),
+      signal_notify.fileno(): (signal_notify, None),
+      }
+
+    poller = select.poll()
+    for fd in fdmap:
+      utils.SetNonblockFlag(fd, True)
+      poller.register(fd, select.POLLIN)
+
+    if options.connect_timeout and mode == constants.IEM_IMPORT:
+      listen_timeout = utils.RunningTimeout(options.connect_timeout, True)
+    else:
+      listen_timeout = None
+
+    exit_timeout = None
+    dd_stats_timeout = None
+
+    while True:
+      # Break out of loop if only signal notify FD is left
+      if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
+        break
+
+      timeout = None
+
+      if listen_timeout and not exit_timeout:
+        assert mode == constants.IEM_IMPORT and options.connect_timeout
+        if status_file.GetConnected():
+          listen_timeout = None
+        elif listen_timeout.Remaining() < 0:
+          errmsg = ("Child process didn't establish connection in time"
+                    " (%0.0fs), sending SIGTERM" % options.connect_timeout)
+          logging.error(errmsg)
+          status_file.AddRecentOutput(errmsg)
+          status_file.Update(True)
+
+          child.Kill(signal.SIGTERM)
+          exit_timeout = \
+            utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True)
+          # Next block will calculate timeout
+        else:
+          # Not yet connected, check again in a second
+          timeout = 1000
 
-def ProcessChildIO(child, socat_stderr_read, status_file, child_logger,
-                   signal_notify, signal_handler):
-  """Handles the child processes' output.
+      if exit_timeout:
+        timeout = exit_timeout.Remaining() * 1000
+        if timeout < 0:
+          logging.info("Child process didn't exit in time")
+          break
 
-  """
-  poller = select.poll()
+      if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0:
+        notify_status = child_io_proc.NotifyDd()
+        if notify_status:
+          # Schedule next notification
+          dd_stats_timeout = utils.RunningTimeout(DD_STATISTICS_INTERVAL, True)
+        else:
+          # Try again soon (dd isn't ready yet)
+          dd_stats_timeout = utils.RunningTimeout(1.0, True)
 
-  script_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
-                                           child_logger, False)
-  try:
-    socat_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
-                                            child_logger, True)
-    try:
-      fdmap = {
-        child.stderr.fileno(): (child.stderr, script_stderr_lines),
-        socat_stderr_read.fileno(): (socat_stderr_read, socat_stderr_lines),
-        signal_notify.fileno(): (signal_notify, None),
-        }
-
-      for fd in fdmap:
-        utils.SetNonblockFlag(fd, True)
-        poller.register(fd, select.POLLIN)
-
-      timeout_calculator = None
-      while True:
-        # Break out of loop if only signal notify FD is left
-        if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
-          break
+      if dd_stats_timeout:
+        dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000)
 
-        if timeout_calculator:
-          timeout = timeout_calculator.Remaining() * 1000
-          if timeout < 0:
-            logging.info("Child process didn't exit in time")
-            break
+        if timeout is None:
+          timeout = dd_timeout
         else:
-          timeout = None
-
-        for fd, event in utils.RetryOnSignal(poller.poll, timeout):
-          if event & (select.POLLIN | event & select.POLLPRI):
-            (from_, to) = fdmap[fd]
-
-            # Read up to 1 KB of data
-            data = from_.read(1024)
-            if data:
-              if to:
-                to.write(data)
-              elif fd == signal_notify.fileno():
-                # Signal handling
-                if signal_handler.called:
-                  signal_handler.Clear()
-                  if timeout_calculator:
-                    logging.info("Child process still has about %0.2f seconds"
-                                 " to exit", timeout_calculator.Remaining())
-                  else:
-                    logging.info("Giving child process %0.2f seconds to exit",
-                                 CHILD_LINGER_TIMEOUT)
-                    timeout_calculator = \
-                      locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
-            else:
-              poller.unregister(fd)
-              del fdmap[fd]
-
-          elif event & (select.POLLNVAL | select.POLLHUP |
-                        select.POLLERR):
+          timeout = min(timeout, dd_timeout)
+
+      for fd, event in utils.RetryOnSignal(poller.poll, timeout):
+        if event & (select.POLLIN | event & select.POLLPRI):
+          (from_, to) = fdmap[fd]
+
+          # Read up to 1 KB of data
+          data = from_.read(1024)
+          if data:
+            if to:
+              to.write(data)
+            elif fd == signal_notify.fileno():
+              # Signal handling
+              if signal_handler.called:
+                signal_handler.Clear()
+                if exit_timeout:
+                  logging.info("Child process still has about %0.2f seconds"
+                               " to exit", exit_timeout.Remaining())
+                else:
+                  logging.info("Giving child process %0.2f seconds to exit",
+                               constants.CHILD_LINGER_TIMEOUT)
+                  exit_timeout = \
+                    utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True)
+          else:
             poller.unregister(fd)
             del fdmap[fd]
 
-        script_stderr_lines.flush()
-        socat_stderr_lines.flush()
+        elif event & (select.POLLNVAL | select.POLLHUP |
+                      select.POLLERR):
+          poller.unregister(fd)
+          del fdmap[fd]
+
+      child_io_proc.FlushAll()
 
-      # If there was a timeout calculator, we were waiting for the child to
-      # finish, e.g. due to a signal
-      return not bool(timeout_calculator)
-    finally:
-      socat_stderr_lines.close()
+    # If there was a timeout calculator, we were waiting for the child to
+    # finish, e.g. due to a signal
+    return not bool(exit_timeout)
   finally:
-    script_stderr_lines.close()
+    child_io_proc.CloseAll()
 
 
 def ParseOptions():
@@ -484,7 +353,7 @@ def ParseOptions():
   @return: Arguments to program
 
   """
-  global options # pylint: disable-msg=W0603
+  global options # pylint: disable=W0603
 
   parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
                                         (constants.IEM_IMPORT,
@@ -499,10 +368,30 @@ def ParseOptions():
                     help="X509 CA file")
   parser.add_option("--bind", dest="bind", action="store", type="string",
                     help="Bind address")
+  parser.add_option("--ipv4", dest="ipv4", action="store_true",
+                    help="Use IPv4 only")
+  parser.add_option("--ipv6", dest="ipv6", action="store_true",
+                    help="Use IPv6 only")
   parser.add_option("--host", dest="host", action="store", type="string",
                     help="Remote hostname")
   parser.add_option("--port", dest="port", action="store", type="int",
                     help="Remote port")
+  parser.add_option("--connect-retries", dest="connect_retries", action="store",
+                    type="int", default=0,
+                    help=("How many times the connection should be retried"
+                          " (export only)"))
+  parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
+                    type="int", default=DEFAULT_CONNECT_TIMEOUT,
+                    help="Timeout for connection to be established (seconds)")
+  parser.add_option("--compress", dest="compress", action="store",
+                    type="choice", help="Compression method",
+                    metavar="[%s]" % "|".join(constants.IEC_ALL),
+                    choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
+  parser.add_option("--expected-size", dest="exp_size", action="store",
+                    type="string", default=None,
+                    help="Expected import/export size (MiB)")
+  parser.add_option("--magic", dest="magic", action="store",
+                    type="string", default=None, help="Magic string")
   parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
                     type="string", help="Command prefix")
   parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
@@ -521,9 +410,94 @@ def ParseOptions():
     # Won't return
     parser.error("Invalid mode: %s" % mode)
 
+  # Normalize and check parameters
+  if options.host is not None and not netutils.IPAddress.IsValid(options.host):
+    try:
+      options.host = netutils.Hostname.GetNormalizedName(options.host)
+    except errors.OpPrereqError, err:
+      parser.error("Invalid hostname '%s': %s" % (options.host, err))
+
+  if options.port is not None:
+    options.port = utils.ValidateServiceName(options.port)
+
+  if (options.exp_size is not None and
+      options.exp_size != constants.IE_CUSTOM_SIZE):
+    try:
+      options.exp_size = int(options.exp_size)
+    except (ValueError, TypeError), err:
+      # Won't return
+      parser.error("Invalid value for --expected-size: %s (%s)" %
+                   (options.exp_size, err))
+
+  if not (options.magic is None or constants.IE_MAGIC_RE.match(options.magic)):
+    parser.error("Magic must match regular expression %s" %
+                 constants.IE_MAGIC_RE.pattern)
+
+  if options.ipv4 and options.ipv6:
+    parser.error("Can only use one of --ipv4 and --ipv6")
+
   return (status_file_path, mode)
 
 
+class ChildProcess(subprocess.Popen):
+  def __init__(self, env, cmd, noclose_fds):
+    """Initializes this class.
+
+    """
+    self._noclose_fds = noclose_fds
+
+    # Not using close_fds because doing so would also close the socat stderr
+    # pipe, which we still need.
+    subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False,
+                              stderr=subprocess.PIPE, stdout=None, stdin=None,
+                              preexec_fn=self._ChildPreexec)
+    self._SetProcessGroup()
+
+  def _ChildPreexec(self):
+    """Called before child executable is execve'd.
+
+    """
+    # Move to separate process group. By sending a signal to its process group
+    # we can kill the child process and all grandchildren.
+    os.setpgid(0, 0)
+
+    # Close almost all file descriptors
+    utils.CloseFDs(noclose_fds=self._noclose_fds)
+
+  def _SetProcessGroup(self):
+    """Sets the child's process group.
+
+    """
+    assert self.pid, "Can't be called in child process"
+
+    # Avoid race condition by setting child's process group (as good as
+    # possible in Python) before sending signals to child. For an
+    # explanation, see preexec function for child.
+    try:
+      os.setpgid(self.pid, self.pid)
+    except EnvironmentError, err:
+      # If the child process was faster we receive EPERM or EACCES
+      if err.errno not in (errno.EPERM, errno.EACCES):
+        raise
+
+  def Kill(self, signum):
+    """Sends signal to child process.
+
+    """
+    logging.info("Sending signal %s to child process", signum)
+    utils.IgnoreProcessNotFound(os.killpg, self.pid, signum)
+
+  def ForceQuit(self):
+    """Ensure child process is no longer running.
+
+    """
+    # Final check if child process is still alive
+    if utils.RetryOnSignal(self.poll) is None:
+      logging.error("Child process still alive, sending SIGKILL")
+      self.Kill(signal.SIGKILL)
+      utils.RetryOnSignal(self.wait)
+
+
 def main():
   """Main function.
 
@@ -540,87 +514,71 @@ def main():
       # Pipe to receive socat's stderr output
       (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
 
-      # Pipe to notify on signals
-      (signal_notify_read_fd, signal_notify_write_fd) = os.pipe()
+      # Pipe to receive dd's stderr output
+      (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe()
 
-      # Configure signal module's notifier
-      try:
-        # This is only supported in Python 2.5 and above (some distributions
-        # backported it to Python 2.4)
-        set_wakeup_fd_fn = signal.set_wakeup_fd
-      except AttributeError:
-        pass
-      else:
-        set_wakeup_fd_fn(signal_notify_write_fd)
+      # Pipe to receive dd's PID
+      (dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
 
-      # Buffer size 0 is important, otherwise .read() with a specified length
-      # might buffer data while poll(2) won't mark its file descriptor as
-      # readable again.
-      socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
-      signal_notify_read = os.fdopen(signal_notify_read_fd, "r", 0)
+      # Pipe to receive size predicted by export script
+      (exp_size_read_fd, exp_size_write_fd) = os.pipe()
 
       # Get child process command
-      cmd = GetCommand(mode, socat_stderr_write_fd)
+      cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
+                                           dd_stderr_write_fd, dd_pid_write_fd)
+      cmd = cmd_builder.GetCommand()
 
-      logging.debug("Starting command %r", cmd)
+      # Prepare command environment
+      cmd_env = os.environ.copy()
 
-      def _ChildPreexec():
-        # Move child to separate process group. By sending a signal to its
-        # process group we can kill the child process and all its own
-        # child-processes.
-        os.setpgid(0, 0)
+      if options.exp_size == constants.IE_CUSTOM_SIZE:
+        cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd)
 
-        # Close almost all file descriptors
-        utils.CloseFDs(noclose_fds=[socat_stderr_write_fd])
+      logging.debug("Starting command %r", cmd)
 
-      # Not using close_fds because doing so would also close the socat stderr
-      # pipe, which we still need.
-      child = subprocess.Popen(cmd, shell=False, close_fds=False,
-                               stderr=subprocess.PIPE, stdout=None, stdin=None,
-                               preexec_fn=_ChildPreexec)
+      # Start child process
+      child = ChildProcess(cmd_env, cmd,
+                           [socat_stderr_write_fd, dd_stderr_write_fd,
+                            dd_pid_write_fd, exp_size_write_fd])
       try:
-        # Avoid race condition by setting child's process group (as good as
-        # possible in Python) before sending signals to child. For an
-        # explanation, see preexec function for child.
-        try:
-          os.setpgid(child.pid, child.pid)
-        except EnvironmentError, err:
-          # If the child process was faster we receive EPERM or EACCES
-          if err.errno not in (errno.EPERM, errno.EACCES):
-            raise
 
-        # Forward signals to child process
         def _ForwardSignal(signum, _):
-          # Wake up poll(2)
-          os.write(signal_notify_write_fd, "\0")
-
-          # Send signal to child
-          os.killpg(child.pid, signum)
-
-        # TODO: There is a race condition between starting the child and
-        # handling the signals here. While there might be a way to work around
-        # it by registering the handlers before starting the child and
-        # deferring sent signals until the child is available, doing so can be
-        # complicated.
-        signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
-                                             handler_fn=_ForwardSignal)
+          """Forwards signals to child process.
+
+          """
+          child.Kill(signum)
+
+        signal_wakeup = utils.SignalWakeupFd()
         try:
-          # Close child's side
-          utils.RetryOnSignal(os.close, socat_stderr_write_fd)
-
-          if ProcessChildIO(child, socat_stderr_read, status_file, child_logger,
-                            signal_notify_read, signal_handler):
-            # The child closed all its file descriptors and there was no signal
-            # TODO: Implement timeout instead of waiting indefinitely
-            utils.RetryOnSignal(child.wait)
+          # TODO: There is a race condition between starting the child and
+          # handling the signals here. While there might be a way to work around
+          # it by registering the handlers before starting the child and
+          # deferring sent signals until the child is available, doing so can be
+          # complicated.
+          signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
+                                               handler_fn=_ForwardSignal,
+                                               wakeup=signal_wakeup)
+          try:
+            # Close child's side
+            utils.RetryOnSignal(os.close, socat_stderr_write_fd)
+            utils.RetryOnSignal(os.close, dd_stderr_write_fd)
+            utils.RetryOnSignal(os.close, dd_pid_write_fd)
+            utils.RetryOnSignal(os.close, exp_size_write_fd)
+
+            if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
+                              dd_pid_read_fd, exp_size_read_fd,
+                              status_file, child_logger,
+                              signal_wakeup, signal_handler, mode):
+              # The child closed all its file descriptors and there was no
+              # signal
+              # TODO: Implement timeout instead of waiting indefinitely
+              utils.RetryOnSignal(child.wait)
+          finally:
+            signal_handler.Reset()
         finally:
-          signal_handler.Reset()
+          signal_wakeup.Reset()
       finally:
-        # Final check if child process is still alive
-        if utils.RetryOnSignal(child.poll) is None:
-          logging.error("Child process still alive, sending SIGKILL")
-          os.killpg(child.pid, signal.SIGKILL)
-          utils.RetryOnSignal(child.wait)
+        child.ForceQuit()
 
       if child.returncode == 0:
         errmsg = None
@@ -630,7 +588,7 @@ def main():
         errmsg = "Exited with status %s" % (child.returncode, )
 
       status_file.SetExitStatus(child.returncode, errmsg)
-    except Exception, err: # pylint: disable-msg=W0703
+    except Exception, err: # pylint: disable=W0703
       logging.exception("Unhandled error occurred")
       status_file.SetExitStatus(constants.EXIT_FAILURE,
                                 "Unhandled error occurred: %s" % (err, ))