4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Import/export daemon.
26 # pylint: disable-msg=C0103
27 # C0103: Invalid name import-export
40 from ganeti import constants
41 from ganeti import cli
42 from ganeti import utils
43 from ganeti import errors
44 from ganeti import serializer
45 from ganeti import objects
46 from ganeti import locking
47 from ganeti import impexpd
48 from ganeti import netutils
51 #: How many lines to keep in the status file
52 MAX_RECENT_OUTPUT_LINES = 20
54 #: Don't update status file more than once every 5 seconds (unless forced)
55 MIN_UPDATE_INTERVAL = 5.0
57 #: Give child process up to 5 seconds to exit after sending a signal
58 CHILD_LINGER_TIMEOUT = 5.0
60 #: How long to wait for a connection to be established
61 DEFAULT_CONNECT_TIMEOUT = 60
63 #: Get dd(1) statistics every few seconds
64 DD_STATISTICS_INTERVAL = 5.0
66 #: Seconds for throughput calculation
67 DD_THROUGHPUT_INTERVAL = 60.0
69 #: Number of samples for throughput calculation
70 DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) /
71 DD_STATISTICS_INTERVAL))
74 # Global variable for options
79 """Configures the logging module.
82 formatter = logging.Formatter("%(asctime)s: %(message)s")
84 stderr_handler = logging.StreamHandler()
85 stderr_handler.setFormatter(formatter)
86 stderr_handler.setLevel(logging.NOTSET)
88 root_logger = logging.getLogger("")
89 root_logger.addHandler(stderr_handler)
92 root_logger.setLevel(logging.NOTSET)
94 root_logger.setLevel(logging.INFO)
96 root_logger.setLevel(logging.ERROR)
98 # Create special logger for child process output
99 child_logger = logging.Logger("child output")
100 child_logger.addHandler(stderr_handler)
101 child_logger.setLevel(logging.NOTSET)
107 """Status file manager.
110 def __init__(self, path):
111 """Initializes class.
115 self._data = objects.ImportExportStatus(ctime=time.time(),
119 def AddRecentOutput(self, line):
120 """Adds a new line of recent output.
123 self._data.recent_output.append(line)
126 del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
128 def SetListenPort(self, port):
129 """Sets the port the daemon is listening on.
132 @param port: TCP/UDP port
135 assert isinstance(port, (int, long)) and 0 < port < 2**16
136 self._data.listen_port = port
138 def GetListenPort(self):
139 """Returns the port the daemon is listening on.
142 return self._data.listen_port
144 def SetConnected(self):
145 """Sets the connected flag.
148 self._data.connected = True
150 def GetConnected(self):
151 """Determines whether the daemon is connected.
154 return self._data.connected
156 def SetProgress(self, mbytes, throughput, percent, eta):
157 """Sets how much data has been transferred so far.
160 @param mbytes: Transferred amount of data in MiB.
161 @type throughput: float
162 @param throughput: MiB/second
163 @type percent: number
164 @param percent: Percent processed
166 @param eta: Expected number of seconds until done
169 self._data.progress_mbytes = mbytes
170 self._data.progress_throughput = throughput
171 self._data.progress_percent = percent
172 self._data.progress_eta = eta
174 def SetExitStatus(self, exit_status, error_message):
175 """Sets the exit status and an error message.
178 # Require error message when status isn't 0
179 assert exit_status == 0 or error_message
181 self._data.exit_status = exit_status
182 self._data.error_message = error_message
184 def ExitStatusIsSuccess(self):
185 """Returns whether the exit status means "success".
188 return not bool(self._data.error_message)
190 def Update(self, force):
191 """Updates the status file.
194 @param force: Write status file in any case, not only when minimum interval
199 self._data.mtime is None or
200 time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
203 logging.debug("Updating status file %s", self._path)
205 self._data.mtime = time.time()
206 utils.WriteFile(self._path,
207 data=serializer.DumpJson(self._data.ToDict(), indent=True),
211 def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
212 dd_pid_read_fd, exp_size_read_fd, status_file, child_logger,
213 signal_notify, signal_handler, mode):
214 """Handles the child processes' output.
217 assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
218 "Other signals are not handled in this function"
220 # Buffer size 0 is important, otherwise .read() with a specified length
221 # might buffer data while poll(2) won't mark its file descriptor as
223 socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
224 dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
225 dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
226 exp_size_read = os.fdopen(exp_size_read_fd, "r", 0)
228 tp_samples = DD_THROUGHPUT_SAMPLES
230 if options.exp_size == constants.IE_CUSTOM_SIZE:
233 exp_size = options.exp_size
235 child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
236 child_logger, tp_samples,
240 child.stderr.fileno():
241 (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)),
242 socat_stderr_read.fileno():
243 (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)),
244 dd_pid_read.fileno():
245 (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
246 dd_stderr_read.fileno():
247 (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
248 exp_size_read.fileno():
249 (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)),
250 signal_notify.fileno(): (signal_notify, None),
253 poller = select.poll()
255 utils.SetNonblockFlag(fd, True)
256 poller.register(fd, select.POLLIN)
258 if options.connect_timeout and mode == constants.IEM_IMPORT:
259 listen_timeout = locking.RunningTimeout(options.connect_timeout, True)
261 listen_timeout = None
264 dd_stats_timeout = None
267 # Break out of loop if only signal notify FD is left
268 if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
273 if listen_timeout and not exit_timeout:
274 if status_file.GetConnected():
275 listen_timeout = None
276 elif listen_timeout.Remaining() < 0:
277 logging.info("Child process didn't establish connection in time")
278 child.Kill(signal.SIGTERM)
280 locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
281 # Next block will calculate timeout
283 # Not yet connected, check again in a second
287 timeout = exit_timeout.Remaining() * 1000
289 logging.info("Child process didn't exit in time")
292 if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0:
293 notify_status = child_io_proc.NotifyDd()
295 # Schedule next notification
296 dd_stats_timeout = locking.RunningTimeout(DD_STATISTICS_INTERVAL,
299 # Try again soon (dd isn't ready yet)
300 dd_stats_timeout = locking.RunningTimeout(1.0, True)
303 dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000)
308 timeout = min(timeout, dd_timeout)
310 for fd, event in utils.RetryOnSignal(poller.poll, timeout):
311 if event & (select.POLLIN | event & select.POLLPRI):
312 (from_, to) = fdmap[fd]
314 # Read up to 1 KB of data
315 data = from_.read(1024)
319 elif fd == signal_notify.fileno():
321 if signal_handler.called:
322 signal_handler.Clear()
324 logging.info("Child process still has about %0.2f seconds"
325 " to exit", exit_timeout.Remaining())
327 logging.info("Giving child process %0.2f seconds to exit",
328 CHILD_LINGER_TIMEOUT)
330 locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
332 poller.unregister(fd)
335 elif event & (select.POLLNVAL | select.POLLHUP |
337 poller.unregister(fd)
340 child_io_proc.FlushAll()
342 # If there was a timeout calculator, we were waiting for the child to
343 # finish, e.g. due to a signal
344 return not bool(exit_timeout)
346 child_io_proc.CloseAll()
350 """Parses the options passed to the program.
352 @return: Arguments to program
355 global options # pylint: disable-msg=W0603
357 parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
358 (constants.IEM_IMPORT,
359 constants.IEM_EXPORT)))
360 parser.add_option(cli.DEBUG_OPT)
361 parser.add_option(cli.VERBOSE_OPT)
362 parser.add_option("--key", dest="key", action="store", type="string",
364 parser.add_option("--cert", dest="cert", action="store", type="string",
365 help="X509 certificate file")
366 parser.add_option("--ca", dest="ca", action="store", type="string",
368 parser.add_option("--bind", dest="bind", action="store", type="string",
370 parser.add_option("--host", dest="host", action="store", type="string",
371 help="Remote hostname")
372 parser.add_option("--port", dest="port", action="store", type="int",
374 parser.add_option("--connect-retries", dest="connect_retries", action="store",
375 type="int", default=0,
376 help=("How many times the connection should be retried"
378 parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
379 type="int", default=DEFAULT_CONNECT_TIMEOUT,
380 help="Timeout for connection to be established (seconds)")
381 parser.add_option("--compress", dest="compress", action="store",
382 type="choice", help="Compression method",
383 metavar="[%s]" % "|".join(constants.IEC_ALL),
384 choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
385 parser.add_option("--expected-size", dest="exp_size", action="store",
386 type="string", default=None,
387 help="Expected import/export size (MiB)")
388 parser.add_option("--magic", dest="magic", action="store",
389 type="string", default=None, help="Magic string")
390 parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
391 type="string", help="Command prefix")
392 parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
393 type="string", help="Command suffix")
395 (options, args) = parser.parse_args()
399 parser.error("Expected exactly two arguments")
401 (status_file_path, mode) = args
403 if mode not in (constants.IEM_IMPORT,
404 constants.IEM_EXPORT):
406 parser.error("Invalid mode: %s" % mode)
408 # Normalize and check parameters
409 if options.host is not None:
411 options.host = netutils.Hostname.GetNormalizedName(options.host)
412 except errors.OpPrereqError, err:
413 parser.error("Invalid hostname '%s': %s" % (options.host, err))
415 if options.port is not None:
416 options.port = utils.ValidateServiceName(options.port)
418 if (options.exp_size is not None and
419 options.exp_size != constants.IE_CUSTOM_SIZE):
421 options.exp_size = int(options.exp_size)
422 except (ValueError, TypeError), err:
424 parser.error("Invalid value for --expected-size: %s (%s)" %
425 (options.exp_size, err))
427 if not (options.magic is None or constants.IE_MAGIC_RE.match(options.magic)):
428 parser.error("Magic must match regular expression %s" %
429 constants.IE_MAGIC_RE.pattern)
431 return (status_file_path, mode)
434 class ChildProcess(subprocess.Popen):
435 def __init__(self, env, cmd, noclose_fds):
436 """Initializes this class.
439 self._noclose_fds = noclose_fds
441 # Not using close_fds because doing so would also close the socat stderr
442 # pipe, which we still need.
443 subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False,
444 stderr=subprocess.PIPE, stdout=None, stdin=None,
445 preexec_fn=self._ChildPreexec)
446 self._SetProcessGroup()
448 def _ChildPreexec(self):
449 """Called before child executable is execve'd.
452 # Move to separate process group. By sending a signal to its process group
453 # we can kill the child process and all grandchildren.
456 # Close almost all file descriptors
457 utils.CloseFDs(noclose_fds=self._noclose_fds)
459 def _SetProcessGroup(self):
460 """Sets the child's process group.
463 assert self.pid, "Can't be called in child process"
465 # Avoid race condition by setting child's process group (as good as
466 # possible in Python) before sending signals to child. For an
467 # explanation, see preexec function for child.
469 os.setpgid(self.pid, self.pid)
470 except EnvironmentError, err:
471 # If the child process was faster we receive EPERM or EACCES
472 if err.errno not in (errno.EPERM, errno.EACCES):
475 def Kill(self, signum):
476 """Sends signal to child process.
479 logging.info("Sending signal %s to child process", signum)
480 utils.IgnoreProcessNotFound(os.killpg, self.pid, signum)
483 """Ensure child process is no longer running.
486 # Final check if child process is still alive
487 if utils.RetryOnSignal(self.poll) is None:
488 logging.error("Child process still alive, sending SIGKILL")
489 self.Kill(signal.SIGKILL)
490 utils.RetryOnSignal(self.wait)
498 (status_file_path, mode) = ParseOptions()
501 child_logger = SetupLogging()
503 status_file = StatusFile(status_file_path)
506 # Pipe to receive socat's stderr output
507 (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
509 # Pipe to receive dd's stderr output
510 (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe()
512 # Pipe to receive dd's PID
513 (dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
515 # Pipe to receive size predicted by export script
516 (exp_size_read_fd, exp_size_write_fd) = os.pipe()
518 # Get child process command
519 cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
520 dd_stderr_write_fd, dd_pid_write_fd)
521 cmd = cmd_builder.GetCommand()
523 # Prepare command environment
524 cmd_env = os.environ.copy()
526 if options.exp_size == constants.IE_CUSTOM_SIZE:
527 cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd)
529 logging.debug("Starting command %r", cmd)
531 # Start child process
532 child = ChildProcess(cmd_env, cmd,
533 [socat_stderr_write_fd, dd_stderr_write_fd,
534 dd_pid_write_fd, exp_size_write_fd])
536 def _ForwardSignal(signum, _):
537 """Forwards signals to child process.
542 signal_wakeup = utils.SignalWakeupFd()
544 # TODO: There is a race condition between starting the child and
545 # handling the signals here. While there might be a way to work around
546 # it by registering the handlers before starting the child and
547 # deferring sent signals until the child is available, doing so can be
549 signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
550 handler_fn=_ForwardSignal,
551 wakeup=signal_wakeup)
554 utils.RetryOnSignal(os.close, socat_stderr_write_fd)
555 utils.RetryOnSignal(os.close, dd_stderr_write_fd)
556 utils.RetryOnSignal(os.close, dd_pid_write_fd)
557 utils.RetryOnSignal(os.close, exp_size_write_fd)
559 if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
560 dd_pid_read_fd, exp_size_read_fd,
561 status_file, child_logger,
562 signal_wakeup, signal_handler, mode):
563 # The child closed all its file descriptors and there was no
565 # TODO: Implement timeout instead of waiting indefinitely
566 utils.RetryOnSignal(child.wait)
568 signal_handler.Reset()
570 signal_wakeup.Reset()
574 if child.returncode == 0:
576 elif child.returncode < 0:
577 errmsg = "Exited due to signal %s" % (-child.returncode, )
579 errmsg = "Exited with status %s" % (child.returncode, )
581 status_file.SetExitStatus(child.returncode, errmsg)
582 except Exception, err: # pylint: disable-msg=W0703
583 logging.exception("Unhandled error occurred")
584 status_file.SetExitStatus(constants.EXIT_FAILURE,
585 "Unhandled error occurred: %s" % (err, ))
587 if status_file.ExitStatusIsSuccess():
588 sys.exit(constants.EXIT_SUCCESS)
590 sys.exit(constants.EXIT_FAILURE)
592 status_file.Update(True)
595 if __name__ == "__main__":