cmdlib: remove usage of ENABLE_FILE_STORAGE
[ganeti-local] / daemons / import-export
index 29386d1..21d1d34 100755 (executable)
 
 """
 
-# pylint: disable-msg=C0103
+# pylint: disable=C0103
 # C0103: Invalid name import-export
 
 import errno
 import logging
 import optparse
 import os
-import re
 import select
 import signal
-import socket
 import subprocess
 import sys
 import time
+import math
 
 from ganeti import constants
 from ganeti import cli
 from ganeti import utils
+from ganeti import errors
 from ganeti import serializer
 from ganeti import objects
-from ganeti import locking
 from ganeti import impexpd
+from ganeti import netutils
 
 
-#: Used to recognize point at which socat(1) starts to listen on its socket.
-#: The local address is required for the remote peer to connect (in particular
-#: the port number).
-LISTENING_RE = re.compile(r"^listening on\s+"
-                          r"AF=(?P<family>\d+)\s+"
-                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
-
-#: Used to recognize point at which socat(1) is sending data over the wire
-TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
-                              re.I)
-
-SOCAT_LOG_DEBUG = "D"
-SOCAT_LOG_INFO = "I"
-SOCAT_LOG_NOTICE = "N"
-SOCAT_LOG_WARNING = "W"
-SOCAT_LOG_ERROR = "E"
-SOCAT_LOG_FATAL = "F"
-
-SOCAT_LOG_IGNORE = frozenset([
-  SOCAT_LOG_DEBUG,
-  SOCAT_LOG_INFO,
-  SOCAT_LOG_NOTICE,
-  ])
-
 #: How many lines to keep in the status file
 MAX_RECENT_OUTPUT_LINES = 20
 
 #: Don't update status file more than once every 5 seconds (unless forced)
 MIN_UPDATE_INTERVAL = 5.0
 
-#: Give child process up to 5 seconds to exit after sending a signal
-CHILD_LINGER_TIMEOUT = 5.0
-
 #: How long to wait for a connection to be established
 DEFAULT_CONNECT_TIMEOUT = 60
 
+#: Get dd(1) statistics every few seconds
+DD_STATISTICS_INTERVAL = 5.0
 
-# Global variable for options
-options = None
+#: Seconds for throughput calculation
+DD_THROUGHPUT_INTERVAL = 60.0
+
+#: Number of samples for throughput calculation
+DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) /
+                                      DD_STATISTICS_INTERVAL))
 
 
-class Error(Exception):
-  """Generic exception"""
+# Global variable for options
+options = None
 
 
 def SetupLogging():
@@ -120,22 +99,6 @@ def SetupLogging():
   return child_logger
 
 
-def _VerifyListening(family, address, port):
-  """Verify address given as listening address by socat.
-
-  """
-  # TODO: Implement IPv6 support
-  if family != socket.AF_INET:
-    raise Error("Address family %r not supported" % family)
-
-  try:
-    packed_address = socket.inet_pton(family, address)
-  except socket.error:
-    raise Error("Invalid address %r for family %s" % (address, family))
-
-  return (socket.inet_ntop(family, packed_address), port)
-
-
 class StatusFile:
   """Status file manager.
 
@@ -165,7 +128,7 @@ class StatusFile:
     @param port: TCP/UDP port
 
     """
-    assert isinstance(port, (int, long)) and 0 < port < 2**16
+    assert isinstance(port, (int, long)) and 0 < port < (2 ** 16)
     self._data.listen_port = port
 
   def GetListenPort(self):
@@ -186,6 +149,24 @@ class StatusFile:
     """
     return self._data.connected
 
