"""
-# pylint: disable-msg=C0103
+# pylint: disable=C0103
# C0103: Invalid name import-export
import errno
from ganeti import constants
from ganeti import cli
from ganeti import utils
+from ganeti import errors
from ganeti import serializer
from ganeti import objects
-from ganeti import locking
from ganeti import impexpd
+from ganeti import netutils
#: How many lines to keep in the status file
#: Don't update status file more than once every 5 seconds (unless forced)
MIN_UPDATE_INTERVAL = 5.0
-#: Give child process up to 5 seconds to exit after sending a signal
-CHILD_LINGER_TIMEOUT = 5.0
-
#: How long to wait for a connection to be established
DEFAULT_CONNECT_TIMEOUT = 60
@param port: TCP/UDP port
"""
- assert isinstance(port, (int, long)) and 0 < port < 2**16
+ assert isinstance(port, (int, long)) and 0 < port < (2 ** 16)
self._data.listen_port = port
def GetListenPort(self):
def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
- dd_pid_read_fd, status_file, child_logger,
+ dd_pid_read_fd, exp_size_read_fd, status_file, child_logger,
signal_notify, signal_handler, mode):
"""Handles the child processes' output.
socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
+ exp_size_read = os.fdopen(exp_size_read_fd, "r", 0)
tp_samples = DD_THROUGHPUT_SAMPLES
+ if options.exp_size == constants.IE_CUSTOM_SIZE:
+ exp_size = None
+ else:
+ exp_size = options.exp_size
+
child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
- child_logger,
- throughput_samples=tp_samples)
+ child_logger, tp_samples,
+ exp_size)
try:
fdmap = {
child.stderr.fileno():
(dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
dd_stderr_read.fileno():
(dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
+ exp_size_read.fileno():
+ (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)),
signal_notify.fileno(): (signal_notify, None),
}
poller.register(fd, select.POLLIN)
if options.connect_timeout and mode == constants.IEM_IMPORT:
- listen_timeout = locking.RunningTimeout(options.connect_timeout, True)
+ listen_timeout = utils.RunningTimeout(options.connect_timeout, True)
else:
listen_timeout = None
timeout = None
if listen_timeout and not exit_timeout:
+ assert mode == constants.IEM_IMPORT and options.connect_timeout
if status_file.GetConnected():
listen_timeout = None
elif listen_timeout.Remaining() < 0:
- logging.info("Child process didn't establish connection in time")
+ errmsg = ("Child process didn't establish connection in time"
+ " (%0.0fs), sending SIGTERM" % options.connect_timeout)
+ logging.error(errmsg)
+ status_file.AddRecentOutput(errmsg)
+ status_file.Update(True)
+
child.Kill(signal.SIGTERM)
exit_timeout = \
- locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
+ utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True)
# Next block will calculate timeout
else:
# Not yet connected, check again in a second
notify_status = child_io_proc.NotifyDd()
if notify_status:
# Schedule next notification
- dd_stats_timeout = locking.RunningTimeout(DD_STATISTICS_INTERVAL,
- True)
+ dd_stats_timeout = utils.RunningTimeout(DD_STATISTICS_INTERVAL, True)
else:
# Try again soon (dd isn't ready yet)
- dd_stats_timeout = locking.RunningTimeout(1.0, True)
+ dd_stats_timeout = utils.RunningTimeout(1.0, True)
if dd_stats_timeout:
dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000)
" to exit", exit_timeout.Remaining())
else:
logging.info("Giving child process %0.2f seconds to exit",
- CHILD_LINGER_TIMEOUT)
+ constants.CHILD_LINGER_TIMEOUT)
exit_timeout = \
- locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
+ utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True)
else:
poller.unregister(fd)
del fdmap[fd]
@return: Arguments to program
"""
- global options # pylint: disable-msg=W0603
+ global options # pylint: disable=W0603
parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
(constants.IEM_IMPORT,
help="X509 CA file")
parser.add_option("--bind", dest="bind", action="store", type="string",
help="Bind address")
+ parser.add_option("--ipv4", dest="ipv4", action="store_true",
+ help="Use IPv4 only")
+ parser.add_option("--ipv6", dest="ipv6", action="store_true",
+ help="Use IPv6 only")
parser.add_option("--host", dest="host", action="store", type="string",
help="Remote hostname")
parser.add_option("--port", dest="port", action="store", type="int",
type="choice", help="Compression method",
metavar="[%s]" % "|".join(constants.IEC_ALL),
choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
+ parser.add_option("--expected-size", dest="exp_size", action="store",
+ type="string", default=None,
+ help="Expected import/export size (MiB)")
+ parser.add_option("--magic", dest="magic", action="store",
+ type="string", default=None, help="Magic string")
parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
type="string", help="Command prefix")
parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
# Won't return
parser.error("Invalid mode: %s" % mode)
+ # Normalize and check parameters
+ if options.host is not None and not netutils.IPAddress.IsValid(options.host):
+ try:
+ options.host = netutils.Hostname.GetNormalizedName(options.host)
+ except errors.OpPrereqError, err:
+ parser.error("Invalid hostname '%s': %s" % (options.host, err))
+
+ if options.port is not None:
+ options.port = utils.ValidateServiceName(options.port)
+
+ if (options.exp_size is not None and
+ options.exp_size != constants.IE_CUSTOM_SIZE):
+ try:
+ options.exp_size = int(options.exp_size)
+ except (ValueError, TypeError), err:
+ # Won't return
+ parser.error("Invalid value for --expected-size: %s (%s)" %
+ (options.exp_size, err))
+
+ if not (options.magic is None or constants.IE_MAGIC_RE.match(options.magic)):
+ parser.error("Magic must match regular expression %s" %
+ constants.IE_MAGIC_RE.pattern)
+
+ if options.ipv4 and options.ipv6:
+ parser.error("Can only use one of --ipv4 and --ipv6")
+
return (status_file_path, mode)
class ChildProcess(subprocess.Popen):
- def __init__(self, cmd, noclose_fds):
+ def __init__(self, env, cmd, noclose_fds):
"""Initializes this class.
"""
# Not using close_fds because doing so would also close the socat stderr
# pipe, which we still need.
- subprocess.Popen.__init__(self, cmd, shell=False, close_fds=False,
+ subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False,
stderr=subprocess.PIPE, stdout=None, stdin=None,
preexec_fn=self._ChildPreexec)
self._SetProcessGroup()
"""
logging.info("Sending signal %s to child process", signum)
- os.killpg(self.pid, signum)
+ utils.IgnoreProcessNotFound(os.killpg, self.pid, signum)
def ForceQuit(self):
"""Ensure child process is no longer running.
# Pipe to receive dd's PID
(dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
+ # Pipe to receive size predicted by export script
+ (exp_size_read_fd, exp_size_write_fd) = os.pipe()
+
# Get child process command
cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
dd_stderr_write_fd, dd_pid_write_fd)
cmd = cmd_builder.GetCommand()
+ # Prepare command environment
+ cmd_env = os.environ.copy()
+
+ if options.exp_size == constants.IE_CUSTOM_SIZE:
+ cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd)
+
logging.debug("Starting command %r", cmd)
# Start child process
- child = ChildProcess(cmd, [socat_stderr_write_fd, dd_stderr_write_fd,
- dd_pid_write_fd])
+ child = ChildProcess(cmd_env, cmd,
+ [socat_stderr_write_fd, dd_stderr_write_fd,
+ dd_pid_write_fd, exp_size_write_fd])
try:
def _ForwardSignal(signum, _):
"""Forwards signals to child process.
utils.RetryOnSignal(os.close, socat_stderr_write_fd)
utils.RetryOnSignal(os.close, dd_stderr_write_fd)
utils.RetryOnSignal(os.close, dd_pid_write_fd)
+ utils.RetryOnSignal(os.close, exp_size_write_fd)
if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
- dd_pid_read_fd, status_file, child_logger,
+ dd_pid_read_fd, exp_size_read_fd,
+ status_file, child_logger,
signal_wakeup, signal_handler, mode):
# The child closed all its file descriptors and there was no
# signal
errmsg = "Exited with status %s" % (child.returncode, )
status_file.SetExitStatus(child.returncode, errmsg)
- except Exception, err: # pylint: disable-msg=W0703
+ except Exception, err: # pylint: disable=W0703
logging.exception("Unhandled error occurred")
status_file.SetExitStatus(constants.EXIT_FAILURE,
"Unhandled error occurred: %s" % (err, ))