Document DRBD dynamic resync params in man pages
[ganeti-local] / daemons / import-export
index 53c4e28..8603460 100755 (executable)
@@ -23,7 +23,7 @@
 
 """
 
-# pylint: disable-msg=C0103
+# pylint: disable=C0103
 # C0103: Invalid name import-export
 
 import errno
@@ -40,10 +40,11 @@ import math
 from ganeti import constants
 from ganeti import cli
 from ganeti import utils
+from ganeti import errors
 from ganeti import serializer
 from ganeti import objects
-from ganeti import locking
 from ganeti import impexpd
+from ganeti import netutils
 
 
 #: How many lines to keep in the status file
@@ -52,9 +53,6 @@ MAX_RECENT_OUTPUT_LINES = 20
 #: Don't update status file more than once every 5 seconds (unless forced)
 MIN_UPDATE_INTERVAL = 5.0
 
-#: Give child process up to 5 seconds to exit after sending a signal
-CHILD_LINGER_TIMEOUT = 5.0
-
 #: How long to wait for a connection to be established
 DEFAULT_CONNECT_TIMEOUT = 60
 
@@ -130,7 +128,7 @@ class StatusFile:
     @param port: TCP/UDP port
 
     """
-    assert isinstance(port, (int, long)) and 0 < port < 2**16
+    assert isinstance(port, (int, long)) and 0 < port < (2 ** 16)
     self._data.listen_port = port
 
   def GetListenPort(self):
@@ -207,7 +205,7 @@ class StatusFile:
 
 
 def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
-                   dd_pid_read_fd, status_file, child_logger,
+                   dd_pid_read_fd, exp_size_read_fd, status_file, child_logger,
                    signal_notify, signal_handler, mode):
   """Handles the child processes' output.
 
@@ -221,12 +219,18 @@ def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
   socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
   dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
   dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
+  exp_size_read = os.fdopen(exp_size_read_fd, "r", 0)
 
   tp_samples = DD_THROUGHPUT_SAMPLES
 
+  if options.exp_size == constants.IE_CUSTOM_SIZE:
+    exp_size = None
+  else:
+    exp_size = options.exp_size
+
   child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
-                                           child_logger,
-                                           throughput_samples=tp_samples)
+                                           child_logger, tp_samples,
+                                           exp_size)
   try:
     fdmap = {
       child.stderr.fileno():
@@ -237,6 +241,8 @@ def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
         (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
       dd_stderr_read.fileno():
         (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
+      exp_size_read.fileno():
+        (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)),
       signal_notify.fileno(): (signal_notify, None),
       }
 
@@ -246,7 +252,7 @@ def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
       poller.register(fd, select.POLLIN)
 
     if options.connect_timeout and mode == constants.IEM_IMPORT:
-      listen_timeout = locking.RunningTimeout(options.connect_timeout, True)
+      listen_timeout = utils.RunningTimeout(options.connect_timeout, True)
     else:
       listen_timeout = None
 
@@ -261,13 +267,19 @@ def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
       timeout = None
 
       if listen_timeout and not exit_timeout:
+        assert mode == constants.IEM_IMPORT and options.connect_timeout
         if status_file.GetConnected():
           listen_timeout = None
         elif listen_timeout.Remaining() < 0:
-          logging.info("Child process didn't establish connection in time")
+          errmsg = ("Child process didn't establish connection in time"
+                    " (%0.0fs), sending SIGTERM" % options.connect_timeout)
+          logging.error(errmsg)
+          status_file.AddRecentOutput(errmsg)
+          status_file.Update(True)
+
           child.Kill(signal.SIGTERM)
           exit_timeout = \
-            locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
+            utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True)
           # Next block will calculate timeout
         else:
           # Not yet connected, check again in a second
@@ -283,11 +295,10 @@ def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
         notify_status = child_io_proc.NotifyDd()
         if notify_status:
           # Schedule next notification
-          dd_stats_timeout = locking.RunningTimeout(DD_STATISTICS_INTERVAL,
-                                                    True)
+          dd_stats_timeout = utils.RunningTimeout(DD_STATISTICS_INTERVAL, True)
         else:
           # Try again soon (dd isn't ready yet)
-          dd_stats_timeout = locking.RunningTimeout(1.0, True)
+          dd_stats_timeout = utils.RunningTimeout(1.0, True)
 
       if dd_stats_timeout:
         dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000)
@@ -315,9 +326,9 @@ def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
                                " to exit", exit_timeout.Remaining())
                 else:
                   logging.info("Giving child process %0.2f seconds to exit",
-                               CHILD_LINGER_TIMEOUT)
+                               constants.CHILD_LINGER_TIMEOUT)
                   exit_timeout = \
-                    locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
+                    utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True)
           else:
             poller.unregister(fd)
             del fdmap[fd]
@@ -342,7 +353,7 @@ def ParseOptions():
   @return: Arguments to program
 
   """
-  global options # pylint: disable-msg=W0603
+  global options # pylint: disable=W0603
 
   parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
                                         (constants.IEM_IMPORT,
@@ -357,6 +368,10 @@ def ParseOptions():
                     help="X509 CA file")
   parser.add_option("--bind", dest="bind", action="store", type="string",
                     help="Bind address")