+  def SetProgress(self, mbytes, throughput, percent, eta):
+    """Sets how much data has been transferred so far.
+
+    @type mbytes: number
+    @param mbytes: Transferred amount of data in MiB.
+    @type throughput: float
+    @param throughput: MiB/second
+    @type percent: number
+    @param percent: Percent processed
+    @type eta: number
+    @param eta: Expected number of seconds until done
+
+    """
+    self._data.progress_mbytes = mbytes
+    self._data.progress_throughput = throughput
+    self._data.progress_percent = percent
+    self._data.progress_eta = eta
+
   def SetExitStatus(self, exit_status, error_message):
     """Sets the exit status and an error message.
 
@@ -219,72 +200,12 @@ class StatusFile:
 
     self._data.mtime = time.time()
     utils.WriteFile(self._path,
-                    data=serializer.DumpJson(self._data.ToDict(), indent=True),
+                    data=serializer.DumpJson(self._data.ToDict()),
                     mode=0400)
 
 
-def _ProcessSocatOutput(status_file, level, msg):
-  """Interprets socat log output.
-
-  """
-  if level == SOCAT_LOG_NOTICE:
-    if status_file.GetListenPort() is None:
-      # TODO: Maybe implement timeout to not listen forever
-      m = LISTENING_RE.match(msg)
-      if m:
-        (_, port) = _VerifyListening(int(m.group("family")), m.group("address"),
-                                     int(m.group("port")))
-
-        status_file.SetListenPort(port)
-        return True
-
-    if not status_file.GetConnected():
-      m = TRANSFER_LOOP_RE.match(msg)
-      if m:
-        status_file.SetConnected()
-        return True
-
-  return False
-
-
-def ProcessOutput(line, status_file, logger, socat):
-  """Takes care of child process output.
-
-  @param status_file: Status file manager
-  @param logger: Child output logger
-  @type socat: bool
-  @param socat: Whether it's a socat output line
-  @type line: string
-  @param line: Child output line
-
-  """
-  force_update = False
-  forward_line = line
-
-  if socat:
-    level = None
-    parts = line.split(None, 4)
-
-    if len(parts) == 5:
-      (_, _, _, level, msg) = parts
-
-      force_update = _ProcessSocatOutput(status_file, level, msg)
-
-      if options.debug or (level and level not in SOCAT_LOG_IGNORE):
-        forward_line = "socat: %s %s" % (level, msg)
-      else:
-        forward_line = None
-    else:
-      forward_line = "socat: %s" % line
-
-  if forward_line:
-    logger.info(forward_line)
-    status_file.AddRecentOutput(forward_line)
-
-  status_file.Update(force_update)
-
-
-def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
+def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
+                   dd_pid_read_fd, exp_size_read_fd, status_file, child_logger,
                    signal_notify, signal_handler, mode):
   """Handles the child processes' output.
 
@@ -296,97 +217,134 @@ def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
   # might buffer data while poll(2) won't mark its file descriptor as
   # readable again.
   socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
+  dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
+  dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
+  exp_size_read = os.fdopen(exp_size_read_fd, "r", 0)
 
-  script_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
-                                           child_logger, False)
-  try:
-    socat_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
-                                            child_logger, True)
-    try:
-      fdmap = {
-        child.stderr.fileno(): (child.stderr, script_stderr_lines),
-        socat_stderr_read.fileno(): (socat_stderr_read, socat_stderr_lines),
-        signal_notify.fileno(): (signal_notify, None),
-        }
-
-      poller = select.poll()
-      for fd in fdmap:
-        utils.SetNonblockFlag(fd, True)
-        poller.register(fd, select.POLLIN)
-
-      if options.connect_timeout and mode == constants.IEM_IMPORT:
-        listen_timeout = locking.RunningTimeout(options.connect_timeout, True)
-      else:
-        listen_timeout = None
+  tp_samples = DD_THROUGHPUT_SAMPLES
 
-      exit_timeout = None
+  if options.exp_size == constants.IE_CUSTOM_SIZE:
+    exp_size = None
+  else:
+    exp_size = options.exp_size
 
