Statistics
| Branch: | Tag: | Revision:

root / lib / impexpd / __init__.py @ 178ad717

History | View | Annotate | Download (16.3 kB)

1 bb44b1ae Michael Hanselmann
#
2 bb44b1ae Michael Hanselmann
#
3 bb44b1ae Michael Hanselmann
4 bb44b1ae Michael Hanselmann
# Copyright (C) 2010 Google Inc.
5 bb44b1ae Michael Hanselmann
#
6 bb44b1ae Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 bb44b1ae Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 bb44b1ae Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 bb44b1ae Michael Hanselmann
# (at your option) any later version.
10 bb44b1ae Michael Hanselmann
#
11 bb44b1ae Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 bb44b1ae Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 bb44b1ae Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 bb44b1ae Michael Hanselmann
# General Public License for more details.
15 bb44b1ae Michael Hanselmann
#
16 bb44b1ae Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 bb44b1ae Michael Hanselmann
# along with this program; if not, write to the Free Software
18 bb44b1ae Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 bb44b1ae Michael Hanselmann
# 02110-1301, USA.
20 bb44b1ae Michael Hanselmann
21 bb44b1ae Michael Hanselmann
22 bb44b1ae Michael Hanselmann
"""Classes and functions for import/export daemon.
23 bb44b1ae Michael Hanselmann

24 bb44b1ae Michael Hanselmann
"""
25 bb44b1ae Michael Hanselmann
26 c08d76f5 Michael Hanselmann
import os
27 34c9ee7b Michael Hanselmann
import re
28 34c9ee7b Michael Hanselmann
import socket
29 53dbf14c Michael Hanselmann
import logging
30 c08d76f5 Michael Hanselmann
import signal
31 c08d76f5 Michael Hanselmann
import errno
32 c08d76f5 Michael Hanselmann
import time
33 bb44b1ae Michael Hanselmann
from cStringIO import StringIO
34 bb44b1ae Michael Hanselmann
35 bb44b1ae Michael Hanselmann
from ganeti import constants
36 bb44b1ae Michael Hanselmann
from ganeti import errors
37 bb44b1ae Michael Hanselmann
from ganeti import utils
38 58bb385c Michael Hanselmann
from ganeti import netutils
39 b8028dcf Michael Hanselmann
from ganeti import compat
40 bb44b1ae Michael Hanselmann
41 bb44b1ae Michael Hanselmann
42 34c9ee7b Michael Hanselmann
#: Used to recognize point at which socat(1) starts to listen on its socket.
43 34c9ee7b Michael Hanselmann
#: The local address is required for the remote peer to connect (in particular
44 34c9ee7b Michael Hanselmann
#: the port number).
45 34c9ee7b Michael Hanselmann
LISTENING_RE = re.compile(r"^listening on\s+"
46 34c9ee7b Michael Hanselmann
                          r"AF=(?P<family>\d+)\s+"
47 34c9ee7b Michael Hanselmann
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
48 34c9ee7b Michael Hanselmann
49 34c9ee7b Michael Hanselmann
#: Used to recognize point at which socat(1) is sending data over the wire
50 34c9ee7b Michael Hanselmann
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
51 34c9ee7b Michael Hanselmann
                              re.I)
52 34c9ee7b Michael Hanselmann
53 34c9ee7b Michael Hanselmann
SOCAT_LOG_DEBUG = "D"
54 34c9ee7b Michael Hanselmann
SOCAT_LOG_INFO = "I"
55 34c9ee7b Michael Hanselmann
SOCAT_LOG_NOTICE = "N"
56 34c9ee7b Michael Hanselmann
SOCAT_LOG_WARNING = "W"
57 34c9ee7b Michael Hanselmann
SOCAT_LOG_ERROR = "E"
58 34c9ee7b Michael Hanselmann
SOCAT_LOG_FATAL = "F"
59 34c9ee7b Michael Hanselmann
60 b8028dcf Michael Hanselmann
SOCAT_LOG_IGNORE = compat.UniqueFrozenset([
61 34c9ee7b Michael Hanselmann
  SOCAT_LOG_DEBUG,
62 34c9ee7b Michael Hanselmann
  SOCAT_LOG_INFO,
63 34c9ee7b Michael Hanselmann
  SOCAT_LOG_NOTICE,
64 34c9ee7b Michael Hanselmann
  ])
65 34c9ee7b Michael Hanselmann
66 c08d76f5 Michael Hanselmann
#: Used to parse GNU dd(1) statistics
67 c08d76f5 Michael Hanselmann
DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
68 c08d76f5 Michael Hanselmann
                        r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
69 c08d76f5 Michael Hanselmann
70 c08d76f5 Michael Hanselmann
#: Used to ignore "N+N records in/out" on dd(1)'s stderr
71 c08d76f5 Michael Hanselmann
DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
72 c08d76f5 Michael Hanselmann
73 c08d76f5 Michael Hanselmann
#: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
74 c08d76f5 Michael Hanselmann
#: unavailable and SIGUSR1 is used instead)
75 c08d76f5 Michael Hanselmann
DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
76 c08d76f5 Michael Hanselmann
77 bb44b1ae Michael Hanselmann
#: Buffer size: at most this many bytes are transferred at once
78 bb44b1ae Michael Hanselmann
BUFSIZE = 1024 * 1024
79 bb44b1ae Michael Hanselmann
80 bb44b1ae Michael Hanselmann
# Common options for socat
81 bb44b1ae Michael Hanselmann
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
82 971bbd84 Michael Hanselmann
SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1",
83 971bbd84 Michael Hanselmann
                      "cipher=%s" % constants.OPENSSL_CIPHERS]
