Statistics
| Branch: | Tag: | Revision:

root / lib / impexpd / __init__.py @ e90739d6

History | View | Annotate | Download (15.7 kB)

1 bb44b1ae Michael Hanselmann
#
2 bb44b1ae Michael Hanselmann
#
3 bb44b1ae Michael Hanselmann
4 bb44b1ae Michael Hanselmann
# Copyright (C) 2010 Google Inc.
5 bb44b1ae Michael Hanselmann
#
6 bb44b1ae Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 bb44b1ae Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 bb44b1ae Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 bb44b1ae Michael Hanselmann
# (at your option) any later version.
10 bb44b1ae Michael Hanselmann
#
11 bb44b1ae Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 bb44b1ae Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 bb44b1ae Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 bb44b1ae Michael Hanselmann
# General Public License for more details.
15 bb44b1ae Michael Hanselmann
#
16 bb44b1ae Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 bb44b1ae Michael Hanselmann
# along with this program; if not, write to the Free Software
18 bb44b1ae Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 bb44b1ae Michael Hanselmann
# 02110-1301, USA.
20 bb44b1ae Michael Hanselmann
21 bb44b1ae Michael Hanselmann
22 bb44b1ae Michael Hanselmann
"""Classes and functions for import/export daemon.
23 bb44b1ae Michael Hanselmann

24 bb44b1ae Michael Hanselmann
"""
25 bb44b1ae Michael Hanselmann
26 c08d76f5 Michael Hanselmann
import os
27 34c9ee7b Michael Hanselmann
import re
28 34c9ee7b Michael Hanselmann
import socket
29 53dbf14c Michael Hanselmann
import logging
30 c08d76f5 Michael Hanselmann
import signal
31 c08d76f5 Michael Hanselmann
import errno
32 c08d76f5 Michael Hanselmann
import time
33 bb44b1ae Michael Hanselmann
from cStringIO import StringIO
34 bb44b1ae Michael Hanselmann
35 bb44b1ae Michael Hanselmann
from ganeti import constants
36 bb44b1ae Michael Hanselmann
from ganeti import errors
37 bb44b1ae Michael Hanselmann
from ganeti import utils
38 bb44b1ae Michael Hanselmann
39 bb44b1ae Michael Hanselmann
40 34c9ee7b Michael Hanselmann
#: Used to recognize point at which socat(1) starts to listen on its socket.
41 34c9ee7b Michael Hanselmann
#: The local address is required for the remote peer to connect (in particular
42 34c9ee7b Michael Hanselmann
#: the port number).
43 34c9ee7b Michael Hanselmann
LISTENING_RE = re.compile(r"^listening on\s+"
44 34c9ee7b Michael Hanselmann
                          r"AF=(?P<family>\d+)\s+"
45 34c9ee7b Michael Hanselmann
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
46 34c9ee7b Michael Hanselmann
47 34c9ee7b Michael Hanselmann
#: Used to recognize point at which socat(1) is sending data over the wire
48 34c9ee7b Michael Hanselmann
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
49 34c9ee7b Michael Hanselmann
                              re.I)
50 34c9ee7b Michael Hanselmann
51 34c9ee7b Michael Hanselmann
SOCAT_LOG_DEBUG = "D"
52 34c9ee7b Michael Hanselmann
SOCAT_LOG_INFO = "I"
53 34c9ee7b Michael Hanselmann
SOCAT_LOG_NOTICE = "N"
54 34c9ee7b Michael Hanselmann
SOCAT_LOG_WARNING = "W"
55 34c9ee7b Michael Hanselmann
SOCAT_LOG_ERROR = "E"
56 34c9ee7b Michael Hanselmann
SOCAT_LOG_FATAL = "F"
57 34c9ee7b Michael Hanselmann
58 34c9ee7b Michael Hanselmann
SOCAT_LOG_IGNORE = frozenset([
59 34c9ee7b Michael Hanselmann
  SOCAT_LOG_DEBUG,
60 34c9ee7b Michael Hanselmann
  SOCAT_LOG_INFO,
61 34c9ee7b Michael Hanselmann
  SOCAT_LOG_NOTICE,
62 34c9ee7b Michael Hanselmann
  ])
63 34c9ee7b Michael Hanselmann
64 c08d76f5 Michael Hanselmann
#: Used to parse GNU dd(1) statistics
65 c08d76f5 Michael Hanselmann
DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
66 c08d76f5 Michael Hanselmann
                        r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
67 c08d76f5 Michael Hanselmann
68 c08d76f5 Michael Hanselmann
#: Used to ignore "N+N records in/out" on dd(1)'s stderr
69 c08d76f5 Michael Hanselmann
DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
70 c08d76f5 Michael Hanselmann
71 c08d76f5 Michael Hanselmann
#: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
72 c08d76f5 Michael Hanselmann
#: unavailable and SIGUSR1 is used instead)
73 c08d76f5 Michael Hanselmann
DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
74 c08d76f5 Michael Hanselmann
75 bb44b1ae Michael Hanselmann
#: Buffer size: at most this many bytes are transferred at once
76 bb44b1ae Michael Hanselmann
BUFSIZE = 1024 * 1024
77 bb44b1ae Michael Hanselmann
78 bb44b1ae Michael Hanselmann
# Common options for socat
79 bb44b1ae Michael Hanselmann
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
80 971bbd84 Michael Hanselmann
SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1",
81 971bbd84 Michael Hanselmann
                      "cipher=%s" % constants.OPENSSL_CIPHERS]
