Statistics
| Branch: | Tag: | Revision:

root / lib / impexpd / __init__.py @ eaa4c57c

History | View | Annotate | Download (16.2 kB)

1 bb44b1ae Michael Hanselmann
#
2 bb44b1ae Michael Hanselmann
#
3 bb44b1ae Michael Hanselmann
4 bb44b1ae Michael Hanselmann
# Copyright (C) 2010 Google Inc.
5 bb44b1ae Michael Hanselmann
#
6 bb44b1ae Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 bb44b1ae Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 bb44b1ae Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 bb44b1ae Michael Hanselmann
# (at your option) any later version.
10 bb44b1ae Michael Hanselmann
#
11 bb44b1ae Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 bb44b1ae Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 bb44b1ae Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 bb44b1ae Michael Hanselmann
# General Public License for more details.
15 bb44b1ae Michael Hanselmann
#
16 bb44b1ae Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 bb44b1ae Michael Hanselmann
# along with this program; if not, write to the Free Software
18 bb44b1ae Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 bb44b1ae Michael Hanselmann
# 02110-1301, USA.
20 bb44b1ae Michael Hanselmann
21 bb44b1ae Michael Hanselmann
22 bb44b1ae Michael Hanselmann
"""Classes and functions for import/export daemon.
23 bb44b1ae Michael Hanselmann

24 bb44b1ae Michael Hanselmann
"""
25 bb44b1ae Michael Hanselmann
26 c08d76f5 Michael Hanselmann
import os
27 34c9ee7b Michael Hanselmann
import re
28 34c9ee7b Michael Hanselmann
import socket
29 53dbf14c Michael Hanselmann
import logging
30 c08d76f5 Michael Hanselmann
import signal
31 c08d76f5 Michael Hanselmann
import errno
32 c08d76f5 Michael Hanselmann
import time
33 bb44b1ae Michael Hanselmann
from cStringIO import StringIO
34 bb44b1ae Michael Hanselmann
35 bb44b1ae Michael Hanselmann
from ganeti import constants
36 bb44b1ae Michael Hanselmann
from ganeti import errors
37 bb44b1ae Michael Hanselmann
from ganeti import utils
38 58bb385c Michael Hanselmann
from ganeti import netutils
39 bb44b1ae Michael Hanselmann
40 bb44b1ae Michael Hanselmann
41 34c9ee7b Michael Hanselmann
#: Used to recognize point at which socat(1) starts to listen on its socket.
42 34c9ee7b Michael Hanselmann
#: The local address is required for the remote peer to connect (in particular
43 34c9ee7b Michael Hanselmann
#: the port number).
44 34c9ee7b Michael Hanselmann
LISTENING_RE = re.compile(r"^listening on\s+"
45 34c9ee7b Michael Hanselmann
                          r"AF=(?P<family>\d+)\s+"
46 34c9ee7b Michael Hanselmann
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
47 34c9ee7b Michael Hanselmann
48 34c9ee7b Michael Hanselmann
#: Used to recognize point at which socat(1) is sending data over the wire
49 34c9ee7b Michael Hanselmann
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
50 34c9ee7b Michael Hanselmann
                              re.I)
51 34c9ee7b Michael Hanselmann
52 34c9ee7b Michael Hanselmann
SOCAT_LOG_DEBUG = "D"
53 34c9ee7b Michael Hanselmann
SOCAT_LOG_INFO = "I"
54 34c9ee7b Michael Hanselmann
SOCAT_LOG_NOTICE = "N"
55 34c9ee7b Michael Hanselmann
SOCAT_LOG_WARNING = "W"
56 34c9ee7b Michael Hanselmann
SOCAT_LOG_ERROR = "E"
57 34c9ee7b Michael Hanselmann
SOCAT_LOG_FATAL = "F"
58 34c9ee7b Michael Hanselmann
59 34c9ee7b Michael Hanselmann
SOCAT_LOG_IGNORE = frozenset([
60 34c9ee7b Michael Hanselmann
  SOCAT_LOG_DEBUG,
61 34c9ee7b Michael Hanselmann
  SOCAT_LOG_INFO,
62 34c9ee7b Michael Hanselmann
  SOCAT_LOG_NOTICE,
63 34c9ee7b Michael Hanselmann
  ])
64 34c9ee7b Michael Hanselmann
65 c08d76f5 Michael Hanselmann
#: Used to parse GNU dd(1) statistics
66 c08d76f5 Michael Hanselmann
DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
67 c08d76f5 Michael Hanselmann
                        r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
68 c08d76f5 Michael Hanselmann
69 c08d76f5 Michael Hanselmann
#: Used to ignore "N+N records in/out" on dd(1)'s stderr
70 c08d76f5 Michael Hanselmann
DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
71 c08d76f5 Michael Hanselmann
72 c08d76f5 Michael Hanselmann
#: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
73 c08d76f5 Michael Hanselmann
#: unavailable and SIGUSR1 is used instead)
74 c08d76f5 Michael Hanselmann
DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
75 c08d76f5 Michael Hanselmann
76 bb44b1ae Michael Hanselmann
#: Buffer size: at most this many bytes are transferred at once
77 bb44b1ae Michael Hanselmann
BUFSIZE = 1024 * 1024
78 bb44b1ae Michael Hanselmann
79 bb44b1ae Michael Hanselmann
# Common options for socat
80 bb44b1ae Michael Hanselmann
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
81 971bbd84 Michael Hanselmann
SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1",
82 971bbd84 Michael Hanselmann
                      "cipher=%s" % constants.OPENSSL_CIPHERS]