84 bb44b1ae Michael Hanselmann
85 e90739d6 Michael Hanselmann
if constants.SOCAT_USE_COMPRESS:
86 e90739d6 Michael Hanselmann
  # Disables all compression in by OpenSSL. Only supported in patched versions
87 e90739d6 Michael Hanselmann
  # of socat (as of November 2010). See INSTALL for more information.
88 e90739d6 Michael Hanselmann
  SOCAT_OPENSSL_OPTS.append("compress=none")
89 e90739d6 Michael Hanselmann
90 0559f745 Michael Hanselmann
SOCAT_OPTION_MAXLEN = 400
91 0559f745 Michael Hanselmann
92 34c9ee7b Michael Hanselmann
(PROG_OTHER,
93 c08d76f5 Michael Hanselmann
 PROG_SOCAT,
94 c08d76f5 Michael Hanselmann
 PROG_DD,
95 f9323011 Michael Hanselmann
 PROG_DD_PID,
96 f9323011 Michael Hanselmann
 PROG_EXP_SIZE) = range(1, 6)
97 b8028dcf Michael Hanselmann
98 b8028dcf Michael Hanselmann
PROG_ALL = compat.UniqueFrozenset([
99 34c9ee7b Michael Hanselmann
  PROG_OTHER,
100 34c9ee7b Michael Hanselmann
  PROG_SOCAT,
101 c08d76f5 Michael Hanselmann
  PROG_DD,
102 c08d76f5 Michael Hanselmann
  PROG_DD_PID,
103 f9323011 Michael Hanselmann
  PROG_EXP_SIZE,
104 34c9ee7b Michael Hanselmann
  ])
105 34c9ee7b Michael Hanselmann
106 bb44b1ae Michael Hanselmann
107 bb44b1ae Michael Hanselmann
class CommandBuilder(object):
108 c08d76f5 Michael Hanselmann
  def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
109 bb44b1ae Michael Hanselmann
    """Initializes this class.
110 bb44b1ae Michael Hanselmann

111 bb44b1ae Michael Hanselmann
    @param mode: Daemon mode (import or export)
112 bb44b1ae Michael Hanselmann
    @param opts: Options object
113 bb44b1ae Michael Hanselmann
    @type socat_stderr_fd: int
114 bb44b1ae Michael Hanselmann
    @param socat_stderr_fd: File descriptor socat should write its stderr to
115 c08d76f5 Michael Hanselmann
    @type dd_stderr_fd: int
116 c08d76f5 Michael Hanselmann
    @param dd_stderr_fd: File descriptor dd should write its stderr to
117 c08d76f5 Michael Hanselmann
    @type dd_pid_fd: int
118 c08d76f5 Michael Hanselmann
    @param dd_pid_fd: File descriptor the child should write dd's PID to
119 bb44b1ae Michael Hanselmann

120 bb44b1ae Michael Hanselmann
    """
121 bb44b1ae Michael Hanselmann
    self._opts = opts
122 bb44b1ae Michael Hanselmann
    self._mode = mode
123 bb44b1ae Michael Hanselmann
    self._socat_stderr_fd = socat_stderr_fd
124 c08d76f5 Michael Hanselmann
    self._dd_stderr_fd = dd_stderr_fd
125 c08d76f5 Michael Hanselmann
    self._dd_pid_fd = dd_pid_fd
126 bb44b1ae Michael Hanselmann
127 1d3dfa29 Michael Hanselmann
    assert (self._opts.magic is None or
128 1d3dfa29 Michael Hanselmann
            constants.IE_MAGIC_RE.match(self._opts.magic))
129 1d3dfa29 Michael Hanselmann
130 bb44b1ae Michael Hanselmann
  @staticmethod
131 bb44b1ae Michael Hanselmann
  def GetBashCommand(cmd):
132 bb44b1ae Michael Hanselmann
    """Prepares a command to be run in Bash.
133 bb44b1ae Michael Hanselmann

134 bb44b1ae Michael Hanselmann
    """
135 bb44b1ae Michael Hanselmann
    return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
136 bb44b1ae Michael Hanselmann
137 bb44b1ae Michael Hanselmann
  def _GetSocatCommand(self):
138 bb44b1ae Michael Hanselmann
    """Returns the socat command.
139 bb44b1ae Michael Hanselmann

140 bb44b1ae Michael Hanselmann
    """
141 bb44b1ae Michael Hanselmann
    common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
142 bb44b1ae Michael Hanselmann
      "key=%s" % self._opts.key,
143 bb44b1ae Michael Hanselmann
      "cert=%s" % self._opts.cert,
144 bb44b1ae Michael Hanselmann
      "cafile=%s" % self._opts.ca,
145 bb44b1ae Michael Hanselmann
      ]
146 bb44b1ae Michael Hanselmann
147 bb44b1ae Michael Hanselmann
    if self._opts.bind is not None:
148 bb44b1ae Michael Hanselmann
      common_addr_opts.append("bind=%s" % self._opts.bind)
149 bb44b1ae Michael Hanselmann
150 58bb385c Michael Hanselmann
    assert not (self._opts.ipv4 and self._opts.ipv6)