82 bb44b1ae Michael Hanselmann
83 e90739d6 Michael Hanselmann
if constants.SOCAT_USE_COMPRESS:
84 e90739d6 Michael Hanselmann
  # Disables all compression in by OpenSSL. Only supported in patched versions
85 e90739d6 Michael Hanselmann
  # of socat (as of November 2010). See INSTALL for more information.
86 e90739d6 Michael Hanselmann
  SOCAT_OPENSSL_OPTS.append("compress=none")
87 e90739d6 Michael Hanselmann
88 0559f745 Michael Hanselmann
SOCAT_OPTION_MAXLEN = 400
89 0559f745 Michael Hanselmann
90 34c9ee7b Michael Hanselmann
(PROG_OTHER,
91 c08d76f5 Michael Hanselmann
 PROG_SOCAT,
92 c08d76f5 Michael Hanselmann
 PROG_DD,
93 f9323011 Michael Hanselmann
 PROG_DD_PID,
94 f9323011 Michael Hanselmann
 PROG_EXP_SIZE) = range(1, 6)
95 34c9ee7b Michael Hanselmann
PROG_ALL = frozenset([
96 34c9ee7b Michael Hanselmann
  PROG_OTHER,
97 34c9ee7b Michael Hanselmann
  PROG_SOCAT,
98 c08d76f5 Michael Hanselmann
  PROG_DD,
99 c08d76f5 Michael Hanselmann
  PROG_DD_PID,
100 f9323011 Michael Hanselmann
  PROG_EXP_SIZE,
101 34c9ee7b Michael Hanselmann
  ])
102 34c9ee7b Michael Hanselmann
103 bb44b1ae Michael Hanselmann
104 bb44b1ae Michael Hanselmann
class CommandBuilder(object):
105 c08d76f5 Michael Hanselmann
  def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
106 bb44b1ae Michael Hanselmann
    """Initializes this class.
107 bb44b1ae Michael Hanselmann

108 bb44b1ae Michael Hanselmann
    @param mode: Daemon mode (import or export)
109 bb44b1ae Michael Hanselmann
    @param opts: Options object
110 bb44b1ae Michael Hanselmann
    @type socat_stderr_fd: int
111 bb44b1ae Michael Hanselmann
    @param socat_stderr_fd: File descriptor socat should write its stderr to
112 c08d76f5 Michael Hanselmann
    @type dd_stderr_fd: int
113 c08d76f5 Michael Hanselmann
    @param dd_stderr_fd: File descriptor dd should write its stderr to
114 c08d76f5 Michael Hanselmann
    @type dd_pid_fd: int
115 c08d76f5 Michael Hanselmann
    @param dd_pid_fd: File descriptor the child should write dd's PID to
116 bb44b1ae Michael Hanselmann

117 bb44b1ae Michael Hanselmann
    """
118 bb44b1ae Michael Hanselmann
    self._opts = opts
119 bb44b1ae Michael Hanselmann
    self._mode = mode
120 bb44b1ae Michael Hanselmann
    self._socat_stderr_fd = socat_stderr_fd
121 c08d76f5 Michael Hanselmann
    self._dd_stderr_fd = dd_stderr_fd
122 c08d76f5 Michael Hanselmann
    self._dd_pid_fd = dd_pid_fd
123 bb44b1ae Michael Hanselmann
124 1d3dfa29 Michael Hanselmann
    assert (self._opts.magic is None or
125 1d3dfa29 Michael Hanselmann
            constants.IE_MAGIC_RE.match(self._opts.magic))
126 1d3dfa29 Michael Hanselmann
127 bb44b1ae Michael Hanselmann
  @staticmethod
128 bb44b1ae Michael Hanselmann
  def GetBashCommand(cmd):
129 bb44b1ae Michael Hanselmann
    """Prepares a command to be run in Bash.
130 bb44b1ae Michael Hanselmann

131 bb44b1ae Michael Hanselmann
    """
132 bb44b1ae Michael Hanselmann
    return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
133 bb44b1ae Michael Hanselmann
134 bb44b1ae Michael Hanselmann
  def _GetSocatCommand(self):
135 bb44b1ae Michael Hanselmann
    """Returns the socat command.
136 bb44b1ae Michael Hanselmann

137 bb44b1ae Michael Hanselmann
    """
138 bb44b1ae Michael Hanselmann
    common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
139 bb44b1ae Michael Hanselmann
      "key=%s" % self._opts.key,
140 bb44b1ae Michael Hanselmann
      "cert=%s" % self._opts.cert,
141 bb44b1ae Michael Hanselmann
      "cafile=%s" % self._opts.ca,
142 bb44b1ae Michael Hanselmann
      ]
143 bb44b1ae Michael Hanselmann
144 bb44b1ae Michael Hanselmann
    if self._opts.bind is not None:
145 bb44b1ae Michael Hanselmann
      common_addr_opts.append("bind=%s" % self._opts.bind)
