#!/usr/bin/python # # Copyright (C) 2010 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301, USA. """Import/export daemon. """ # pylint: disable=C0103 # C0103: Invalid name import-export import errno import logging import optparse import os import select import signal import subprocess import sys import time import math from ganeti import constants from ganeti import cli from ganeti import utils from ganeti import errors from ganeti import serializer from ganeti import objects from ganeti import impexpd from ganeti import netutils #: How many lines to keep in the status file MAX_RECENT_OUTPUT_LINES = 20 #: Don't update status file more than once every 5 seconds (unless forced) MIN_UPDATE_INTERVAL = 5.0 #: How long to wait for a connection to be established DEFAULT_CONNECT_TIMEOUT = 60 #: Get dd(1) statistics every few seconds DD_STATISTICS_INTERVAL = 5.0 #: Seconds for throughput calculation DD_THROUGHPUT_INTERVAL = 60.0 #: Number of samples for throughput calculation DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) / DD_STATISTICS_INTERVAL)) # Global variable for options options = None def SetupLogging(): """Configures the logging module. """ formatter = logging.Formatter("%(asctime)s: %(message)s") stderr_handler = logging.StreamHandler() stderr_handler.setFormatter(formatter) stderr_handler.setLevel(logging.NOTSET) root_logger = logging.getLogger("") root_logger.addHandler(stderr_handler) if options.debug: root_logger.setLevel(logging.NOTSET) elif options.verbose: root_logger.setLevel(logging.INFO) else: root_logger.setLevel(logging.ERROR) # Create special logger for child process output child_logger = logging.Logger("child output") child_logger.addHandler(stderr_handler) child_logger.setLevel(logging.NOTSET) return child_logger class StatusFile: """Status file manager. """ def __init__(self, path): """Initializes class. """ self._path = path self._data = objects.ImportExportStatus(ctime=time.time(), mtime=None, recent_output=[]) def AddRecentOutput(self, line): """Adds a new line of recent output. """ self._data.recent_output.append(line) # Remove old lines del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES] def SetListenPort(self, port): """Sets the port the daemon is listening on. @type port: int @param port: TCP/UDP port """ assert isinstance(port, (int, long)) and 0 < port < (2 ** 16) self._data.listen_port = port def GetListenPort(self): """Returns the port the daemon is listening on. """ return self._data.listen_port def SetConnected(self): """Sets the connected flag. """ self._data.connected = True def GetConnected(self): """Determines whether the daemon is connected. """ return self._data.connected def SetProgress(self, mbytes, throughput, percent, eta): """Sets how much data has been transferred so far. @type mbytes: number @param mbytes: Transferred amount of data in MiB. @type throughput: float @param throughput: MiB/second @type percent: number @param percent: Percent processed @type eta: number @param eta: Expected number of seconds until done """ self._data.progress_mbytes = mbytes self._data.progress_throughput = throughput self._data.progress_percent = percent self._data.progress_eta = eta def SetExitStatus(self, exit_status, error_message): """Sets the exit status and an error message. """ # Require error message when status isn't 0 assert exit_status == 0 or error_message self._data.exit_status = exit_status self._data.error_message = error_message def ExitStatusIsSuccess(self): """Returns whether the exit status means "success". """ return not bool(self._data.error_message) def Update(self, force): """Updates the status file. @type force: bool @param force: Write status file in any case, not only when minimum interval is expired """ if not (force or self._data.mtime is None or time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)): return logging.debug("Updating status file %s", self._path) self._data.mtime = time.time() utils.WriteFile(self._path, data=serializer.DumpJson(self._data.ToDict()), mode=0400) def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, dd_pid_read_fd, exp_size_read_fd, status_file, child_logger, signal_notify, signal_handler, mode): """Handles the child processes' output. """ assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \ "Other signals are not handled in this function" # Buffer size 0 is important, otherwise .read() with a specified length # might buffer data while poll(2) won't mark its file descriptor as # readable again. socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0) dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0) dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0) exp_size_read = os.fdopen(exp_size_read_fd, "r", 0) tp_samples = DD_THROUGHPUT_SAMPLES if options.exp_size == constants.IE_CUSTOM_SIZE: exp_size = None else: exp_size = options.exp_size child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file, child_logger, tp_samples, exp_size) try: fdmap = { child.stderr.fileno(): (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)), socat_stderr_read.fileno(): (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)), dd_pid_read.fileno(): (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)), dd_stderr_read.fileno(): (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)), exp_size_read.fileno(): (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)), signal_notify.fileno(): (signal_notify, None), } poller = select.poll() for fd in fdmap: utils.SetNonblockFlag(fd, True) poller.register(fd, select.POLLIN) if options.connect_timeout and mode == constants.IEM_IMPORT: listen_timeout = utils.RunningTimeout(options.connect_timeout, True) else: listen_timeout = None exit_timeout = None dd_stats_timeout = None while True: # Break out of loop if only signal notify FD is left if len(fdmap) == 1 and signal_notify.fileno() in fdmap: break timeout = None if listen_timeout and not exit_timeout: assert mode == constants.IEM_IMPORT and options.connect_timeout if status_file.GetConnected(): listen_timeout = None elif listen_timeout.Remaining() < 0: errmsg = ("Child process didn't establish connection in time" " (%0.0fs), sending SIGTERM" % options.connect_timeout) logging.error(errmsg) status_file.AddRecentOutput(errmsg) status_file.Update(True) child.Kill(signal.SIGTERM) exit_timeout = \ utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True) # Next block will calculate timeout else: # Not yet connected, check again in a second timeout = 1000 if exit_timeout: timeout = exit_timeout.Remaining() * 1000 if timeout < 0: logging.info("Child process didn't exit in time") break if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0: notify_status = child_io_proc.NotifyDd() if notify_status: # Schedule next notification dd_stats_timeout = utils.RunningTimeout(DD_STATISTICS_INTERVAL, True) else: # Try again soon (dd isn't ready yet) dd_stats_timeout = utils.RunningTimeout(1.0, True) if dd_stats_timeout: dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000) if timeout is None: timeout = dd_timeout else: timeout = min(timeout, dd_timeout) for fd, event in utils.RetryOnSignal(poller.poll, timeout): if event & (select.POLLIN | event & select.POLLPRI): (from_, to) = fdmap[fd] # Read up to 1 KB of data data = from_.read(1024) if data: if to: to.write(data) elif fd == signal_notify.fileno(): # Signal handling if signal_handler.called: signal_handler.Clear() if exit_timeout: logging.info("Child process still has about %0.2f seconds" " to exit", exit_timeout.Remaining()) else: logging.info("Giving child process %0.2f seconds to exit", constants.CHILD_LINGER_TIMEOUT) exit_timeout = \ utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True) else: poller.unregister(fd) del fdmap[fd] elif event & (select.POLLNVAL | select.POLLHUP | select.POLLERR): poller.unregister(fd) del fdmap[fd] child_io_proc.FlushAll() # If there was a timeout calculator, we were waiting for the child to # finish, e.g. due to a signal return not bool(exit_timeout) finally: child_io_proc.CloseAll() def ParseOptions(): """Parses the options passed to the program. @return: Arguments to program """ global options # pylint: disable=W0603 parser = optparse.OptionParser(usage=("%%prog {%s|%s}" % (constants.IEM_IMPORT, constants.IEM_EXPORT))) parser.add_option(cli.DEBUG_OPT) parser.add_option(cli.VERBOSE_OPT) parser.add_option("--key", dest="key", action="store", type="string", help="RSA key file") parser.add_option("--cert", dest="cert", action="store", type="string", help="X509 certificate file") parser.add_option("--ca", dest="ca", action="store", type="string", help="X509 CA file") parser.add_option("--bind", dest="bind", action="store", type="string", help="Bind address") parser.add_option("--ipv4", dest="ipv4", action="store_true", help="Use IPv4 only") parser.add_option("--ipv6", dest="ipv6", action="store_true", help="Use IPv6 only") parser.add_option("--host", dest="host", action="store", type="string", help="Remote hostname") parser.add_option("--port", dest="port", action="store", type="int", help="Remote port") parser.add_option("--connect-retries", dest="connect_retries", action="store", type="int", default=0, help=("How many times the connection should be retried" " (export only)")) parser.add_option("--connect-timeout", dest="connect_timeout", action="store", type="int", default=DEFAULT_CONNECT_TIMEOUT, help="Timeout for connection to be established (seconds)") parser.add_option("--compress", dest="compress", action="store", type="choice", help="Compression method", metavar="[%s]" % "|".join(constants.IEC_ALL), choices=list(constants.IEC_ALL), default=constants.IEC_GZIP) parser.add_option("--expected-size", dest="exp_size", action="store", type="string", default=None, help="Expected import/export size (MiB)") parser.add_option("--magic", dest="magic", action="store", type="string", default=None, help="Magic string") parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store", type="string", help="Command prefix") parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store", type="string", help="Command suffix") (options, args) = parser.parse_args() if len(args) != 2: # Won't return parser.error("Expected exactly two arguments") (status_file_path, mode) = args if mode not in (constants.IEM_IMPORT, constants.IEM_EXPORT): # Won't return parser.error("Invalid mode: %s" % mode) # Normalize and check parameters if options.host is not None and not netutils.IPAddress.IsValid(options.host): try: options.host = netutils.Hostname.GetNormalizedName(options.host) except errors.OpPrereqError, err: parser.error("Invalid hostname '%s': %s" % (options.host, err)) if options.port is not None: options.port = utils.ValidateServiceName(options.port) if (options.exp_size is not None and options.exp_size != constants.IE_CUSTOM_SIZE): try: options.exp_size = int(options.exp_size) except (ValueError, TypeError), err: # Won't return parser.error("Invalid value for --expected-size: %s (%s)" % (options.exp_size, err)) if not (options.magic is None or constants.IE_MAGIC_RE.match(options.magic)): parser.error("Magic must match regular expression %s" % constants.IE_MAGIC_RE.pattern) if options.ipv4 and options.ipv6: parser.error("Can only use one of --ipv4 and --ipv6") return (status_file_path, mode) class ChildProcess(subprocess.Popen): def __init__(self, env, cmd, noclose_fds): """Initializes this class. """ self._noclose_fds = noclose_fds # Not using close_fds because doing so would also close the socat stderr # pipe, which we still need. subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False, stderr=subprocess.PIPE, stdout=None, stdin=None, preexec_fn=self._ChildPreexec) self._SetProcessGroup() def _ChildPreexec(self): """Called before child executable is execve'd. """ # Move to separate process group. By sending a signal to its process group # we can kill the child process and all grandchildren. os.setpgid(0, 0) # Close almost all file descriptors utils.CloseFDs(noclose_fds=self._noclose_fds) def _SetProcessGroup(self): """Sets the child's process group. """ assert self.pid, "Can't be called in child process" # Avoid race condition by setting child's process group (as good as # possible in Python) before sending signals to child. For an # explanation, see preexec function for child. try: os.setpgid(self.pid, self.pid) except EnvironmentError, err: # If the child process was faster we receive EPERM or EACCES if err.errno not in (errno.EPERM, errno.EACCES): raise def Kill(self, signum): """Sends signal to child process. """ logging.info("Sending signal %s to child process", signum) utils.IgnoreProcessNotFound(os.killpg, self.pid, signum) def ForceQuit(self): """Ensure child process is no longer running. """ # Final check if child process is still alive if utils.RetryOnSignal(self.poll) is None: logging.error("Child process still alive, sending SIGKILL") self.Kill(signal.SIGKILL) utils.RetryOnSignal(self.wait) def main(): """Main function. """ # Option parsing (status_file_path, mode) = ParseOptions() # Configure logging child_logger = SetupLogging() status_file = StatusFile(status_file_path) try: try: # Pipe to receive socat's stderr output (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe() # Pipe to receive dd's stderr output (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe() # Pipe to receive dd's PID (dd_pid_read_fd, dd_pid_write_fd) = os.pipe() # Pipe to receive size predicted by export script (exp_size_read_fd, exp_size_write_fd) = os.pipe() # Get child process command cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd, dd_stderr_write_fd, dd_pid_write_fd) cmd = cmd_builder.GetCommand() # Prepare command environment cmd_env = os.environ.copy() if options.exp_size == constants.IE_CUSTOM_SIZE: cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd) logging.debug("Starting command %r", cmd) # Start child process child = ChildProcess(cmd_env, cmd, [socat_stderr_write_fd, dd_stderr_write_fd, dd_pid_write_fd, exp_size_write_fd]) try: def _ForwardSignal(signum, _): """Forwards signals to child process. """ child.Kill(signum) signal_wakeup = utils.SignalWakeupFd() try: # TODO: There is a race condition between starting the child and # handling the signals here. While there might be a way to work around # it by registering the handlers before starting the child and # deferring sent signals until the child is available, doing so can be # complicated. signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT], handler_fn=_ForwardSignal, wakeup=signal_wakeup) try: # Close child's side utils.RetryOnSignal(os.close, socat_stderr_write_fd) utils.RetryOnSignal(os.close, dd_stderr_write_fd) utils.RetryOnSignal(os.close, dd_pid_write_fd) utils.RetryOnSignal(os.close, exp_size_write_fd) if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, dd_pid_read_fd, exp_size_read_fd, status_file, child_logger, signal_wakeup, signal_handler, mode): # The child closed all its file descriptors and there was no # signal # TODO: Implement timeout instead of waiting indefinitely utils.RetryOnSignal(child.wait) finally: signal_handler.Reset() finally: signal_wakeup.Reset() finally: child.ForceQuit() if child.returncode == 0: errmsg = None elif child.returncode < 0: errmsg = "Exited due to signal %s" % (-child.returncode, ) else: errmsg = "Exited with status %s" % (child.returncode, ) status_file.SetExitStatus(child.returncode, errmsg) except Exception, err: # pylint: disable=W0703 logging.exception("Unhandled error occurred") status_file.SetExitStatus(constants.EXIT_FAILURE, "Unhandled error occurred: %s" % (err, )) if status_file.ExitStatusIsSuccess(): sys.exit(constants.EXIT_SUCCESS) sys.exit(constants.EXIT_FAILURE) finally: status_file.Update(True) if __name__ == "__main__": main()