from ganeti import constants
from ganeti import errors
from ganeti import utils
+from ganeti import netutils
#: Used to recognize point at which socat(1) starts to listen on its socket.
# Common options for socat
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
-SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
+SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1",
+ "cipher=%s" % constants.OPENSSL_CIPHERS]
+
+if constants.SOCAT_USE_COMPRESS:
+ # Disables all compression in by OpenSSL. Only supported in patched versions
+ # of socat (as of November 2010). See INSTALL for more information.
+ SOCAT_OPENSSL_OPTS.append("compress=none")
+
+SOCAT_OPTION_MAXLEN = 400
(PROG_OTHER,
PROG_SOCAT,
PROG_DD,
- PROG_DD_PID) = range(1, 5)
+ PROG_DD_PID,
+ PROG_EXP_SIZE) = range(1, 6)
PROG_ALL = frozenset([
PROG_OTHER,
PROG_SOCAT,
PROG_DD,
PROG_DD_PID,
+ PROG_EXP_SIZE,
])
self._dd_stderr_fd = dd_stderr_fd
self._dd_pid_fd = dd_pid_fd
+ assert (self._opts.magic is None or
+ constants.IE_MAGIC_RE.match(self._opts.magic))
+
@staticmethod
def GetBashCommand(cmd):
"""Prepares a command to be run in Bash.
if self._opts.bind is not None:
common_addr_opts.append("bind=%s" % self._opts.bind)
+ assert not (self._opts.ipv4 and self._opts.ipv6)
+
+ if self._opts.ipv4:
+ common_addr_opts.append("pf=ipv4")
+ elif self._opts.ipv6:
+ common_addr_opts.append("pf=ipv6")
+
if self._mode == constants.IEM_IMPORT:
if self._opts.port is None:
port = 0
addr2 = ["stdout"]
elif self._mode == constants.IEM_EXPORT:
+ if self._opts.host and netutils.IP6Address.IsValid(self._opts.host):
+ host = "[%s]" % self._opts.host
+ else:
+ host = self._opts.host
+
addr1 = ["stdin"]
addr2 = [
- "OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
+ "OPENSSL:%s:%s" % (host, self._opts.port),
# How long to wait per connection attempt
"connect-timeout=%s" % self._opts.connect_timeout,
for i in [addr1, addr2]:
for value in i:
+ if len(value) > SOCAT_OPTION_MAXLEN:
+ raise errors.GenericError("Socat option longer than %s"
+ " characters: %r" %
+ (SOCAT_OPTION_MAXLEN, value))
if "," in value:
raise errors.GenericError("Comma not allowed in socat option"
" value: %r" % value)
",".join(addr1), ",".join(addr2)
]
- def _GetTransportCommand(self):
- """Returns the command for the transport part of the daemon.
+ def _GetMagicCommand(self):
+ """Returns the command to read/write the magic value.
"""
- socat_cmd = ("%s 2>&%d" %
- (utils.ShellQuoteArgs(self._GetSocatCommand()),
- self._socat_stderr_fd))
+ if not self._opts.magic:
+ return None
+
+ # Prefix to ensure magic isn't interpreted as option to "echo"
+ magic = "M=%s" % self._opts.magic
+ cmd = StringIO()
+
+ if self._mode == constants.IEM_IMPORT:
+ cmd.write("{ ")
+ cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"]))
+ cmd.write(" && ")
+ cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic))
+ cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch"))
+ cmd.write(" exit 1;")
+ cmd.write("fi;")
+ cmd.write(" }")
+
+ elif self._mode == constants.IEM_EXPORT:
+ cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic]))
+
+ else:
+ raise errors.GenericError("Invalid mode '%s'" % self._mode)
+
+ return cmd.getvalue()
+
+ def _GetDdCommand(self):
+ """Returns the command for measuring throughput.
+
+ """
dd_cmd = StringIO()
+
+ magic_cmd = self._GetMagicCommand()
+ if magic_cmd:
+ dd_cmd.write("{ ")
+ dd_cmd.write(magic_cmd)
+ dd_cmd.write(" && ")
+
+ dd_cmd.write("{ ")
# Setting LC_ALL since we want to parse the output and explicitely
# redirecting stdin, as the background process (dd) would have /dev/null as
# stdin otherwise
- dd_cmd.write("{ LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
+ dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
(BUFSIZE, self._dd_stderr_fd))
# Send PID to daemon
dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
dd_cmd.write(" wait $pid;")
dd_cmd.write(" }")
+ if magic_cmd:
+ dd_cmd.write(" }")
+
+ return dd_cmd.getvalue()
+
+ def _GetTransportCommand(self):
+ """Returns the command for the transport part of the daemon.
+
+ """
+ socat_cmd = ("%s 2>&%d" %
+ (utils.ShellQuoteArgs(self._GetSocatCommand()),
+ self._socat_stderr_fd))
+ dd_cmd = self._GetDdCommand()
+
compr = self._opts.compress
assert compr in constants.IEC_ALL
+ parts = []
+
if self._mode == constants.IEM_IMPORT:
+ parts.append(socat_cmd)
+
if compr == constants.IEC_GZIP:
- transport_cmd = "%s | gunzip -c" % socat_cmd
- else:
- transport_cmd = socat_cmd
+ parts.append("gunzip -c")
+
+ parts.append(dd_cmd)
- transport_cmd += " | %s" % dd_cmd.getvalue()
elif self._mode == constants.IEM_EXPORT:
+ parts.append(dd_cmd)
+
if compr == constants.IEC_GZIP:
- transport_cmd = "gzip -c | %s" % socat_cmd
- else:
- transport_cmd = socat_cmd
+ parts.append("gzip -c")
+
+ parts.append(socat_cmd)
- transport_cmd = "%s | %s" % (dd_cmd.getvalue(), transport_cmd)
else:
raise errors.GenericError("Invalid mode '%s'" % self._mode)
# TODO: Run transport as separate user
# The transport uses its own shell to simplify running it as a separate user
# in the future.
- return self.GetBashCommand(transport_cmd)
+ return self.GetBashCommand(" | ".join(parts))
def GetCommand(self):
"""Returns the complete child process command.
"""Verify address given as listening address by socat.
"""
- # TODO: Implement IPv6 support
- if family != socket.AF_INET:
+ if family not in (socket.AF_INET, socket.AF_INET6):
raise errors.GenericError("Address family %r not supported" % family)
+ if (family == socket.AF_INET6 and address.startswith("[") and
+ address.endswith("]")):
+ address = address.lstrip("[").rstrip("]")
+
try:
packed_address = socket.inet_pton(family, address)
except socket.error:
class ChildIOProcessor(object):
- def __init__(self, debug, status_file, logger, throughput_samples):
+ def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
"""Initializes this class.
"""
self._dd_progress = []
# Expected size of transferred data
- self._exp_size = None
+ self._exp_size = exp_size
def GetLineSplitter(self, prog):
"""Returns the line splitter for a program.
raise
# Process no longer exists
+ logging.debug("dd exited")
self._dd_pid = None
return True
self._dd_pid = int(line)
forward_line = None
+ elif prog == PROG_EXP_SIZE:
+ logging.debug("Received predicted size %r", line)
+ forward_line = None
+
+ if line:
+ try:
+ exp_size = utils.BytesToMebibyte(int(line))
+ except (ValueError, TypeError), err:
+ logging.error("Failed to convert predicted size %r to number: %s",
+ line, err)
+ exp_size = None
+ else:
+ exp_size = None
+
+ self._exp_size = exp_size
+
if forward_line:
self._logger.info(forward_line)
self._status_file.AddRecentOutput(forward_line)