151 58bb385c Michael Hanselmann
152 58bb385c Michael Hanselmann
    if self._opts.ipv4:
153 58bb385c Michael Hanselmann
      common_addr_opts.append("pf=ipv4")
154 58bb385c Michael Hanselmann
    elif self._opts.ipv6:
155 58bb385c Michael Hanselmann
      common_addr_opts.append("pf=ipv6")
156 58bb385c Michael Hanselmann
157 bb44b1ae Michael Hanselmann
    if self._mode == constants.IEM_IMPORT:
158 bb44b1ae Michael Hanselmann
      if self._opts.port is None:
159 bb44b1ae Michael Hanselmann
        port = 0
160 bb44b1ae Michael Hanselmann
      else:
161 bb44b1ae Michael Hanselmann
        port = self._opts.port
162 bb44b1ae Michael Hanselmann
163 bb44b1ae Michael Hanselmann
      addr1 = [
164 bb44b1ae Michael Hanselmann
        "OPENSSL-LISTEN:%s" % port,
165 bb44b1ae Michael Hanselmann
        "reuseaddr",
166 bb44b1ae Michael Hanselmann
167 bb44b1ae Michael Hanselmann
        # Retry to listen if connection wasn't established successfully, up to
168 bb44b1ae Michael Hanselmann
        # 100 times a second. Note that this still leaves room for DoS attacks.
169 bb44b1ae Michael Hanselmann
        "forever",
170 bb44b1ae Michael Hanselmann
        "intervall=0.01",
171 bb44b1ae Michael Hanselmann
        ] + common_addr_opts
172 bb44b1ae Michael Hanselmann
      addr2 = ["stdout"]
173 bb44b1ae Michael Hanselmann
174 bb44b1ae Michael Hanselmann
    elif self._mode == constants.IEM_EXPORT:
175 58bb385c Michael Hanselmann
      if self._opts.host and netutils.IP6Address.IsValid(self._opts.host):
176 58bb385c Michael Hanselmann
        host = "[%s]" % self._opts.host
177 58bb385c Michael Hanselmann
      else:
178 58bb385c Michael Hanselmann
        host = self._opts.host
179 58bb385c Michael Hanselmann
180 bb44b1ae Michael Hanselmann
      addr1 = ["stdin"]
181 bb44b1ae Michael Hanselmann
      addr2 = [
182 58bb385c Michael Hanselmann
        "OPENSSL:%s:%s" % (host, self._opts.port),
183 bb44b1ae Michael Hanselmann
184 bb44b1ae Michael Hanselmann
        # How long to wait per connection attempt
185 bb44b1ae Michael Hanselmann
        "connect-timeout=%s" % self._opts.connect_timeout,
186 bb44b1ae Michael Hanselmann
187 bb44b1ae Michael Hanselmann
        # Retry a few times before giving up to connect (once per second)
188 bb44b1ae Michael Hanselmann
        "retry=%s" % self._opts.connect_retries,
189 bb44b1ae Michael Hanselmann
        "intervall=1",
190 bb44b1ae Michael Hanselmann
        ] + common_addr_opts
191 bb44b1ae Michael Hanselmann
192 bb44b1ae Michael Hanselmann
    else:
193 bb44b1ae Michael Hanselmann
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
194 bb44b1ae Michael Hanselmann
195 bb44b1ae Michael Hanselmann
    for i in [addr1, addr2]:
196 bb44b1ae Michael Hanselmann
      for value in i:
197 0559f745 Michael Hanselmann
        if len(value) > SOCAT_OPTION_MAXLEN:
198 0559f745 Michael Hanselmann
          raise errors.GenericError("Socat option longer than %s"
199 0559f745 Michael Hanselmann
                                    " characters: %r" %
200 0559f745 Michael Hanselmann
                                    (SOCAT_OPTION_MAXLEN, value))
201 bb44b1ae Michael Hanselmann
        if "," in value:
202 bb44b1ae Michael Hanselmann
          raise errors.GenericError("Comma not allowed in socat option"
203 bb44b1ae Michael Hanselmann
                                    " value: %r" % value)
204 bb44b1ae Michael Hanselmann
205 bb44b1ae Michael Hanselmann
    return [
206 bb44b1ae Michael Hanselmann
      constants.SOCAT_PATH,
207 bb44b1ae Michael Hanselmann
208 bb44b1ae Michael Hanselmann
      # Log to stderr
209 bb44b1ae Michael Hanselmann
      "-ls",
210 bb44b1ae Michael Hanselmann
211 bb44b1ae Michael Hanselmann
      # Log level
212 bb44b1ae Michael Hanselmann
      "-d", "-d",
213 bb44b1ae Michael Hanselmann
214 bb44b1ae Michael Hanselmann
      # Buffer size
215 bb44b1ae Michael Hanselmann
      "-b%s" % BUFSIZE,
216 bb44b1ae Michael Hanselmann
217 bb44b1ae Michael Hanselmann
      # Unidirectional mode, the first address is only used for reading, and the
218 bb44b1ae Michael Hanselmann
      # second address is only used for writing
219 bb44b1ae Michael Hanselmann
      "-u",
220 bb44b1ae Michael Hanselmann
221 3c286190 Dimitris Aragiorgis
      ",".join(addr1), ",".join(addr2),
222 bb44b1ae Michael Hanselmann
      ]
