import logging
import optparse
import os
-import re
import select
import signal
-import socket
import subprocess
import sys
import time
-from cStringIO import StringIO
+import math
from ganeti import constants
from ganeti import cli
from ganeti import utils
+from ganeti import errors
from ganeti import serializer
from ganeti import objects
-from ganeti import locking
+from ganeti import impexpd
+from ganeti import netutils
-#: Used to recognize point at which socat(1) starts to listen on its socket.
-#: The local address is required for the remote peer to connect (in particular
-#: the port number).
-LISTENING_RE = re.compile(r"^listening on\s+"
- r"AF=(?P<family>\d+)\s+"
- r"(?P<address>.+):(?P<port>\d+)$", re.I)
-
-#: Used to recognize point at which socat(1) is sending data over the wire
-TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
- re.I)
-
-SOCAT_LOG_DEBUG = "D"
-SOCAT_LOG_INFO = "I"
-SOCAT_LOG_NOTICE = "N"
-SOCAT_LOG_WARNING = "W"
-SOCAT_LOG_ERROR = "E"
-SOCAT_LOG_FATAL = "F"
-
-SOCAT_LOG_IGNORE = frozenset([
- SOCAT_LOG_DEBUG,
- SOCAT_LOG_INFO,
- SOCAT_LOG_NOTICE,
- ])
-
-#: Socat buffer size: at most this many bytes are transferred per step
-SOCAT_BUFSIZE = 1024 * 1024
-
#: How many lines to keep in the status file
MAX_RECENT_OUTPUT_LINES = 20
#: Don't update status file more than once every 5 seconds (unless forced)
MIN_UPDATE_INTERVAL = 5.0
-#: Give child process up to 5 seconds to exit after sending a signal
-CHILD_LINGER_TIMEOUT = 5.0
-
#: How long to wait for a connection to be established
DEFAULT_CONNECT_TIMEOUT = 60
-# Common options for socat
-SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
-SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
+#: Get dd(1) statistics every few seconds
+DD_STATISTICS_INTERVAL = 5.0
+#: Seconds for throughput calculation
+DD_THROUGHPUT_INTERVAL = 60.0
-# Global variable for options
-options = None
+#: Number of samples for throughput calculation
+DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) /
+ DD_STATISTICS_INTERVAL))
-class Error(Exception):
- """Generic exception"""
+# Global variable for options
+options = None
def SetupLogging():
return child_logger
-def _VerifyListening(family, address, port):
- """Verify address given as listening address by socat.
-
- """
- # TODO: Implement IPv6 support
- if family != socket.AF_INET:
- raise Error("Address family %r not supported" % family)
-
- try:
- packed_address = socket.inet_pton(family, address)
- except socket.error:
- raise Error("Invalid address %r for family %s" % (address, family))
-
- return (socket.inet_ntop(family, packed_address), port)
-
-
class StatusFile:
"""Status file manager.
"""
return self._data.connected
+ def SetProgress(self, mbytes, throughput, percent, eta):
+ """Sets how much data has been transferred so far.
+
+ @type mbytes: number
+ @param mbytes: Transferred amount of data in MiB.
+ @type throughput: float
+ @param throughput: MiB/second
+ @type percent: number
+ @param percent: Percent processed
+ @type eta: number
+ @param eta: Expected number of seconds until done
+
+ """
+ self._data.progress_mbytes = mbytes
+ self._data.progress_throughput = throughput
+ self._data.progress_percent = percent
+ self._data.progress_eta = eta
+
def SetExitStatus(self, exit_status, error_message):
"""Sets the exit status and an error message.
mode=0400)
-def _ProcessSocatOutput(status_file, level, msg):
- """Interprets socat log output.
-
- """
- if level == SOCAT_LOG_NOTICE:
- if status_file.GetListenPort() is None:
- # TODO: Maybe implement timeout to not listen forever
- m = LISTENING_RE.match(msg)
- if m:
- (_, port) = _VerifyListening(int(m.group("family")), m.group("address"),
- int(m.group("port")))
-
- status_file.SetListenPort(port)
- return True
-
- if not status_file.GetConnected():
- m = TRANSFER_LOOP_RE.match(msg)
- if m:
- status_file.SetConnected()
- return True
-
- return False
-
-
-def ProcessOutput(line, status_file, logger, socat):
- """Takes care of child process output.
-
- @param status_file: Status file manager
- @param logger: Child output logger
- @type socat: bool
- @param socat: Whether it's a socat output line
- @type line: string
- @param line: Child output line
-
- """
- force_update = False
- forward_line = line
-
- if socat:
- level = None
- parts = line.split(None, 4)
-
- if len(parts) == 5:
- (_, _, _, level, msg) = parts
-
- force_update = _ProcessSocatOutput(status_file, level, msg)
-
- if options.debug or (level and level not in SOCAT_LOG_IGNORE):
- forward_line = "socat: %s %s" % (level, msg)
- else:
- forward_line = None
- else:
- forward_line = "socat: %s" % line
-
- if forward_line:
- logger.info(forward_line)
- status_file.AddRecentOutput(forward_line)
-
- status_file.Update(force_update)
-
-
-def GetBashCommand(cmd):
- """Prepares a command to be run in Bash.
-
- """
- return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
-
-
-def GetSocatCommand(mode):
- """Returns the socat command.
-
- """
- common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
- "key=%s" % options.key,
- "cert=%s" % options.cert,
- "cafile=%s" % options.ca,
- ]
-
- if options.bind is not None:
- common_addr_opts.append("bind=%s" % options.bind)
-
- if mode == constants.IEM_IMPORT:
- if options.port is None:
- port = 0
- else:
- port = options.port
-
- addr1 = [
- "OPENSSL-LISTEN:%s" % port,
- "reuseaddr",
-
- # Retry to listen if connection wasn't established successfully, up to
- # 100 times a second. Note that this still leaves room for DoS attacks.
- "forever",
- "intervall=0.01",
- ] + common_addr_opts
- addr2 = ["stdout"]
-
- elif mode == constants.IEM_EXPORT:
- addr1 = ["stdin"]
- addr2 = [
- "OPENSSL:%s:%s" % (options.host, options.port),
-
- # How long to wait per connection attempt
- "connect-timeout=%s" % options.connect_timeout,
-
- # Retry a few times before giving up to connect (once per second)
- "retry=%s" % options.connect_retries,
- "intervall=1",
- ] + common_addr_opts
-
- else:
- raise Error("Invalid mode")
-
- for i in [addr1, addr2]:
- for value in i:
- if "," in value:
- raise Error("Comma not allowed in socat option value: %r" % value)
-
- return [
- constants.SOCAT_PATH,
-
- # Log to stderr
- "-ls",
-
- # Log level
- "-d", "-d",
-
- # Buffer size
- "-b%s" % SOCAT_BUFSIZE,
-
- # Unidirectional mode, the first address is only used for reading, and the
- # second address is only used for writing
- "-u",
-
- ",".join(addr1), ",".join(addr2)
- ]
-
-
-def GetTransportCommand(mode, socat_stderr_fd):
- """Returns the command for the transport part of the daemon.
-
- @param mode: Daemon mode (import or export)
- @type socat_stderr_fd: int
- @param socat_stderr_fd: File descriptor socat should write its stderr to
-
- """
- socat_cmd = ("%s 2>&%d" %
- (utils.ShellQuoteArgs(GetSocatCommand(mode)),
- socat_stderr_fd))
-
- # TODO: Make compression configurable
-
- if mode == constants.IEM_IMPORT:
- transport_cmd = "%s | gunzip -c" % socat_cmd
- elif mode == constants.IEM_EXPORT:
- transport_cmd = "gzip -c | %s" % socat_cmd
- else:
- raise Error("Invalid mode")
-
- # TODO: Use "dd" to measure processed data (allows to give an ETA)
-
- # TODO: Run transport as separate user
- # The transport uses its own shell to simplify running it as a separate user
- # in the future.
- return GetBashCommand(transport_cmd)
-
-
-def GetCommand(mode, socat_stderr_fd):
- """Returns the complete child process command.
-
- """
- buf = StringIO()
-
- if options.cmd_prefix:
- buf.write(options.cmd_prefix)
- buf.write(" ")
-
- buf.write(utils.ShellQuoteArgs(GetTransportCommand(mode, socat_stderr_fd)))
-
- if options.cmd_suffix:
- buf.write(" ")
- buf.write(options.cmd_suffix)
-
- return GetBashCommand(buf.getvalue())
-
-
-def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
+def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
+ dd_pid_read_fd, exp_size_read_fd, status_file, child_logger,
signal_notify, signal_handler, mode):
"""Handles the child processes' output.
# might buffer data while poll(2) won't mark its file descriptor as
# readable again.
socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
+ dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
+ dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
+ exp_size_read = os.fdopen(exp_size_read_fd, "r", 0)
- script_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
- child_logger, False)
- try:
- socat_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
- child_logger, True)
- try:
- fdmap = {
- child.stderr.fileno(): (child.stderr, script_stderr_lines),
- socat_stderr_read.fileno(): (socat_stderr_read, socat_stderr_lines),
- signal_notify.fileno(): (signal_notify, None),
- }
-
- poller = select.poll()
- for fd in fdmap:
- utils.SetNonblockFlag(fd, True)
- poller.register(fd, select.POLLIN)
-
- if options.connect_timeout and mode == constants.IEM_IMPORT:
- listen_timeout = locking.RunningTimeout(options.connect_timeout, True)
- else:
- listen_timeout = None
+ tp_samples = DD_THROUGHPUT_SAMPLES
- exit_timeout = None
+ if options.exp_size == constants.IE_CUSTOM_SIZE:
+ exp_size = None
+ else:
+ exp_size = options.exp_size
- while True:
- # Break out of loop if only signal notify FD is left
- if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
+ child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
+ child_logger, tp_samples,
+ exp_size)
+ try:
+ fdmap = {
+ child.stderr.fileno():
+ (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)),
+ socat_stderr_read.fileno():
+ (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)),
+ dd_pid_read.fileno():
+ (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
+ dd_stderr_read.fileno():
+ (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
+ exp_size_read.fileno():
+ (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)),
+ signal_notify.fileno(): (signal_notify, None),
+ }
+
+ poller = select.poll()
+ for fd in fdmap:
+ utils.SetNonblockFlag(fd, True)
+ poller.register(fd, select.POLLIN)
+
+ if options.connect_timeout and mode == constants.IEM_IMPORT:
+ listen_timeout = utils.RunningTimeout(options.connect_timeout, True)
+ else:
+ listen_timeout = None
+
+ exit_timeout = None
+ dd_stats_timeout = None
+
+ while True:
+ # Break out of loop if only signal notify FD is left
+ if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
+ break
+
+ timeout = None
+
+ if listen_timeout and not exit_timeout:
+ assert mode == constants.IEM_IMPORT and options.connect_timeout
+ if status_file.GetConnected():
+ listen_timeout = None
+ elif listen_timeout.Remaining() < 0:
+ errmsg = ("Child process didn't establish connection in time"
+ " (%0.0fs), sending SIGTERM" % options.connect_timeout)
+ logging.error(errmsg)
+ status_file.AddRecentOutput(errmsg)
+ status_file.Update(True)
+
+ child.Kill(signal.SIGTERM)
+ exit_timeout = \
+ utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True)
+ # Next block will calculate timeout
+ else:
+ # Not yet connected, check again in a second
+ timeout = 1000
+
+ if exit_timeout:
+ timeout = exit_timeout.Remaining() * 1000
+ if timeout < 0:
+ logging.info("Child process didn't exit in time")
break
- timeout = None
-
- if listen_timeout and not exit_timeout:
- if status_file.GetConnected():
- listen_timeout = None
- elif listen_timeout.Remaining() < 0:
- logging.info("Child process didn't establish connection in time")
- child.Kill(signal.SIGTERM)
- exit_timeout = \
- locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
- # Next block will calculate timeout
+ if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0:
+ notify_status = child_io_proc.NotifyDd()
+ if notify_status:
+ # Schedule next notification
+ dd_stats_timeout = utils.RunningTimeout(DD_STATISTICS_INTERVAL, True)
+ else:
+ # Try again soon (dd isn't ready yet)
+ dd_stats_timeout = utils.RunningTimeout(1.0, True)
+
+ if dd_stats_timeout:
+ dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000)
+
+ if timeout is None:
+ timeout = dd_timeout
+ else:
+ timeout = min(timeout, dd_timeout)
+
+ for fd, event in utils.RetryOnSignal(poller.poll, timeout):
+ if event & (select.POLLIN | event & select.POLLPRI):
+ (from_, to) = fdmap[fd]
+
+ # Read up to 1 KB of data
+ data = from_.read(1024)
+ if data:
+ if to:
+ to.write(data)
+ elif fd == signal_notify.fileno():
+ # Signal handling
+ if signal_handler.called:
+ signal_handler.Clear()
+ if exit_timeout:
+ logging.info("Child process still has about %0.2f seconds"
+ " to exit", exit_timeout.Remaining())
+ else:
+ logging.info("Giving child process %0.2f seconds to exit",
+ constants.CHILD_LINGER_TIMEOUT)
+ exit_timeout = \
+ utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True)
else:
- # Not yet connected, check again in a second
- timeout = 1000
-
- if exit_timeout:
- timeout = exit_timeout.Remaining() * 1000
- if timeout < 0:
- logging.info("Child process didn't exit in time")
- break
-
- for fd, event in utils.RetryOnSignal(poller.poll, timeout):
- if event & (select.POLLIN | event & select.POLLPRI):
- (from_, to) = fdmap[fd]
-
- # Read up to 1 KB of data
- data = from_.read(1024)
- if data:
- if to:
- to.write(data)
- elif fd == signal_notify.fileno():
- # Signal handling
- if signal_handler.called:
- signal_handler.Clear()
- if exit_timeout:
- logging.info("Child process still has about %0.2f seconds"
- " to exit", exit_timeout.Remaining())
- else:
- logging.info("Giving child process %0.2f seconds to exit",
- CHILD_LINGER_TIMEOUT)
- exit_timeout = \
- locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
- else:
- poller.unregister(fd)
- del fdmap[fd]
-
- elif event & (select.POLLNVAL | select.POLLHUP |
- select.POLLERR):
poller.unregister(fd)
del fdmap[fd]
- script_stderr_lines.flush()
- socat_stderr_lines.flush()
+ elif event & (select.POLLNVAL | select.POLLHUP |
+ select.POLLERR):
+ poller.unregister(fd)
+ del fdmap[fd]
+
+ child_io_proc.FlushAll()
- # If there was a timeout calculator, we were waiting for the child to
- # finish, e.g. due to a signal
- return not bool(exit_timeout)
- finally:
- socat_stderr_lines.close()
+ # If there was a timeout calculator, we were waiting for the child to
+ # finish, e.g. due to a signal
+ return not bool(exit_timeout)
finally:
- script_stderr_lines.close()
+ child_io_proc.CloseAll()
def ParseOptions():
help="X509 CA file")
parser.add_option("--bind", dest="bind", action="store", type="string",
help="Bind address")
+ parser.add_option("--ipv4", dest="ipv4", action="store_true",
+ help="Use IPv4 only")
+ parser.add_option("--ipv6", dest="ipv6", action="store_true",
+ help="Use IPv6 only")
parser.add_option("--host", dest="host", action="store", type="string",
help="Remote hostname")
parser.add_option("--port", dest="port", action="store", type="int",
parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
type="int", default=DEFAULT_CONNECT_TIMEOUT,
help="Timeout for connection to be established (seconds)")
+ parser.add_option("--compress", dest="compress", action="store",
+ type="choice", help="Compression method",
+ metavar="[%s]" % "|".join(constants.IEC_ALL),
+ choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
+ parser.add_option("--expected-size", dest="exp_size", action="store",
+ type="string", default=None,
+ help="Expected import/export size (MiB)")
+ parser.add_option("--magic", dest="magic", action="store",
+ type="string", default=None, help="Magic string")
parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
type="string", help="Command prefix")
parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
# Won't return
parser.error("Invalid mode: %s" % mode)
+ # Normalize and check parameters
+ if options.host is not None and not netutils.IPAddress.IsValid(options.host):
+ try:
+ options.host = netutils.Hostname.GetNormalizedName(options.host)
+ except errors.OpPrereqError, err:
+ parser.error("Invalid hostname '%s': %s" % (options.host, err))
+
+ if options.port is not None:
+ options.port = utils.ValidateServiceName(options.port)
+
+ if (options.exp_size is not None and
+ options.exp_size != constants.IE_CUSTOM_SIZE):
+ try:
+ options.exp_size = int(options.exp_size)
+ except (ValueError, TypeError), err:
+ # Won't return
+ parser.error("Invalid value for --expected-size: %s (%s)" %
+ (options.exp_size, err))
+
+ if not (options.magic is None or constants.IE_MAGIC_RE.match(options.magic)):
+ parser.error("Magic must match regular expression %s" %
+ constants.IE_MAGIC_RE.pattern)
+
+ if options.ipv4 and options.ipv6:
+ parser.error("Can only use one of --ipv4 and --ipv6")
+
return (status_file_path, mode)
class ChildProcess(subprocess.Popen):
- def __init__(self, cmd, noclose_fds):
+ def __init__(self, env, cmd, noclose_fds):
"""Initializes this class.
"""
# Not using close_fds because doing so would also close the socat stderr
# pipe, which we still need.
- subprocess.Popen.__init__(self, cmd, shell=False, close_fds=False,
+ subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False,
stderr=subprocess.PIPE, stdout=None, stdin=None,
preexec_fn=self._ChildPreexec)
self._SetProcessGroup()
"""
logging.info("Sending signal %s to child process", signum)
- os.killpg(self.pid, signum)
+ utils.IgnoreProcessNotFound(os.killpg, self.pid, signum)
def ForceQuit(self):
"""Ensure child process is no longer running.
# Pipe to receive socat's stderr output
(socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
+ # Pipe to receive dd's stderr output
+ (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe()
+
+ # Pipe to receive dd's PID
+ (dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
+
+ # Pipe to receive size predicted by export script
+ (exp_size_read_fd, exp_size_write_fd) = os.pipe()
+
# Get child process command
- cmd = GetCommand(mode, socat_stderr_write_fd)
+ cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
+ dd_stderr_write_fd, dd_pid_write_fd)
+ cmd = cmd_builder.GetCommand()
+
+ # Prepare command environment
+ cmd_env = os.environ.copy()
+
+ if options.exp_size == constants.IE_CUSTOM_SIZE:
+ cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd)
logging.debug("Starting command %r", cmd)
# Start child process
- child = ChildProcess(cmd, [socat_stderr_write_fd])
+ child = ChildProcess(cmd_env, cmd,
+ [socat_stderr_write_fd, dd_stderr_write_fd,
+ dd_pid_write_fd, exp_size_write_fd])
try:
def _ForwardSignal(signum, _):
"""Forwards signals to child process.
try:
# Close child's side
utils.RetryOnSignal(os.close, socat_stderr_write_fd)
-
- if ProcessChildIO(child, socat_stderr_read_fd, status_file,
- child_logger, signal_wakeup, signal_handler,
- mode):
+ utils.RetryOnSignal(os.close, dd_stderr_write_fd)
+ utils.RetryOnSignal(os.close, dd_pid_write_fd)
+ utils.RetryOnSignal(os.close, exp_size_write_fd)
+
+ if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
+ dd_pid_read_fd, exp_size_read_fd,
+ status_file, child_logger,
+ signal_wakeup, signal_handler, mode):
# The child closed all its file descriptors and there was no
# signal
# TODO: Implement timeout instead of waiting indefinitely