146 bb44b1ae Michael Hanselmann
147 bb44b1ae Michael Hanselmann
    if self._mode == constants.IEM_IMPORT:
148 bb44b1ae Michael Hanselmann
      if self._opts.port is None:
149 bb44b1ae Michael Hanselmann
        port = 0
150 bb44b1ae Michael Hanselmann
      else:
151 bb44b1ae Michael Hanselmann
        port = self._opts.port
152 bb44b1ae Michael Hanselmann
153 bb44b1ae Michael Hanselmann
      addr1 = [
154 bb44b1ae Michael Hanselmann
        "OPENSSL-LISTEN:%s" % port,
155 bb44b1ae Michael Hanselmann
        "reuseaddr",
156 bb44b1ae Michael Hanselmann
157 bb44b1ae Michael Hanselmann
        # Retry to listen if connection wasn't established successfully, up to
158 bb44b1ae Michael Hanselmann
        # 100 times a second. Note that this still leaves room for DoS attacks.
159 bb44b1ae Michael Hanselmann
        "forever",
160 bb44b1ae Michael Hanselmann
        "intervall=0.01",
161 bb44b1ae Michael Hanselmann
        ] + common_addr_opts
162 bb44b1ae Michael Hanselmann
      addr2 = ["stdout"]
163 bb44b1ae Michael Hanselmann
164 bb44b1ae Michael Hanselmann
    elif self._mode == constants.IEM_EXPORT:
165 bb44b1ae Michael Hanselmann
      addr1 = ["stdin"]
166 bb44b1ae Michael Hanselmann
      addr2 = [
167 bb44b1ae Michael Hanselmann
        "OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
168 bb44b1ae Michael Hanselmann
169 bb44b1ae Michael Hanselmann
        # How long to wait per connection attempt
170 bb44b1ae Michael Hanselmann
        "connect-timeout=%s" % self._opts.connect_timeout,
171 bb44b1ae Michael Hanselmann
172 bb44b1ae Michael Hanselmann
        # Retry a few times before giving up to connect (once per second)
173 bb44b1ae Michael Hanselmann
        "retry=%s" % self._opts.connect_retries,
174 bb44b1ae Michael Hanselmann
        "intervall=1",
175 bb44b1ae Michael Hanselmann
        ] + common_addr_opts
176 bb44b1ae Michael Hanselmann
177 bb44b1ae Michael Hanselmann
    else:
178 bb44b1ae Michael Hanselmann
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
179 bb44b1ae Michael Hanselmann
180 bb44b1ae Michael Hanselmann
    for i in [addr1, addr2]:
181 bb44b1ae Michael Hanselmann
      for value in i:
182 0559f745 Michael Hanselmann
        if len(value) > SOCAT_OPTION_MAXLEN:
183 0559f745 Michael Hanselmann
          raise errors.GenericError("Socat option longer than %s"
184 0559f745 Michael Hanselmann
                                    " characters: %r" %
185 0559f745 Michael Hanselmann
                                    (SOCAT_OPTION_MAXLEN, value))
186 bb44b1ae Michael Hanselmann
        if "," in value:
187 bb44b1ae Michael Hanselmann
          raise errors.GenericError("Comma not allowed in socat option"
188 bb44b1ae Michael Hanselmann
                                    " value: %r" % value)
189 bb44b1ae Michael Hanselmann
190 bb44b1ae Michael Hanselmann
    return [
191 bb44b1ae Michael Hanselmann
      constants.SOCAT_PATH,
192 bb44b1ae Michael Hanselmann
193 bb44b1ae Michael Hanselmann
      # Log to stderr
194 bb44b1ae Michael Hanselmann
      "-ls",
195 bb44b1ae Michael Hanselmann
196 bb44b1ae Michael Hanselmann
      # Log level
197 bb44b1ae Michael Hanselmann
      "-d", "-d",
198 bb44b1ae Michael Hanselmann
199 bb44b1ae Michael Hanselmann
      # Buffer size
200 bb44b1ae Michael Hanselmann
      "-b%s" % BUFSIZE,
201 bb44b1ae Michael Hanselmann
202 bb44b1ae Michael Hanselmann
      # Unidirectional mode, the first address is only used for reading, and the
203 bb44b1ae Michael Hanselmann
      # second address is only used for writing
204 bb44b1ae Michael Hanselmann
      "-u",
205 bb44b1ae Michael Hanselmann
206 bb44b1ae Michael Hanselmann
      ",".join(addr1), ",".join(addr2)
207 bb44b1ae Michael Hanselmann
      ]
208 bb44b1ae Michael Hanselmann
209 1d3dfa29 Michael Hanselmann
  def _GetMagicCommand(self):
210 1d3dfa29 Michael Hanselmann
    """Returns the command to read/write the magic value.
211 1d3dfa29 Michael Hanselmann

212 1d3dfa29 Michael Hanselmann
    """
213 1d3dfa29 Michael Hanselmann
    if not self._opts.magic:
214 1d3dfa29 Michael Hanselmann
      return None
215 1d3dfa29 Michael Hanselmann
216 1d3dfa29 Michael Hanselmann
    # Prefix to ensure magic isn't interpreted as option to "echo"