223 bb44b1ae Michael Hanselmann
224 1d3dfa29 Michael Hanselmann
  def _GetMagicCommand(self):
225 1d3dfa29 Michael Hanselmann
    """Returns the command to read/write the magic value.
226 1d3dfa29 Michael Hanselmann

227 1d3dfa29 Michael Hanselmann
    """
228 1d3dfa29 Michael Hanselmann
    if not self._opts.magic:
229 1d3dfa29 Michael Hanselmann
      return None
230 1d3dfa29 Michael Hanselmann
231 1d3dfa29 Michael Hanselmann
    # Prefix to ensure magic isn't interpreted as option to "echo"
232 1d3dfa29 Michael Hanselmann
    magic = "M=%s" % self._opts.magic
233 1d3dfa29 Michael Hanselmann
234 1d3dfa29 Michael Hanselmann
    cmd = StringIO()
235 1d3dfa29 Michael Hanselmann
236 1d3dfa29 Michael Hanselmann
    if self._mode == constants.IEM_IMPORT:
237 1d3dfa29 Michael Hanselmann
      cmd.write("{ ")
238 1d3dfa29 Michael Hanselmann
      cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"]))
239 1d3dfa29 Michael Hanselmann
      cmd.write(" && ")
240 1d3dfa29 Michael Hanselmann
      cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic))
241 1d3dfa29 Michael Hanselmann
      cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch"))
242 1d3dfa29 Michael Hanselmann
      cmd.write(" exit 1;")
243 1d3dfa29 Michael Hanselmann
      cmd.write("fi;")
244 1d3dfa29 Michael Hanselmann
      cmd.write(" }")
245 1d3dfa29 Michael Hanselmann
246 1d3dfa29 Michael Hanselmann
    elif self._mode == constants.IEM_EXPORT:
247 1d3dfa29 Michael Hanselmann
      cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic]))
248 1d3dfa29 Michael Hanselmann
249 1d3dfa29 Michael Hanselmann
    else:
250 1d3dfa29 Michael Hanselmann
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
251 1d3dfa29 Michael Hanselmann
252 1d3dfa29 Michael Hanselmann
    return cmd.getvalue()
253 1d3dfa29 Michael Hanselmann
254 fbb6b864 Michael Hanselmann
  def _GetDdCommand(self):
255 fbb6b864 Michael Hanselmann
    """Returns the command for measuring throughput.
256 bb44b1ae Michael Hanselmann

257 bb44b1ae Michael Hanselmann
    """
258 c08d76f5 Michael Hanselmann
    dd_cmd = StringIO()
259 1d3dfa29 Michael Hanselmann
260 1d3dfa29 Michael Hanselmann
    magic_cmd = self._GetMagicCommand()
261 1d3dfa29 Michael Hanselmann
    if magic_cmd:
262 1d3dfa29 Michael Hanselmann
      dd_cmd.write("{ ")
263 1d3dfa29 Michael Hanselmann
      dd_cmd.write(magic_cmd)
264 1d3dfa29 Michael Hanselmann
      dd_cmd.write(" && ")
265 1d3dfa29 Michael Hanselmann
266 1d3dfa29 Michael Hanselmann
    dd_cmd.write("{ ")
267 2ed0e208 Iustin Pop
    # Setting LC_ALL since we want to parse the output and explicitly
268 2ed0e208 Iustin Pop
    # redirecting stdin, as the background process (dd) would have
269 2ed0e208 Iustin Pop
    # /dev/null as stdin otherwise
270 1d3dfa29 Michael Hanselmann
    dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
271 c08d76f5 Michael Hanselmann
                 (BUFSIZE, self._dd_stderr_fd))
272 c08d76f5 Michael Hanselmann
    # Send PID to daemon
273 c08d76f5 Michael Hanselmann
    dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
274 c08d76f5 Michael Hanselmann
    # And wait for dd
275 c08d76f5 Michael Hanselmann
    dd_cmd.write(" wait $pid;")
276 c08d76f5 Michael Hanselmann
    dd_cmd.write(" }")
277 1d3dfa29 Michael Hanselmann
278 1d3dfa29 Michael Hanselmann
    if magic_cmd:
279 1d3dfa29 Michael Hanselmann
      dd_cmd.write(" }")
280 1d3dfa29 Michael Hanselmann
281 fbb6b864 Michael Hanselmann
    return dd_cmd.getvalue()
282 fbb6b864 Michael Hanselmann
283 fbb6b864 Michael Hanselmann
  def _GetTransportCommand(self):
284 fbb6b864 Michael Hanselmann
    """Returns the command for the transport part of the daemon.
285 fbb6b864 Michael Hanselmann

286 fbb6b864 Michael Hanselmann
    """
287 fbb6b864 Michael Hanselmann
    socat_cmd = ("%s 2>&%d" %
288 fbb6b864 Michael Hanselmann
                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
289 fbb6b864 Michael Hanselmann
                  self._socat_stderr_fd))
290 fbb6b864 Michael Hanselmann
    dd_cmd = self._GetDdCommand()
291 c08d76f5 Michael Hanselmann
292 bb44b1ae Michael Hanselmann
    compr = self._opts.compress
293 bb44b1ae Michael Hanselmann
294 bb44b1ae Michael Hanselmann
    assert compr in constants.IEC_ALL
