4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Import/export daemon.
26 # pylint: disable-msg=C0103
27 # C0103: Invalid name import-export
40 from cStringIO import StringIO
42 from ganeti import constants
43 from ganeti import cli
44 from ganeti import utils
45 from ganeti import serializer
46 from ganeti import objects
47 from ganeti import locking
50 #: Used to recognize point at which socat(1) starts to listen on its socket.
51 #: The local address is required for the remote peer to connect (in particular
53 LISTENING_RE = re.compile(r"^listening on\s+"
54 r"AF=(?P<family>\d+)\s+"
55 r"(?P<address>.+):(?P<port>\d+)$", re.I)
57 #: Used to recognize point at which socat(1) is sending data over the wire
58 TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
63 SOCAT_LOG_NOTICE = "N"
64 SOCAT_LOG_WARNING = "W"
68 SOCAT_LOG_IGNORE = frozenset([
74 #: Socat buffer size: at most this many bytes are transferred per step
75 SOCAT_BUFSIZE = 1024 * 1024
77 #: How many lines to keep in the status file
78 MAX_RECENT_OUTPUT_LINES = 20
80 #: Don't update status file more than once every 5 seconds (unless forced)
81 MIN_UPDATE_INTERVAL = 5.0
83 #: Give child process up to 5 seconds to exit after sending a signal
84 CHILD_LINGER_TIMEOUT = 5.0
86 #: How long to wait for a connection to be established
87 DEFAULT_CONNECT_TIMEOUT = 60
89 # Common options for socat
90 SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
91 SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
94 # Global variable for options
98 class Error(Exception):
99 """Generic exception"""
103 """Configures the logging module.
106 formatter = logging.Formatter("%(asctime)s: %(message)s")
108 stderr_handler = logging.StreamHandler()
109 stderr_handler.setFormatter(formatter)
110 stderr_handler.setLevel(logging.NOTSET)
112 root_logger = logging.getLogger("")
113 root_logger.addHandler(stderr_handler)
116 root_logger.setLevel(logging.NOTSET)
117 elif options.verbose:
118 root_logger.setLevel(logging.INFO)
120 root_logger.setLevel(logging.ERROR)
122 # Create special logger for child process output
123 child_logger = logging.Logger("child output")
124 child_logger.addHandler(stderr_handler)
125 child_logger.setLevel(logging.NOTSET)
130 def _VerifyListening(family, address, port):
131 """Verify address given as listening address by socat.
134 # TODO: Implement IPv6 support
135 if family != socket.AF_INET:
136 raise Error("Address family %r not supported" % family)
139 packed_address = socket.inet_pton(family, address)
141 raise Error("Invalid address %r for family %s" % (address, family))
143 return (socket.inet_ntop(family, packed_address), port)
147 """Status file manager.
150 def __init__(self, path):
151 """Initializes class.
155 self._data = objects.ImportExportStatus(ctime=time.time(),
159 def AddRecentOutput(self, line):
160 """Adds a new line of recent output.
163 self._data.recent_output.append(line)
166 del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
168 def SetListenPort(self, port):
169 """Sets the port the daemon is listening on.
172 @param port: TCP/UDP port
175 assert isinstance(port, (int, long)) and 0 < port < 2**16
176 self._data.listen_port = port
178 def GetListenPort(self):
179 """Returns the port the daemon is listening on.
182 return self._data.listen_port
184 def SetConnected(self):
185 """Sets the connected flag.
188 self._data.connected = True
190 def GetConnected(self):
191 """Determines whether the daemon is connected.
194 return self._data.connected
196 def SetExitStatus(self, exit_status, error_message):
197 """Sets the exit status and an error message.
200 # Require error message when status isn't 0
201 assert exit_status == 0 or error_message
203 self._data.exit_status = exit_status
204 self._data.error_message = error_message
206 def ExitStatusIsSuccess(self):
207 """Returns whether the exit status means "success".
210 return not bool(self._data.error_message)
212 def Update(self, force):
213 """Updates the status file.
216 @param force: Write status file in any case, not only when minimum interval
221 self._data.mtime is None or
222 time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
225 logging.debug("Updating status file %s", self._path)
227 self._data.mtime = time.time()
228 utils.WriteFile(self._path,
229 data=serializer.DumpJson(self._data.ToDict(), indent=True),
233 def _ProcessSocatOutput(status_file, level, msg):
234 """Interprets socat log output.
237 if level == SOCAT_LOG_NOTICE:
238 if status_file.GetListenPort() is None:
239 # TODO: Maybe implement timeout to not listen forever
240 m = LISTENING_RE.match(msg)
242 (_, port) = _VerifyListening(int(m.group("family")), m.group("address"),
243 int(m.group("port")))
245 status_file.SetListenPort(port)
248 if not status_file.GetConnected():
249 m = TRANSFER_LOOP_RE.match(msg)
251 status_file.SetConnected()
257 def ProcessOutput(line, status_file, logger, socat):
258 """Takes care of child process output.
260 @param status_file: Status file manager
261 @param logger: Child output logger
263 @param socat: Whether it's a socat output line
265 @param line: Child output line
273 parts = line.split(None, 4)
276 (_, _, _, level, msg) = parts
278 force_update = _ProcessSocatOutput(status_file, level, msg)
280 if options.debug or (level and level not in SOCAT_LOG_IGNORE):
281 forward_line = "socat: %s %s" % (level, msg)
285 forward_line = "socat: %s" % line
288 logger.info(forward_line)
289 status_file.AddRecentOutput(forward_line)
291 status_file.Update(force_update)
294 def GetBashCommand(cmd):
295 """Prepares a command to be run in Bash.
298 return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
301 def GetSocatCommand(mode):
302 """Returns the socat command.
305 common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
306 "key=%s" % options.key,
307 "cert=%s" % options.cert,
308 "cafile=%s" % options.ca,
311 if options.bind is not None:
312 common_addr_opts.append("bind=%s" % options.bind)
314 if mode == constants.IEM_IMPORT:
315 if options.port is None:
321 "OPENSSL-LISTEN:%s" % port,
324 # Retry to listen if connection wasn't established successfully, up to
325 # 100 times a second. Note that this still leaves room for DoS attacks.
331 elif mode == constants.IEM_EXPORT:
334 "OPENSSL:%s:%s" % (options.host, options.port),
336 # How long to wait per connection attempt
337 "connect-timeout=%s" % options.connect_timeout,
339 # Retry a few times before giving up to connect (once per second)
340 "retry=%s" % options.connect_retries,
345 raise Error("Invalid mode")
347 for i in [addr1, addr2]:
350 raise Error("Comma not allowed in socat option value: %r" % value)
353 constants.SOCAT_PATH,
362 "-b%s" % SOCAT_BUFSIZE,
364 # Unidirectional mode, the first address is only used for reading, and the
365 # second address is only used for writing
368 ",".join(addr1), ",".join(addr2)
372 def GetTransportCommand(mode, socat_stderr_fd):
373 """Returns the command for the transport part of the daemon.
375 @param mode: Daemon mode (import or export)
376 @type socat_stderr_fd: int
377 @param socat_stderr_fd: File descriptor socat should write its stderr to
380 socat_cmd = ("%s 2>&%d" %
381 (utils.ShellQuoteArgs(GetSocatCommand(mode)),
384 # TODO: Make compression configurable
386 if mode == constants.IEM_IMPORT:
387 transport_cmd = "%s | gunzip -c" % socat_cmd
388 elif mode == constants.IEM_EXPORT:
389 transport_cmd = "gzip -c | %s" % socat_cmd
391 raise Error("Invalid mode")
393 # TODO: Use "dd" to measure processed data (allows to give an ETA)
395 # TODO: Run transport as separate user
396 # The transport uses its own shell to simplify running it as a separate user
398 return GetBashCommand(transport_cmd)
401 def GetCommand(mode, socat_stderr_fd):
402 """Returns the complete child process command.
407 if options.cmd_prefix:
408 buf.write(options.cmd_prefix)
411 buf.write(utils.ShellQuoteArgs(GetTransportCommand(mode, socat_stderr_fd)))
413 if options.cmd_suffix:
415 buf.write(options.cmd_suffix)
417 return GetBashCommand(buf.getvalue())
420 def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
421 signal_notify, signal_handler, mode):
422 """Handles the child processes' output.
425 assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
426 "Other signals are not handled in this function"
428 # Buffer size 0 is important, otherwise .read() with a specified length
429 # might buffer data while poll(2) won't mark its file descriptor as
431 socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
433 script_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
436 socat_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
440 child.stderr.fileno(): (child.stderr, script_stderr_lines),
441 socat_stderr_read.fileno(): (socat_stderr_read, socat_stderr_lines),
442 signal_notify.fileno(): (signal_notify, None),
445 poller = select.poll()
447 utils.SetNonblockFlag(fd, True)
448 poller.register(fd, select.POLLIN)
450 if options.connect_timeout and mode == constants.IEM_IMPORT:
451 listen_timeout = locking.RunningTimeout(options.connect_timeout, True)
453 listen_timeout = None
458 # Break out of loop if only signal notify FD is left
459 if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
464 if listen_timeout and not exit_timeout:
465 if status_file.GetConnected():
466 listen_timeout = None
467 elif listen_timeout.Remaining() < 0:
468 logging.info("Child process didn't establish connection in time")
469 child.Kill(signal.SIGTERM)
471 locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
472 # Next block will calculate timeout
474 # Not yet connected, check again in a second
478 timeout = exit_timeout.Remaining() * 1000
480 logging.info("Child process didn't exit in time")
483 for fd, event in utils.RetryOnSignal(poller.poll, timeout):
484 if event & (select.POLLIN | event & select.POLLPRI):
485 (from_, to) = fdmap[fd]
487 # Read up to 1 KB of data
488 data = from_.read(1024)
492 elif fd == signal_notify.fileno():
494 if signal_handler.called:
495 signal_handler.Clear()
497 logging.info("Child process still has about %0.2f seconds"
498 " to exit", exit_timeout.Remaining())
500 logging.info("Giving child process %0.2f seconds to exit",
501 CHILD_LINGER_TIMEOUT)
503 locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
505 poller.unregister(fd)
508 elif event & (select.POLLNVAL | select.POLLHUP |
510 poller.unregister(fd)
513 script_stderr_lines.flush()
514 socat_stderr_lines.flush()
516 # If there was a timeout calculator, we were waiting for the child to
517 # finish, e.g. due to a signal
518 return not bool(exit_timeout)
520 socat_stderr_lines.close()
522 script_stderr_lines.close()
526 """Parses the options passed to the program.
528 @return: Arguments to program
531 global options # pylint: disable-msg=W0603
533 parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
534 (constants.IEM_IMPORT,
535 constants.IEM_EXPORT)))
536 parser.add_option(cli.DEBUG_OPT)
537 parser.add_option(cli.VERBOSE_OPT)
538 parser.add_option("--key", dest="key", action="store", type="string",
540 parser.add_option("--cert", dest="cert", action="store", type="string",
541 help="X509 certificate file")
542 parser.add_option("--ca", dest="ca", action="store", type="string",
544 parser.add_option("--bind", dest="bind", action="store", type="string",
546 parser.add_option("--host", dest="host", action="store", type="string",
547 help="Remote hostname")
548 parser.add_option("--port", dest="port", action="store", type="int",
550 parser.add_option("--connect-retries", dest="connect_retries", action="store",
551 type="int", default=0,
552 help=("How many times the connection should be retried"
554 parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
555 type="int", default=DEFAULT_CONNECT_TIMEOUT,
556 help="Timeout for connection to be established (seconds)")
557 parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
558 type="string", help="Command prefix")
559 parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
560 type="string", help="Command suffix")
562 (options, args) = parser.parse_args()
566 parser.error("Expected exactly two arguments")
568 (status_file_path, mode) = args
570 if mode not in (constants.IEM_IMPORT,
571 constants.IEM_EXPORT):
573 parser.error("Invalid mode: %s" % mode)
575 return (status_file_path, mode)
578 class ChildProcess(subprocess.Popen):
579 def __init__(self, cmd, noclose_fds):
580 """Initializes this class.
583 self._noclose_fds = noclose_fds
585 # Not using close_fds because doing so would also close the socat stderr
586 # pipe, which we still need.
587 subprocess.Popen.__init__(self, cmd, shell=False, close_fds=False,
588 stderr=subprocess.PIPE, stdout=None, stdin=None,
589 preexec_fn=self._ChildPreexec)
590 self._SetProcessGroup()
592 def _ChildPreexec(self):
593 """Called before child executable is execve'd.
596 # Move to separate process group. By sending a signal to its process group
597 # we can kill the child process and all grandchildren.
600 # Close almost all file descriptors
601 utils.CloseFDs(noclose_fds=self._noclose_fds)
603 def _SetProcessGroup(self):
604 """Sets the child's process group.
607 assert self.pid, "Can't be called in child process"
609 # Avoid race condition by setting child's process group (as good as
610 # possible in Python) before sending signals to child. For an
611 # explanation, see preexec function for child.
613 os.setpgid(self.pid, self.pid)
614 except EnvironmentError, err:
615 # If the child process was faster we receive EPERM or EACCES
616 if err.errno not in (errno.EPERM, errno.EACCES):
619 def Kill(self, signum):
620 """Sends signal to child process.
623 logging.info("Sending signal %s to child process", signum)
624 os.killpg(self.pid, signum)
627 """Ensure child process is no longer running.
630 # Final check if child process is still alive
631 if utils.RetryOnSignal(self.poll) is None:
632 logging.error("Child process still alive, sending SIGKILL")
633 self.Kill(signal.SIGKILL)
634 utils.RetryOnSignal(self.wait)
642 (status_file_path, mode) = ParseOptions()
645 child_logger = SetupLogging()
647 status_file = StatusFile(status_file_path)
650 # Pipe to receive socat's stderr output
651 (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
653 # Get child process command
654 cmd = GetCommand(mode, socat_stderr_write_fd)
656 logging.debug("Starting command %r", cmd)
658 # Start child process
659 child = ChildProcess(cmd, [socat_stderr_write_fd])
661 def _ForwardSignal(signum, _):
662 """Forwards signals to child process.
667 signal_wakeup = utils.SignalWakeupFd()
669 # TODO: There is a race condition between starting the child and
670 # handling the signals here. While there might be a way to work around
671 # it by registering the handlers before starting the child and
672 # deferring sent signals until the child is available, doing so can be
674 signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
675 handler_fn=_ForwardSignal,
676 wakeup=signal_wakeup)
679 utils.RetryOnSignal(os.close, socat_stderr_write_fd)
681 if ProcessChildIO(child, socat_stderr_read_fd, status_file,
682 child_logger, signal_wakeup, signal_handler,
684 # The child closed all its file descriptors and there was no
686 # TODO: Implement timeout instead of waiting indefinitely
687 utils.RetryOnSignal(child.wait)
689 signal_handler.Reset()
691 signal_wakeup.Reset()
695 if child.returncode == 0:
697 elif child.returncode < 0:
698 errmsg = "Exited due to signal %s" % (-child.returncode, )
700 errmsg = "Exited with status %s" % (child.returncode, )
702 status_file.SetExitStatus(child.returncode, errmsg)
703 except Exception, err: # pylint: disable-msg=W0703
704 logging.exception("Unhandled error occurred")
705 status_file.SetExitStatus(constants.EXIT_FAILURE,
706 "Unhandled error occurred: %s" % (err, ))
708 if status_file.ExitStatusIsSuccess():
709 sys.exit(constants.EXIT_SUCCESS)
711 sys.exit(constants.EXIT_FAILURE)
713 status_file.Update(True)
716 if __name__ == "__main__":