-      while True:
-        # Break out of loop if only signal notify FD is left
-        if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
+  child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
+                                           child_logger, tp_samples,
+                                           exp_size)
+  try:
+    fdmap = {
+      child.stderr.fileno():
+        (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)),
+      socat_stderr_read.fileno():
+        (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)),
+      dd_pid_read.fileno():
+        (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
+      dd_stderr_read.fileno():
+        (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
+      exp_size_read.fileno():
+        (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)),
+      signal_notify.fileno(): (signal_notify, None),
+      }
+
+    poller = select.poll()
+    for fd in fdmap:
+      utils.SetNonblockFlag(fd, True)
+      poller.register(fd, select.POLLIN)
+
+    if options.connect_timeout and mode == constants.IEM_IMPORT:
+      listen_timeout = utils.RunningTimeout(options.connect_timeout, True)
+    else:
+      listen_timeout = None
+
+    exit_timeout = None
+    dd_stats_timeout = None
+
+    while True:
+      # Break out of loop if only signal notify FD is left
+      if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
+        break
+
+      timeout = None
+
+      if listen_timeout and not exit_timeout:
+        assert mode == constants.IEM_IMPORT and options.connect_timeout
+        if status_file.GetConnected():
+          listen_timeout = None
+        elif listen_timeout.Remaining() < 0:
+          errmsg = ("Child process didn't establish connection in time"
+                    " (%0.0fs), sending SIGTERM" % options.connect_timeout)
+          logging.error(errmsg)
+          status_file.AddRecentOutput(errmsg)
+          status_file.Update(True)
+
+          child.Kill(signal.SIGTERM)
+          exit_timeout = \
+            utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True)
+          # Next block will calculate timeout
+        else:
+          # Not yet connected, check again in a second
+          timeout = 1000
+
+      if exit_timeout:
+        timeout = exit_timeout.Remaining() * 1000
+        if timeout < 0:
+          logging.info("Child process didn't exit in time")
           break
 
-        timeout = None
-
-        if listen_timeout and not exit_timeout:
-          if status_file.GetConnected():
-            listen_timeout = None
-          elif listen_timeout.Remaining() < 0:
-            logging.info("Child process didn't establish connection in time")
-            child.Kill(signal.SIGTERM)
-            exit_timeout = \
-              locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
-            # Next block will calculate timeout
+      if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0:
+        notify_status = child_io_proc.NotifyDd()
+        if notify_status:
+          # Schedule next notification
+          dd_stats_timeout = utils.RunningTimeout(DD_STATISTICS_INTERVAL, True)
+        else:
+          # Try again soon (dd isn't ready yet)
+          dd_stats_timeout = utils.RunningTimeout(1.0, True)
+
+      if dd_stats_timeout:
+        dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000)
+
+        if timeout is None:
+          timeout = dd_timeout
+        else:
+          timeout = min(timeout, dd_timeout)
+
+      for fd, event in utils.RetryOnSignal(poller.poll, timeout):
+        if event & (select.POLLIN | event & select.POLLPRI):
+          (from_, to) = fdmap[fd]
+
+          # Read up to 1 KB of data
+          data = from_.read(1024)
+          if data:
+            if to:
+              to.write(data)
+            elif fd == signal_notify.fileno():
+              # Signal handling
+              if signal_handler.called:
+                signal_handler.Clear()
+                if exit_timeout:
+                  logging.info("Child process still has about %0.2f seconds"
+                               " to exit", exit_timeout.Remaining())
+                else:
+                  logging.info("Giving child process %0.2f seconds to exit",
+                               constants.CHILD_LINGER_TIMEOUT)
+                  exit_timeout = \
+                    utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True)
           else:
