4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Import/export daemon.
26 # pylint: disable-msg=C0103
27 # C0103: Invalid name import-export
40 from ganeti import constants
41 from ganeti import cli
42 from ganeti import utils
43 from ganeti import errors
44 from ganeti import serializer
45 from ganeti import objects
46 from ganeti import impexpd
47 from ganeti import netutils
50 #: How many lines to keep in the status file
51 MAX_RECENT_OUTPUT_LINES = 20
53 #: Don't update status file more than once every 5 seconds (unless forced)
54 MIN_UPDATE_INTERVAL = 5.0
56 #: Give child process up to 5 seconds to exit after sending a signal
57 CHILD_LINGER_TIMEOUT = 5.0
59 #: How long to wait for a connection to be established
60 DEFAULT_CONNECT_TIMEOUT = 60
62 #: Get dd(1) statistics every few seconds
63 DD_STATISTICS_INTERVAL = 5.0
65 #: Seconds for throughput calculation
66 DD_THROUGHPUT_INTERVAL = 60.0
68 #: Number of samples for throughput calculation
69 DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) /
70 DD_STATISTICS_INTERVAL))
73 # Global variable for options
78 """Configures the logging module.
81 formatter = logging.Formatter("%(asctime)s: %(message)s")
83 stderr_handler = logging.StreamHandler()
84 stderr_handler.setFormatter(formatter)
85 stderr_handler.setLevel(logging.NOTSET)
87 root_logger = logging.getLogger("")
88 root_logger.addHandler(stderr_handler)
91 root_logger.setLevel(logging.NOTSET)
93 root_logger.setLevel(logging.INFO)
95 root_logger.setLevel(logging.ERROR)
97 # Create special logger for child process output
98 child_logger = logging.Logger("child output")
99 child_logger.addHandler(stderr_handler)
100 child_logger.setLevel(logging.NOTSET)
106 """Status file manager.
109 def __init__(self, path):
110 """Initializes class.
114 self._data = objects.ImportExportStatus(ctime=time.time(),
118 def AddRecentOutput(self, line):
119 """Adds a new line of recent output.
122 self._data.recent_output.append(line)
125 del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
127 def SetListenPort(self, port):
128 """Sets the port the daemon is listening on.
131 @param port: TCP/UDP port
134 assert isinstance(port, (int, long)) and 0 < port < 2**16
135 self._data.listen_port = port
137 def GetListenPort(self):
138 """Returns the port the daemon is listening on.
141 return self._data.listen_port
143 def SetConnected(self):
144 """Sets the connected flag.
147 self._data.connected = True
149 def GetConnected(self):
150 """Determines whether the daemon is connected.
153 return self._data.connected
155 def SetProgress(self, mbytes, throughput, percent, eta):
156 """Sets how much data has been transferred so far.
159 @param mbytes: Transferred amount of data in MiB.
160 @type throughput: float
161 @param throughput: MiB/second
162 @type percent: number
163 @param percent: Percent processed
165 @param eta: Expected number of seconds until done
168 self._data.progress_mbytes = mbytes
169 self._data.progress_throughput = throughput
170 self._data.progress_percent = percent
171 self._data.progress_eta = eta
173 def SetExitStatus(self, exit_status, error_message):
174 """Sets the exit status and an error message.
177 # Require error message when status isn't 0
178 assert exit_status == 0 or error_message
180 self._data.exit_status = exit_status
181 self._data.error_message = error_message
183 def ExitStatusIsSuccess(self):
184 """Returns whether the exit status means "success".
187 return not bool(self._data.error_message)
189 def Update(self, force):
190 """Updates the status file.
193 @param force: Write status file in any case, not only when minimum interval
198 self._data.mtime is None or
199 time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
202 logging.debug("Updating status file %s", self._path)
204 self._data.mtime = time.time()
205 utils.WriteFile(self._path,
206 data=serializer.DumpJson(self._data.ToDict(), indent=True),
210 def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
211 dd_pid_read_fd, exp_size_read_fd, status_file, child_logger,
212 signal_notify, signal_handler, mode):
213 """Handles the child processes' output.
216 assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
217 "Other signals are not handled in this function"
219 # Buffer size 0 is important, otherwise .read() with a specified length
220 # might buffer data while poll(2) won't mark its file descriptor as
222 socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
223 dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
224 dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
225 exp_size_read = os.fdopen(exp_size_read_fd, "r", 0)
227 tp_samples = DD_THROUGHPUT_SAMPLES
229 if options.exp_size == constants.IE_CUSTOM_SIZE:
232 exp_size = options.exp_size
234 child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
235 child_logger, tp_samples,
239 child.stderr.fileno():
240 (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)),
241 socat_stderr_read.fileno():
242 (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)),
243 dd_pid_read.fileno():
244 (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
245 dd_stderr_read.fileno():
246 (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
247 exp_size_read.fileno():
248 (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)),
249 signal_notify.fileno(): (signal_notify, None),
252 poller = select.poll()
254 utils.SetNonblockFlag(fd, True)
255 poller.register(fd, select.POLLIN)
257 if options.connect_timeout and mode == constants.IEM_IMPORT:
258 listen_timeout = utils.RunningTimeout(options.connect_timeout, True)
260 listen_timeout = None
263 dd_stats_timeout = None
266 # Break out of loop if only signal notify FD is left
267 if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
272 if listen_timeout and not exit_timeout:
273 if status_file.GetConnected():
274 listen_timeout = None
275 elif listen_timeout.Remaining() < 0:
276 logging.info("Child process didn't establish connection in time")
277 child.Kill(signal.SIGTERM)
279 utils.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
280 # Next block will calculate timeout
282 # Not yet connected, check again in a second
286 timeout = exit_timeout.Remaining() * 1000
288 logging.info("Child process didn't exit in time")
291 if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0:
292 notify_status = child_io_proc.NotifyDd()
294 # Schedule next notification
295 dd_stats_timeout = utils.RunningTimeout(DD_STATISTICS_INTERVAL, True)
297 # Try again soon (dd isn't ready yet)
298 dd_stats_timeout = utils.RunningTimeout(1.0, True)
301 dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000)
306 timeout = min(timeout, dd_timeout)
308 for fd, event in utils.RetryOnSignal(poller.poll, timeout):
309 if event & (select.POLLIN | event & select.POLLPRI):
310 (from_, to) = fdmap[fd]
312 # Read up to 1 KB of data
313 data = from_.read(1024)
317 elif fd == signal_notify.fileno():
319 if signal_handler.called:
320 signal_handler.Clear()
322 logging.info("Child process still has about %0.2f seconds"
323 " to exit", exit_timeout.Remaining())
325 logging.info("Giving child process %0.2f seconds to exit",
326 CHILD_LINGER_TIMEOUT)
328 utils.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
330 poller.unregister(fd)
333 elif event & (select.POLLNVAL | select.POLLHUP |
335 poller.unregister(fd)
338 child_io_proc.FlushAll()
340 # If there was a timeout calculator, we were waiting for the child to
341 # finish, e.g. due to a signal
342 return not bool(exit_timeout)
344 child_io_proc.CloseAll()
348 """Parses the options passed to the program.
350 @return: Arguments to program
353 global options # pylint: disable-msg=W0603
355 parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
356 (constants.IEM_IMPORT,
357 constants.IEM_EXPORT)))
358 parser.add_option(cli.DEBUG_OPT)
359 parser.add_option(cli.VERBOSE_OPT)
360 parser.add_option("--key", dest="key", action="store", type="string",
362 parser.add_option("--cert", dest="cert", action="store", type="string",
363 help="X509 certificate file")
364 parser.add_option("--ca", dest="ca", action="store", type="string",
366 parser.add_option("--bind", dest="bind", action="store", type="string",
368 parser.add_option("--host", dest="host", action="store", type="string",
369 help="Remote hostname")
370 parser.add_option("--port", dest="port", action="store", type="int",
372 parser.add_option("--connect-retries", dest="connect_retries", action="store",
373 type="int", default=0,
374 help=("How many times the connection should be retried"
376 parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
377 type="int", default=DEFAULT_CONNECT_TIMEOUT,
378 help="Timeout for connection to be established (seconds)")
379 parser.add_option("--compress", dest="compress", action="store",
380 type="choice", help="Compression method",
381 metavar="[%s]" % "|".join(constants.IEC_ALL),
382 choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
383 parser.add_option("--expected-size", dest="exp_size", action="store",
384 type="string", default=None,
385 help="Expected import/export size (MiB)")
386 parser.add_option("--magic", dest="magic", action="store",
387 type="string", default=None, help="Magic string")
388 parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
389 type="string", help="Command prefix")
390 parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
391 type="string", help="Command suffix")
393 (options, args) = parser.parse_args()
397 parser.error("Expected exactly two arguments")
399 (status_file_path, mode) = args
401 if mode not in (constants.IEM_IMPORT,
402 constants.IEM_EXPORT):
404 parser.error("Invalid mode: %s" % mode)
406 # Normalize and check parameters
407 if options.host is not None:
409 options.host = netutils.Hostname.GetNormalizedName(options.host)
410 except errors.OpPrereqError, err:
411 parser.error("Invalid hostname '%s': %s" % (options.host, err))
413 if options.port is not None:
414 options.port = utils.ValidateServiceName(options.port)
416 if (options.exp_size is not None and
417 options.exp_size != constants.IE_CUSTOM_SIZE):
419 options.exp_size = int(options.exp_size)
420 except (ValueError, TypeError), err:
422 parser.error("Invalid value for --expected-size: %s (%s)" %
423 (options.exp_size, err))
425 if not (options.magic is None or constants.IE_MAGIC_RE.match(options.magic)):
426 parser.error("Magic must match regular expression %s" %
427 constants.IE_MAGIC_RE.pattern)
429 return (status_file_path, mode)
432 class ChildProcess(subprocess.Popen):
433 def __init__(self, env, cmd, noclose_fds):
434 """Initializes this class.
437 self._noclose_fds = noclose_fds
439 # Not using close_fds because doing so would also close the socat stderr
440 # pipe, which we still need.
441 subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False,
442 stderr=subprocess.PIPE, stdout=None, stdin=None,
443 preexec_fn=self._ChildPreexec)
444 self._SetProcessGroup()
446 def _ChildPreexec(self):
447 """Called before child executable is execve'd.
450 # Move to separate process group. By sending a signal to its process group
451 # we can kill the child process and all grandchildren.
454 # Close almost all file descriptors
455 utils.CloseFDs(noclose_fds=self._noclose_fds)
457 def _SetProcessGroup(self):
458 """Sets the child's process group.
461 assert self.pid, "Can't be called in child process"
463 # Avoid race condition by setting child's process group (as good as
464 # possible in Python) before sending signals to child. For an
465 # explanation, see preexec function for child.
467 os.setpgid(self.pid, self.pid)
468 except EnvironmentError, err:
469 # If the child process was faster we receive EPERM or EACCES
470 if err.errno not in (errno.EPERM, errno.EACCES):
473 def Kill(self, signum):
474 """Sends signal to child process.
477 logging.info("Sending signal %s to child process", signum)
478 utils.IgnoreProcessNotFound(os.killpg, self.pid, signum)
481 """Ensure child process is no longer running.
484 # Final check if child process is still alive
485 if utils.RetryOnSignal(self.poll) is None:
486 logging.error("Child process still alive, sending SIGKILL")
487 self.Kill(signal.SIGKILL)
488 utils.RetryOnSignal(self.wait)
496 (status_file_path, mode) = ParseOptions()
499 child_logger = SetupLogging()
501 status_file = StatusFile(status_file_path)
504 # Pipe to receive socat's stderr output
505 (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
507 # Pipe to receive dd's stderr output
508 (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe()
510 # Pipe to receive dd's PID
511 (dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
513 # Pipe to receive size predicted by export script
514 (exp_size_read_fd, exp_size_write_fd) = os.pipe()
516 # Get child process command
517 cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
518 dd_stderr_write_fd, dd_pid_write_fd)
519 cmd = cmd_builder.GetCommand()
521 # Prepare command environment
522 cmd_env = os.environ.copy()
524 if options.exp_size == constants.IE_CUSTOM_SIZE:
525 cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd)
527 logging.debug("Starting command %r", cmd)
529 # Start child process
530 child = ChildProcess(cmd_env, cmd,
531 [socat_stderr_write_fd, dd_stderr_write_fd,
532 dd_pid_write_fd, exp_size_write_fd])
534 def _ForwardSignal(signum, _):
535 """Forwards signals to child process.
540 signal_wakeup = utils.SignalWakeupFd()
542 # TODO: There is a race condition between starting the child and
543 # handling the signals here. While there might be a way to work around
544 # it by registering the handlers before starting the child and
545 # deferring sent signals until the child is available, doing so can be
547 signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
548 handler_fn=_ForwardSignal,
549 wakeup=signal_wakeup)
552 utils.RetryOnSignal(os.close, socat_stderr_write_fd)
553 utils.RetryOnSignal(os.close, dd_stderr_write_fd)
554 utils.RetryOnSignal(os.close, dd_pid_write_fd)
555 utils.RetryOnSignal(os.close, exp_size_write_fd)
557 if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
558 dd_pid_read_fd, exp_size_read_fd,
559 status_file, child_logger,
560 signal_wakeup, signal_handler, mode):
561 # The child closed all its file descriptors and there was no
563 # TODO: Implement timeout instead of waiting indefinitely
564 utils.RetryOnSignal(child.wait)
566 signal_handler.Reset()
568 signal_wakeup.Reset()
572 if child.returncode == 0:
574 elif child.returncode < 0:
575 errmsg = "Exited due to signal %s" % (-child.returncode, )
577 errmsg = "Exited with status %s" % (child.returncode, )
579 status_file.SetExitStatus(child.returncode, errmsg)
580 except Exception, err: # pylint: disable-msg=W0703
581 logging.exception("Unhandled error occurred")
582 status_file.SetExitStatus(constants.EXIT_FAILURE,
583 "Unhandled error occurred: %s" % (err, ))
585 if status_file.ExitStatusIsSuccess():
586 sys.exit(constants.EXIT_SUCCESS)
588 sys.exit(constants.EXIT_FAILURE)
590 status_file.Update(True)
593 if __name__ == "__main__":