X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/4ca693ca220138a4bcc47b19b9766c9e3a4c6c1f..2e6469a14ae2b2c7b547076efe3fdc8eacae6050:/daemons/import-export diff --git a/daemons/import-export b/daemons/import-export index 748ab39..ce9c3bd 100755 --- a/daemons/import-export +++ b/daemons/import-export @@ -30,50 +30,24 @@ import errno import logging import optparse import os -import re import select import signal -import socket import subprocess import sys import time -from cStringIO import StringIO +import math from ganeti import constants from ganeti import cli from ganeti import utils +from ganeti import errors from ganeti import serializer from ganeti import objects from ganeti import locking +from ganeti import impexpd +from ganeti import netutils -#: Used to recognize point at which socat(1) starts to listen on its socket. -#: The local address is required for the remote peer to connect (in particular -#: the port number). -LISTENING_RE = re.compile(r"^listening on\s+" - r"AF=(?P\d+)\s+" - r"(?P
.+):(?P\d+)$", re.I) - -#: Used to recognize point at which socat(1) is sending data over the wire -TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$", - re.I) - -SOCAT_LOG_DEBUG = "D" -SOCAT_LOG_INFO = "I" -SOCAT_LOG_NOTICE = "N" -SOCAT_LOG_WARNING = "W" -SOCAT_LOG_ERROR = "E" -SOCAT_LOG_FATAL = "F" - -SOCAT_LOG_IGNORE = frozenset([ - SOCAT_LOG_DEBUG, - SOCAT_LOG_INFO, - SOCAT_LOG_NOTICE, - ]) - -#: Socat buffer size: at most this many bytes are transferred per step -SOCAT_BUFSIZE = 1024 * 1024 - #: How many lines to keep in the status file MAX_RECENT_OUTPUT_LINES = 20 @@ -83,18 +57,22 @@ MIN_UPDATE_INTERVAL = 5.0 #: Give child process up to 5 seconds to exit after sending a signal CHILD_LINGER_TIMEOUT = 5.0 -# Common options for socat -SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"] -SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"] -SOCAT_CONNECT_TIMEOUT = 60 +#: How long to wait for a connection to be established +DEFAULT_CONNECT_TIMEOUT = 60 +#: Get dd(1) statistics every few seconds +DD_STATISTICS_INTERVAL = 5.0 -# Global variable for options -options = None +#: Seconds for throughput calculation +DD_THROUGHPUT_INTERVAL = 60.0 + +#: Number of samples for throughput calculation +DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) / + DD_STATISTICS_INTERVAL)) -class Error(Exception): - """Generic exception""" +# Global variable for options +options = None def SetupLogging(): @@ -125,22 +103,6 @@ def SetupLogging(): return child_logger -def _VerifyListening(family, address, port): - """Verify address given as listening address by socat. - - """ - # TODO: Implement IPv6 support - if family != socket.AF_INET: - raise Error("Address family %r not supported" % family) - - try: - packed_address = socket.inet_pton(family, address) - except socket.error: - raise Error("Invalid address %r for family %s" % (address, family)) - - return (socket.inet_ntop(family, packed_address), port) - - class StatusFile: """Status file manager. @@ -185,6 +147,30 @@ class StatusFile: """ self._data.connected = True + def GetConnected(self): + """Determines whether the daemon is connected. + + """ + return self._data.connected + + def SetProgress(self, mbytes, throughput, percent, eta): + """Sets how much data has been transferred so far. + + @type mbytes: number + @param mbytes: Transferred amount of data in MiB. + @type throughput: float + @param throughput: MiB/second + @type percent: number + @param percent: Percent processed + @type eta: number + @param eta: Expected number of seconds until done + + """ + self._data.progress_mbytes = mbytes + self._data.progress_throughput = throughput + self._data.progress_percent = percent + self._data.progress_eta = eta + def SetExitStatus(self, exit_status, error_message): """Sets the exit status and an error message. @@ -222,260 +208,142 @@ class StatusFile: mode=0400) -def _ProcessSocatOutput(status_file, level, msg): - """Interprets socat log output. - - """ - if level == SOCAT_LOG_NOTICE: - if status_file.GetListenPort() is None: - # TODO: Maybe implement timeout to not listen forever - m = LISTENING_RE.match(msg) - if m: - (_, port) = _VerifyListening(int(m.group("family")), m.group("address"), - int(m.group("port"))) - - status_file.SetListenPort(port) - return True - - m = TRANSFER_LOOP_RE.match(msg) - if m: - status_file.SetConnected() - return True - - return False - - -def ProcessOutput(line, status_file, logger, socat): - """Takes care of child process output. - - @param status_file: Status file manager - @param logger: Child output logger - @type socat: bool - @param socat: Whether it's a socat output line - @type line: string - @param line: Child output line - - """ - force_update = False - forward_line = line - - if socat: - level = None - parts = line.split(None, 4) - - if len(parts) == 5: - (_, _, _, level, msg) = parts - - force_update = _ProcessSocatOutput(status_file, level, msg) - - if options.debug or (level and level not in SOCAT_LOG_IGNORE): - forward_line = "socat: %s %s" % (level, msg) - else: - forward_line = None - else: - forward_line = "socat: %s" % line - - if forward_line: - logger.info(forward_line) - status_file.AddRecentOutput(forward_line) - - status_file.Update(force_update) - - -def GetBashCommand(cmd): - """Prepares a command to be run in Bash. - - """ - return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd] - - -def GetSocatCommand(mode): - """Returns the socat command. +def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, + dd_pid_read_fd, exp_size_read_fd, status_file, child_logger, + signal_notify, signal_handler, mode): + """Handles the child processes' output. """ - common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [ - "key=%s" % options.key, - "cert=%s" % options.cert, - "cafile=%s" % options.ca, - ] - - if options.bind is not None: - common_addr_opts.append("bind=%s" % options.bind) - - if mode == constants.IEM_IMPORT: - if options.port is None: - port = 0 - else: - port = options.port - - addr1 = [ - "OPENSSL-LISTEN:%s" % port, - "reuseaddr", - ] + common_addr_opts - addr2 = ["stdout"] - - elif mode == constants.IEM_EXPORT: - addr1 = ["stdin"] - addr2 = [ - "OPENSSL:%s:%s" % (options.host, options.port), - "connect-timeout=%s" % SOCAT_CONNECT_TIMEOUT, - ] + common_addr_opts - - else: - raise Error("Invalid mode") - - for i in [addr1, addr2]: - for value in i: - if "," in value: - raise Error("Comma not allowed in socat option value: %r" % value) - - return [ - constants.SOCAT_PATH, - - # Log to stderr - "-ls", - - # Log level - "-d", "-d", - - # Buffer size - "-b%s" % SOCAT_BUFSIZE, - - # Unidirectional mode, the first address is only used for reading, and the - # second address is only used for writing - "-u", + assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \ + "Other signals are not handled in this function" - ",".join(addr1), ",".join(addr2) - ] + # Buffer size 0 is important, otherwise .read() with a specified length + # might buffer data while poll(2) won't mark its file descriptor as + # readable again. + socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0) + dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0) + dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0) + exp_size_read = os.fdopen(exp_size_read_fd, "r", 0) + tp_samples = DD_THROUGHPUT_SAMPLES -def GetTransportCommand(mode, socat_stderr_fd): - """Returns the command for the transport part of the daemon. - - @param mode: Daemon mode (import or export) - @type socat_stderr_fd: int - @param socat_stderr_fd: File descriptor socat should write its stderr to - - """ - socat_cmd = ("%s 2>&%d" % - (utils.ShellQuoteArgs(GetSocatCommand(mode)), - socat_stderr_fd)) - - # TODO: Make compression configurable - - if mode == constants.IEM_IMPORT: - transport_cmd = "%s | gunzip -c" % socat_cmd - elif mode == constants.IEM_EXPORT: - transport_cmd = "gzip -c | %s" % socat_cmd + if options.exp_size == constants.IE_CUSTOM_SIZE: + exp_size = None else: - raise Error("Invalid mode") - - # TODO: Use "dd" to measure processed data (allows to give an ETA) - # TODO: If a connection to socat is dropped (e.g. due to a wrong - # certificate), socat should be restarted - - # TODO: Run transport as separate user - # The transport uses its own shell to simplify running it as a separate user - # in the future. - return GetBashCommand(transport_cmd) - - -def GetCommand(mode, socat_stderr_fd): - """Returns the complete child process command. - - """ - buf = StringIO() - - if options.cmd_prefix: - buf.write(options.cmd_prefix) - buf.write(" ") - - buf.write(utils.ShellQuoteArgs(GetTransportCommand(mode, socat_stderr_fd))) - - if options.cmd_suffix: - buf.write(" ") - buf.write(options.cmd_suffix) - - return GetBashCommand(buf.getvalue()) + exp_size = options.exp_size + child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file, + child_logger, tp_samples, + exp_size) + try: + fdmap = { + child.stderr.fileno(): + (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)), + socat_stderr_read.fileno(): + (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)), + dd_pid_read.fileno(): + (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)), + dd_stderr_read.fileno(): + (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)), + exp_size_read.fileno(): + (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)), + signal_notify.fileno(): (signal_notify, None), + } + + poller = select.poll() + for fd in fdmap: + utils.SetNonblockFlag(fd, True) + poller.register(fd, select.POLLIN) + + if options.connect_timeout and mode == constants.IEM_IMPORT: + listen_timeout = locking.RunningTimeout(options.connect_timeout, True) + else: + listen_timeout = None + + exit_timeout = None + dd_stats_timeout = None + + while True: + # Break out of loop if only signal notify FD is left + if len(fdmap) == 1 and signal_notify.fileno() in fdmap: + break + + timeout = None + + if listen_timeout and not exit_timeout: + if status_file.GetConnected(): + listen_timeout = None + elif listen_timeout.Remaining() < 0: + logging.info("Child process didn't establish connection in time") + child.Kill(signal.SIGTERM) + exit_timeout = \ + locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True) + # Next block will calculate timeout + else: + # Not yet connected, check again in a second + timeout = 1000 -def ProcessChildIO(child, socat_stderr_read, status_file, child_logger, - signal_notify, signal_handler): - """Handles the child processes' output. + if exit_timeout: + timeout = exit_timeout.Remaining() * 1000 + if timeout < 0: + logging.info("Child process didn't exit in time") + break - """ - poller = select.poll() + if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0: + notify_status = child_io_proc.NotifyDd() + if notify_status: + # Schedule next notification + dd_stats_timeout = locking.RunningTimeout(DD_STATISTICS_INTERVAL, + True) + else: + # Try again soon (dd isn't ready yet) + dd_stats_timeout = locking.RunningTimeout(1.0, True) - script_stderr_lines = utils.LineSplitter(ProcessOutput, status_file, - child_logger, False) - try: - socat_stderr_lines = utils.LineSplitter(ProcessOutput, status_file, - child_logger, True) - try: - fdmap = { - child.stderr.fileno(): (child.stderr, script_stderr_lines), - socat_stderr_read.fileno(): (socat_stderr_read, socat_stderr_lines), - signal_notify.fileno(): (signal_notify, None), - } - - for fd in fdmap: - utils.SetNonblockFlag(fd, True) - poller.register(fd, select.POLLIN) - - timeout_calculator = None - while True: - # Break out of loop if only signal notify FD is left - if len(fdmap) == 1 and signal_notify.fileno() in fdmap: - break + if dd_stats_timeout: + dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000) - if timeout_calculator: - timeout = timeout_calculator.Remaining() * 1000 - if timeout < 0: - logging.info("Child process didn't exit in time") - break + if timeout is None: + timeout = dd_timeout else: - timeout = None - - for fd, event in utils.RetryOnSignal(poller.poll, timeout): - if event & (select.POLLIN | event & select.POLLPRI): - (from_, to) = fdmap[fd] - - # Read up to 1 KB of data - data = from_.read(1024) - if data: - if to: - to.write(data) - elif fd == signal_notify.fileno(): - # Signal handling - if signal_handler.called: - signal_handler.Clear() - if timeout_calculator: - logging.info("Child process still has about %0.2f seconds" - " to exit", timeout_calculator.Remaining()) - else: - logging.info("Giving child process %0.2f seconds to exit", - CHILD_LINGER_TIMEOUT) - timeout_calculator = \ - locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True) - else: - poller.unregister(fd) - del fdmap[fd] - - elif event & (select.POLLNVAL | select.POLLHUP | - select.POLLERR): + timeout = min(timeout, dd_timeout) + + for fd, event in utils.RetryOnSignal(poller.poll, timeout): + if event & (select.POLLIN | event & select.POLLPRI): + (from_, to) = fdmap[fd] + + # Read up to 1 KB of data + data = from_.read(1024) + if data: + if to: + to.write(data) + elif fd == signal_notify.fileno(): + # Signal handling + if signal_handler.called: + signal_handler.Clear() + if exit_timeout: + logging.info("Child process still has about %0.2f seconds" + " to exit", exit_timeout.Remaining()) + else: + logging.info("Giving child process %0.2f seconds to exit", + CHILD_LINGER_TIMEOUT) + exit_timeout = \ + locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True) + else: poller.unregister(fd) del fdmap[fd] - script_stderr_lines.flush() - socat_stderr_lines.flush() + elif event & (select.POLLNVAL | select.POLLHUP | + select.POLLERR): + poller.unregister(fd) + del fdmap[fd] + + child_io_proc.FlushAll() - # If there was a timeout calculator, we were waiting for the child to - # finish, e.g. due to a signal - return not bool(timeout_calculator) - finally: - socat_stderr_lines.close() + # If there was a timeout calculator, we were waiting for the child to + # finish, e.g. due to a signal + return not bool(exit_timeout) finally: - script_stderr_lines.close() + child_io_proc.CloseAll() def ParseOptions(): @@ -503,6 +371,22 @@ def ParseOptions(): help="Remote hostname") parser.add_option("--port", dest="port", action="store", type="int", help="Remote port") + parser.add_option("--connect-retries", dest="connect_retries", action="store", + type="int", default=0, + help=("How many times the connection should be retried" + " (export only)")) + parser.add_option("--connect-timeout", dest="connect_timeout", action="store", + type="int", default=DEFAULT_CONNECT_TIMEOUT, + help="Timeout for connection to be established (seconds)") + parser.add_option("--compress", dest="compress", action="store", + type="choice", help="Compression method", + metavar="[%s]" % "|".join(constants.IEC_ALL), + choices=list(constants.IEC_ALL), default=constants.IEC_GZIP) + parser.add_option("--expected-size", dest="exp_size", action="store", + type="string", default=None, + help="Expected import/export size (MiB)") + parser.add_option("--magic", dest="magic", action="store", + type="string", default=None, help="Magic string") parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store", type="string", help="Command prefix") parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store", @@ -521,11 +405,34 @@ def ParseOptions(): # Won't return parser.error("Invalid mode: %s" % mode) + # Normalize and check parameters + if options.host is not None: + try: + options.host = netutils.Hostname.GetNormalizedName(options.host) + except errors.OpPrereqError, err: + parser.error("Invalid hostname '%s': %s" % (options.host, err)) + + if options.port is not None: + options.port = utils.ValidateServiceName(options.port) + + if (options.exp_size is not None and + options.exp_size != constants.IE_CUSTOM_SIZE): + try: + options.exp_size = int(options.exp_size) + except (ValueError, TypeError), err: + # Won't return + parser.error("Invalid value for --expected-size: %s (%s)" % + (options.exp_size, err)) + + if not (options.magic is None or constants.IE_MAGIC_RE.match(options.magic)): + parser.error("Magic must match regular expression %s" % + constants.IE_MAGIC_RE.pattern) + return (status_file_path, mode) class ChildProcess(subprocess.Popen): - def __init__(self, cmd, noclose_fds): + def __init__(self, env, cmd, noclose_fds): """Initializes this class. """ @@ -533,7 +440,7 @@ class ChildProcess(subprocess.Popen): # Not using close_fds because doing so would also close the socat stderr # pipe, which we still need. - subprocess.Popen.__init__(self, cmd, shell=False, close_fds=False, + subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False, stderr=subprocess.PIPE, stdout=None, stdin=None, preexec_fn=self._ChildPreexec) self._SetProcessGroup() @@ -570,7 +477,7 @@ class ChildProcess(subprocess.Popen): """ logging.info("Sending signal %s to child process", signum) - os.killpg(self.pid, signum) + utils.IgnoreProcessNotFound(os.killpg, self.pid, signum) def ForceQuit(self): """Ensure child process is no longer running. @@ -599,59 +506,68 @@ def main(): # Pipe to receive socat's stderr output (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe() - # Pipe to notify on signals - (signal_notify_read_fd, signal_notify_write_fd) = os.pipe() + # Pipe to receive dd's stderr output + (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe() - # Configure signal module's notifier - try: - # This is only supported in Python 2.5 and above (some distributions - # backported it to Python 2.4) - set_wakeup_fd_fn = signal.set_wakeup_fd - except AttributeError: - pass - else: - set_wakeup_fd_fn(signal_notify_write_fd) + # Pipe to receive dd's PID + (dd_pid_read_fd, dd_pid_write_fd) = os.pipe() - # Buffer size 0 is important, otherwise .read() with a specified length - # might buffer data while poll(2) won't mark its file descriptor as - # readable again. - socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0) - signal_notify_read = os.fdopen(signal_notify_read_fd, "r", 0) + # Pipe to receive size predicted by export script + (exp_size_read_fd, exp_size_write_fd) = os.pipe() # Get child process command - cmd = GetCommand(mode, socat_stderr_write_fd) + cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd, + dd_stderr_write_fd, dd_pid_write_fd) + cmd = cmd_builder.GetCommand() + + # Prepare command environment + cmd_env = os.environ.copy() + + if options.exp_size == constants.IE_CUSTOM_SIZE: + cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd) logging.debug("Starting command %r", cmd) # Start child process - child = ChildProcess(cmd, [socat_stderr_write_fd]) + child = ChildProcess(cmd_env, cmd, + [socat_stderr_write_fd, dd_stderr_write_fd, + dd_pid_write_fd, exp_size_write_fd]) try: - # Forward signals to child process def _ForwardSignal(signum, _): - # Wake up poll(2) - os.write(signal_notify_write_fd, "\0") + """Forwards signals to child process. - # Send signal to child + """ child.Kill(signum) - # TODO: There is a race condition between starting the child and - # handling the signals here. While there might be a way to work around - # it by registering the handlers before starting the child and - # deferring sent signals until the child is available, doing so can be - # complicated. - signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT], - handler_fn=_ForwardSignal) + signal_wakeup = utils.SignalWakeupFd() try: - # Close child's side - utils.RetryOnSignal(os.close, socat_stderr_write_fd) - - if ProcessChildIO(child, socat_stderr_read, status_file, child_logger, - signal_notify_read, signal_handler): - # The child closed all its file descriptors and there was no signal - # TODO: Implement timeout instead of waiting indefinitely - utils.RetryOnSignal(child.wait) + # TODO: There is a race condition between starting the child and + # handling the signals here. While there might be a way to work around + # it by registering the handlers before starting the child and + # deferring sent signals until the child is available, doing so can be + # complicated. + signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT], + handler_fn=_ForwardSignal, + wakeup=signal_wakeup) + try: + # Close child's side + utils.RetryOnSignal(os.close, socat_stderr_write_fd) + utils.RetryOnSignal(os.close, dd_stderr_write_fd) + utils.RetryOnSignal(os.close, dd_pid_write_fd) + utils.RetryOnSignal(os.close, exp_size_write_fd) + + if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, + dd_pid_read_fd, exp_size_read_fd, + status_file, child_logger, + signal_wakeup, signal_handler, mode): + # The child closed all its file descriptors and there was no + # signal + # TODO: Implement timeout instead of waiting indefinitely + utils.RetryOnSignal(child.wait) + finally: + signal_handler.Reset() finally: - signal_handler.Reset() + signal_wakeup.Reset() finally: child.ForceQuit()