83 bb44b1ae Michael Hanselmann
84 e90739d6 Michael Hanselmann
if constants.SOCAT_USE_COMPRESS:
85 e90739d6 Michael Hanselmann
  # Disables all compression in by OpenSSL. Only supported in patched versions
86 e90739d6 Michael Hanselmann
  # of socat (as of November 2010). See INSTALL for more information.
87 e90739d6 Michael Hanselmann
  SOCAT_OPENSSL_OPTS.append("compress=none")
88 e90739d6 Michael Hanselmann
89 0559f745 Michael Hanselmann
SOCAT_OPTION_MAXLEN = 400
90 0559f745 Michael Hanselmann
91 34c9ee7b Michael Hanselmann
(PROG_OTHER,
92 c08d76f5 Michael Hanselmann
 PROG_SOCAT,
93 c08d76f5 Michael Hanselmann
 PROG_DD,
94 f9323011 Michael Hanselmann
 PROG_DD_PID,
95 f9323011 Michael Hanselmann
 PROG_EXP_SIZE) = range(1, 6)
96 34c9ee7b Michael Hanselmann
PROG_ALL = frozenset([
97 34c9ee7b Michael Hanselmann
  PROG_OTHER,
98 34c9ee7b Michael Hanselmann
  PROG_SOCAT,
99 c08d76f5 Michael Hanselmann
  PROG_DD,
100 c08d76f5 Michael Hanselmann
  PROG_DD_PID,
101 f9323011 Michael Hanselmann
  PROG_EXP_SIZE,
102 34c9ee7b Michael Hanselmann
  ])
103 34c9ee7b Michael Hanselmann
104 bb44b1ae Michael Hanselmann
105 bb44b1ae Michael Hanselmann
class CommandBuilder(object):
106 c08d76f5 Michael Hanselmann
  def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
107 bb44b1ae Michael Hanselmann
    """Initializes this class.
108 bb44b1ae Michael Hanselmann

109 bb44b1ae Michael Hanselmann
    @param mode: Daemon mode (import or export)
110 bb44b1ae Michael Hanselmann
    @param opts: Options object
111 bb44b1ae Michael Hanselmann
    @type socat_stderr_fd: int
112 bb44b1ae Michael Hanselmann
    @param socat_stderr_fd: File descriptor socat should write its stderr to
113 c08d76f5 Michael Hanselmann
    @type dd_stderr_fd: int
114 c08d76f5 Michael Hanselmann
    @param dd_stderr_fd: File descriptor dd should write its stderr to
115 c08d76f5 Michael Hanselmann
    @type dd_pid_fd: int
116 c08d76f5 Michael Hanselmann
    @param dd_pid_fd: File descriptor the child should write dd's PID to
117 bb44b1ae Michael Hanselmann

118 bb44b1ae Michael Hanselmann
    """
119 bb44b1ae Michael Hanselmann
    self._opts = opts
120 bb44b1ae Michael Hanselmann
    self._mode = mode
121 bb44b1ae Michael Hanselmann
    self._socat_stderr_fd = socat_stderr_fd
122 c08d76f5 Michael Hanselmann
    self._dd_stderr_fd = dd_stderr_fd
123 c08d76f5 Michael Hanselmann
    self._dd_pid_fd = dd_pid_fd
124 bb44b1ae Michael Hanselmann
125 1d3dfa29 Michael Hanselmann
    assert (self._opts.magic is None or
126 1d3dfa29 Michael Hanselmann
            constants.IE_MAGIC_RE.match(self._opts.magic))
127 1d3dfa29 Michael Hanselmann
128 bb44b1ae Michael Hanselmann
  @staticmethod
129 bb44b1ae Michael Hanselmann
  def GetBashCommand(cmd):
130 bb44b1ae Michael Hanselmann
    """Prepares a command to be run in Bash.
131 bb44b1ae Michael Hanselmann

132 bb44b1ae Michael Hanselmann
    """
133 bb44b1ae Michael Hanselmann
    return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
134 bb44b1ae Michael Hanselmann
135 bb44b1ae Michael Hanselmann
  def _GetSocatCommand(self):
136 bb44b1ae Michael Hanselmann
    """Returns the socat command.
137 bb44b1ae Michael Hanselmann

138 bb44b1ae Michael Hanselmann
    """
139 bb44b1ae Michael Hanselmann
    common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
140 bb44b1ae Michael Hanselmann
      "key=%s" % self._opts.key,
141 bb44b1ae Michael Hanselmann
      "cert=%s" % self._opts.cert,
142 bb44b1ae Michael Hanselmann
      "cafile=%s" % self._opts.ca,
143 bb44b1ae Michael Hanselmann
      ]
144 bb44b1ae Michael Hanselmann
145 bb44b1ae Michael Hanselmann
    if self._opts.bind is not None:
146 bb44b1ae Michael Hanselmann
      common_addr_opts.append("bind=%s" % self._opts.bind)
147 bb44b1ae Michael Hanselmann
148 58bb385c Michael Hanselmann
    assert not (self._opts.ipv4 and self._opts.ipv6)
149 58bb385c Michael Hanselmann
150 58bb385c Michael Hanselmann
    if self._opts.ipv4:
151 58bb385c Michael Hanselmann
      common_addr_opts.append("pf=ipv4")
152 58bb385c Michael Hanselmann
    elif self._opts.ipv6:
153 58bb385c Michael Hanselmann
      common_addr_opts.append("pf=ipv6")
154 58bb385c Michael Hanselmann
155 bb44b1ae Michael Hanselmann
    if self._mode == constants.IEM_IMPORT:
156 bb44b1ae Michael Hanselmann
      if self._opts.port is None:
157 bb44b1ae Michael Hanselmann
        port = 0
158 bb44b1ae Michael Hanselmann
      else:
159 bb44b1ae Michael Hanselmann
        port = self._opts.port
160 bb44b1ae Michael Hanselmann
161 bb44b1ae Michael Hanselmann
      addr1 = [
162 bb44b1ae Michael Hanselmann
        "OPENSSL-LISTEN:%s" % port,
163 bb44b1ae Michael Hanselmann
        "reuseaddr",
164 bb44b1ae Michael Hanselmann
165 bb44b1ae Michael Hanselmann
        # Retry to listen if connection wasn't established successfully, up to
166 bb44b1ae Michael Hanselmann
        # 100 times a second. Note that this still leaves room for DoS attacks.
167 bb44b1ae Michael Hanselmann
        "forever",
168 bb44b1ae Michael Hanselmann
        "intervall=0.01",
169 bb44b1ae Michael Hanselmann
        ] + common_addr_opts
170 bb44b1ae Michael Hanselmann
      addr2 = ["stdout"]
171 bb44b1ae Michael Hanselmann
172 bb44b1ae Michael Hanselmann
    elif self._mode == constants.IEM_EXPORT:
173 58bb385c Michael Hanselmann
      if self._opts.host and netutils.IP6Address.IsValid(self._opts.host):
174 58bb385c Michael Hanselmann
        host = "[%s]" % self._opts.host
175 58bb385c Michael Hanselmann
      else:
176 58bb385c Michael Hanselmann
        host = self._opts.host
177 58bb385c Michael Hanselmann
178 bb44b1ae Michael Hanselmann
      addr1 = ["stdin"]
179 bb44b1ae Michael Hanselmann
      addr2 = [
180 58bb385c Michael Hanselmann
        "OPENSSL:%s:%s" % (host, self._opts.port),
181 bb44b1ae Michael Hanselmann
182 bb44b1ae Michael Hanselmann
        # How long to wait per connection attempt
183 bb44b1ae Michael Hanselmann
        "connect-timeout=%s" % self._opts.connect_timeout,
184 bb44b1ae Michael Hanselmann
185 bb44b1ae Michael Hanselmann
        # Retry a few times before giving up to connect (once per second)
186 bb44b1ae Michael Hanselmann
        "retry=%s" % self._opts.connect_retries,
187 bb44b1ae Michael Hanselmann
        "intervall=1",
188 bb44b1ae Michael Hanselmann
        ] + common_addr_opts
189 bb44b1ae Michael Hanselmann
190 bb44b1ae Michael Hanselmann
    else:
191 bb44b1ae Michael Hanselmann
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
192 bb44b1ae Michael Hanselmann
193 bb44b1ae Michael Hanselmann
    for i in [addr1, addr2]:
194 bb44b1ae Michael Hanselmann
      for value in i:
195 0559f745 Michael Hanselmann
        if len(value) > SOCAT_OPTION_MAXLEN:
196 0559f745 Michael Hanselmann
          raise errors.GenericError("Socat option longer than %s"
197 0559f745 Michael Hanselmann
                                    " characters: %r" %
198 0559f745 Michael Hanselmann
                                    (SOCAT_OPTION_MAXLEN, value))
199 bb44b1ae Michael Hanselmann
        if "," in value:
200 bb44b1ae Michael Hanselmann
          raise errors.GenericError("Comma not allowed in socat option"
201 bb44b1ae Michael Hanselmann
                                    " value: %r" % value)
202 bb44b1ae Michael Hanselmann
203 bb44b1ae Michael Hanselmann
    return [
204 bb44b1ae Michael Hanselmann
      constants.SOCAT_PATH,
205 bb44b1ae Michael Hanselmann
206 bb44b1ae Michael Hanselmann
      # Log to stderr
207 bb44b1ae Michael Hanselmann
      "-ls",
208 bb44b1ae Michael Hanselmann
209 bb44b1ae Michael Hanselmann
      # Log level
210 bb44b1ae Michael Hanselmann
      "-d", "-d",
211 bb44b1ae Michael Hanselmann
212 bb44b1ae Michael Hanselmann
      # Buffer size
213 bb44b1ae Michael Hanselmann
      "-b%s" % BUFSIZE,
214 bb44b1ae Michael Hanselmann
215 bb44b1ae Michael Hanselmann
      # Unidirectional mode, the first address is only used for reading, and the
216 bb44b1ae Michael Hanselmann
      # second address is only used for writing
217 bb44b1ae Michael Hanselmann
      "-u",
218 bb44b1ae Michael Hanselmann
219 bb44b1ae Michael Hanselmann
      ",".join(addr1), ",".join(addr2)
220 bb44b1ae Michael Hanselmann
      ]
221 bb44b1ae Michael Hanselmann
222 1d3dfa29 Michael Hanselmann
  def _GetMagicCommand(self):