217 1d3dfa29 Michael Hanselmann
    magic = "M=%s" % self._opts.magic
218 1d3dfa29 Michael Hanselmann
219 1d3dfa29 Michael Hanselmann
    cmd = StringIO()
220 1d3dfa29 Michael Hanselmann
221 1d3dfa29 Michael Hanselmann
    if self._mode == constants.IEM_IMPORT:
222 1d3dfa29 Michael Hanselmann
      cmd.write("{ ")
223 1d3dfa29 Michael Hanselmann
      cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"]))
224 1d3dfa29 Michael Hanselmann
      cmd.write(" && ")
225 1d3dfa29 Michael Hanselmann
      cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic))
226 1d3dfa29 Michael Hanselmann
      cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch"))
227 1d3dfa29 Michael Hanselmann
      cmd.write(" exit 1;")
228 1d3dfa29 Michael Hanselmann
      cmd.write("fi;")
229 1d3dfa29 Michael Hanselmann
      cmd.write(" }")
230 1d3dfa29 Michael Hanselmann
231 1d3dfa29 Michael Hanselmann
    elif self._mode == constants.IEM_EXPORT:
232 1d3dfa29 Michael Hanselmann
      cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic]))
233 1d3dfa29 Michael Hanselmann
234 1d3dfa29 Michael Hanselmann
    else:
235 1d3dfa29 Michael Hanselmann
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
236 1d3dfa29 Michael Hanselmann
237 1d3dfa29 Michael Hanselmann
    return cmd.getvalue()
238 1d3dfa29 Michael Hanselmann
239 fbb6b864 Michael Hanselmann
  def _GetDdCommand(self):
240 fbb6b864 Michael Hanselmann
    """Returns the command for measuring throughput.
241 bb44b1ae Michael Hanselmann

242 bb44b1ae Michael Hanselmann
    """
243 c08d76f5 Michael Hanselmann
    dd_cmd = StringIO()
244 1d3dfa29 Michael Hanselmann
245 1d3dfa29 Michael Hanselmann
    magic_cmd = self._GetMagicCommand()
246 1d3dfa29 Michael Hanselmann
    if magic_cmd:
247 1d3dfa29 Michael Hanselmann
      dd_cmd.write("{ ")
248 1d3dfa29 Michael Hanselmann
      dd_cmd.write(magic_cmd)
249 1d3dfa29 Michael Hanselmann
      dd_cmd.write(" && ")
250 1d3dfa29 Michael Hanselmann
251 1d3dfa29 Michael Hanselmann
    dd_cmd.write("{ ")
252 c08d76f5 Michael Hanselmann
    # Setting LC_ALL since we want to parse the output and explicitely
253 c08d76f5 Michael Hanselmann
    # redirecting stdin, as the background process (dd) would have /dev/null as
254 c08d76f5 Michael Hanselmann
    # stdin otherwise
255 1d3dfa29 Michael Hanselmann
    dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
256 c08d76f5 Michael Hanselmann
                 (BUFSIZE, self._dd_stderr_fd))
257 c08d76f5 Michael Hanselmann
    # Send PID to daemon
258 c08d76f5 Michael Hanselmann
    dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
259 c08d76f5 Michael Hanselmann
    # And wait for dd
260 c08d76f5 Michael Hanselmann
    dd_cmd.write(" wait $pid;")
261 c08d76f5 Michael Hanselmann
    dd_cmd.write(" }")
262 1d3dfa29 Michael Hanselmann
263 1d3dfa29 Michael Hanselmann
    if magic_cmd:
264 1d3dfa29 Michael Hanselmann
      dd_cmd.write(" }")
265 1d3dfa29 Michael Hanselmann
266 fbb6b864 Michael Hanselmann
    return dd_cmd.getvalue()
267 fbb6b864 Michael Hanselmann
268 fbb6b864 Michael Hanselmann
  def _GetTransportCommand(self):
269 fbb6b864 Michael Hanselmann
    """Returns the command for the transport part of the daemon.
270 fbb6b864 Michael Hanselmann

271 fbb6b864 Michael Hanselmann
    """
272 fbb6b864 Michael Hanselmann
    socat_cmd = ("%s 2>&%d" %
273 fbb6b864 Michael Hanselmann
                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
274 fbb6b864 Michael Hanselmann
                  self._socat_stderr_fd))
275 fbb6b864 Michael Hanselmann
    dd_cmd = self._GetDdCommand()
276 c08d76f5 Michael Hanselmann
277 bb44b1ae Michael Hanselmann
    compr = self._opts.compress
278 bb44b1ae Michael Hanselmann
279 bb44b1ae Michael Hanselmann
    assert compr in constants.IEC_ALL
280 bb44b1ae Michael Hanselmann
281 fbb6b864 Michael Hanselmann
    parts = []
282 fbb6b864 Michael Hanselmann
283 bb44b1ae Michael Hanselmann
    if self._mode == constants.IEM_IMPORT:
284 fbb6b864 Michael Hanselmann
      parts.append(socat_cmd)
285 fbb6b864 Michael Hanselmann
286 bb44b1ae Michael Hanselmann
      if compr == constants.IEC_GZIP:
287 fbb6b864 Michael Hanselmann
        parts.append("gunzip -c")
288 fbb6b864 Michael Hanselmann
289 fbb6b864 Michael Hanselmann
      parts.append(dd_cmd)
290 c08d76f5 Michael Hanselmann
291 bb44b1ae Michael Hanselmann
    elif self._mode == constants.IEM_EXPORT:
292 fbb6b864 Michael Hanselmann
      parts.append(dd_cmd)
293 fbb6b864 Michael Hanselmann
294 bb44b1ae Michael Hanselmann
      if compr == constants.IEC_GZIP:
295 fbb6b864 Michael Hanselmann
        parts.append("gzip -c")
296 fbb6b864 Michael Hanselmann
297 fbb6b864 Michael Hanselmann
      parts.append(socat_cmd)
298 c08d76f5 Michael Hanselmann
299 bb44b1ae Michael Hanselmann
    else:
300 bb44b1ae Michael Hanselmann
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
301 bb44b1ae Michael Hanselmann
302 bb44b1ae Michael Hanselmann
    # TODO: Run transport as separate user
303 bb44b1ae Michael Hanselmann
    # The transport uses its own shell to simplify running it as a separate user
304 bb44b1ae Michael Hanselmann
    # in the future.
305 fbb6b864 Michael Hanselmann
    return self.GetBashCommand(" | ".join(parts))
306 bb44b1ae Michael Hanselmann
307 bb44b1ae Michael Hanselmann
  def GetCommand(self):
308 bb44b1ae Michael Hanselmann
    """Returns the complete child process command.
309 bb44b1ae Michael Hanselmann

310 bb44b1ae Michael Hanselmann
    """
311 bb44b1ae Michael Hanselmann
    transport_cmd = self._GetTransportCommand()
312 bb44b1ae Michael Hanselmann
313 bb44b1ae Michael Hanselmann
    buf = StringIO()
314 bb44b1ae Michael Hanselmann
315 bb44b1ae Michael Hanselmann
    if self._opts.cmd_prefix:
316 bb44b1ae Michael Hanselmann
      buf.write(self._opts.cmd_prefix)
317 bb44b1ae Michael Hanselmann
      buf.write(" ")
318 bb44b1ae Michael Hanselmann
319 bb44b1ae Michael Hanselmann
    buf.write(utils.ShellQuoteArgs(transport_cmd))
320 bb44b1ae Michael Hanselmann
321 bb44b1ae Michael Hanselmann
    if self._opts.cmd_suffix:
322 bb44b1ae Michael Hanselmann
      buf.write(" ")
323 bb44b1ae Michael Hanselmann
      buf.write(self._opts.cmd_suffix)
324 bb44b1ae Michael Hanselmann
325 bb44b1ae Michael Hanselmann
    return self.GetBashCommand(buf.getvalue())
326 34c9ee7b Michael Hanselmann
327 34c9ee7b Michael Hanselmann
328 34c9ee7b Michael Hanselmann
def _VerifyListening(family, address, port):
329 34c9ee7b Michael Hanselmann
  """Verify address given as listening address by socat.
330 34c9ee7b Michael Hanselmann

331 34c9ee7b Michael Hanselmann
  """
332 34c9ee7b Michael Hanselmann
  # TODO: Implement IPv6 support
333 34c9ee7b Michael Hanselmann
  if family != socket.AF_INET:
334 34c9ee7b Michael Hanselmann
    raise errors.GenericError("Address family %r not supported" % family)
335 34c9ee7b Michael Hanselmann
336 34c9ee7b Michael Hanselmann
  try:
337 34c9ee7b Michael Hanselmann
    packed_address = socket.inet_pton(family, address)
338 34c9ee7b Michael Hanselmann
  except socket.error:
339 34c9ee7b Michael Hanselmann
    raise errors.GenericError("Invalid address %r for family %s" %
340 34c9ee7b Michael Hanselmann
                              (address, family))
341 34c9ee7b Michael Hanselmann
342 34c9ee7b Michael Hanselmann
  return (socket.inet_ntop(family, packed_address), port)
343 34c9ee7b Michael Hanselmann
344 34c9ee7b Michael Hanselmann
345 34c9ee7b Michael Hanselmann
class ChildIOProcessor(object):
346 f9323011 Michael Hanselmann
  def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
347 34c9ee7b Michael Hanselmann
    """Initializes this class.
348 34c9ee7b Michael Hanselmann

349 34c9ee7b Michael Hanselmann
    """
350 34c9ee7b Michael Hanselmann
    self._debug = debug
351 34c9ee7b Michael Hanselmann
    self._status_file = status_file
352 34c9ee7b Michael Hanselmann
    self._logger = logger
353 34c9ee7b Michael Hanselmann
354 34c9ee7b Michael Hanselmann
    self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
355 34c9ee7b Michael Hanselmann
                           for prog in PROG_ALL])
356 34c9ee7b Michael Hanselmann
357 c08d76f5 Michael Hanselmann
    self._dd_pid = None
358 c08d76f5 Michael Hanselmann
    self._dd_ready = False
359 c08d76f5 Michael Hanselmann
    self._dd_tp_samples = throughput_samples
