4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Import/export daemon.
26 # pylint: disable-msg=C0103
27 # C0103: Invalid name import-export
41 from ganeti import constants
42 from ganeti import cli
43 from ganeti import utils
44 from ganeti import serializer
45 from ganeti import objects
46 from ganeti import locking
47 from ganeti import impexpd
50 #: Used to recognize point at which socat(1) starts to listen on its socket.
51 #: The local address is required for the remote peer to connect (in particular
53 LISTENING_RE = re.compile(r"^listening on\s+"
54 r"AF=(?P<family>\d+)\s+"
55 r"(?P<address>.+):(?P<port>\d+)$", re.I)
57 #: Used to recognize point at which socat(1) is sending data over the wire
58 TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
63 SOCAT_LOG_NOTICE = "N"
64 SOCAT_LOG_WARNING = "W"
68 SOCAT_LOG_IGNORE = frozenset([
74 #: How many lines to keep in the status file
75 MAX_RECENT_OUTPUT_LINES = 20
77 #: Don't update status file more than once every 5 seconds (unless forced)
78 MIN_UPDATE_INTERVAL = 5.0
80 #: Give child process up to 5 seconds to exit after sending a signal
81 CHILD_LINGER_TIMEOUT = 5.0
83 #: How long to wait for a connection to be established
84 DEFAULT_CONNECT_TIMEOUT = 60
87 # Global variable for options
91 class Error(Exception):
92 """Generic exception"""
96 """Configures the logging module.
99 formatter = logging.Formatter("%(asctime)s: %(message)s")
101 stderr_handler = logging.StreamHandler()
102 stderr_handler.setFormatter(formatter)
103 stderr_handler.setLevel(logging.NOTSET)
105 root_logger = logging.getLogger("")
106 root_logger.addHandler(stderr_handler)
109 root_logger.setLevel(logging.NOTSET)
110 elif options.verbose:
111 root_logger.setLevel(logging.INFO)
113 root_logger.setLevel(logging.ERROR)
115 # Create special logger for child process output
116 child_logger = logging.Logger("child output")
117 child_logger.addHandler(stderr_handler)
118 child_logger.setLevel(logging.NOTSET)
123 def _VerifyListening(family, address, port):
124 """Verify address given as listening address by socat.
127 # TODO: Implement IPv6 support
128 if family != socket.AF_INET:
129 raise Error("Address family %r not supported" % family)
132 packed_address = socket.inet_pton(family, address)
134 raise Error("Invalid address %r for family %s" % (address, family))
136 return (socket.inet_ntop(family, packed_address), port)
140 """Status file manager.
143 def __init__(self, path):
144 """Initializes class.
148 self._data = objects.ImportExportStatus(ctime=time.time(),
152 def AddRecentOutput(self, line):
153 """Adds a new line of recent output.
156 self._data.recent_output.append(line)
159 del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
161 def SetListenPort(self, port):
162 """Sets the port the daemon is listening on.
165 @param port: TCP/UDP port
168 assert isinstance(port, (int, long)) and 0 < port < 2**16
169 self._data.listen_port = port
171 def GetListenPort(self):
172 """Returns the port the daemon is listening on.
175 return self._data.listen_port
177 def SetConnected(self):
178 """Sets the connected flag.
181 self._data.connected = True
183 def GetConnected(self):
184 """Determines whether the daemon is connected.
187 return self._data.connected
189 def SetExitStatus(self, exit_status, error_message):
190 """Sets the exit status and an error message.
193 # Require error message when status isn't 0
194 assert exit_status == 0 or error_message
196 self._data.exit_status = exit_status
197 self._data.error_message = error_message
199 def ExitStatusIsSuccess(self):
200 """Returns whether the exit status means "success".
203 return not bool(self._data.error_message)
205 def Update(self, force):
206 """Updates the status file.
209 @param force: Write status file in any case, not only when minimum interval
214 self._data.mtime is None or
215 time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
218 logging.debug("Updating status file %s", self._path)
220 self._data.mtime = time.time()
221 utils.WriteFile(self._path,
222 data=serializer.DumpJson(self._data.ToDict(), indent=True),
226 def _ProcessSocatOutput(status_file, level, msg):
227 """Interprets socat log output.
230 if level == SOCAT_LOG_NOTICE:
231 if status_file.GetListenPort() is None:
232 # TODO: Maybe implement timeout to not listen forever
233 m = LISTENING_RE.match(msg)
235 (_, port) = _VerifyListening(int(m.group("family")), m.group("address"),
236 int(m.group("port")))
238 status_file.SetListenPort(port)
241 if not status_file.GetConnected():
242 m = TRANSFER_LOOP_RE.match(msg)
244 status_file.SetConnected()
250 def ProcessOutput(line, status_file, logger, socat):
251 """Takes care of child process output.
253 @param status_file: Status file manager
254 @param logger: Child output logger
256 @param socat: Whether it's a socat output line
258 @param line: Child output line
266 parts = line.split(None, 4)
269 (_, _, _, level, msg) = parts
271 force_update = _ProcessSocatOutput(status_file, level, msg)
273 if options.debug or (level and level not in SOCAT_LOG_IGNORE):
274 forward_line = "socat: %s %s" % (level, msg)
278 forward_line = "socat: %s" % line
281 logger.info(forward_line)
282 status_file.AddRecentOutput(forward_line)
284 status_file.Update(force_update)
287 def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
288 signal_notify, signal_handler, mode):
289 """Handles the child processes' output.
292 assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
293 "Other signals are not handled in this function"
295 # Buffer size 0 is important, otherwise .read() with a specified length
296 # might buffer data while poll(2) won't mark its file descriptor as
298 socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
300 script_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
303 socat_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
307 child.stderr.fileno(): (child.stderr, script_stderr_lines),
308 socat_stderr_read.fileno(): (socat_stderr_read, socat_stderr_lines),
309 signal_notify.fileno(): (signal_notify, None),
312 poller = select.poll()
314 utils.SetNonblockFlag(fd, True)
315 poller.register(fd, select.POLLIN)
317 if options.connect_timeout and mode == constants.IEM_IMPORT:
318 listen_timeout = locking.RunningTimeout(options.connect_timeout, True)
320 listen_timeout = None
325 # Break out of loop if only signal notify FD is left
326 if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
331 if listen_timeout and not exit_timeout:
332 if status_file.GetConnected():
333 listen_timeout = None
334 elif listen_timeout.Remaining() < 0:
335 logging.info("Child process didn't establish connection in time")
336 child.Kill(signal.SIGTERM)
338 locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
339 # Next block will calculate timeout
341 # Not yet connected, check again in a second
345 timeout = exit_timeout.Remaining() * 1000
347 logging.info("Child process didn't exit in time")
350 for fd, event in utils.RetryOnSignal(poller.poll, timeout):
351 if event & (select.POLLIN | event & select.POLLPRI):
352 (from_, to) = fdmap[fd]
354 # Read up to 1 KB of data
355 data = from_.read(1024)
359 elif fd == signal_notify.fileno():
361 if signal_handler.called:
362 signal_handler.Clear()
364 logging.info("Child process still has about %0.2f seconds"
365 " to exit", exit_timeout.Remaining())
367 logging.info("Giving child process %0.2f seconds to exit",
368 CHILD_LINGER_TIMEOUT)
370 locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
372 poller.unregister(fd)
375 elif event & (select.POLLNVAL | select.POLLHUP |
377 poller.unregister(fd)
380 script_stderr_lines.flush()
381 socat_stderr_lines.flush()
383 # If there was a timeout calculator, we were waiting for the child to
384 # finish, e.g. due to a signal
385 return not bool(exit_timeout)
387 socat_stderr_lines.close()
389 script_stderr_lines.close()
393 """Parses the options passed to the program.
395 @return: Arguments to program
398 global options # pylint: disable-msg=W0603
400 parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
401 (constants.IEM_IMPORT,
402 constants.IEM_EXPORT)))
403 parser.add_option(cli.DEBUG_OPT)
404 parser.add_option(cli.VERBOSE_OPT)
405 parser.add_option("--key", dest="key", action="store", type="string",
407 parser.add_option("--cert", dest="cert", action="store", type="string",
408 help="X509 certificate file")
409 parser.add_option("--ca", dest="ca", action="store", type="string",
411 parser.add_option("--bind", dest="bind", action="store", type="string",
413 parser.add_option("--host", dest="host", action="store", type="string",
414 help="Remote hostname")
415 parser.add_option("--port", dest="port", action="store", type="int",
417 parser.add_option("--connect-retries", dest="connect_retries", action="store",
418 type="int", default=0,
419 help=("How many times the connection should be retried"
421 parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
422 type="int", default=DEFAULT_CONNECT_TIMEOUT,
423 help="Timeout for connection to be established (seconds)")
424 parser.add_option("--compress", dest="compress", action="store",
425 type="choice", help="Compression method",
426 metavar="[%s]" % "|".join(constants.IEC_ALL),
427 choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
428 parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
429 type="string", help="Command prefix")
430 parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
431 type="string", help="Command suffix")
433 (options, args) = parser.parse_args()
437 parser.error("Expected exactly two arguments")
439 (status_file_path, mode) = args
441 if mode not in (constants.IEM_IMPORT,
442 constants.IEM_EXPORT):
444 parser.error("Invalid mode: %s" % mode)
446 return (status_file_path, mode)
449 class ChildProcess(subprocess.Popen):
450 def __init__(self, cmd, noclose_fds):
451 """Initializes this class.
454 self._noclose_fds = noclose_fds
456 # Not using close_fds because doing so would also close the socat stderr
457 # pipe, which we still need.
458 subprocess.Popen.__init__(self, cmd, shell=False, close_fds=False,
459 stderr=subprocess.PIPE, stdout=None, stdin=None,
460 preexec_fn=self._ChildPreexec)
461 self._SetProcessGroup()
463 def _ChildPreexec(self):
464 """Called before child executable is execve'd.
467 # Move to separate process group. By sending a signal to its process group
468 # we can kill the child process and all grandchildren.
471 # Close almost all file descriptors
472 utils.CloseFDs(noclose_fds=self._noclose_fds)
474 def _SetProcessGroup(self):
475 """Sets the child's process group.
478 assert self.pid, "Can't be called in child process"
480 # Avoid race condition by setting child's process group (as good as
481 # possible in Python) before sending signals to child. For an
482 # explanation, see preexec function for child.
484 os.setpgid(self.pid, self.pid)
485 except EnvironmentError, err:
486 # If the child process was faster we receive EPERM or EACCES
487 if err.errno not in (errno.EPERM, errno.EACCES):
490 def Kill(self, signum):
491 """Sends signal to child process.
494 logging.info("Sending signal %s to child process", signum)
495 os.killpg(self.pid, signum)
498 """Ensure child process is no longer running.
501 # Final check if child process is still alive
502 if utils.RetryOnSignal(self.poll) is None:
503 logging.error("Child process still alive, sending SIGKILL")
504 self.Kill(signal.SIGKILL)
505 utils.RetryOnSignal(self.wait)
513 (status_file_path, mode) = ParseOptions()
516 child_logger = SetupLogging()
518 status_file = StatusFile(status_file_path)
521 # Pipe to receive socat's stderr output
522 (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
524 # Get child process command
525 cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd)
526 cmd = cmd_builder.GetCommand()
528 logging.debug("Starting command %r", cmd)
530 # Start child process
531 child = ChildProcess(cmd, [socat_stderr_write_fd])
533 def _ForwardSignal(signum, _):
534 """Forwards signals to child process.
539 signal_wakeup = utils.SignalWakeupFd()
541 # TODO: There is a race condition between starting the child and
542 # handling the signals here. While there might be a way to work around
543 # it by registering the handlers before starting the child and
544 # deferring sent signals until the child is available, doing so can be
546 signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
547 handler_fn=_ForwardSignal,
548 wakeup=signal_wakeup)
551 utils.RetryOnSignal(os.close, socat_stderr_write_fd)
553 if ProcessChildIO(child, socat_stderr_read_fd, status_file,
554 child_logger, signal_wakeup, signal_handler,
556 # The child closed all its file descriptors and there was no
558 # TODO: Implement timeout instead of waiting indefinitely
559 utils.RetryOnSignal(child.wait)
561 signal_handler.Reset()
563 signal_wakeup.Reset()
567 if child.returncode == 0:
569 elif child.returncode < 0:
570 errmsg = "Exited due to signal %s" % (-child.returncode, )
572 errmsg = "Exited with status %s" % (child.returncode, )
574 status_file.SetExitStatus(child.returncode, errmsg)
575 except Exception, err: # pylint: disable-msg=W0703
576 logging.exception("Unhandled error occurred")
577 status_file.SetExitStatus(constants.EXIT_FAILURE,
578 "Unhandled error occurred: %s" % (err, ))
580 if status_file.ExitStatusIsSuccess():
581 sys.exit(constants.EXIT_SUCCESS)
583 sys.exit(constants.EXIT_FAILURE)
585 status_file.Update(True)
588 if __name__ == "__main__":