223 1d3dfa29 Michael Hanselmann
    """Returns the command to read/write the magic value.
224 1d3dfa29 Michael Hanselmann

225 1d3dfa29 Michael Hanselmann
    """
226 1d3dfa29 Michael Hanselmann
    if not self._opts.magic:
227 1d3dfa29 Michael Hanselmann
      return None
228 1d3dfa29 Michael Hanselmann
229 1d3dfa29 Michael Hanselmann
    # Prefix to ensure magic isn't interpreted as option to "echo"
230 1d3dfa29 Michael Hanselmann
    magic = "M=%s" % self._opts.magic
231 1d3dfa29 Michael Hanselmann
232 1d3dfa29 Michael Hanselmann
    cmd = StringIO()
233 1d3dfa29 Michael Hanselmann
234 1d3dfa29 Michael Hanselmann
    if self._mode == constants.IEM_IMPORT:
235 1d3dfa29 Michael Hanselmann
      cmd.write("{ ")
236 1d3dfa29 Michael Hanselmann
      cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"]))
237 1d3dfa29 Michael Hanselmann
      cmd.write(" && ")
238 1d3dfa29 Michael Hanselmann
      cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic))
239 1d3dfa29 Michael Hanselmann
      cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch"))
240 1d3dfa29 Michael Hanselmann
      cmd.write(" exit 1;")
241 1d3dfa29 Michael Hanselmann
      cmd.write("fi;")
242 1d3dfa29 Michael Hanselmann
      cmd.write(" }")
243 1d3dfa29 Michael Hanselmann
244 1d3dfa29 Michael Hanselmann
    elif self._mode == constants.IEM_EXPORT:
245 1d3dfa29 Michael Hanselmann
      cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic]))
246 1d3dfa29 Michael Hanselmann
247 1d3dfa29 Michael Hanselmann
    else:
248 1d3dfa29 Michael Hanselmann
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
249 1d3dfa29 Michael Hanselmann
250 1d3dfa29 Michael Hanselmann
    return cmd.getvalue()
251 1d3dfa29 Michael Hanselmann
252 fbb6b864 Michael Hanselmann
  def _GetDdCommand(self):
253 fbb6b864 Michael Hanselmann
    """Returns the command for measuring throughput.
254 bb44b1ae Michael Hanselmann

255 bb44b1ae Michael Hanselmann
    """
256 c08d76f5 Michael Hanselmann
    dd_cmd = StringIO()
257 1d3dfa29 Michael Hanselmann
258 1d3dfa29 Michael Hanselmann
    magic_cmd = self._GetMagicCommand()
259 1d3dfa29 Michael Hanselmann
    if magic_cmd:
260 1d3dfa29 Michael Hanselmann
      dd_cmd.write("{ ")
261 1d3dfa29 Michael Hanselmann
      dd_cmd.write(magic_cmd)
262 1d3dfa29 Michael Hanselmann
      dd_cmd.write(" && ")
263 1d3dfa29 Michael Hanselmann
264 1d3dfa29 Michael Hanselmann
    dd_cmd.write("{ ")
265 2ed0e208 Iustin Pop
    # Setting LC_ALL since we want to parse the output and explicitly
266 2ed0e208 Iustin Pop
    # redirecting stdin, as the background process (dd) would have
267 2ed0e208 Iustin Pop
    # /dev/null as stdin otherwise
268 1d3dfa29 Michael Hanselmann
    dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
269 c08d76f5 Michael Hanselmann
                 (BUFSIZE, self._dd_stderr_fd))
270 c08d76f5 Michael Hanselmann
    # Send PID to daemon
271 c08d76f5 Michael Hanselmann
    dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
272 c08d76f5 Michael Hanselmann
    # And wait for dd
273 c08d76f5 Michael Hanselmann
    dd_cmd.write(" wait $pid;")
274 c08d76f5 Michael Hanselmann
    dd_cmd.write(" }")
275 1d3dfa29 Michael Hanselmann
276 1d3dfa29 Michael Hanselmann
    if magic_cmd:
277 1d3dfa29 Michael Hanselmann
      dd_cmd.write(" }")
278 1d3dfa29 Michael Hanselmann
279 fbb6b864 Michael Hanselmann
    return dd_cmd.getvalue()
280 fbb6b864 Michael Hanselmann
281 fbb6b864 Michael Hanselmann
  def _GetTransportCommand(self):
282 fbb6b864 Michael Hanselmann
    """Returns the command for the transport part of the daemon.
283 fbb6b864 Michael Hanselmann

284 fbb6b864 Michael Hanselmann
    """
285 fbb6b864 Michael Hanselmann
    socat_cmd = ("%s 2>&%d" %
286 fbb6b864 Michael Hanselmann
                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
287 fbb6b864 Michael Hanselmann
                  self._socat_stderr_fd))
288 fbb6b864 Michael Hanselmann
    dd_cmd = self._GetDdCommand()
289 c08d76f5 Michael Hanselmann
290 bb44b1ae Michael Hanselmann
    compr = self._opts.compress
291 bb44b1ae Michael Hanselmann
292 bb44b1ae Michael Hanselmann
    assert compr in constants.IEC_ALL
293 bb44b1ae Michael Hanselmann
294 fbb6b864 Michael Hanselmann
    parts = []