360 c08d76f5 Michael Hanselmann
    self._dd_progress = []
361 c08d76f5 Michael Hanselmann
362 c08d76f5 Michael Hanselmann
    # Expected size of transferred data
363 f9323011 Michael Hanselmann
    self._exp_size = exp_size
364 c08d76f5 Michael Hanselmann
365 34c9ee7b Michael Hanselmann
  def GetLineSplitter(self, prog):
366 34c9ee7b Michael Hanselmann
    """Returns the line splitter for a program.
367 34c9ee7b Michael Hanselmann

368 34c9ee7b Michael Hanselmann
    """
369 34c9ee7b Michael Hanselmann
    return self._splitter[prog]
370 34c9ee7b Michael Hanselmann
371 34c9ee7b Michael Hanselmann
  def FlushAll(self):
372 34c9ee7b Michael Hanselmann
    """Flushes all line splitters.
373 34c9ee7b Michael Hanselmann

374 34c9ee7b Michael Hanselmann
    """
375 34c9ee7b Michael Hanselmann
    for ls in self._splitter.itervalues():
376 34c9ee7b Michael Hanselmann
      ls.flush()
377 34c9ee7b Michael Hanselmann
378 34c9ee7b Michael Hanselmann
  def CloseAll(self):
379 34c9ee7b Michael Hanselmann
    """Closes all line splitters.
380 34c9ee7b Michael Hanselmann

381 34c9ee7b Michael Hanselmann
    """
382 34c9ee7b Michael Hanselmann
    for ls in self._splitter.itervalues():
383 34c9ee7b Michael Hanselmann
      ls.close()
384 34c9ee7b Michael Hanselmann
    self._splitter.clear()
385 34c9ee7b Michael Hanselmann
386 c08d76f5 Michael Hanselmann
  def NotifyDd(self):
387 c08d76f5 Michael Hanselmann
    """Tells dd(1) to write statistics.
388 c08d76f5 Michael Hanselmann

389 c08d76f5 Michael Hanselmann
    """
390 c08d76f5 Michael Hanselmann
    if self._dd_pid is None:
391 c08d76f5 Michael Hanselmann
      # Can't notify
392 c08d76f5 Michael Hanselmann
      return False
393 c08d76f5 Michael Hanselmann
394 c08d76f5 Michael Hanselmann
    if not self._dd_ready:
395 c08d76f5 Michael Hanselmann
      # There's a race condition between starting the program and sending
396 c08d76f5 Michael Hanselmann
      # signals.  The signal handler is only registered after some time, so we
397 c08d76f5 Michael Hanselmann
      # have to check whether the program is ready. If it isn't, sending a
398 c08d76f5 Michael Hanselmann
      # signal will invoke the default handler (and usually abort the program).
399 c08d76f5 Michael Hanselmann
      if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
400 c08d76f5 Michael Hanselmann
        logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
401 c08d76f5 Michael Hanselmann
        return False
402 c08d76f5 Michael Hanselmann
403 c08d76f5 Michael Hanselmann
      logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
404 c08d76f5 Michael Hanselmann
      self._dd_ready = True
405 c08d76f5 Michael Hanselmann
406 c08d76f5 Michael Hanselmann
    logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
407 c08d76f5 Michael Hanselmann
    try:
408 c08d76f5 Michael Hanselmann
      os.kill(self._dd_pid, DD_INFO_SIGNAL)
409 c08d76f5 Michael Hanselmann
    except EnvironmentError, err:
410 c08d76f5 Michael Hanselmann
      if err.errno != errno.ESRCH:
411 c08d76f5 Michael Hanselmann
        raise
412 c08d76f5 Michael Hanselmann
413 c08d76f5 Michael Hanselmann
      # Process no longer exists
414 560cbec1 Michael Hanselmann
      logging.debug("dd exited")
415 c08d76f5 Michael Hanselmann
      self._dd_pid = None
416 c08d76f5 Michael Hanselmann
417 c08d76f5 Michael Hanselmann
    return True
418 c08d76f5 Michael Hanselmann
419 34c9ee7b Michael Hanselmann
  def _ProcessOutput(self, line, prog):
420 34c9ee7b Michael Hanselmann
    """Takes care of child process output.
421 34c9ee7b Michael Hanselmann

422 34c9ee7b Michael Hanselmann
    @type line: string
423 34c9ee7b Michael Hanselmann
    @param line: Child output line
424 34c9ee7b Michael Hanselmann
    @type prog: number
425 34c9ee7b Michael Hanselmann
    @param prog: Program from which the line originates
426 34c9ee7b Michael Hanselmann

427 34c9ee7b Michael Hanselmann
    """
428 34c9ee7b Michael Hanselmann
    force_update = False
429 34c9ee7b Michael Hanselmann
    forward_line = line
430 34c9ee7b Michael Hanselmann
431 34c9ee7b Michael Hanselmann
    if prog == PROG_SOCAT:
432 34c9ee7b Michael Hanselmann
      level = None
433 34c9ee7b Michael Hanselmann
      parts = line.split(None, 4)
434 34c9ee7b Michael Hanselmann
435 34c9ee7b Michael Hanselmann
      if len(parts) == 5:
436 34c9ee7b Michael Hanselmann
        (_, _, _, level, msg) = parts
437 34c9ee7b Michael Hanselmann
438 34c9ee7b Michael Hanselmann
        force_update = self._ProcessSocatOutput(self._status_file, level, msg)
439 34c9ee7b Michael Hanselmann
440 34c9ee7b Michael Hanselmann
        if self._debug or (level and level not in SOCAT_LOG_IGNORE):
441 34c9ee7b Michael Hanselmann
          forward_line = "socat: %s %s" % (level, msg)
442 34c9ee7b Michael Hanselmann
        else:
443 34c9ee7b Michael Hanselmann
          forward_line = None
444 34c9ee7b Michael Hanselmann
      else:
445 34c9ee7b Michael Hanselmann
        forward_line = "socat: %s" % line
446 34c9ee7b Michael Hanselmann
447 c08d76f5 Michael Hanselmann
    elif prog == PROG_DD:
448 c08d76f5 Michael Hanselmann
      (should_forward, force_update) = self._ProcessDdOutput(line)
449 c08d76f5 Michael Hanselmann
450 c08d76f5 Michael Hanselmann
      if should_forward or self._debug:
451 c08d76f5 Michael Hanselmann
        forward_line = "dd: %s" % line
452 c08d76f5 Michael Hanselmann
      else:
453 c08d76f5 Michael Hanselmann
        forward_line = None
454 c08d76f5 Michael Hanselmann
455 c08d76f5 Michael Hanselmann
    elif prog == PROG_DD_PID:
456 c08d76f5 Michael Hanselmann
      if self._dd_pid:
457 c08d76f5 Michael Hanselmann
        raise RuntimeError("dd PID reported more than once")
458 c08d76f5 Michael Hanselmann
      logging.debug("Received dd PID %r", line)
459 c08d76f5 Michael Hanselmann
      self._dd_pid = int(line)
460 c08d76f5 Michael Hanselmann
      forward_line = None
461 c08d76f5 Michael Hanselmann
462 f9323011 Michael Hanselmann
    elif prog == PROG_EXP_SIZE:
463 f9323011 Michael Hanselmann
      logging.debug("Received predicted size %r", line)
464 f9323011 Michael Hanselmann
      forward_line = None
465 f9323011 Michael Hanselmann
466 f9323011 Michael Hanselmann
      if line:
467 f9323011 Michael Hanselmann
        try:
468 f9323011 Michael Hanselmann
          exp_size = utils.BytesToMebibyte(int(line))
469 f9323011 Michael Hanselmann
        except (ValueError, TypeError), err:
470 f9323011 Michael Hanselmann
          logging.error("Failed to convert predicted size %r to number: %s",
471 f9323011 Michael Hanselmann
                        line, err)
472 f9323011 Michael Hanselmann
          exp_size = None
473 f9323011 Michael Hanselmann
      else:
474 f9323011 Michael Hanselmann
        exp_size = None
475 f9323011 Michael Hanselmann
476 f9323011 Michael Hanselmann
      self._exp_size = exp_size
477 f9323011 Michael Hanselmann
478 34c9ee7b Michael Hanselmann
    if forward_line:
479 34c9ee7b Michael Hanselmann
      self._logger.info(forward_line)
480 34c9ee7b Michael Hanselmann
      self._status_file.AddRecentOutput(forward_line)
481 34c9ee7b Michael Hanselmann
482 34c9ee7b Michael Hanselmann
    self._status_file.Update(force_update)
483 34c9ee7b Michael Hanselmann
484 34c9ee7b Michael Hanselmann
  @staticmethod
485 34c9ee7b Michael Hanselmann
  def _ProcessSocatOutput(status_file, level, msg):
486 34c9ee7b Michael Hanselmann
    """Interprets socat log output.
487 34c9ee7b Michael Hanselmann

488 34c9ee7b Michael Hanselmann
    """
489 34c9ee7b Michael Hanselmann
    if level == SOCAT_LOG_NOTICE:
490 34c9ee7b Michael Hanselmann
      if status_file.GetListenPort() is None:
491 34c9ee7b Michael Hanselmann
        # TODO: Maybe implement timeout to not listen forever
492 34c9ee7b Michael Hanselmann
        m = LISTENING_RE.match(msg)
493 34c9ee7b Michael Hanselmann
        if m:
494 34c9ee7b Michael Hanselmann
          (_, port) = _VerifyListening(int(m.group("family")),
495 34c9ee7b Michael Hanselmann
                                       m.group("address"),
496 34c9ee7b Michael Hanselmann
                                       int(m.group("port")))
497 34c9ee7b Michael Hanselmann
498 34c9ee7b Michael Hanselmann
          status_file.SetListenPort(port)
499 34c9ee7b Michael Hanselmann
          return True
500 34c9ee7b Michael Hanselmann
501 34c9ee7b Michael Hanselmann
      if not status_file.GetConnected():
502 34c9ee7b Michael Hanselmann
        m = TRANSFER_LOOP_RE.match(msg)
503 34c9ee7b Michael Hanselmann
        if m:
504 53dbf14c Michael Hanselmann
          logging.debug("Connection established")
505 34c9ee7b Michael Hanselmann
          status_file.SetConnected()
