4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Classes and functions for import/export daemon.
33 from cStringIO import StringIO
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import utils
38 from ganeti import netutils
41 #: Used to recognize point at which socat(1) starts to listen on its socket.
42 #: The local address is required for the remote peer to connect (in particular
44 LISTENING_RE = re.compile(r"^listening on\s+"
45 r"AF=(?P<family>\d+)\s+"
46 r"(?P<address>.+):(?P<port>\d+)$", re.I)
48 #: Used to recognize point at which socat(1) is sending data over the wire
49 TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
54 SOCAT_LOG_NOTICE = "N"
55 SOCAT_LOG_WARNING = "W"
59 SOCAT_LOG_IGNORE = frozenset([
65 #: Used to parse GNU dd(1) statistics
66 DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
67 r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
69 #: Used to ignore "N+N records in/out" on dd(1)'s stderr
70 DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
72 #: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
73 #: unavailable and SIGUSR1 is used instead)
74 DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
76 #: Buffer size: at most this many bytes are transferred at once
79 # Common options for socat
80 SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
81 SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1",
82 "cipher=%s" % constants.OPENSSL_CIPHERS]
84 if constants.SOCAT_USE_COMPRESS:
85 # Disables all compression in by OpenSSL. Only supported in patched versions
86 # of socat (as of November 2010). See INSTALL for more information.
87 SOCAT_OPENSSL_OPTS.append("compress=none")
89 SOCAT_OPTION_MAXLEN = 400
95 PROG_EXP_SIZE) = range(1, 6)
96 PROG_ALL = frozenset([
105 class CommandBuilder(object):
106 def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
107 """Initializes this class.
109 @param mode: Daemon mode (import or export)
110 @param opts: Options object
111 @type socat_stderr_fd: int
112 @param socat_stderr_fd: File descriptor socat should write its stderr to
113 @type dd_stderr_fd: int
114 @param dd_stderr_fd: File descriptor dd should write its stderr to
116 @param dd_pid_fd: File descriptor the child should write dd's PID to
121 self._socat_stderr_fd = socat_stderr_fd
122 self._dd_stderr_fd = dd_stderr_fd
123 self._dd_pid_fd = dd_pid_fd
125 assert (self._opts.magic is None or
126 constants.IE_MAGIC_RE.match(self._opts.magic))
129 def GetBashCommand(cmd):
130 """Prepares a command to be run in Bash.
133 return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
135 def _GetSocatCommand(self):
136 """Returns the socat command.
139 common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
140 "key=%s" % self._opts.key,
141 "cert=%s" % self._opts.cert,
142 "cafile=%s" % self._opts.ca,
145 if self._opts.bind is not None:
146 common_addr_opts.append("bind=%s" % self._opts.bind)
148 assert not (self._opts.ipv4 and self._opts.ipv6)
151 common_addr_opts.append("pf=ipv4")
152 elif self._opts.ipv6:
153 common_addr_opts.append("pf=ipv6")
155 if self._mode == constants.IEM_IMPORT:
156 if self._opts.port is None:
159 port = self._opts.port
162 "OPENSSL-LISTEN:%s" % port,
165 # Retry to listen if connection wasn't established successfully, up to
166 # 100 times a second. Note that this still leaves room for DoS attacks.
172 elif self._mode == constants.IEM_EXPORT:
173 if self._opts.host and netutils.IP6Address.IsValid(self._opts.host):
174 host = "[%s]" % self._opts.host
176 host = self._opts.host
180 "OPENSSL:%s:%s" % (host, self._opts.port),
182 # How long to wait per connection attempt
183 "connect-timeout=%s" % self._opts.connect_timeout,
185 # Retry a few times before giving up to connect (once per second)
186 "retry=%s" % self._opts.connect_retries,
191 raise errors.GenericError("Invalid mode '%s'" % self._mode)
193 for i in [addr1, addr2]:
195 if len(value) > SOCAT_OPTION_MAXLEN:
196 raise errors.GenericError("Socat option longer than %s"
198 (SOCAT_OPTION_MAXLEN, value))
200 raise errors.GenericError("Comma not allowed in socat option"
201 " value: %r" % value)
204 constants.SOCAT_PATH,
215 # Unidirectional mode, the first address is only used for reading, and the
216 # second address is only used for writing
219 ",".join(addr1), ",".join(addr2)
222 def _GetMagicCommand(self):
223 """Returns the command to read/write the magic value.
226 if not self._opts.magic:
229 # Prefix to ensure magic isn't interpreted as option to "echo"
230 magic = "M=%s" % self._opts.magic
234 if self._mode == constants.IEM_IMPORT:
236 cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"]))
238 cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic))
239 cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch"))
240 cmd.write(" exit 1;")
244 elif self._mode == constants.IEM_EXPORT:
245 cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic]))
248 raise errors.GenericError("Invalid mode '%s'" % self._mode)
250 return cmd.getvalue()
252 def _GetDdCommand(self):
253 """Returns the command for measuring throughput.
258 magic_cmd = self._GetMagicCommand()
261 dd_cmd.write(magic_cmd)
265 # Setting LC_ALL since we want to parse the output and explicitely
266 # redirecting stdin, as the background process (dd) would have /dev/null as
268 dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
269 (BUFSIZE, self._dd_stderr_fd))
271 dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
273 dd_cmd.write(" wait $pid;")
279 return dd_cmd.getvalue()
281 def _GetTransportCommand(self):
282 """Returns the command for the transport part of the daemon.
285 socat_cmd = ("%s 2>&%d" %
286 (utils.ShellQuoteArgs(self._GetSocatCommand()),
287 self._socat_stderr_fd))
288 dd_cmd = self._GetDdCommand()
290 compr = self._opts.compress
292 assert compr in constants.IEC_ALL
296 if self._mode == constants.IEM_IMPORT:
297 parts.append(socat_cmd)
299 if compr == constants.IEC_GZIP:
300 parts.append("gunzip -c")
304 elif self._mode == constants.IEM_EXPORT:
307 if compr == constants.IEC_GZIP:
308 parts.append("gzip -c")
310 parts.append(socat_cmd)
313 raise errors.GenericError("Invalid mode '%s'" % self._mode)
315 # TODO: Run transport as separate user
316 # The transport uses its own shell to simplify running it as a separate user
318 return self.GetBashCommand(" | ".join(parts))
320 def GetCommand(self):
321 """Returns the complete child process command.
324 transport_cmd = self._GetTransportCommand()
328 if self._opts.cmd_prefix:
329 buf.write(self._opts.cmd_prefix)
332 buf.write(utils.ShellQuoteArgs(transport_cmd))
334 if self._opts.cmd_suffix:
336 buf.write(self._opts.cmd_suffix)
338 return self.GetBashCommand(buf.getvalue())
341 def _VerifyListening(family, address, port):
342 """Verify address given as listening address by socat.
345 if family not in (socket.AF_INET, socket.AF_INET6):
346 raise errors.GenericError("Address family %r not supported" % family)
348 if (family == socket.AF_INET6 and address.startswith("[") and
349 address.endswith("]")):
350 address = address.lstrip("[").rstrip("]")
353 packed_address = socket.inet_pton(family, address)
355 raise errors.GenericError("Invalid address %r for family %s" %
358 return (socket.inet_ntop(family, packed_address), port)
361 class ChildIOProcessor(object):
362 def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
363 """Initializes this class.
367 self._status_file = status_file
368 self._logger = logger
370 self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
371 for prog in PROG_ALL])
374 self._dd_ready = False
375 self._dd_tp_samples = throughput_samples
376 self._dd_progress = []
378 # Expected size of transferred data
379 self._exp_size = exp_size
381 def GetLineSplitter(self, prog):
382 """Returns the line splitter for a program.
385 return self._splitter[prog]
388 """Flushes all line splitters.
391 for ls in self._splitter.itervalues():
395 """Closes all line splitters.
398 for ls in self._splitter.itervalues():
400 self._splitter.clear()
403 """Tells dd(1) to write statistics.
406 if self._dd_pid is None:
410 if not self._dd_ready:
411 # There's a race condition between starting the program and sending
412 # signals. The signal handler is only registered after some time, so we
413 # have to check whether the program is ready. If it isn't, sending a
414 # signal will invoke the default handler (and usually abort the program).
415 if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
416 logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
419 logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
420 self._dd_ready = True
422 logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
424 os.kill(self._dd_pid, DD_INFO_SIGNAL)
425 except EnvironmentError, err:
426 if err.errno != errno.ESRCH:
429 # Process no longer exists
430 logging.debug("dd exited")
435 def _ProcessOutput(self, line, prog):
436 """Takes care of child process output.
439 @param line: Child output line
441 @param prog: Program from which the line originates
447 if prog == PROG_SOCAT:
449 parts = line.split(None, 4)
452 (_, _, _, level, msg) = parts
454 force_update = self._ProcessSocatOutput(self._status_file, level, msg)
456 if self._debug or (level and level not in SOCAT_LOG_IGNORE):
457 forward_line = "socat: %s %s" % (level, msg)
461 forward_line = "socat: %s" % line
463 elif prog == PROG_DD:
464 (should_forward, force_update) = self._ProcessDdOutput(line)
466 if should_forward or self._debug:
467 forward_line = "dd: %s" % line
471 elif prog == PROG_DD_PID:
473 raise RuntimeError("dd PID reported more than once")
474 logging.debug("Received dd PID %r", line)
475 self._dd_pid = int(line)
478 elif prog == PROG_EXP_SIZE:
479 logging.debug("Received predicted size %r", line)
484 exp_size = utils.BytesToMebibyte(int(line))
485 except (ValueError, TypeError), err:
486 logging.error("Failed to convert predicted size %r to number: %s",
492 self._exp_size = exp_size
495 self._logger.info(forward_line)
496 self._status_file.AddRecentOutput(forward_line)
498 self._status_file.Update(force_update)
501 def _ProcessSocatOutput(status_file, level, msg):
502 """Interprets socat log output.
505 if level == SOCAT_LOG_NOTICE:
506 if status_file.GetListenPort() is None:
507 # TODO: Maybe implement timeout to not listen forever
508 m = LISTENING_RE.match(msg)
510 (_, port) = _VerifyListening(int(m.group("family")),
512 int(m.group("port")))
514 status_file.SetListenPort(port)
517 if not status_file.GetConnected():
518 m = TRANSFER_LOOP_RE.match(msg)
520 logging.debug("Connection established")
521 status_file.SetConnected()
526 def _ProcessDdOutput(self, line):
527 """Interprets a line of dd(1)'s output.
530 m = DD_INFO_RE.match(line)
532 seconds = float(m.group("seconds"))
533 mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
534 self._UpdateDdProgress(seconds, mbytes)
537 m = DD_STDERR_IGNORE.match(line)
540 return (False, False)
545 def _UpdateDdProgress(self, seconds, mbytes):
546 """Updates the internal status variables for dd(1) progress.
549 @param seconds: Timestamp of this update
551 @param mbytes: Total number of MiB transferred so far
555 self._dd_progress.append((seconds, mbytes))
558 del self._dd_progress[:-self._dd_tp_samples]
560 # Calculate throughput
561 throughput = _CalcThroughput(self._dd_progress)
563 # Calculate percent and ETA
567 if self._exp_size is not None:
568 if self._exp_size != 0:
569 percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
572 eta = max(0, float(self._exp_size - mbytes) / throughput)
574 self._status_file.SetProgress(mbytes, throughput, percent, eta)
577 def _CalcThroughput(samples):
578 """Calculates the throughput in MiB/second.
580 @type samples: sequence
581 @param samples: List of samples, each consisting of a (timestamp, mbytes)
583 @rtype: float or None
584 @return: Throughput in MiB/second
588 # Can't calculate throughput
591 (start_time, start_mbytes) = samples[0]
592 (end_time, end_mbytes) = samples[-1]
594 return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)