295 fbb6b864 Michael Hanselmann
296 bb44b1ae Michael Hanselmann
    if self._mode == constants.IEM_IMPORT:
297 fbb6b864 Michael Hanselmann
      parts.append(socat_cmd)
298 fbb6b864 Michael Hanselmann
299 bb44b1ae Michael Hanselmann
      if compr == constants.IEC_GZIP:
300 fbb6b864 Michael Hanselmann
        parts.append("gunzip -c")
301 fbb6b864 Michael Hanselmann
302 fbb6b864 Michael Hanselmann
      parts.append(dd_cmd)
303 c08d76f5 Michael Hanselmann
304 bb44b1ae Michael Hanselmann
    elif self._mode == constants.IEM_EXPORT:
305 fbb6b864 Michael Hanselmann
      parts.append(dd_cmd)
306 fbb6b864 Michael Hanselmann
307 bb44b1ae Michael Hanselmann
      if compr == constants.IEC_GZIP:
308 fbb6b864 Michael Hanselmann
        parts.append("gzip -c")
309 fbb6b864 Michael Hanselmann
310 fbb6b864 Michael Hanselmann
      parts.append(socat_cmd)
311 c08d76f5 Michael Hanselmann
312 bb44b1ae Michael Hanselmann
    else:
313 bb44b1ae Michael Hanselmann
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
314 bb44b1ae Michael Hanselmann
315 bb44b1ae Michael Hanselmann
    # TODO: Run transport as separate user
316 bb44b1ae Michael Hanselmann
    # The transport uses its own shell to simplify running it as a separate user
317 bb44b1ae Michael Hanselmann
    # in the future.
318 fbb6b864 Michael Hanselmann
    return self.GetBashCommand(" | ".join(parts))
319 bb44b1ae Michael Hanselmann
320 bb44b1ae Michael Hanselmann
  def GetCommand(self):
321 bb44b1ae Michael Hanselmann
    """Returns the complete child process command.
322 bb44b1ae Michael Hanselmann

323 bb44b1ae Michael Hanselmann
    """
324 bb44b1ae Michael Hanselmann
    transport_cmd = self._GetTransportCommand()
325 bb44b1ae Michael Hanselmann
326 bb44b1ae Michael Hanselmann
    buf = StringIO()
327 bb44b1ae Michael Hanselmann
328 bb44b1ae Michael Hanselmann
    if self._opts.cmd_prefix:
329 bb44b1ae Michael Hanselmann
      buf.write(self._opts.cmd_prefix)
330 bb44b1ae Michael Hanselmann
      buf.write(" ")
331 bb44b1ae Michael Hanselmann
332 bb44b1ae Michael Hanselmann
    buf.write(utils.ShellQuoteArgs(transport_cmd))
333 bb44b1ae Michael Hanselmann
334 bb44b1ae Michael Hanselmann
    if self._opts.cmd_suffix:
335 bb44b1ae Michael Hanselmann
      buf.write(" ")
336 bb44b1ae Michael Hanselmann
      buf.write(self._opts.cmd_suffix)
337 bb44b1ae Michael Hanselmann
338 bb44b1ae Michael Hanselmann
    return self.GetBashCommand(buf.getvalue())
339 34c9ee7b Michael Hanselmann
340 34c9ee7b Michael Hanselmann
341 34c9ee7b Michael Hanselmann
def _VerifyListening(family, address, port):
342 34c9ee7b Michael Hanselmann
  """Verify address given as listening address by socat.
343 34c9ee7b Michael Hanselmann

344 34c9ee7b Michael Hanselmann
  """
345 58bb385c Michael Hanselmann
  if family not in (socket.AF_INET, socket.AF_INET6):
346 34c9ee7b Michael Hanselmann
    raise errors.GenericError("Address family %r not supported" % family)
347 34c9ee7b Michael Hanselmann
348 58bb385c Michael Hanselmann
  if (family == socket.AF_INET6 and address.startswith("[") and
349 58bb385c Michael Hanselmann
      address.endswith("]")):
350 58bb385c Michael Hanselmann
    address = address.lstrip("[").rstrip("]")
351 58bb385c Michael Hanselmann
352 34c9ee7b Michael Hanselmann
  try:
353 34c9ee7b Michael Hanselmann
    packed_address = socket.inet_pton(family, address)
354 34c9ee7b Michael Hanselmann
  except socket.error:
355 34c9ee7b Michael Hanselmann
    raise errors.GenericError("Invalid address %r for family %s" %
356 34c9ee7b Michael Hanselmann
                              (address, family))
357 34c9ee7b Michael Hanselmann
358 34c9ee7b Michael Hanselmann
  return (socket.inet_ntop(family, packed_address), port)
359 34c9ee7b Michael Hanselmann
360 34c9ee7b Michael Hanselmann
361 34c9ee7b Michael Hanselmann
class ChildIOProcessor(object):
362 f9323011 Michael Hanselmann
  def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
363 34c9ee7b Michael Hanselmann
    """Initializes this class.
364 34c9ee7b Michael Hanselmann

365 34c9ee7b Michael Hanselmann
    """
366 34c9ee7b Michael Hanselmann
    self._debug = debug
367 34c9ee7b Michael Hanselmann
    self._status_file = status_file
368 34c9ee7b Michael Hanselmann
    self._logger = logger