+  parser.add_option("--ipv4", dest="ipv4", action="store_true",
+                    help="Use IPv4 only")
+  parser.add_option("--ipv6", dest="ipv6", action="store_true",
+                    help="Use IPv6 only")
   parser.add_option("--host", dest="host", action="store", type="string",
                     help="Remote hostname")
   parser.add_option("--port", dest="port", action="store", type="int",
@@ -372,6 +387,11 @@ def ParseOptions():
                     type="choice", help="Compression method",
                     metavar="[%s]" % "|".join(constants.IEC_ALL),
                     choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
+  parser.add_option("--expected-size", dest="exp_size", action="store",
+                    type="string", default=None,
+                    help="Expected import/export size (MiB)")
+  parser.add_option("--magic", dest="magic", action="store",
+                    type="string", default=None, help="Magic string")
   parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
                     type="string", help="Command prefix")
   parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
@@ -390,11 +410,37 @@ def ParseOptions():
     # Won't return
     parser.error("Invalid mode: %s" % mode)
 
+  # Normalize and check parameters
+  if options.host is not None and not netutils.IPAddress.IsValid(options.host):
+    try:
+      options.host = netutils.Hostname.GetNormalizedName(options.host)
+    except errors.OpPrereqError, err:
+      parser.error("Invalid hostname '%s': %s" % (options.host, err))
+
+  if options.port is not None:
+    options.port = utils.ValidateServiceName(options.port)
+
+  if (options.exp_size is not None and
+      options.exp_size != constants.IE_CUSTOM_SIZE):
+    try:
+      options.exp_size = int(options.exp_size)
+    except (ValueError, TypeError), err:
+      # Won't return
+      parser.error("Invalid value for --expected-size: %s (%s)" %
+                   (options.exp_size, err))
+
+  if not (options.magic is None or constants.IE_MAGIC_RE.match(options.magic)):
+    parser.error("Magic must match regular expression %s" %
+                 constants.IE_MAGIC_RE.pattern)
+
+  if options.ipv4 and options.ipv6:
+    parser.error("Can only use one of --ipv4 and --ipv6")
+
   return (status_file_path, mode)
 
 
 class ChildProcess(subprocess.Popen):
-  def __init__(self, cmd, noclose_fds):
+  def __init__(self, env, cmd, noclose_fds):
     """Initializes this class.
 
     """
@@ -402,7 +448,7 @@ class ChildProcess(subprocess.Popen):
 
     # Not using close_fds because doing so would also close the socat stderr
     # pipe, which we still need.
-    subprocess.Popen.__init__(self, cmd, shell=False, close_fds=False,
+    subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False,
                               stderr=subprocess.PIPE, stdout=None, stdin=None,
                               preexec_fn=self._ChildPreexec)
     self._SetProcessGroup()
@@ -439,7 +485,7 @@ class ChildProcess(subprocess.Popen):
 
     """
     logging.info("Sending signal %s to child process", signum)
-    os.killpg(self.pid, signum)
+    utils.IgnoreProcessNotFound(os.killpg, self.pid, signum)
 
   def ForceQuit(self):
     """Ensure child process is no longer running.
@@ -474,16 +520,26 @@ def main():
       # Pipe to receive dd's PID
       (dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
 
+      # Pipe to receive size predicted by export script
+      (exp_size_read_fd, exp_size_write_fd) = os.pipe()
+
       # Get child process command
       cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
                                            dd_stderr_write_fd, dd_pid_write_fd)
       cmd = cmd_builder.GetCommand()
 
+      # Prepare command environment
+      cmd_env = os.environ.copy()
+
+      if options.exp_size == constants.IE_CUSTOM_SIZE:
+        cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd)
+
       logging.debug("Starting command %r", cmd)
 
       # Start child process
-      child = ChildProcess(cmd, [socat_stderr_write_fd, dd_stderr_write_fd,
-                                 dd_pid_write_fd])
+      child = ChildProcess(cmd_env, cmd,
+                           [socat_stderr_write_fd, dd_stderr_write_fd,
+                            dd_pid_write_fd, exp_size_write_fd])
       try:
         def _ForwardSignal(signum, _):
           """Forwards signals to child process.
@@ -506,9 +562,11 @@ def main():
             utils.RetryOnSignal(os.close, socat_stderr_write_fd)
             utils.RetryOnSignal(os.close, dd_stderr_write_fd)
             utils.RetryOnSignal(os.close, dd_pid_write_fd)
+            utils.RetryOnSignal(os.close, exp_size_write_fd)
 
             if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
-                              dd_pid_read_fd, status_file, child_logger,
+                              dd_pid_read_fd, exp_size_read_fd,
+                              status_file, child_logger,
                               signal_wakeup, signal_handler, mode):
               # The child closed all its file descriptors and there was no
               # signal
@@ -529,7 +587,7 @@ def main():
         errmsg = "Exited with status %s" % (child.returncode, )
 
       status_file.SetExitStatus(child.returncode, errmsg)
-    except Exception, err: # pylint: disable-msg=W0703
+    except Exception, err: # pylint: disable=W0703
       logging.exception("Unhandled error occurred")
       status_file.SetExitStatus(constants.EXIT_FAILURE,
                                 "Unhandled error occurred: %s" % (err, ))