-            # Not yet connected, check again in a second
-            timeout = 1000
-
-        if exit_timeout:
-          timeout = exit_timeout.Remaining() * 1000
-          if timeout < 0:
-            logging.info("Child process didn't exit in time")
-            break
-
-        for fd, event in utils.RetryOnSignal(poller.poll, timeout):
-          if event & (select.POLLIN | event & select.POLLPRI):
-            (from_, to) = fdmap[fd]
-
-            # Read up to 1 KB of data
-            data = from_.read(1024)
-            if data:
-              if to:
-                to.write(data)
-              elif fd == signal_notify.fileno():
-                # Signal handling
-                if signal_handler.called:
-                  signal_handler.Clear()
-                  if exit_timeout:
-                    logging.info("Child process still has about %0.2f seconds"
-                                 " to exit", exit_timeout.Remaining())
-                  else:
-                    logging.info("Giving child process %0.2f seconds to exit",
-                                 CHILD_LINGER_TIMEOUT)
-                    exit_timeout = \
-                      locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
-            else:
-              poller.unregister(fd)
-              del fdmap[fd]
-
-          elif event & (select.POLLNVAL | select.POLLHUP |
-                        select.POLLERR):
             poller.unregister(fd)
             del fdmap[fd]
 
-        script_stderr_lines.flush()
-        socat_stderr_lines.flush()
+        elif event & (select.POLLNVAL | select.POLLHUP |
+                      select.POLLERR):
+          poller.unregister(fd)
+          del fdmap[fd]
 
-      # If there was a timeout calculator, we were waiting for the child to
-      # finish, e.g. due to a signal
-      return not bool(exit_timeout)
-    finally:
-      socat_stderr_lines.close()
+      child_io_proc.FlushAll()
+
+    # If there was a timeout calculator, we were waiting for the child to
+    # finish, e.g. due to a signal
+    return not bool(exit_timeout)
   finally:
-    script_stderr_lines.close()
+    child_io_proc.CloseAll()
 
 
 def ParseOptions():
@@ -395,7 +353,7 @@ def ParseOptions():
   @return: Arguments to program
 
   """
-  global options # pylint: disable-msg=W0603
+  global options # pylint: disable=W0603
 
   parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
                                         (constants.IEM_IMPORT,
@@ -410,6 +368,10 @@ def ParseOptions():
                     help="X509 CA file")
   parser.add_option("--bind", dest="bind", action="store", type="string",
                     help="Bind address")
+  parser.add_option("--ipv4", dest="ipv4", action="store_true",
+                    help="Use IPv4 only")
+  parser.add_option("--ipv6", dest="ipv6", action="store_true",
+                    help="Use IPv6 only")
   parser.add_option("--host", dest="host", action="store", type="string",
                     help="Remote hostname")
   parser.add_option("--port", dest="port", action="store", type="int",
@@ -425,6 +387,11 @@ def ParseOptions():
                     type="choice", help="Compression method",
                     metavar="[%s]" % "|".join(constants.IEC_ALL),
                     choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
+  parser.add_option("--expected-size", dest="exp_size", action="store",
+                    type="string", default=None,
+                    help="Expected import/export size (MiB)")
+  parser.add_option("--magic", dest="magic", action="store",
+                    type="string", default=None, help="Magic string")
   parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
                     type="string", help="Command prefix")
   parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
@@ -443,11 +410,37 @@ def ParseOptions():
     # Won't return
     parser.error("Invalid mode: %s" % mode)
 
+  # Normalize and check parameters
+  if options.host is not None and not netutils.IPAddress.IsValid(options.host):
+    try:
+      options.host = netutils.Hostname.GetNormalizedName(options.host)
+    except errors.OpPrereqError, err:
+      parser.error("Invalid hostname '%s': %s" % (options.host, err))
+
+  if options.port is not None:
+    options.port = utils.ValidateServiceName(options.port)
+
+  if (options.exp_size is not None and
+      options.exp_size != constants.IE_CUSTOM_SIZE):
+    try:
+      options.exp_size = int(options.exp_size)
+    except (ValueError, TypeError), err:
+      # Won't return
+      parser.error("Invalid value for --expected-size: %s (%s)" %
+                   (options.exp_size, err))
+
+  if not (options.magic is None or constants.IE_MAGIC_RE.match(options.magic)):
+    parser.error("Magic must match regular expression %s" %
+                 constants.IE_MAGIC_RE.pattern)
+
+  if options.ipv4 and options.ipv6:
+    parser.error("Can only use one of --ipv4 and --ipv6")
+
   return (status_file_path, mode)
 
 
 class ChildProcess(subprocess.Popen):
-  def __init__(self, cmd, noclose_fds):
+  def __init__(self, env, cmd, noclose_fds):
     """Initializes this class.
 
     """
@@ -455,7 +448,7 @@ class ChildProcess(subprocess.Popen):
 
     # Not using close_fds because doing so would also close the socat stderr
     # pipe, which we still need.
-    subprocess.Popen.__init__(self, cmd, shell=False, close_fds=False,
+    subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False,
                               stderr=subprocess.PIPE, stdout=None, stdin=None,
                               preexec_fn=self._ChildPreexec)
     self._SetProcessGroup()
@@ -492,7 +485,7 @@ class ChildProcess(subprocess.Popen):
 
     """
     logging.info("Sending signal %s to child process", signum)
