X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/c08d76f5f297ead72045c8dc7212bc76195f47e6..9475189d3cba70537155ba00783ed75baf3ae2b8:/daemons/import-export diff --git a/daemons/import-export b/daemons/import-export index 53c4e28..8603460 100755 --- a/daemons/import-export +++ b/daemons/import-export @@ -23,7 +23,7 @@ """ -# pylint: disable-msg=C0103 +# pylint: disable=C0103 # C0103: Invalid name import-export import errno @@ -40,10 +40,11 @@ import math from ganeti import constants from ganeti import cli from ganeti import utils +from ganeti import errors from ganeti import serializer from ganeti import objects -from ganeti import locking from ganeti import impexpd +from ganeti import netutils #: How many lines to keep in the status file @@ -52,9 +53,6 @@ MAX_RECENT_OUTPUT_LINES = 20 #: Don't update status file more than once every 5 seconds (unless forced) MIN_UPDATE_INTERVAL = 5.0 -#: Give child process up to 5 seconds to exit after sending a signal -CHILD_LINGER_TIMEOUT = 5.0 - #: How long to wait for a connection to be established DEFAULT_CONNECT_TIMEOUT = 60 @@ -130,7 +128,7 @@ class StatusFile: @param port: TCP/UDP port """ - assert isinstance(port, (int, long)) and 0 < port < 2**16 + assert isinstance(port, (int, long)) and 0 < port < (2 ** 16) self._data.listen_port = port def GetListenPort(self): @@ -207,7 +205,7 @@ class StatusFile: def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, - dd_pid_read_fd, status_file, child_logger, + dd_pid_read_fd, exp_size_read_fd, status_file, child_logger, signal_notify, signal_handler, mode): """Handles the child processes' output. @@ -221,12 +219,18 @@ def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0) dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0) dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0) + exp_size_read = os.fdopen(exp_size_read_fd, "r", 0) tp_samples = DD_THROUGHPUT_SAMPLES + if options.exp_size == constants.IE_CUSTOM_SIZE: + exp_size = None + else: + exp_size = options.exp_size + child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file, - child_logger, - throughput_samples=tp_samples) + child_logger, tp_samples, + exp_size) try: fdmap = { child.stderr.fileno(): @@ -237,6 +241,8 @@ def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)), dd_stderr_read.fileno(): (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)), + exp_size_read.fileno(): + (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)), signal_notify.fileno(): (signal_notify, None), } @@ -246,7 +252,7 @@ def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, poller.register(fd, select.POLLIN) if options.connect_timeout and mode == constants.IEM_IMPORT: - listen_timeout = locking.RunningTimeout(options.connect_timeout, True) + listen_timeout = utils.RunningTimeout(options.connect_timeout, True) else: listen_timeout = None @@ -261,13 +267,19 @@ def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, timeout = None if listen_timeout and not exit_timeout: + assert mode == constants.IEM_IMPORT and options.connect_timeout if status_file.GetConnected(): listen_timeout = None elif listen_timeout.Remaining() < 0: - logging.info("Child process didn't establish connection in time") + errmsg = ("Child process didn't establish connection in time" + " (%0.0fs), sending SIGTERM" % options.connect_timeout) + logging.error(errmsg) + status_file.AddRecentOutput(errmsg) + status_file.Update(True) + child.Kill(signal.SIGTERM) exit_timeout = \ - locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True) + utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True) # Next block will calculate timeout else: # Not yet connected, check again in a second @@ -283,11 +295,10 @@ def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, notify_status = child_io_proc.NotifyDd() if notify_status: # Schedule next notification - dd_stats_timeout = locking.RunningTimeout(DD_STATISTICS_INTERVAL, - True) + dd_stats_timeout = utils.RunningTimeout(DD_STATISTICS_INTERVAL, True) else: # Try again soon (dd isn't ready yet) - dd_stats_timeout = locking.RunningTimeout(1.0, True) + dd_stats_timeout = utils.RunningTimeout(1.0, True) if dd_stats_timeout: dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000) @@ -315,9 +326,9 @@ def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, " to exit", exit_timeout.Remaining()) else: logging.info("Giving child process %0.2f seconds to exit", - CHILD_LINGER_TIMEOUT) + constants.CHILD_LINGER_TIMEOUT) exit_timeout = \ - locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True) + utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True) else: poller.unregister(fd) del fdmap[fd] @@ -342,7 +353,7 @@ def ParseOptions(): @return: Arguments to program """ - global options # pylint: disable-msg=W0603 + global options # pylint: disable=W0603 parser = optparse.OptionParser(usage=("%%prog {%s|%s}" % (constants.IEM_IMPORT, @@ -357,6 +368,10 @@ def ParseOptions(): help="X509 CA file") parser.add_option("--bind", dest="bind", action="store", type="string", help="Bind address") + parser.add_option("--ipv4", dest="ipv4", action="store_true", + help="Use IPv4 only") + parser.add_option("--ipv6", dest="ipv6", action="store_true", + help="Use IPv6 only") parser.add_option("--host", dest="host", action="store", type="string", help="Remote hostname") parser.add_option("--port", dest="port", action="store", type="int", @@ -372,6 +387,11 @@ def ParseOptions(): type="choice", help="Compression method", metavar="[%s]" % "|".join(constants.IEC_ALL), choices=list(constants.IEC_ALL), default=constants.IEC_GZIP) + parser.add_option("--expected-size", dest="exp_size", action="store", + type="string", default=None, + help="Expected import/export size (MiB)") + parser.add_option("--magic", dest="magic", action="store", + type="string", default=None, help="Magic string") parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store", type="string", help="Command prefix") parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store", @@ -390,11 +410,37 @@ def ParseOptions(): # Won't return parser.error("Invalid mode: %s" % mode) + # Normalize and check parameters + if options.host is not None and not netutils.IPAddress.IsValid(options.host): + try: + options.host = netutils.Hostname.GetNormalizedName(options.host) + except errors.OpPrereqError, err: + parser.error("Invalid hostname '%s': %s" % (options.host, err)) + + if options.port is not None: + options.port = utils.ValidateServiceName(options.port) + + if (options.exp_size is not None and + options.exp_size != constants.IE_CUSTOM_SIZE): + try: + options.exp_size = int(options.exp_size) + except (ValueError, TypeError), err: + # Won't return + parser.error("Invalid value for --expected-size: %s (%s)" % + (options.exp_size, err)) + + if not (options.magic is None or constants.IE_MAGIC_RE.match(options.magic)): + parser.error("Magic must match regular expression %s" % + constants.IE_MAGIC_RE.pattern) + + if options.ipv4 and options.ipv6: + parser.error("Can only use one of --ipv4 and --ipv6") + return (status_file_path, mode) class ChildProcess(subprocess.Popen): - def __init__(self, cmd, noclose_fds): + def __init__(self, env, cmd, noclose_fds): """Initializes this class. """ @@ -402,7 +448,7 @@ class ChildProcess(subprocess.Popen): # Not using close_fds because doing so would also close the socat stderr # pipe, which we still need. - subprocess.Popen.__init__(self, cmd, shell=False, close_fds=False, + subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False, stderr=subprocess.PIPE, stdout=None, stdin=None, preexec_fn=self._ChildPreexec) self._SetProcessGroup() @@ -439,7 +485,7 @@ class ChildProcess(subprocess.Popen): """ logging.info("Sending signal %s to child process", signum) - os.killpg(self.pid, signum) + utils.IgnoreProcessNotFound(os.killpg, self.pid, signum) def ForceQuit(self): """Ensure child process is no longer running. @@ -474,16 +520,26 @@ def main(): # Pipe to receive dd's PID (dd_pid_read_fd, dd_pid_write_fd) = os.pipe() + # Pipe to receive size predicted by export script + (exp_size_read_fd, exp_size_write_fd) = os.pipe() + # Get child process command cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd, dd_stderr_write_fd, dd_pid_write_fd) cmd = cmd_builder.GetCommand() + # Prepare command environment + cmd_env = os.environ.copy() + + if options.exp_size == constants.IE_CUSTOM_SIZE: + cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd) + logging.debug("Starting command %r", cmd) # Start child process - child = ChildProcess(cmd, [socat_stderr_write_fd, dd_stderr_write_fd, - dd_pid_write_fd]) + child = ChildProcess(cmd_env, cmd, + [socat_stderr_write_fd, dd_stderr_write_fd, + dd_pid_write_fd, exp_size_write_fd]) try: def _ForwardSignal(signum, _): """Forwards signals to child process. @@ -506,9 +562,11 @@ def main(): utils.RetryOnSignal(os.close, socat_stderr_write_fd) utils.RetryOnSignal(os.close, dd_stderr_write_fd) utils.RetryOnSignal(os.close, dd_pid_write_fd) + utils.RetryOnSignal(os.close, exp_size_write_fd) if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, - dd_pid_read_fd, status_file, child_logger, + dd_pid_read_fd, exp_size_read_fd, + status_file, child_logger, signal_wakeup, signal_handler, mode): # The child closed all its file descriptors and there was no # signal @@ -529,7 +587,7 @@ def main(): errmsg = "Exited with status %s" % (child.returncode, ) status_file.SetExitStatus(child.returncode, errmsg) - except Exception, err: # pylint: disable-msg=W0703 + except Exception, err: # pylint: disable=W0703 logging.exception("Unhandled error occurred") status_file.SetExitStatus(constants.EXIT_FAILURE, "Unhandled error occurred: %s" % (err, ))