cmdlib: Adding hv/disk state dict helper functions
[ganeti-local] / lib / impexpd / __init__.py
index 4f9efda..cf3a8ca 100644 (file)
@@ -35,6 +35,7 @@ from cStringIO import StringIO
 from ganeti import constants
 from ganeti import errors
 from ganeti import utils
+from ganeti import netutils
 
 
 #: Used to recognize point at which socat(1) starts to listen on its socket.
@@ -77,17 +78,27 @@ BUFSIZE = 1024 * 1024
 
 # Common options for socat
 SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
-SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
+SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1",
+                      "cipher=%s" % constants.OPENSSL_CIPHERS]
+
+if constants.SOCAT_USE_COMPRESS:
+  # Disables all compression in by OpenSSL. Only supported in patched versions
+  # of socat (as of November 2010). See INSTALL for more information.
+  SOCAT_OPENSSL_OPTS.append("compress=none")
+
+SOCAT_OPTION_MAXLEN = 400
 
 (PROG_OTHER,
  PROG_SOCAT,
  PROG_DD,
- PROG_DD_PID) = range(1, 5)
+ PROG_DD_PID,
+ PROG_EXP_SIZE) = range(1, 6)
 PROG_ALL = frozenset([
   PROG_OTHER,
   PROG_SOCAT,
   PROG_DD,
   PROG_DD_PID,
+  PROG_EXP_SIZE,
   ])
 
 
@@ -111,6 +122,9 @@ class CommandBuilder(object):
     self._dd_stderr_fd = dd_stderr_fd
     self._dd_pid_fd = dd_pid_fd
 
+    assert (self._opts.magic is None or
+            constants.IE_MAGIC_RE.match(self._opts.magic))
+
   @staticmethod
   def GetBashCommand(cmd):
     """Prepares a command to be run in Bash.
@@ -131,6 +145,13 @@ class CommandBuilder(object):
     if self._opts.bind is not None:
       common_addr_opts.append("bind=%s" % self._opts.bind)
 
+    assert not (self._opts.ipv4 and self._opts.ipv6)
+
+    if self._opts.ipv4:
+      common_addr_opts.append("pf=ipv4")
+    elif self._opts.ipv6:
+      common_addr_opts.append("pf=ipv6")
+
     if self._mode == constants.IEM_IMPORT:
       if self._opts.port is None:
         port = 0
@@ -149,9 +170,14 @@ class CommandBuilder(object):
       addr2 = ["stdout"]
 
     elif self._mode == constants.IEM_EXPORT:
+      if self._opts.host and netutils.IP6Address.IsValid(self._opts.host):
+        host = "[%s]" % self._opts.host
+      else:
+        host = self._opts.host
+
       addr1 = ["stdin"]
       addr2 = [
-        "OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
+        "OPENSSL:%s:%s" % (host, self._opts.port),
 
         # How long to wait per connection attempt
         "connect-timeout=%s" % self._opts.connect_timeout,
@@ -166,6 +192,10 @@ class CommandBuilder(object):
 
     for i in [addr1, addr2]:
       for value in i:
+        if len(value) > SOCAT_OPTION_MAXLEN:
+          raise errors.GenericError("Socat option longer than %s"
+                                    " characters: %r" %
+                                    (SOCAT_OPTION_MAXLEN, value))
         if "," in value:
           raise errors.GenericError("Comma not allowed in socat option"
                                     " value: %r" % value)
@@ -189,19 +219,53 @@ class CommandBuilder(object):
       ",".join(addr1), ",".join(addr2)
       ]
 
-  def _GetTransportCommand(self):
-    """Returns the command for the transport part of the daemon.
+  def _GetMagicCommand(self):
+    """Returns the command to read/write the magic value.
 
     """
-    socat_cmd = ("%s 2>&%d" %
-                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
-                  self._socat_stderr_fd))
+    if not self._opts.magic:
+      return None
+
+    # Prefix to ensure magic isn't interpreted as option to "echo"
+    magic = "M=%s" % self._opts.magic
 
+    cmd = StringIO()
+
+    if self._mode == constants.IEM_IMPORT:
+      cmd.write("{ ")
+      cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"]))
+      cmd.write(" && ")
+      cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic))
+      cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch"))
+      cmd.write(" exit 1;")
+      cmd.write("fi;")
+      cmd.write(" }")
+
+    elif self._mode == constants.IEM_EXPORT:
+      cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic]))
+
+    else:
+      raise errors.GenericError("Invalid mode '%s'" % self._mode)
+
+    return cmd.getvalue()
+
+  def _GetDdCommand(self):
+    """Returns the command for measuring throughput.
+
+    """
     dd_cmd = StringIO()