506 34c9ee7b Michael Hanselmann
          return True
507 34c9ee7b Michael Hanselmann
508 34c9ee7b Michael Hanselmann
    return False
509 c08d76f5 Michael Hanselmann
510 c08d76f5 Michael Hanselmann
  def _ProcessDdOutput(self, line):
511 c08d76f5 Michael Hanselmann
    """Interprets a line of dd(1)'s output.
512 c08d76f5 Michael Hanselmann

513 c08d76f5 Michael Hanselmann
    """
514 c08d76f5 Michael Hanselmann
    m = DD_INFO_RE.match(line)
515 c08d76f5 Michael Hanselmann
    if m:
516 c08d76f5 Michael Hanselmann
      seconds = float(m.group("seconds"))
517 c08d76f5 Michael Hanselmann
      mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
518 c08d76f5 Michael Hanselmann
      self._UpdateDdProgress(seconds, mbytes)
519 c08d76f5 Michael Hanselmann
      return (False, True)
520 c08d76f5 Michael Hanselmann
521 c08d76f5 Michael Hanselmann
    m = DD_STDERR_IGNORE.match(line)
522 c08d76f5 Michael Hanselmann
    if m:
523 c08d76f5 Michael Hanselmann
      # Ignore
524 c08d76f5 Michael Hanselmann
      return (False, False)
525 c08d76f5 Michael Hanselmann
526 c08d76f5 Michael Hanselmann
    # Forward line
527 c08d76f5 Michael Hanselmann
    return (True, False)
528 c08d76f5 Michael Hanselmann
529 c08d76f5 Michael Hanselmann
  def _UpdateDdProgress(self, seconds, mbytes):
530 c08d76f5 Michael Hanselmann
    """Updates the internal status variables for dd(1) progress.
531 c08d76f5 Michael Hanselmann

532 c08d76f5 Michael Hanselmann
    @type seconds: float
533 c08d76f5 Michael Hanselmann
    @param seconds: Timestamp of this update
534 c08d76f5 Michael Hanselmann
    @type mbytes: float
535 c08d76f5 Michael Hanselmann
    @param mbytes: Total number of MiB transferred so far
536 c08d76f5 Michael Hanselmann

537 c08d76f5 Michael Hanselmann
    """
538 c08d76f5 Michael Hanselmann
    # Add latest sample
539 c08d76f5 Michael Hanselmann
    self._dd_progress.append((seconds, mbytes))
540 c08d76f5 Michael Hanselmann
541 c08d76f5 Michael Hanselmann
    # Remove old samples
542 c08d76f5 Michael Hanselmann
    del self._dd_progress[:-self._dd_tp_samples]
543 c08d76f5 Michael Hanselmann
544 c08d76f5 Michael Hanselmann
    # Calculate throughput
545 c08d76f5 Michael Hanselmann
    throughput = _CalcThroughput(self._dd_progress)
546 c08d76f5 Michael Hanselmann
547 c08d76f5 Michael Hanselmann
    # Calculate percent and ETA
548 c08d76f5 Michael Hanselmann
    percent = None
549 c08d76f5 Michael Hanselmann
    eta = None
550 c08d76f5 Michael Hanselmann
551 c08d76f5 Michael Hanselmann
    if self._exp_size is not None:
552 c08d76f5 Michael Hanselmann
      if self._exp_size != 0:
553 c08d76f5 Michael Hanselmann
        percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
554 c08d76f5 Michael Hanselmann
555 c08d76f5 Michael Hanselmann
      if throughput:
556 c08d76f5 Michael Hanselmann
        eta = max(0, float(self._exp_size - mbytes) / throughput)
557 c08d76f5 Michael Hanselmann
558 c08d76f5 Michael Hanselmann
    self._status_file.SetProgress(mbytes, throughput, percent, eta)
559 c08d76f5 Michael Hanselmann
560 c08d76f5 Michael Hanselmann
561 c08d76f5 Michael Hanselmann
def _CalcThroughput(samples):
562 c08d76f5 Michael Hanselmann
  """Calculates the throughput in MiB/second.
563 c08d76f5 Michael Hanselmann

564 c08d76f5 Michael Hanselmann
  @type samples: sequence
565 c08d76f5 Michael Hanselmann
  @param samples: List of samples, each consisting of a (timestamp, mbytes)
566 c08d76f5 Michael Hanselmann
                  tuple
567 c08d76f5 Michael Hanselmann
  @rtype: float or None
568 c08d76f5 Michael Hanselmann
  @return: Throughput in MiB/second
569 c08d76f5 Michael Hanselmann

570 c08d76f5 Michael Hanselmann
  """
571 c08d76f5 Michael Hanselmann
  if len(samples) < 2:
572 c08d76f5 Michael Hanselmann
    # Can't calculate throughput
573 c08d76f5 Michael Hanselmann
    return None
574 c08d76f5 Michael Hanselmann
575 c08d76f5 Michael Hanselmann
  (start_time, start_mbytes) = samples[0]
576 c08d76f5 Michael Hanselmann
  (end_time, end_mbytes) = samples[-1]
577 c08d76f5 Michael Hanselmann
578 c08d76f5 Michael Hanselmann
  return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)