295 bb44b1ae Michael Hanselmann
296 fbb6b864 Michael Hanselmann
    parts = []
297 fbb6b864 Michael Hanselmann
298 bb44b1ae Michael Hanselmann
    if self._mode == constants.IEM_IMPORT:
299 fbb6b864 Michael Hanselmann
      parts.append(socat_cmd)
300 fbb6b864 Michael Hanselmann
301 bb44b1ae Michael Hanselmann
      if compr == constants.IEC_GZIP:
302 fbb6b864 Michael Hanselmann
        parts.append("gunzip -c")
303 fbb6b864 Michael Hanselmann
304 fbb6b864 Michael Hanselmann
      parts.append(dd_cmd)
305 c08d76f5 Michael Hanselmann
306 bb44b1ae Michael Hanselmann
    elif self._mode == constants.IEM_EXPORT:
307 fbb6b864 Michael Hanselmann
      parts.append(dd_cmd)
308 fbb6b864 Michael Hanselmann
309 bb44b1ae Michael Hanselmann
      if compr == constants.IEC_GZIP:
310 fbb6b864 Michael Hanselmann
        parts.append("gzip -c")
311 fbb6b864 Michael Hanselmann
312 fbb6b864 Michael Hanselmann
      parts.append(socat_cmd)
313 c08d76f5 Michael Hanselmann
314 bb44b1ae Michael Hanselmann
    else:
315 bb44b1ae Michael Hanselmann
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
316 bb44b1ae Michael Hanselmann
317 bb44b1ae Michael Hanselmann
    # TODO: Run transport as separate user
318 bb44b1ae Michael Hanselmann
    # The transport uses its own shell to simplify running it as a separate user
319 bb44b1ae Michael Hanselmann
    # in the future.
320 fbb6b864 Michael Hanselmann
    return self.GetBashCommand(" | ".join(parts))
321 bb44b1ae Michael Hanselmann
322 bb44b1ae Michael Hanselmann
  def GetCommand(self):
323 bb44b1ae Michael Hanselmann
    """Returns the complete child process command.
324 bb44b1ae Michael Hanselmann

325 bb44b1ae Michael Hanselmann
    """
326 bb44b1ae Michael Hanselmann
    transport_cmd = self._GetTransportCommand()
327 bb44b1ae Michael Hanselmann
328 bb44b1ae Michael Hanselmann
    buf = StringIO()
329 bb44b1ae Michael Hanselmann
330 bb44b1ae Michael Hanselmann
    if self._opts.cmd_prefix:
331 bb44b1ae Michael Hanselmann
      buf.write(self._opts.cmd_prefix)
332 bb44b1ae Michael Hanselmann
      buf.write(" ")
333 bb44b1ae Michael Hanselmann
334 bb44b1ae Michael Hanselmann
    buf.write(utils.ShellQuoteArgs(transport_cmd))
335 bb44b1ae Michael Hanselmann
336 bb44b1ae Michael Hanselmann
    if self._opts.cmd_suffix:
337 bb44b1ae Michael Hanselmann
      buf.write(" ")
338 bb44b1ae Michael Hanselmann
      buf.write(self._opts.cmd_suffix)
339 bb44b1ae Michael Hanselmann
340 bb44b1ae Michael Hanselmann
    return self.GetBashCommand(buf.getvalue())
341 34c9ee7b Michael Hanselmann
342 34c9ee7b Michael Hanselmann
343 34c9ee7b Michael Hanselmann
def _VerifyListening(family, address, port):
344 34c9ee7b Michael Hanselmann
  """Verify address given as listening address by socat.
345 34c9ee7b Michael Hanselmann

346 34c9ee7b Michael Hanselmann
  """
347 58bb385c Michael Hanselmann
  if family not in (socket.AF_INET, socket.AF_INET6):
348 34c9ee7b Michael Hanselmann
    raise errors.GenericError("Address family %r not supported" % family)
349 34c9ee7b Michael Hanselmann
350 58bb385c Michael Hanselmann
  if (family == socket.AF_INET6 and address.startswith("[") and
351 58bb385c Michael Hanselmann
      address.endswith("]")):
352 58bb385c Michael Hanselmann
    address = address.lstrip("[").rstrip("]")
353 58bb385c Michael Hanselmann
354 34c9ee7b Michael Hanselmann
  try:
355 34c9ee7b Michael Hanselmann
    packed_address = socket.inet_pton(family, address)
356 34c9ee7b Michael Hanselmann
  except socket.error:
357 34c9ee7b Michael Hanselmann
    raise errors.GenericError("Invalid address %r for family %s" %
358 34c9ee7b Michael Hanselmann
                              (address, family))
359 34c9ee7b Michael Hanselmann
360 34c9ee7b Michael Hanselmann
  return (socket.inet_ntop(family, packed_address), port)
361 34c9ee7b Michael Hanselmann
362 34c9ee7b Michael Hanselmann
363 34c9ee7b Michael Hanselmann
class ChildIOProcessor(object):
364 f9323011 Michael Hanselmann
  def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
365 34c9ee7b Michael Hanselmann
    """Initializes this class.
366 34c9ee7b Michael Hanselmann

367 34c9ee7b Michael Hanselmann
    """
368 34c9ee7b Michael Hanselmann
    self._debug = debug
369 34c9ee7b Michael Hanselmann
    self._status_file = status_file
370 34c9ee7b Michael Hanselmann
    self._logger = logger
