#!/usr/bin/python # # Copyright (C) 2010 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301, USA. """Import/export daemon. """ # pylint: disable-msg=C0103 # C0103: Invalid name import-export import errno import logging import optparse import os import re import select import signal import socket import subprocess import sys import time from cStringIO import StringIO from ganeti import constants from ganeti import cli from ganeti import utils from ganeti import serializer from ganeti import objects from ganeti import locking #: Used to recognize point at which socat(1) starts to listen on its socket. #: The local address is required for the remote peer to connect (in particular #: the port number). LISTENING_RE = re.compile(r"^listening on\s+" r"AF=(?P\d+)\s+" r"(?P
.+):(?P\d+)$", re.I) #: Used to recognize point at which socat(1) is sending data over the wire TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$", re.I) SOCAT_LOG_DEBUG = "D" SOCAT_LOG_INFO = "I" SOCAT_LOG_NOTICE = "N" SOCAT_LOG_WARNING = "W" SOCAT_LOG_ERROR = "E" SOCAT_LOG_FATAL = "F" SOCAT_LOG_IGNORE = frozenset([ SOCAT_LOG_DEBUG, SOCAT_LOG_INFO, SOCAT_LOG_NOTICE, ]) #: Socat buffer size: at most this many bytes are transferred per step SOCAT_BUFSIZE = 1024 * 1024 #: How many lines to keep in the status file MAX_RECENT_OUTPUT_LINES = 20 #: Don't update status file more than once every 5 seconds (unless forced) MIN_UPDATE_INTERVAL = 5.0 #: Give child process up to 5 seconds to exit after sending a signal CHILD_LINGER_TIMEOUT = 5.0 # Common options for socat SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"] SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"] SOCAT_CONNECT_TIMEOUT = 60 # Global variable for options options = None class Error(Exception): """Generic exception""" def SetupLogging(): """Configures the logging module. """ formatter = logging.Formatter("%(asctime)s: %(message)s") stderr_handler = logging.StreamHandler() stderr_handler.setFormatter(formatter) stderr_handler.setLevel(logging.NOTSET) root_logger = logging.getLogger("") root_logger.addHandler(stderr_handler) if options.debug: root_logger.setLevel(logging.NOTSET) elif options.verbose: root_logger.setLevel(logging.INFO) else: root_logger.setLevel(logging.ERROR) # Create special logger for child process output child_logger = logging.Logger("child output") child_logger.addHandler(stderr_handler) child_logger.setLevel(logging.NOTSET) return child_logger def _VerifyListening(family, address, port): """Verify address given as listening address by socat. """ # TODO: Implement IPv6 support if family != socket.AF_INET: raise Error("Address family %r not supported" % family) try: packed_address = socket.inet_pton(family, address) except socket.error: raise Error("Invalid address %r for family %s" % (address, family)) return (socket.inet_ntop(family, packed_address), port) class StatusFile: """Status file manager. """ def __init__(self, path): """Initializes class. """ self._path = path self._data = objects.ImportExportStatus(ctime=time.time(), mtime=None, recent_output=[]) def AddRecentOutput(self, line): """Adds a new line of recent output. """ self._data.recent_output.append(line) # Remove old lines del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES] def SetListenPort(self, port): """Sets the port the daemon is listening on. @type port: int @param port: TCP/UDP port """ assert isinstance(port, (int, long)) and 0 < port < 2**16 self._data.listen_port = port def GetListenPort(self): """Returns the port the daemon is listening on. """ return self._data.listen_port def SetConnected(self): """Sets the connected flag. """ self._data.connected = True def SetExitStatus(self, exit_status, error_message): """Sets the exit status and an error message. """ # Require error message when status isn't 0 assert exit_status == 0 or error_message self._data.exit_status = exit_status self._data.error_message = error_message def ExitStatusIsSuccess(self): """Returns whether the exit status means "success". """ return not bool(self._data.error_message) def Update(self, force): """Updates the status file. @type force: bool @param force: Write status file in any case, not only when minimum interval is expired """ if not (force or self._data.mtime is None or time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)): return logging.debug("Updating status file %s", self._path) self._data.mtime = time.time() utils.WriteFile(self._path, data=serializer.DumpJson(self._data.ToDict(), indent=True), mode=0400) def _ProcessSocatOutput(status_file, level, msg): """Interprets socat log output. """ if level == SOCAT_LOG_NOTICE: if status_file.GetListenPort() is None: # TODO: Maybe implement timeout to not listen forever m = LISTENING_RE.match(msg) if m: (_, port) = _VerifyListening(int(m.group("family")), m.group("address"), int(m.group("port"))) status_file.SetListenPort(port) return True m = TRANSFER_LOOP_RE.match(msg) if m: status_file.SetConnected() return True return False def ProcessOutput(line, status_file, logger, socat): """Takes care of child process output. @param status_file: Status file manager @param logger: Child output logger @type socat: bool @param socat: Whether it's a socat output line @type line: string @param line: Child output line """ force_update = False forward_line = line if socat: level = None parts = line.split(None, 4) if len(parts) == 5: (_, _, _, level, msg) = parts force_update = _ProcessSocatOutput(status_file, level, msg) if options.debug or (level and level not in SOCAT_LOG_IGNORE): forward_line = "socat: %s %s" % (level, msg) else: forward_line = None else: forward_line = "socat: %s" % line if forward_line: logger.info(forward_line) status_file.AddRecentOutput(forward_line) status_file.Update(force_update) def GetBashCommand(cmd): """Prepares a command to be run in Bash. """ return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd] def GetSocatCommand(mode): """Returns the socat command. """ common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [ "key=%s" % options.key, "cert=%s" % options.cert, "cafile=%s" % options.ca, ] if options.bind is not None: common_addr_opts.append("bind=%s" % options.bind) if mode == constants.IEM_IMPORT: if options.port is None: port = 0 else: port = options.port addr1 = [ "OPENSSL-LISTEN:%s" % port, "reuseaddr", ] + common_addr_opts addr2 = ["stdout"] elif mode == constants.IEM_EXPORT: addr1 = ["stdin"] addr2 = [ "OPENSSL:%s:%s" % (options.host, options.port), "connect-timeout=%s" % SOCAT_CONNECT_TIMEOUT, ] + common_addr_opts else: raise Error("Invalid mode") for i in [addr1, addr2]: for value in i: if "," in value: raise Error("Comma not allowed in socat option value: %r" % value) return [ constants.SOCAT_PATH, # Log to stderr "-ls", # Log level "-d", "-d", # Buffer size "-b%s" % SOCAT_BUFSIZE, # Unidirectional mode, the first address is only used for reading, and the # second address is only used for writing "-u", ",".join(addr1), ",".join(addr2) ] def GetTransportCommand(mode, socat_stderr_fd): """Returns the command for the transport part of the daemon. @param mode: Daemon mode (import or export) @type socat_stderr_fd: int @param socat_stderr_fd: File descriptor socat should write its stderr to """ socat_cmd = ("%s 2>&%d" % (utils.ShellQuoteArgs(GetSocatCommand(mode)), socat_stderr_fd)) # TODO: Make compression configurable if mode == constants.IEM_IMPORT: transport_cmd = "%s | gunzip -c" % socat_cmd elif mode == constants.IEM_EXPORT: transport_cmd = "gzip -c | %s" % socat_cmd else: raise Error("Invalid mode") # TODO: Use "dd" to measure processed data (allows to give an ETA) # TODO: If a connection to socat is dropped (e.g. due to a wrong # certificate), socat should be restarted # TODO: Run transport as separate user # The transport uses its own shell to simplify running it as a separate user # in the future. return GetBashCommand(transport_cmd) def GetCommand(mode, socat_stderr_fd): """Returns the complete child process command. """ buf = StringIO() if options.cmd_prefix: buf.write(options.cmd_prefix) buf.write(" ") buf.write(utils.ShellQuoteArgs(GetTransportCommand(mode, socat_stderr_fd))) if options.cmd_suffix: buf.write(" ") buf.write(options.cmd_suffix) return GetBashCommand(buf.getvalue()) def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger, signal_notify, signal_handler): """Handles the child processes' output. """ assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \ "Other signals are not handled in this function" # Buffer size 0 is important, otherwise .read() with a specified length # might buffer data while poll(2) won't mark its file descriptor as # readable again. socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0) script_stderr_lines = utils.LineSplitter(ProcessOutput, status_file, child_logger, False) try: socat_stderr_lines = utils.LineSplitter(ProcessOutput, status_file, child_logger, True) try: fdmap = { child.stderr.fileno(): (child.stderr, script_stderr_lines), socat_stderr_read.fileno(): (socat_stderr_read, socat_stderr_lines), signal_notify.fileno(): (signal_notify, None), } poller = select.poll() for fd in fdmap: utils.SetNonblockFlag(fd, True) poller.register(fd, select.POLLIN) timeout_calculator = None while True: # Break out of loop if only signal notify FD is left if len(fdmap) == 1 and signal_notify.fileno() in fdmap: break if timeout_calculator: timeout = timeout_calculator.Remaining() * 1000 if timeout < 0: logging.info("Child process didn't exit in time") break else: timeout = None for fd, event in utils.RetryOnSignal(poller.poll, timeout): if event & (select.POLLIN | event & select.POLLPRI): (from_, to) = fdmap[fd] # Read up to 1 KB of data data = from_.read(1024) if data: if to: to.write(data) elif fd == signal_notify.fileno(): # Signal handling if signal_handler.called: signal_handler.Clear() if timeout_calculator: logging.info("Child process still has about %0.2f seconds" " to exit", timeout_calculator.Remaining()) else: logging.info("Giving child process %0.2f seconds to exit", CHILD_LINGER_TIMEOUT) timeout_calculator = \ locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True) else: poller.unregister(fd) del fdmap[fd] elif event & (select.POLLNVAL | select.POLLHUP | select.POLLERR): poller.unregister(fd) del fdmap[fd] script_stderr_lines.flush() socat_stderr_lines.flush() # If there was a timeout calculator, we were waiting for the child to # finish, e.g. due to a signal return not bool(timeout_calculator) finally: socat_stderr_lines.close() finally: script_stderr_lines.close() def ParseOptions(): """Parses the options passed to the program. @return: Arguments to program """ global options # pylint: disable-msg=W0603 parser = optparse.OptionParser(usage=("%%prog {%s|%s}" % (constants.IEM_IMPORT, constants.IEM_EXPORT))) parser.add_option(cli.DEBUG_OPT) parser.add_option(cli.VERBOSE_OPT) parser.add_option("--key", dest="key", action="store", type="string", help="RSA key file") parser.add_option("--cert", dest="cert", action="store", type="string", help="X509 certificate file") parser.add_option("--ca", dest="ca", action="store", type="string", help="X509 CA file") parser.add_option("--bind", dest="bind", action="store", type="string", help="Bind address") parser.add_option("--host", dest="host", action="store", type="string", help="Remote hostname") parser.add_option("--port", dest="port", action="store", type="int", help="Remote port") parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store", type="string", help="Command prefix") parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store", type="string", help="Command suffix") (options, args) = parser.parse_args() if len(args) != 2: # Won't return parser.error("Expected exactly two arguments") (status_file_path, mode) = args if mode not in (constants.IEM_IMPORT, constants.IEM_EXPORT): # Won't return parser.error("Invalid mode: %s" % mode) return (status_file_path, mode) class ChildProcess(subprocess.Popen): def __init__(self, cmd, noclose_fds): """Initializes this class. """ self._noclose_fds = noclose_fds # Not using close_fds because doing so would also close the socat stderr # pipe, which we still need. subprocess.Popen.__init__(self, cmd, shell=False, close_fds=False, stderr=subprocess.PIPE, stdout=None, stdin=None, preexec_fn=self._ChildPreexec) self._SetProcessGroup() def _ChildPreexec(self): """Called before child executable is execve'd. """ # Move to separate process group. By sending a signal to its process group # we can kill the child process and all grandchildren. os.setpgid(0, 0) # Close almost all file descriptors utils.CloseFDs(noclose_fds=self._noclose_fds) def _SetProcessGroup(self): """Sets the child's process group. """ assert self.pid, "Can't be called in child process" # Avoid race condition by setting child's process group (as good as # possible in Python) before sending signals to child. For an # explanation, see preexec function for child. try: os.setpgid(self.pid, self.pid) except EnvironmentError, err: # If the child process was faster we receive EPERM or EACCES if err.errno not in (errno.EPERM, errno.EACCES): raise def Kill(self, signum): """Sends signal to child process. """ logging.info("Sending signal %s to child process", signum) os.killpg(self.pid, signum) def ForceQuit(self): """Ensure child process is no longer running. """ # Final check if child process is still alive if utils.RetryOnSignal(self.poll) is None: logging.error("Child process still alive, sending SIGKILL") self.Kill(signal.SIGKILL) utils.RetryOnSignal(self.wait) def main(): """Main function. """ # Option parsing (status_file_path, mode) = ParseOptions() # Configure logging child_logger = SetupLogging() status_file = StatusFile(status_file_path) try: try: # Pipe to receive socat's stderr output (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe() # Get child process command cmd = GetCommand(mode, socat_stderr_write_fd) logging.debug("Starting command %r", cmd) # Start child process child = ChildProcess(cmd, [socat_stderr_write_fd]) try: def _ForwardSignal(signum, _): """Forwards signals to child process. """ child.Kill(signum) signal_wakeup = utils.SignalWakeupFd() try: # TODO: There is a race condition between starting the child and # handling the signals here. While there might be a way to work around # it by registering the handlers before starting the child and # deferring sent signals until the child is available, doing so can be # complicated. signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT], handler_fn=_ForwardSignal, wakeup=signal_wakeup) try: # Close child's side utils.RetryOnSignal(os.close, socat_stderr_write_fd) if ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger, signal_wakeup, signal_handler): # The child closed all its file descriptors and there was no # signal # TODO: Implement timeout instead of waiting indefinitely utils.RetryOnSignal(child.wait) finally: signal_handler.Reset() finally: signal_wakeup.Reset() finally: child.ForceQuit() if child.returncode == 0: errmsg = None elif child.returncode < 0: errmsg = "Exited due to signal %s" % (-child.returncode, ) else: errmsg = "Exited with status %s" % (child.returncode, ) status_file.SetExitStatus(child.returncode, errmsg) except Exception, err: # pylint: disable-msg=W0703 logging.exception("Unhandled error occurred") status_file.SetExitStatus(constants.EXIT_FAILURE, "Unhandled error occurred: %s" % (err, )) if status_file.ExitStatusIsSuccess(): sys.exit(constants.EXIT_SUCCESS) sys.exit(constants.EXIT_FAILURE) finally: status_file.Update(True) if __name__ == "__main__": main()