4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Import/export daemon.
26 # pylint: disable=C0103
27 # C0103: Invalid name import-export
40 from ganeti import constants
41 from ganeti import cli
42 from ganeti import utils
43 from ganeti import errors
44 from ganeti import serializer
45 from ganeti import objects
46 from ganeti import impexpd
47 from ganeti import netutils
50 #: How many lines to keep in the status file
51 MAX_RECENT_OUTPUT_LINES = 20
53 #: Don't update status file more than once every 5 seconds (unless forced)
54 MIN_UPDATE_INTERVAL = 5.0
56 #: How long to wait for a connection to be established
57 DEFAULT_CONNECT_TIMEOUT = 60
59 #: Get dd(1) statistics every few seconds
60 DD_STATISTICS_INTERVAL = 5.0
62 #: Seconds for throughput calculation
63 DD_THROUGHPUT_INTERVAL = 60.0
65 #: Number of samples for throughput calculation
66 DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) /
67 DD_STATISTICS_INTERVAL))
70 # Global variable for options
75 """Configures the logging module.
78 formatter = logging.Formatter("%(asctime)s: %(message)s")
80 stderr_handler = logging.StreamHandler()
81 stderr_handler.setFormatter(formatter)
82 stderr_handler.setLevel(logging.NOTSET)
84 root_logger = logging.getLogger("")
85 root_logger.addHandler(stderr_handler)
88 root_logger.setLevel(logging.NOTSET)
90 root_logger.setLevel(logging.INFO)
92 root_logger.setLevel(logging.ERROR)
94 # Create special logger for child process output
95 child_logger = logging.Logger("child output")
96 child_logger.addHandler(stderr_handler)
97 child_logger.setLevel(logging.NOTSET)
103 """Status file manager.
106 def __init__(self, path):
107 """Initializes class.
111 self._data = objects.ImportExportStatus(ctime=time.time(),
115 def AddRecentOutput(self, line):
116 """Adds a new line of recent output.
119 self._data.recent_output.append(line)
122 del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
124 def SetListenPort(self, port):
125 """Sets the port the daemon is listening on.
128 @param port: TCP/UDP port
131 assert isinstance(port, (int, long)) and 0 < port < (2 ** 16)
132 self._data.listen_port = port
134 def GetListenPort(self):
135 """Returns the port the daemon is listening on.
138 return self._data.listen_port
140 def SetConnected(self):
141 """Sets the connected flag.
144 self._data.connected = True
146 def GetConnected(self):
147 """Determines whether the daemon is connected.
150 return self._data.connected
152 def SetProgress(self, mbytes, throughput, percent, eta):
153 """Sets how much data has been transferred so far.
156 @param mbytes: Transferred amount of data in MiB.
157 @type throughput: float
158 @param throughput: MiB/second
159 @type percent: number
160 @param percent: Percent processed
162 @param eta: Expected number of seconds until done
165 self._data.progress_mbytes = mbytes
166 self._data.progress_throughput = throughput
167 self._data.progress_percent = percent
168 self._data.progress_eta = eta
170 def SetExitStatus(self, exit_status, error_message):
171 """Sets the exit status and an error message.
174 # Require error message when status isn't 0
175 assert exit_status == 0 or error_message
177 self._data.exit_status = exit_status
178 self._data.error_message = error_message
180 def ExitStatusIsSuccess(self):
181 """Returns whether the exit status means "success".
184 return not bool(self._data.error_message)
186 def Update(self, force):
187 """Updates the status file.
190 @param force: Write status file in any case, not only when minimum interval
195 self._data.mtime is None or
196 time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
199 logging.debug("Updating status file %s", self._path)
201 self._data.mtime = time.time()
202 utils.WriteFile(self._path,
203 data=serializer.DumpJson(self._data.ToDict()),
207 def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
208 dd_pid_read_fd, exp_size_read_fd, status_file, child_logger,
209 signal_notify, signal_handler, mode):
210 """Handles the child processes' output.
213 assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
214 "Other signals are not handled in this function"
216 # Buffer size 0 is important, otherwise .read() with a specified length
217 # might buffer data while poll(2) won't mark its file descriptor as
219 socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
220 dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
221 dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
222 exp_size_read = os.fdopen(exp_size_read_fd, "r", 0)
224 tp_samples = DD_THROUGHPUT_SAMPLES
226 if options.exp_size == constants.IE_CUSTOM_SIZE:
229 exp_size = options.exp_size
231 child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
232 child_logger, tp_samples,
236 child.stderr.fileno():
237 (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)),
238 socat_stderr_read.fileno():
239 (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)),
240 dd_pid_read.fileno():
241 (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
242 dd_stderr_read.fileno():
243 (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
244 exp_size_read.fileno():
245 (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)),
246 signal_notify.fileno(): (signal_notify, None),
249 poller = select.poll()
251 utils.SetNonblockFlag(fd, True)
252 poller.register(fd, select.POLLIN)
254 if options.connect_timeout and mode == constants.IEM_IMPORT:
255 listen_timeout = utils.RunningTimeout(options.connect_timeout, True)
257 listen_timeout = None
260 dd_stats_timeout = None
263 # Break out of loop if only signal notify FD is left
264 if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
269 if listen_timeout and not exit_timeout:
270 assert mode == constants.IEM_IMPORT and options.connect_timeout
271 if status_file.GetConnected():
272 listen_timeout = None
273 elif listen_timeout.Remaining() < 0:
274 errmsg = ("Child process didn't establish connection in time"
275 " (%0.0fs), sending SIGTERM" % options.connect_timeout)
276 logging.error(errmsg)
277 status_file.AddRecentOutput(errmsg)
278 status_file.Update(True)
280 child.Kill(signal.SIGTERM)
282 utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True)
283 # Next block will calculate timeout
285 # Not yet connected, check again in a second
289 timeout = exit_timeout.Remaining() * 1000
291 logging.info("Child process didn't exit in time")
294 if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0:
295 notify_status = child_io_proc.NotifyDd()
297 # Schedule next notification
298 dd_stats_timeout = utils.RunningTimeout(DD_STATISTICS_INTERVAL, True)
300 # Try again soon (dd isn't ready yet)
301 dd_stats_timeout = utils.RunningTimeout(1.0, True)
304 dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000)
309 timeout = min(timeout, dd_timeout)
311 for fd, event in utils.RetryOnSignal(poller.poll, timeout):
312 if event & (select.POLLIN | event & select.POLLPRI):
313 (from_, to) = fdmap[fd]
315 # Read up to 1 KB of data
316 data = from_.read(1024)
320 elif fd == signal_notify.fileno():
322 if signal_handler.called:
323 signal_handler.Clear()
325 logging.info("Child process still has about %0.2f seconds"
326 " to exit", exit_timeout.Remaining())
328 logging.info("Giving child process %0.2f seconds to exit",
329 constants.CHILD_LINGER_TIMEOUT)
331 utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True)
333 poller.unregister(fd)
336 elif event & (select.POLLNVAL | select.POLLHUP |
338 poller.unregister(fd)
341 child_io_proc.FlushAll()
343 # If there was a timeout calculator, we were waiting for the child to
344 # finish, e.g. due to a signal
345 return not bool(exit_timeout)
347 child_io_proc.CloseAll()
351 """Parses the options passed to the program.
353 @return: Arguments to program
356 global options # pylint: disable=W0603
358 parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
359 (constants.IEM_IMPORT,
360 constants.IEM_EXPORT)))
361 parser.add_option(cli.DEBUG_OPT)
362 parser.add_option(cli.VERBOSE_OPT)
363 parser.add_option("--key", dest="key", action="store", type="string",
365 parser.add_option("--cert", dest="cert", action="store", type="string",
366 help="X509 certificate file")
367 parser.add_option("--ca", dest="ca", action="store", type="string",
369 parser.add_option("--bind", dest="bind", action="store", type="string",
371 parser.add_option("--ipv4", dest="ipv4", action="store_true",
372 help="Use IPv4 only")
373 parser.add_option("--ipv6", dest="ipv6", action="store_true",
374 help="Use IPv6 only")
375 parser.add_option("--host", dest="host", action="store", type="string",
376 help="Remote hostname")
377 parser.add_option("--port", dest="port", action="store", type="int",
379 parser.add_option("--connect-retries", dest="connect_retries", action="store",
380 type="int", default=0,
381 help=("How many times the connection should be retried"
383 parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
384 type="int", default=DEFAULT_CONNECT_TIMEOUT,
385 help="Timeout for connection to be established (seconds)")
386 parser.add_option("--compress", dest="compress", action="store",
387 type="choice", help="Compression method",
388 metavar="[%s]" % "|".join(constants.IEC_ALL),
389 choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
390 parser.add_option("--expected-size", dest="exp_size", action="store",
391 type="string", default=None,
392 help="Expected import/export size (MiB)")
393 parser.add_option("--magic", dest="magic", action="store",
394 type="string", default=None, help="Magic string")
395 parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
396 type="string", help="Command prefix")
397 parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
398 type="string", help="Command suffix")
400 (options, args) = parser.parse_args()
404 parser.error("Expected exactly two arguments")
406 (status_file_path, mode) = args
408 if mode not in (constants.IEM_IMPORT,
409 constants.IEM_EXPORT):
411 parser.error("Invalid mode: %s" % mode)
413 # Normalize and check parameters
414 if options.host is not None and not netutils.IPAddress.IsValid(options.host):
416 options.host = netutils.Hostname.GetNormalizedName(options.host)
417 except errors.OpPrereqError, err:
418 parser.error("Invalid hostname '%s': %s" % (options.host, err))
420 if options.port is not None:
421 options.port = utils.ValidateServiceName(options.port)
423 if (options.exp_size is not None and
424 options.exp_size != constants.IE_CUSTOM_SIZE):
426 options.exp_size = int(options.exp_size)
427 except (ValueError, TypeError), err:
429 parser.error("Invalid value for --expected-size: %s (%s)" %
430 (options.exp_size, err))
432 if not (options.magic is None or constants.IE_MAGIC_RE.match(options.magic)):
433 parser.error("Magic must match regular expression %s" %
434 constants.IE_MAGIC_RE.pattern)
436 if options.ipv4 and options.ipv6:
437 parser.error("Can only use one of --ipv4 and --ipv6")
439 return (status_file_path, mode)
442 class ChildProcess(subprocess.Popen):
443 def __init__(self, env, cmd, noclose_fds):
444 """Initializes this class.
447 self._noclose_fds = noclose_fds
449 # Not using close_fds because doing so would also close the socat stderr
450 # pipe, which we still need.
451 subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False,
452 stderr=subprocess.PIPE, stdout=None, stdin=None,
453 preexec_fn=self._ChildPreexec)
454 self._SetProcessGroup()
456 def _ChildPreexec(self):
457 """Called before child executable is execve'd.
460 # Move to separate process group. By sending a signal to its process group
461 # we can kill the child process and all grandchildren.
464 # Close almost all file descriptors
465 utils.CloseFDs(noclose_fds=self._noclose_fds)
467 def _SetProcessGroup(self):
468 """Sets the child's process group.
471 assert self.pid, "Can't be called in child process"
473 # Avoid race condition by setting child's process group (as good as
474 # possible in Python) before sending signals to child. For an
475 # explanation, see preexec function for child.
477 os.setpgid(self.pid, self.pid)
478 except EnvironmentError, err:
479 # If the child process was faster we receive EPERM or EACCES
480 if err.errno not in (errno.EPERM, errno.EACCES):
483 def Kill(self, signum):
484 """Sends signal to child process.
487 logging.info("Sending signal %s to child process", signum)
488 utils.IgnoreProcessNotFound(os.killpg, self.pid, signum)
491 """Ensure child process is no longer running.
494 # Final check if child process is still alive
495 if utils.RetryOnSignal(self.poll) is None:
496 logging.error("Child process still alive, sending SIGKILL")
497 self.Kill(signal.SIGKILL)
498 utils.RetryOnSignal(self.wait)
506 (status_file_path, mode) = ParseOptions()
509 child_logger = SetupLogging()
511 status_file = StatusFile(status_file_path)
514 # Pipe to receive socat's stderr output
515 (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
517 # Pipe to receive dd's stderr output
518 (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe()
520 # Pipe to receive dd's PID
521 (dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
523 # Pipe to receive size predicted by export script
524 (exp_size_read_fd, exp_size_write_fd) = os.pipe()
526 # Get child process command
527 cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
528 dd_stderr_write_fd, dd_pid_write_fd)
529 cmd = cmd_builder.GetCommand()
531 # Prepare command environment
532 cmd_env = os.environ.copy()
534 if options.exp_size == constants.IE_CUSTOM_SIZE:
535 cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd)
537 logging.debug("Starting command %r", cmd)
539 # Start child process
540 child = ChildProcess(cmd_env, cmd,
541 [socat_stderr_write_fd, dd_stderr_write_fd,
542 dd_pid_write_fd, exp_size_write_fd])
545 def _ForwardSignal(signum, _):
546 """Forwards signals to child process.
551 signal_wakeup = utils.SignalWakeupFd()
553 # TODO: There is a race condition between starting the child and
554 # handling the signals here. While there might be a way to work around
555 # it by registering the handlers before starting the child and
556 # deferring sent signals until the child is available, doing so can be
558 signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
559 handler_fn=_ForwardSignal,
560 wakeup=signal_wakeup)
563 utils.RetryOnSignal(os.close, socat_stderr_write_fd)
564 utils.RetryOnSignal(os.close, dd_stderr_write_fd)
565 utils.RetryOnSignal(os.close, dd_pid_write_fd)
566 utils.RetryOnSignal(os.close, exp_size_write_fd)
568 if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
569 dd_pid_read_fd, exp_size_read_fd,
570 status_file, child_logger,
571 signal_wakeup, signal_handler, mode):
572 # The child closed all its file descriptors and there was no
574 # TODO: Implement timeout instead of waiting indefinitely
575 utils.RetryOnSignal(child.wait)
577 signal_handler.Reset()
579 signal_wakeup.Reset()
583 if child.returncode == 0:
585 elif child.returncode < 0:
586 errmsg = "Exited due to signal %s" % (-child.returncode, )
588 errmsg = "Exited with status %s" % (child.returncode, )
590 status_file.SetExitStatus(child.returncode, errmsg)
591 except Exception, err: # pylint: disable=W0703
592 logging.exception("Unhandled error occurred")
593 status_file.SetExitStatus(constants.EXIT_FAILURE,
594 "Unhandled error occurred: %s" % (err, ))
596 if status_file.ExitStatusIsSuccess():
597 sys.exit(constants.EXIT_SUCCESS)
599 sys.exit(constants.EXIT_FAILURE)
601 status_file.Update(True)
604 if __name__ == "__main__":