371 34c9ee7b Michael Hanselmann
372 34c9ee7b Michael Hanselmann
    self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
373 34c9ee7b Michael Hanselmann
                           for prog in PROG_ALL])
374 34c9ee7b Michael Hanselmann
375 c08d76f5 Michael Hanselmann
    self._dd_pid = None
376 c08d76f5 Michael Hanselmann
    self._dd_ready = False
377 c08d76f5 Michael Hanselmann
    self._dd_tp_samples = throughput_samples
378 c08d76f5 Michael Hanselmann
    self._dd_progress = []
379 c08d76f5 Michael Hanselmann
380 c08d76f5 Michael Hanselmann
    # Expected size of transferred data
381 f9323011 Michael Hanselmann
    self._exp_size = exp_size
382 c08d76f5 Michael Hanselmann
383 34c9ee7b Michael Hanselmann
  def GetLineSplitter(self, prog):
384 34c9ee7b Michael Hanselmann
    """Returns the line splitter for a program.
385 34c9ee7b Michael Hanselmann

386 34c9ee7b Michael Hanselmann
    """
387 34c9ee7b Michael Hanselmann
    return self._splitter[prog]
388 34c9ee7b Michael Hanselmann
389 34c9ee7b Michael Hanselmann
  def FlushAll(self):
390 34c9ee7b Michael Hanselmann
    """Flushes all line splitters.
391 34c9ee7b Michael Hanselmann

392 34c9ee7b Michael Hanselmann
    """
393 34c9ee7b Michael Hanselmann
    for ls in self._splitter.itervalues():
394 34c9ee7b Michael Hanselmann
      ls.flush()
395 34c9ee7b Michael Hanselmann
396 34c9ee7b Michael Hanselmann
  def CloseAll(self):
397 34c9ee7b Michael Hanselmann
    """Closes all line splitters.
398 34c9ee7b Michael Hanselmann

399 34c9ee7b Michael Hanselmann
    """
400 34c9ee7b Michael Hanselmann
    for ls in self._splitter.itervalues():
401 34c9ee7b Michael Hanselmann
      ls.close()
402 34c9ee7b Michael Hanselmann
    self._splitter.clear()
403 34c9ee7b Michael Hanselmann
404 c08d76f5 Michael Hanselmann
  def NotifyDd(self):
405 c08d76f5 Michael Hanselmann
    """Tells dd(1) to write statistics.
406 c08d76f5 Michael Hanselmann

407 c08d76f5 Michael Hanselmann
    """
408 c08d76f5 Michael Hanselmann
    if self._dd_pid is None:
409 c08d76f5 Michael Hanselmann
      # Can't notify
410 c08d76f5 Michael Hanselmann
      return False
411 c08d76f5 Michael Hanselmann
412 c08d76f5 Michael Hanselmann
    if not self._dd_ready:
413 c08d76f5 Michael Hanselmann
      # There's a race condition between starting the program and sending
414 c08d76f5 Michael Hanselmann
      # signals.  The signal handler is only registered after some time, so we
415 c08d76f5 Michael Hanselmann
      # have to check whether the program is ready. If it isn't, sending a
416 c08d76f5 Michael Hanselmann
      # signal will invoke the default handler (and usually abort the program).
417 c08d76f5 Michael Hanselmann
      if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
418 c08d76f5 Michael Hanselmann
        logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
419 c08d76f5 Michael Hanselmann
        return False
420 c08d76f5 Michael Hanselmann
421 c08d76f5 Michael Hanselmann
      logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
422 c08d76f5 Michael Hanselmann
      self._dd_ready = True
423 c08d76f5 Michael Hanselmann
424 c08d76f5 Michael Hanselmann
    logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
425 c08d76f5 Michael Hanselmann
    try:
426 c08d76f5 Michael Hanselmann
      os.kill(self._dd_pid, DD_INFO_SIGNAL)
427 c08d76f5 Michael Hanselmann
    except EnvironmentError, err:
428 c08d76f5 Michael Hanselmann
      if err.errno != errno.ESRCH:
429 c08d76f5 Michael Hanselmann
        raise
430 c08d76f5 Michael Hanselmann
431 c08d76f5 Michael Hanselmann
      # Process no longer exists
432 560cbec1 Michael Hanselmann
      logging.debug("dd exited")
433 c08d76f5 Michael Hanselmann
      self._dd_pid = None
434 c08d76f5 Michael Hanselmann
435 c08d76f5 Michael Hanselmann
    return True
436 c08d76f5 Michael Hanselmann
437 34c9ee7b Michael Hanselmann
  def _ProcessOutput(self, line, prog):
438 34c9ee7b Michael Hanselmann
    """Takes care of child process output.
439 34c9ee7b Michael Hanselmann

440 34c9ee7b Michael Hanselmann
    @type line: string
441 34c9ee7b Michael Hanselmann
    @param line: Child output line
442 34c9ee7b Michael Hanselmann
    @type prog: number
443 34c9ee7b Michael Hanselmann
    @param prog: Program from which the line originates
444 34c9ee7b Michael Hanselmann

445 34c9ee7b Michael Hanselmann
    """
446 34c9ee7b Michael Hanselmann
    force_update = False
447 34c9ee7b Michael Hanselmann
    forward_line = line
448 34c9ee7b Michael Hanselmann
449 34c9ee7b Michael Hanselmann
    if prog == PROG_SOCAT:
450 34c9ee7b Michael Hanselmann
      level = None
451 34c9ee7b Michael Hanselmann
      parts = line.split(None, 4)
452 34c9ee7b Michael Hanselmann
453 34c9ee7b Michael Hanselmann
      if len(parts) == 5:
454 34c9ee7b Michael Hanselmann
        (_, _, _, level, msg) = parts
455 34c9ee7b Michael Hanselmann
456 34c9ee7b Michael Hanselmann
        force_update = self._ProcessSocatOutput(self._status_file, level, msg)
457 34c9ee7b Michael Hanselmann
458 34c9ee7b Michael Hanselmann
        if self._debug or (level and level not in SOCAT_LOG_IGNORE):
459 34c9ee7b Michael Hanselmann
          forward_line = "socat: %s %s" % (level, msg)
460 34c9ee7b Michael Hanselmann
        else:
461 34c9ee7b Michael Hanselmann
          forward_line = None
462 34c9ee7b Michael Hanselmann
      else:
463 34c9ee7b Michael Hanselmann
        forward_line = "socat: %s" % line
464 34c9ee7b Michael Hanselmann
465 c08d76f5 Michael Hanselmann
    elif prog == PROG_DD:
466 c08d76f5 Michael Hanselmann
      (should_forward, force_update) = self._ProcessDdOutput(line)
467 c08d76f5 Michael Hanselmann
468 c08d76f5 Michael Hanselmann
      if should_forward or self._debug:
469 c08d76f5 Michael Hanselmann
        forward_line = "dd: %s" % line
470 c08d76f5 Michael Hanselmann
      else:
471 c08d76f5 Michael Hanselmann
        forward_line = None
472 c08d76f5 Michael Hanselmann
473 c08d76f5 Michael Hanselmann
    elif prog == PROG_DD_PID:
474 c08d76f5 Michael Hanselmann
      if self._dd_pid:
475 c08d76f5 Michael Hanselmann
        raise RuntimeError("dd PID reported more than once")
476 c08d76f5 Michael Hanselmann
      logging.debug("Received dd PID %r", line)
477 c08d76f5 Michael Hanselmann
      self._dd_pid = int(line)
478 c08d76f5 Michael Hanselmann
      forward_line = None
479 c08d76f5 Michael Hanselmann
480 f9323011 Michael Hanselmann
    elif prog == PROG_EXP_SIZE:
481 f9323011 Michael Hanselmann
      logging.debug("Received predicted size %r", line)
482 f9323011 Michael Hanselmann
      forward_line = None
483 f9323011 Michael Hanselmann
484 f9323011 Michael Hanselmann
      if line:
485 f9323011 Michael Hanselmann
        try:
486 f9323011 Michael Hanselmann
          exp_size = utils.BytesToMebibyte(int(line))
487 f9323011 Michael Hanselmann
        except (ValueError, TypeError), err:
488 f9323011 Michael Hanselmann
          logging.error("Failed to convert predicted size %r to number: %s",
489 f9323011 Michael Hanselmann
                        line, err)
490 f9323011 Michael Hanselmann
          exp_size = None
491 f9323011 Michael Hanselmann
      else:
492 f9323011 Michael Hanselmann
        exp_size = None
493 f9323011 Michael Hanselmann
494 f9323011 Michael Hanselmann
      self._exp_size = exp_size
495 f9323011 Michael Hanselmann
496 34c9ee7b Michael Hanselmann
    if forward_line:
497 34c9ee7b Michael Hanselmann
      self._logger.info(forward_line)
498 34c9ee7b Michael Hanselmann
      self._status_file.AddRecentOutput(forward_line)
499 34c9ee7b Michael Hanselmann
500 34c9ee7b Michael Hanselmann
    self._status_file.Update(force_update)
501 34c9ee7b Michael Hanselmann
502 34c9ee7b Michael Hanselmann
  @staticmethod
503 34c9ee7b Michael Hanselmann
  def _ProcessSocatOutput(status_file, level, msg):
504 34c9ee7b Michael Hanselmann
    """Interprets socat log output.
505 34c9ee7b Michael Hanselmann

506 34c9ee7b Michael Hanselmann
    """
507 34c9ee7b Michael Hanselmann
    if level == SOCAT_LOG_NOTICE:
508 34c9ee7b Michael Hanselmann
      if status_file.GetListenPort() is None:
509 34c9ee7b Michael Hanselmann
        # TODO: Maybe implement timeout to not listen forever
510 34c9ee7b Michael Hanselmann
        m = LISTENING_RE.match(msg)
511 34c9ee7b Michael Hanselmann
        if m:
512 34c9ee7b Michael Hanselmann
          (_, port) = _VerifyListening(int(m.group("family")),
513 34c9ee7b Michael Hanselmann
                                       m.group("address"),
514 34c9ee7b Michael Hanselmann
                                       int(m.group("port")))
515 34c9ee7b Michael Hanselmann
516 34c9ee7b Michael Hanselmann
          status_file.SetListenPort(port)
517 34c9ee7b Michael Hanselmann
          return True
518 34c9ee7b Michael Hanselmann
519 34c9ee7b Michael Hanselmann
      if not status_file.GetConnected():
520 34c9ee7b Michael Hanselmann
        m = TRANSFER_LOOP_RE.match(msg)
521 34c9ee7b Michael Hanselmann
        if m:
522 53dbf14c Michael Hanselmann
          logging.debug("Connection established")
523 34c9ee7b Michael Hanselmann
          status_file.SetConnected()
524 34c9ee7b Michael Hanselmann
          return True
525 34c9ee7b Michael Hanselmann
526 34c9ee7b Michael Hanselmann
    return False
527 c08d76f5 Michael Hanselmann
528 c08d76f5 Michael Hanselmann
  def _ProcessDdOutput(self, line):
529 c08d76f5 Michael Hanselmann
    """Interprets a line of dd(1)'s output.
530 c08d76f5 Michael Hanselmann

531 c08d76f5 Michael Hanselmann
    """
532 c08d76f5 Michael Hanselmann
    m = DD_INFO_RE.match(line)
533 c08d76f5 Michael Hanselmann
    if m:
534 c08d76f5 Michael Hanselmann
      seconds = float(m.group("seconds"))
535 c08d76f5 Michael Hanselmann
      mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
536 c08d76f5 Michael Hanselmann
      self._UpdateDdProgress(seconds, mbytes)
537 c08d76f5 Michael Hanselmann
      return (False, True)
538 c08d76f5 Michael Hanselmann
539 c08d76f5 Michael Hanselmann
    m = DD_STDERR_IGNORE.match(line)
540 c08d76f5 Michael Hanselmann
    if m:
541 c08d76f5 Michael Hanselmann
      # Ignore
542 c08d76f5 Michael Hanselmann
      return (False, False)
543 c08d76f5 Michael Hanselmann
544 c08d76f5 Michael Hanselmann
    # Forward line
545 c08d76f5 Michael Hanselmann
    return (True, False)
546 c08d76f5 Michael Hanselmann
547 c08d76f5 Michael Hanselmann
  def _UpdateDdProgress(self, seconds, mbytes):
548 c08d76f5 Michael Hanselmann
    """Updates the internal status variables for dd(1) progress.
549 c08d76f5 Michael Hanselmann

550 c08d76f5 Michael Hanselmann
    @type seconds: float
551 c08d76f5 Michael Hanselmann
    @param seconds: Timestamp of this update
552 c08d76f5 Michael Hanselmann
    @type mbytes: float
553 c08d76f5 Michael Hanselmann
    @param mbytes: Total number of MiB transferred so far
554 c08d76f5 Michael Hanselmann

555 c08d76f5 Michael Hanselmann
    """
556 c08d76f5 Michael Hanselmann
    # Add latest sample
557 c08d76f5 Michael Hanselmann
    self._dd_progress.append((seconds, mbytes))
558 c08d76f5 Michael Hanselmann
559 c08d76f5 Michael Hanselmann
    # Remove old samples
560 c08d76f5 Michael Hanselmann
    del self._dd_progress[:-self._dd_tp_samples]
561 c08d76f5 Michael Hanselmann
562 c08d76f5 Michael Hanselmann
    # Calculate throughput
563 c08d76f5 Michael Hanselmann
    throughput = _CalcThroughput(self._dd_progress)
564 c08d76f5 Michael Hanselmann
565 c08d76f5 Michael Hanselmann
    # Calculate percent and ETA
566 c08d76f5 Michael Hanselmann
    percent = None
567 c08d76f5 Michael Hanselmann
    eta = None
568 c08d76f5 Michael Hanselmann
569 c08d76f5 Michael Hanselmann
    if self._exp_size is not None:
570 c08d76f5 Michael Hanselmann
      if self._exp_size != 0:
571 c08d76f5 Michael Hanselmann
        percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
572 c08d76f5 Michael Hanselmann
573 c08d76f5 Michael Hanselmann
      if throughput:
574 c08d76f5 Michael Hanselmann
        eta = max(0, float(self._exp_size - mbytes) / throughput)
575 c08d76f5 Michael Hanselmann
576 c08d76f5 Michael Hanselmann
    self._status_file.SetProgress(mbytes, throughput, percent, eta)
577 c08d76f5 Michael Hanselmann
578 c08d76f5 Michael Hanselmann
579 c08d76f5 Michael Hanselmann
def _CalcThroughput(samples):
580 c08d76f5 Michael Hanselmann
  """Calculates the throughput in MiB/second.
581 c08d76f5 Michael Hanselmann

582 c08d76f5 Michael Hanselmann
  @type samples: sequence
583 c08d76f5 Michael Hanselmann
  @param samples: List of samples, each consisting of a (timestamp, mbytes)
584 c08d76f5 Michael Hanselmann
                  tuple
585 c08d76f5 Michael Hanselmann
  @rtype: float or None
586 c08d76f5 Michael Hanselmann
  @return: Throughput in MiB/second
587 c08d76f5 Michael Hanselmann

588 c08d76f5 Michael Hanselmann
  """
589 c08d76f5 Michael Hanselmann
  if len(samples) < 2:
590 c08d76f5 Michael Hanselmann
    # Can't calculate throughput
591 c08d76f5 Michael Hanselmann
    return None
592 c08d76f5 Michael Hanselmann
593 c08d76f5 Michael Hanselmann
  (start_time, start_mbytes) = samples[0]
594 c08d76f5 Michael Hanselmann
  (end_time, end_mbytes) = samples[-1]
595 c08d76f5 Michael Hanselmann
596 c08d76f5 Michael Hanselmann
  return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)