-    os.killpg(self.pid, signum)
+    utils.IgnoreProcessNotFound(os.killpg, self.pid, signum)
 
   def ForceQuit(self):
     """Ensure child process is no longer running.
@@ -521,15 +514,34 @@ def main():
       # Pipe to receive socat's stderr output
       (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
 
+      # Pipe to receive dd's stderr output
+      (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe()
+
+      # Pipe to receive dd's PID
+      (dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
+
+      # Pipe to receive size predicted by export script
+      (exp_size_read_fd, exp_size_write_fd) = os.pipe()
+
       # Get child process command
-      cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd)
+      cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
+                                           dd_stderr_write_fd, dd_pid_write_fd)
       cmd = cmd_builder.GetCommand()
 
+      # Prepare command environment
+      cmd_env = os.environ.copy()
+
+      if options.exp_size == constants.IE_CUSTOM_SIZE:
+        cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd)
+
       logging.debug("Starting command %r", cmd)
 
       # Start child process
-      child = ChildProcess(cmd, [socat_stderr_write_fd])
+      child = ChildProcess(cmd_env, cmd,
+                           [socat_stderr_write_fd, dd_stderr_write_fd,
+                            dd_pid_write_fd, exp_size_write_fd])
       try:
+
         def _ForwardSignal(signum, _):
           """Forwards signals to child process.
 
@@ -549,10 +561,14 @@ def main():
           try:
             # Close child's side
             utils.RetryOnSignal(os.close, socat_stderr_write_fd)
-
-            if ProcessChildIO(child, socat_stderr_read_fd, status_file,
-                              child_logger, signal_wakeup, signal_handler,
-                              mode):
+            utils.RetryOnSignal(os.close, dd_stderr_write_fd)
+            utils.RetryOnSignal(os.close, dd_pid_write_fd)
+            utils.RetryOnSignal(os.close, exp_size_write_fd)
+
+            if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
+                              dd_pid_read_fd, exp_size_read_fd,
+                              status_file, child_logger,
+                              signal_wakeup, signal_handler, mode):
               # The child closed all its file descriptors and there was no
               # signal
               # TODO: Implement timeout instead of waiting indefinitely
@@ -572,7 +588,7 @@ def main():
         errmsg = "Exited with status %s" % (child.returncode, )
 
       status_file.SetExitStatus(child.returncode, errmsg)
-    except Exception, err: # pylint: disable-msg=W0703
+    except Exception, err: # pylint: disable=W0703
       logging.exception("Unhandled error occurred")
       status_file.SetExitStatus(constants.EXIT_FAILURE,
                                 "Unhandled error occurred: %s" % (err, ))