+
+    magic_cmd = self._GetMagicCommand()
+    if magic_cmd:
+      dd_cmd.write("{ ")
+      dd_cmd.write(magic_cmd)
+      dd_cmd.write(" && ")
+
+    dd_cmd.write("{ ")
     # Setting LC_ALL since we want to parse the output and explicitely
     # redirecting stdin, as the background process (dd) would have /dev/null as
     # stdin otherwise
-    dd_cmd.write("{ LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
+    dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
                  (BUFSIZE, self._dd_stderr_fd))
     # Send PID to daemon
     dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
@@ -209,31 +273,49 @@ class CommandBuilder(object):
     dd_cmd.write(" wait $pid;")
     dd_cmd.write(" }")
 
+    if magic_cmd:
+      dd_cmd.write(" }")
+
+    return dd_cmd.getvalue()
+
+  def _GetTransportCommand(self):
+    """Returns the command for the transport part of the daemon.
+
+    """
+    socat_cmd = ("%s 2>&%d" %
+                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
+                  self._socat_stderr_fd))
+    dd_cmd = self._GetDdCommand()
+
     compr = self._opts.compress
 
     assert compr in constants.IEC_ALL
 
+    parts = []
+
     if self._mode == constants.IEM_IMPORT:
+      parts.append(socat_cmd)
+
       if compr == constants.IEC_GZIP:
-        transport_cmd = "%s | gunzip -c" % socat_cmd
-      else:
-        transport_cmd = socat_cmd
+        parts.append("gunzip -c")
+
+      parts.append(dd_cmd)
 
-      transport_cmd += " | %s" % dd_cmd.getvalue()
     elif self._mode == constants.IEM_EXPORT:
+      parts.append(dd_cmd)
+
       if compr == constants.IEC_GZIP:
-        transport_cmd = "gzip -c | %s" % socat_cmd
-      else:
-        transport_cmd = socat_cmd
+        parts.append("gzip -c")
+
+      parts.append(socat_cmd)
 
-      transport_cmd = "%s | %s" % (dd_cmd.getvalue(), transport_cmd)
     else:
       raise errors.GenericError("Invalid mode '%s'" % self._mode)
 
     # TODO: Run transport as separate user
     # The transport uses its own shell to simplify running it as a separate user
     # in the future.
-    return self.GetBashCommand(transport_cmd)
+    return self.GetBashCommand(" | ".join(parts))
 
   def GetCommand(self):
     """Returns the complete child process command.
@@ -260,10 +342,13 @@ def _VerifyListening(family, address, port):
   """Verify address given as listening address by socat.
 
   """
-  # TODO: Implement IPv6 support
-  if family != socket.AF_INET:
+  if family not in (socket.AF_INET, socket.AF_INET6):
     raise errors.GenericError("Address family %r not supported" % family)
 
+  if (family == socket.AF_INET6 and address.startswith("[") and
+      address.endswith("]")):
+    address = address.lstrip("[").rstrip("]")
+
   try:
     packed_address = socket.inet_pton(family, address)
   except socket.error:
@@ -274,7 +359,7 @@ def _VerifyListening(family, address, port):
 
 
 class ChildIOProcessor(object):
-  def __init__(self, debug, status_file, logger, throughput_samples):
+  def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
     """Initializes this class.
 
     """
@@ -291,7 +376,7 @@ class ChildIOProcessor(object):
     self._dd_progress = []
 
     # Expected size of transferred data
-    self._exp_size = None
+    self._exp_size = exp_size
 
   def GetLineSplitter(self, prog):
     """Returns the line splitter for a program.
@@ -342,6 +427,7 @@ class ChildIOProcessor(object):
         raise
 
       # Process no longer exists
+      logging.debug("dd exited")
       self._dd_pid = None
 
     return True
@@ -389,6 +475,22 @@ class ChildIOProcessor(object):
       self._dd_pid = int(line)
       forward_line = None
 
+    elif prog == PROG_EXP_SIZE:
+      logging.debug("Received predicted size %r", line)
+      forward_line = None
+
+      if line:
+        try:
+          exp_size = utils.BytesToMebibyte(int(line))
+        except (ValueError, TypeError), err:
+          logging.error("Failed to convert predicted size %r to number: %s",
+                        line, err)
+          exp_size = None
+      else:
+        exp_size = None
+
+      self._exp_size = exp_size
+
     if forward_line:
       self._logger.info(forward_line)
       self._status_file.AddRecentOutput(forward_line)