369 34c9ee7b Michael Hanselmann
370 34c9ee7b Michael Hanselmann
    self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
371 34c9ee7b Michael Hanselmann
                           for prog in PROG_ALL])
372 34c9ee7b Michael Hanselmann
373 c08d76f5 Michael Hanselmann
    self._dd_pid = None
374 c08d76f5 Michael Hanselmann
    self._dd_ready = False
375 c08d76f5 Michael Hanselmann
    self._dd_tp_samples = throughput_samples
376 c08d76f5 Michael Hanselmann
    self._dd_progress = []
377 c08d76f5 Michael Hanselmann
378 c08d76f5 Michael Hanselmann
    # Expected size of transferred data
379 f9323011 Michael Hanselmann
    self._exp_size = exp_size
380 c08d76f5 Michael Hanselmann
381 34c9ee7b Michael Hanselmann
  def GetLineSplitter(self, prog):
382 34c9ee7b Michael Hanselmann
    """Returns the line splitter for a program.
383 34c9ee7b Michael Hanselmann

384 34c9ee7b Michael Hanselmann
    """
385 34c9ee7b Michael Hanselmann
    return self._splitter[prog]
386 34c9ee7b Michael Hanselmann
387 34c9ee7b Michael Hanselmann
  def FlushAll(self):
388 34c9ee7b Michael Hanselmann
    """Flushes all line splitters.
389 34c9ee7b Michael Hanselmann

390 34c9ee7b Michael Hanselmann
    """
391 34c9ee7b Michael Hanselmann
    for ls in self._splitter.itervalues():
392 34c9ee7b Michael Hanselmann
      ls.flush()
393 34c9ee7b Michael Hanselmann
394 34c9ee7b Michael Hanselmann
  def CloseAll(self):
395 34c9ee7b Michael Hanselmann
    """Closes all line splitters.
396 34c9ee7b Michael Hanselmann

397 34c9ee7b Michael Hanselmann
    """
398 34c9ee7b Michael Hanselmann
    for ls in self._splitter.itervalues():
399 34c9ee7b Michael Hanselmann
      ls.close()
400 34c9ee7b Michael Hanselmann
    self._splitter.clear()
401 34c9ee7b Michael Hanselmann
402 c08d76f5 Michael Hanselmann
  def NotifyDd(self):
403 c08d76f5 Michael Hanselmann
    """Tells dd(1) to write statistics.
404 c08d76f5 Michael Hanselmann

405 c08d76f5 Michael Hanselmann
    """
406 c08d76f5 Michael Hanselmann
    if self._dd_pid is None:
407 c08d76f5 Michael Hanselmann
      # Can't notify
408 c08d76f5 Michael Hanselmann
      return False
409 c08d76f5 Michael Hanselmann
410 c08d76f5 Michael Hanselmann
    if not self._dd_ready:
411 c08d76f5 Michael Hanselmann
      # There's a race condition between starting the program and sending
412 c08d76f5 Michael Hanselmann
      # signals.  The signal handler is only registered after some time, so we
413 c08d76f5 Michael Hanselmann
      # have to check whether the program is ready. If it isn't, sending a
414 c08d76f5 Michael Hanselmann
      # signal will invoke the default handler (and usually abort the program).
415 c08d76f5 Michael Hanselmann
      if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
416 c08d76f5 Michael Hanselmann
        logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
417 c08d76f5 Michael Hanselmann
        return False
418 c08d76f5 Michael Hanselmann
419 c08d76f5 Michael Hanselmann
      logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
420 c08d76f5 Michael Hanselmann
      self._dd_ready = True
421 c08d76f5 Michael Hanselmann
422 c08d76f5 Michael Hanselmann
    logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
423 c08d76f5 Michael Hanselmann
    try:
424 c08d76f5 Michael Hanselmann
      os.kill(self._dd_pid, DD_INFO_SIGNAL)
425 c08d76f5 Michael Hanselmann
    except EnvironmentError, err:
426 c08d76f5 Michael Hanselmann
      if err.errno != errno.ESRCH:
427 c08d76f5 Michael Hanselmann
        raise
428 c08d76f5 Michael Hanselmann
429 c08d76f5 Michael Hanselmann
      # Process no longer exists
430 560cbec1 Michael Hanselmann
      logging.debug("dd exited")
431 c08d76f5 Michael Hanselmann
      self._dd_pid = None
432 c08d76f5 Michael Hanselmann
433 c08d76f5 Michael Hanselmann
    return True
434 c08d76f5 Michael Hanselmann
435 34c9ee7b Michael Hanselmann
  def _ProcessOutput(self, line, prog):
436 34c9ee7b Michael Hanselmann
    """Takes care of child process output.
437 34c9ee7b Michael Hanselmann

438 34c9ee7b Michael Hanselmann
    @type line: string
439 34c9ee7b Michael Hanselmann
    @param line: Child output line
440 34c9ee7b Michael Hanselmann
    @type prog: number
441 34c9ee7b Michael Hanselmann
    @param prog: Program from which the line originates
442 34c9ee7b Michael Hanselmann

443 34c9ee7b Michael Hanselmann
    """
444 34c9ee7b Michael Hanselmann
    force_update = False
445 34c9ee7b Michael Hanselmann
    forward_line = line
446 34c9ee7b Michael Hanselmann
447 34c9ee7b Michael Hanselmann
    if prog == PROG_SOCAT:
448 34c9ee7b Michael Hanselmann
      level = None
449 34c9ee7b Michael Hanselmann
      parts = line.split(None, 4)
450 34c9ee7b Michael Hanselmann
451 34c9ee7b Michael Hanselmann
      if len(parts) == 5:
452 34c9ee7b Michael Hanselmann
        (_, _, _, level, msg) = parts
453 34c9ee7b Michael Hanselmann
454 34c9ee7b Michael Hanselmann
        force_update = self._ProcessSocatOutput(self._status_file, level, msg)
455 34c9ee7b Michael Hanselmann
456 34c9ee7b Michael Hanselmann
        if self._debug or (level and level not in SOCAT_LOG_IGNORE):
457 34c9ee7b Michael Hanselmann
          forward_line = "socat: %s %s" % (level, msg)
458 34c9ee7b Michael Hanselmann
        else:
459 34c9ee7b Michael Hanselmann
          forward_line = None
460 34c9ee7b Michael Hanselmann
      else:
461 34c9ee7b Michael Hanselmann
        forward_line = "socat: %s" % line
462 34c9ee7b Michael Hanselmann
463 c08d76f5 Michael Hanselmann
    elif prog == PROG_DD:
464 c08d76f5 Michael Hanselmann
      (should_forward, force_update) = self._ProcessDdOutput(line)
465 c08d76f5 Michael Hanselmann
466 c08d76f5 Michael Hanselmann
      if should_forward or self._debug:
467 c08d76f5 Michael Hanselmann
        forward_line = "dd: %s" % line
468 c08d76f5 Michael Hanselmann
      else:
469 c08d76f5 Michael Hanselmann
        forward_line = None
470 c08d76f5 Michael Hanselmann
471 c08d76f5 Michael Hanselmann
    elif prog == PROG_DD_PID:
472 c08d76f5 Michael Hanselmann
      if self._dd_pid:
473 c08d76f5 Michael Hanselmann
        raise RuntimeError("dd PID reported more than once")
474 c08d76f5 Michael Hanselmann
      logging.debug("Received dd PID %r", line)
475 c08d76f5 Michael Hanselmann
      self._dd_pid = int(line)
476 c08d76f5 Michael Hanselmann
      forward_line = None
477 c08d76f5 Michael Hanselmann
478 f9323011 Michael Hanselmann
    elif prog == PROG_EXP_SIZE:
479 f9323011 Michael Hanselmann
      logging.debug("Received predicted size %r", line)
480 f9323011 Michael Hanselmann
      forward_line = None
481 f9323011 Michael Hanselmann
482 f9323011 Michael Hanselmann
      if line:
483 f9323011 Michael Hanselmann
        try:
484 f9323011 Michael Hanselmann
          exp_size = utils.BytesToMebibyte(int(line))
485 f9323011 Michael Hanselmann
        except (ValueError, TypeError), err:
486 f9323011 Michael Hanselmann
          logging.error("Failed to convert predicted size %r to number: %s",
487 f9323011 Michael Hanselmann
                        line, err)
488 f9323011 Michael Hanselmann
          exp_size = None
489 f9323011 Michael Hanselmann
      else:
490 f9323011 Michael Hanselmann
        exp_size = None
491 f9323011 Michael Hanselmann
492 f9323011 Michael Hanselmann
      self._exp_size = exp_size
493 f9323011 Michael Hanselmann
494 34c9ee7b Michael Hanselmann
    if forward_line:
495 34c9ee7b Michael Hanselmann
      self._logger.info(forward_line)
496 34c9ee7b Michael Hanselmann
      self._status_file.AddRecentOutput(forward_line)
497 34c9ee7b Michael Hanselmann
498 34c9ee7b Michael Hanselmann
    self._status_file.Update(force_update)
499 34c9ee7b Michael Hanselmann
500 34c9ee7b Michael Hanselmann
  @staticmethod
501 34c9ee7b Michael Hanselmann
  def _ProcessSocatOutput(status_file, level, msg):
502 34c9ee7b Michael Hanselmann
    """Interprets socat log output.
503 34c9ee7b Michael Hanselmann

504 34c9ee7b Michael Hanselmann
    """
505 34c9ee7b Michael Hanselmann
    if level == SOCAT_LOG_NOTICE:
506 34c9ee7b Michael Hanselmann
      if status_file.GetListenPort() is None:
507 34c9ee7b Michael Hanselmann
        # TODO: Maybe implement timeout to not listen forever
508 34c9ee7b Michael Hanselmann
        m = LISTENING_RE.match(msg)
509 34c9ee7b Michael Hanselmann
        if m:
510 34c9ee7b Michael Hanselmann
          (_, port) = _VerifyListening(int(m.group("family")),
511 34c9ee7b Michael Hanselmann
                                       m.group("address"),
512 34c9ee7b Michael Hanselmann
                                       int(m.group("port")))
513 34c9ee7b Michael Hanselmann
514 34c9ee7b Michael Hanselmann
          status_file.SetListenPort(port)
515 34c9ee7b Michael Hanselmann
          return True
516 34c9ee7b Michael Hanselmann
517 34c9ee7b Michael Hanselmann
      if not status_file.GetConnected():
518 34c9ee7b Michael Hanselmann
        m = TRANSFER_LOOP_RE.match(msg)
519 34c9ee7b Michael Hanselmann
        if m:
520 53dbf14c Michael Hanselmann
          logging.debug("Connection established")
521 34c9ee7b Michael Hanselmann
          status_file.SetConnected()
522 34c9ee7b Michael Hanselmann
          return True
523 34c9ee7b Michael Hanselmann
524 34c9ee7b Michael Hanselmann
    return False
525 c08d76f5 Michael Hanselmann
526 c08d76f5 Michael Hanselmann
  def _ProcessDdOutput(self, line):
527 c08d76f5 Michael Hanselmann
    """Interprets a line of dd(1)'s output.
528 c08d76f5 Michael Hanselmann

529 c08d76f5 Michael Hanselmann
    """
530 c08d76f5 Michael Hanselmann
    m = DD_INFO_RE.match(line)
531 c08d76f5 Michael Hanselmann
    if m:
532 c08d76f5 Michael Hanselmann
      seconds = float(m.group("seconds"))
533 c08d76f5 Michael Hanselmann
      mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
534 c08d76f5 Michael Hanselmann
      self._UpdateDdProgress(seconds, mbytes)
535 c08d76f5 Michael Hanselmann
      return (False, True)
536 c08d76f5 Michael Hanselmann
537 c08d76f5 Michael Hanselmann
    m = DD_STDERR_IGNORE.match(line)
538 c08d76f5 Michael Hanselmann
    if m:
539 c08d76f5 Michael Hanselmann
      # Ignore
540 c08d76f5 Michael Hanselmann
      return (False, False)
541 c08d76f5 Michael Hanselmann
542 c08d76f5 Michael Hanselmann
    # Forward line
543 c08d76f5 Michael Hanselmann
    return (True, False)
544 c08d76f5 Michael Hanselmann
545 c08d76f5 Michael Hanselmann
  def _UpdateDdProgress(self, seconds, mbytes):
546 c08d76f5 Michael Hanselmann
    """Updates the internal status variables for dd(1) progress.
547 c08d76f5 Michael Hanselmann

548 c08d76f5 Michael Hanselmann
    @type seconds: float
549 c08d76f5 Michael Hanselmann
    @param seconds: Timestamp of this update
550 c08d76f5 Michael Hanselmann
    @type mbytes: float
551 c08d76f5 Michael Hanselmann
    @param mbytes: Total number of MiB transferred so far
552 c08d76f5 Michael Hanselmann

553 c08d76f5 Michael Hanselmann
    """
554 c08d76f5 Michael Hanselmann
    # Add latest sample
555 c08d76f5 Michael Hanselmann
    self._dd_progress.append((seconds, mbytes))
556 c08d76f5 Michael Hanselmann
557 c08d76f5 Michael Hanselmann
    # Remove old samples
558 c08d76f5 Michael Hanselmann
    del self._dd_progress[:-self._dd_tp_samples]
559 c08d76f5 Michael Hanselmann
560 c08d76f5 Michael Hanselmann
    # Calculate throughput
561 c08d76f5 Michael Hanselmann
    throughput = _CalcThroughput(self._dd_progress)
562 c08d76f5 Michael Hanselmann
563 c08d76f5 Michael Hanselmann
    # Calculate percent and ETA
564 c08d76f5 Michael Hanselmann
    percent = None
565 c08d76f5 Michael Hanselmann
    eta = None
566 c08d76f5 Michael Hanselmann
567 c08d76f5 Michael Hanselmann
    if self._exp_size is not None:
568 c08d76f5 Michael Hanselmann
      if self._exp_size != 0:
569 c08d76f5 Michael Hanselmann
        percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
570 c08d76f5 Michael Hanselmann
571 c08d76f5 Michael Hanselmann
      if throughput:
572 c08d76f5 Michael Hanselmann
        eta = max(0, float(self._exp_size - mbytes) / throughput)
573 c08d76f5 Michael Hanselmann
574 c08d76f5 Michael Hanselmann
    self._status_file.SetProgress(mbytes, throughput, percent, eta)
575 c08d76f5 Michael Hanselmann
576 c08d76f5 Michael Hanselmann
577 c08d76f5 Michael Hanselmann
def _CalcThroughput(samples):
578 c08d76f5 Michael Hanselmann
  """Calculates the throughput in MiB/second.
579 c08d76f5 Michael Hanselmann

580 c08d76f5 Michael Hanselmann
  @type samples: sequence
581 c08d76f5 Michael Hanselmann
  @param samples: List of samples, each consisting of a (timestamp, mbytes)
582 c08d76f5 Michael Hanselmann
                  tuple
583 c08d76f5 Michael Hanselmann
  @rtype: float or None
584 c08d76f5 Michael Hanselmann
  @return: Throughput in MiB/second
585 c08d76f5 Michael Hanselmann

586 c08d76f5 Michael Hanselmann
  """
587 c08d76f5 Michael Hanselmann
  if len(samples) < 2:
588 c08d76f5 Michael Hanselmann
    # Can't calculate throughput
589 c08d76f5 Michael Hanselmann
    return None
590 c08d76f5 Michael Hanselmann
591 c08d76f5 Michael Hanselmann
  (start_time, start_mbytes) = samples[0]
592 c08d76f5 Michael Hanselmann
  (end_time, end_mbytes) = samples[-1]
593 c08d76f5 Michael Hanselmann
594 c08d76f5 Michael Hanselmann
  return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)