Statistics
| Branch: | Tag: | Revision:

root / lib / impexpd / __init__.py @ b43dcc5a

History | View | Annotate | Download (15.5 kB)

1 bb44b1ae Michael Hanselmann
#
2 bb44b1ae Michael Hanselmann
#
3 bb44b1ae Michael Hanselmann
4 bb44b1ae Michael Hanselmann
# Copyright (C) 2010 Google Inc.
5 bb44b1ae Michael Hanselmann
#
6 bb44b1ae Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 bb44b1ae Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 bb44b1ae Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 bb44b1ae Michael Hanselmann
# (at your option) any later version.
10 bb44b1ae Michael Hanselmann
#
11 bb44b1ae Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 bb44b1ae Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 bb44b1ae Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 bb44b1ae Michael Hanselmann
# General Public License for more details.
15 bb44b1ae Michael Hanselmann
#
16 bb44b1ae Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 bb44b1ae Michael Hanselmann
# along with this program; if not, write to the Free Software
18 bb44b1ae Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 bb44b1ae Michael Hanselmann
# 02110-1301, USA.
20 bb44b1ae Michael Hanselmann
21 bb44b1ae Michael Hanselmann
22 bb44b1ae Michael Hanselmann
"""Classes and functions for import/export daemon.
23 bb44b1ae Michael Hanselmann

24 bb44b1ae Michael Hanselmann
"""
25 bb44b1ae Michael Hanselmann
26 c08d76f5 Michael Hanselmann
import os
27 34c9ee7b Michael Hanselmann
import re
28 34c9ee7b Michael Hanselmann
import socket
29 53dbf14c Michael Hanselmann
import logging
30 c08d76f5 Michael Hanselmann
import signal
31 c08d76f5 Michael Hanselmann
import errno
32 c08d76f5 Michael Hanselmann
import time
33 bb44b1ae Michael Hanselmann
from cStringIO import StringIO
34 bb44b1ae Michael Hanselmann
35 bb44b1ae Michael Hanselmann
from ganeti import constants
36 bb44b1ae Michael Hanselmann
from ganeti import errors
37 bb44b1ae Michael Hanselmann
from ganeti import utils
38 bb44b1ae Michael Hanselmann
39 bb44b1ae Michael Hanselmann
40 34c9ee7b Michael Hanselmann
#: Used to recognize point at which socat(1) starts to listen on its socket.
41 34c9ee7b Michael Hanselmann
#: The local address is required for the remote peer to connect (in particular
42 34c9ee7b Michael Hanselmann
#: the port number).
43 34c9ee7b Michael Hanselmann
LISTENING_RE = re.compile(r"^listening on\s+"
44 34c9ee7b Michael Hanselmann
                          r"AF=(?P<family>\d+)\s+"
45 34c9ee7b Michael Hanselmann
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
46 34c9ee7b Michael Hanselmann
47 34c9ee7b Michael Hanselmann
#: Used to recognize point at which socat(1) is sending data over the wire
48 34c9ee7b Michael Hanselmann
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
49 34c9ee7b Michael Hanselmann
                              re.I)
50 34c9ee7b Michael Hanselmann
51 34c9ee7b Michael Hanselmann
SOCAT_LOG_DEBUG = "D"
52 34c9ee7b Michael Hanselmann
SOCAT_LOG_INFO = "I"
53 34c9ee7b Michael Hanselmann
SOCAT_LOG_NOTICE = "N"
54 34c9ee7b Michael Hanselmann
SOCAT_LOG_WARNING = "W"
55 34c9ee7b Michael Hanselmann
SOCAT_LOG_ERROR = "E"
56 34c9ee7b Michael Hanselmann
SOCAT_LOG_FATAL = "F"
57 34c9ee7b Michael Hanselmann
58 34c9ee7b Michael Hanselmann
SOCAT_LOG_IGNORE = frozenset([
59 34c9ee7b Michael Hanselmann
  SOCAT_LOG_DEBUG,
60 34c9ee7b Michael Hanselmann
  SOCAT_LOG_INFO,
61 34c9ee7b Michael Hanselmann
  SOCAT_LOG_NOTICE,
62 34c9ee7b Michael Hanselmann
  ])
63 34c9ee7b Michael Hanselmann
64 c08d76f5 Michael Hanselmann
#: Used to parse GNU dd(1) statistics
65 c08d76f5 Michael Hanselmann
DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
66 c08d76f5 Michael Hanselmann
                        r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
67 c08d76f5 Michael Hanselmann
68 c08d76f5 Michael Hanselmann
#: Used to ignore "N+N records in/out" on dd(1)'s stderr
69 c08d76f5 Michael Hanselmann
DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
70 c08d76f5 Michael Hanselmann
71 c08d76f5 Michael Hanselmann
#: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
72 c08d76f5 Michael Hanselmann
#: unavailable and SIGUSR1 is used instead)
73 c08d76f5 Michael Hanselmann
DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
74 c08d76f5 Michael Hanselmann
75 bb44b1ae Michael Hanselmann
#: Buffer size: at most this many bytes are transferred at once
76 bb44b1ae Michael Hanselmann
BUFSIZE = 1024 * 1024
77 bb44b1ae Michael Hanselmann
78 bb44b1ae Michael Hanselmann
# Common options for socat
79 bb44b1ae Michael Hanselmann
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
80 971bbd84 Michael Hanselmann
SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1",
81 971bbd84 Michael Hanselmann
                      "cipher=%s" % constants.OPENSSL_CIPHERS]
82 bb44b1ae Michael Hanselmann
83 0559f745 Michael Hanselmann
SOCAT_OPTION_MAXLEN = 400
84 0559f745 Michael Hanselmann
85 34c9ee7b Michael Hanselmann
(PROG_OTHER,
86 c08d76f5 Michael Hanselmann
 PROG_SOCAT,
87 c08d76f5 Michael Hanselmann
 PROG_DD,
88 f9323011 Michael Hanselmann
 PROG_DD_PID,
89 f9323011 Michael Hanselmann
 PROG_EXP_SIZE) = range(1, 6)
90 34c9ee7b Michael Hanselmann
PROG_ALL = frozenset([
91 34c9ee7b Michael Hanselmann
  PROG_OTHER,
92 34c9ee7b Michael Hanselmann
  PROG_SOCAT,
93 c08d76f5 Michael Hanselmann
  PROG_DD,
94 c08d76f5 Michael Hanselmann
  PROG_DD_PID,
95 f9323011 Michael Hanselmann
  PROG_EXP_SIZE,
96 34c9ee7b Michael Hanselmann
  ])
97 34c9ee7b Michael Hanselmann
98 bb44b1ae Michael Hanselmann
99 bb44b1ae Michael Hanselmann
class CommandBuilder(object):
100 c08d76f5 Michael Hanselmann
  def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
101 bb44b1ae Michael Hanselmann
    """Initializes this class.
102 bb44b1ae Michael Hanselmann

103 bb44b1ae Michael Hanselmann
    @param mode: Daemon mode (import or export)
104 bb44b1ae Michael Hanselmann
    @param opts: Options object
105 bb44b1ae Michael Hanselmann
    @type socat_stderr_fd: int
106 bb44b1ae Michael Hanselmann
    @param socat_stderr_fd: File descriptor socat should write its stderr to
107 c08d76f5 Michael Hanselmann
    @type dd_stderr_fd: int
108 c08d76f5 Michael Hanselmann
    @param dd_stderr_fd: File descriptor dd should write its stderr to
109 c08d76f5 Michael Hanselmann
    @type dd_pid_fd: int
110 c08d76f5 Michael Hanselmann
    @param dd_pid_fd: File descriptor the child should write dd's PID to
111 bb44b1ae Michael Hanselmann

112 bb44b1ae Michael Hanselmann
    """
113 bb44b1ae Michael Hanselmann
    self._opts = opts
114 bb44b1ae Michael Hanselmann
    self._mode = mode
115 bb44b1ae Michael Hanselmann
    self._socat_stderr_fd = socat_stderr_fd
116 c08d76f5 Michael Hanselmann
    self._dd_stderr_fd = dd_stderr_fd
117 c08d76f5 Michael Hanselmann
    self._dd_pid_fd = dd_pid_fd
118 bb44b1ae Michael Hanselmann
119 1d3dfa29 Michael Hanselmann
    assert (self._opts.magic is None or
120 1d3dfa29 Michael Hanselmann
            constants.IE_MAGIC_RE.match(self._opts.magic))
121 1d3dfa29 Michael Hanselmann
122 bb44b1ae Michael Hanselmann
  @staticmethod
123 bb44b1ae Michael Hanselmann
  def GetBashCommand(cmd):
124 bb44b1ae Michael Hanselmann
    """Prepares a command to be run in Bash.
125 bb44b1ae Michael Hanselmann

126 bb44b1ae Michael Hanselmann
    """
127 bb44b1ae Michael Hanselmann
    return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
128 bb44b1ae Michael Hanselmann
129 bb44b1ae Michael Hanselmann
  def _GetSocatCommand(self):
130 bb44b1ae Michael Hanselmann
    """Returns the socat command.
131 bb44b1ae Michael Hanselmann

132 bb44b1ae Michael Hanselmann
    """
133 bb44b1ae Michael Hanselmann
    common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
134 bb44b1ae Michael Hanselmann
      "key=%s" % self._opts.key,
135 bb44b1ae Michael Hanselmann
      "cert=%s" % self._opts.cert,
136 bb44b1ae Michael Hanselmann
      "cafile=%s" % self._opts.ca,
137 bb44b1ae Michael Hanselmann
      ]
138 bb44b1ae Michael Hanselmann
139 bb44b1ae Michael Hanselmann
    if self._opts.bind is not None:
140 bb44b1ae Michael Hanselmann
      common_addr_opts.append("bind=%s" % self._opts.bind)
141 bb44b1ae Michael Hanselmann
142 bb44b1ae Michael Hanselmann
    if self._mode == constants.IEM_IMPORT:
143 bb44b1ae Michael Hanselmann
      if self._opts.port is None:
144 bb44b1ae Michael Hanselmann
        port = 0
145 bb44b1ae Michael Hanselmann
      else:
146 bb44b1ae Michael Hanselmann
        port = self._opts.port
147 bb44b1ae Michael Hanselmann
148 bb44b1ae Michael Hanselmann
      addr1 = [
149 bb44b1ae Michael Hanselmann
        "OPENSSL-LISTEN:%s" % port,
150 bb44b1ae Michael Hanselmann
        "reuseaddr",
151 bb44b1ae Michael Hanselmann
152 bb44b1ae Michael Hanselmann
        # Retry to listen if connection wasn't established successfully, up to
153 bb44b1ae Michael Hanselmann
        # 100 times a second. Note that this still leaves room for DoS attacks.
154 bb44b1ae Michael Hanselmann
        "forever",
155 bb44b1ae Michael Hanselmann
        "intervall=0.01",
156 bb44b1ae Michael Hanselmann
        ] + common_addr_opts
157 bb44b1ae Michael Hanselmann
      addr2 = ["stdout"]
158 bb44b1ae Michael Hanselmann
159 bb44b1ae Michael Hanselmann
    elif self._mode == constants.IEM_EXPORT:
160 bb44b1ae Michael Hanselmann
      addr1 = ["stdin"]
161 bb44b1ae Michael Hanselmann
      addr2 = [
162 bb44b1ae Michael Hanselmann
        "OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
163 bb44b1ae Michael Hanselmann
164 bb44b1ae Michael Hanselmann
        # How long to wait per connection attempt
165 bb44b1ae Michael Hanselmann
        "connect-timeout=%s" % self._opts.connect_timeout,
166 bb44b1ae Michael Hanselmann
167 bb44b1ae Michael Hanselmann
        # Retry a few times before giving up to connect (once per second)
168 bb44b1ae Michael Hanselmann
        "retry=%s" % self._opts.connect_retries,
169 bb44b1ae Michael Hanselmann
        "intervall=1",
170 bb44b1ae Michael Hanselmann
        ] + common_addr_opts
171 bb44b1ae Michael Hanselmann
172 bb44b1ae Michael Hanselmann
    else:
173 bb44b1ae Michael Hanselmann
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
174 bb44b1ae Michael Hanselmann
175 bb44b1ae Michael Hanselmann
    for i in [addr1, addr2]:
176 bb44b1ae Michael Hanselmann
      for value in i:
177 0559f745 Michael Hanselmann
        if len(value) > SOCAT_OPTION_MAXLEN:
178 0559f745 Michael Hanselmann
          raise errors.GenericError("Socat option longer than %s"
179 0559f745 Michael Hanselmann
                                    " characters: %r" %
180 0559f745 Michael Hanselmann
                                    (SOCAT_OPTION_MAXLEN, value))
181 bb44b1ae Michael Hanselmann
        if "," in value:
182 bb44b1ae Michael Hanselmann
          raise errors.GenericError("Comma not allowed in socat option"
183 bb44b1ae Michael Hanselmann
                                    " value: %r" % value)
184 bb44b1ae Michael Hanselmann
185 bb44b1ae Michael Hanselmann
    return [
186 bb44b1ae Michael Hanselmann
      constants.SOCAT_PATH,
187 bb44b1ae Michael Hanselmann
188 bb44b1ae Michael Hanselmann
      # Log to stderr
189 bb44b1ae Michael Hanselmann
      "-ls",
190 bb44b1ae Michael Hanselmann
191 bb44b1ae Michael Hanselmann
      # Log level
192 bb44b1ae Michael Hanselmann
      "-d", "-d",
193 bb44b1ae Michael Hanselmann
194 bb44b1ae Michael Hanselmann
      # Buffer size
195 bb44b1ae Michael Hanselmann
      "-b%s" % BUFSIZE,
196 bb44b1ae Michael Hanselmann
197 bb44b1ae Michael Hanselmann
      # Unidirectional mode, the first address is only used for reading, and the
198 bb44b1ae Michael Hanselmann
      # second address is only used for writing
199 bb44b1ae Michael Hanselmann
      "-u",
200 bb44b1ae Michael Hanselmann
201 bb44b1ae Michael Hanselmann
      ",".join(addr1), ",".join(addr2)
202 bb44b1ae Michael Hanselmann
      ]
203 bb44b1ae Michael Hanselmann
204 1d3dfa29 Michael Hanselmann
  def _GetMagicCommand(self):
205 1d3dfa29 Michael Hanselmann
    """Returns the command to read/write the magic value.
206 1d3dfa29 Michael Hanselmann

207 1d3dfa29 Michael Hanselmann
    """
208 1d3dfa29 Michael Hanselmann
    if not self._opts.magic:
209 1d3dfa29 Michael Hanselmann
      return None
210 1d3dfa29 Michael Hanselmann
211 1d3dfa29 Michael Hanselmann
    # Prefix to ensure magic isn't interpreted as option to "echo"
212 1d3dfa29 Michael Hanselmann
    magic = "M=%s" % self._opts.magic
213 1d3dfa29 Michael Hanselmann
214 1d3dfa29 Michael Hanselmann
    cmd = StringIO()
215 1d3dfa29 Michael Hanselmann
216 1d3dfa29 Michael Hanselmann
    if self._mode == constants.IEM_IMPORT:
217 1d3dfa29 Michael Hanselmann
      cmd.write("{ ")
218 1d3dfa29 Michael Hanselmann
      cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"]))
219 1d3dfa29 Michael Hanselmann
      cmd.write(" && ")
220 1d3dfa29 Michael Hanselmann
      cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic))
221 1d3dfa29 Michael Hanselmann
      cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch"))
222 1d3dfa29 Michael Hanselmann
      cmd.write(" exit 1;")
223 1d3dfa29 Michael Hanselmann
      cmd.write("fi;")
224 1d3dfa29 Michael Hanselmann
      cmd.write(" }")
225 1d3dfa29 Michael Hanselmann
226 1d3dfa29 Michael Hanselmann
    elif self._mode == constants.IEM_EXPORT:
227 1d3dfa29 Michael Hanselmann
      cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic]))
228 1d3dfa29 Michael Hanselmann
229 1d3dfa29 Michael Hanselmann
    else:
230 1d3dfa29 Michael Hanselmann
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
231 1d3dfa29 Michael Hanselmann
232 1d3dfa29 Michael Hanselmann
    return cmd.getvalue()
233 1d3dfa29 Michael Hanselmann
234 fbb6b864 Michael Hanselmann
  def _GetDdCommand(self):
235 fbb6b864 Michael Hanselmann
    """Returns the command for measuring throughput.
236 bb44b1ae Michael Hanselmann

237 bb44b1ae Michael Hanselmann
    """
238 c08d76f5 Michael Hanselmann
    dd_cmd = StringIO()
239 1d3dfa29 Michael Hanselmann
240 1d3dfa29 Michael Hanselmann
    magic_cmd = self._GetMagicCommand()
241 1d3dfa29 Michael Hanselmann
    if magic_cmd:
242 1d3dfa29 Michael Hanselmann
      dd_cmd.write("{ ")
243 1d3dfa29 Michael Hanselmann
      dd_cmd.write(magic_cmd)
244 1d3dfa29 Michael Hanselmann
      dd_cmd.write(" && ")
245 1d3dfa29 Michael Hanselmann
246 1d3dfa29 Michael Hanselmann
    dd_cmd.write("{ ")
247 c08d76f5 Michael Hanselmann
    # Setting LC_ALL since we want to parse the output and explicitely
248 c08d76f5 Michael Hanselmann
    # redirecting stdin, as the background process (dd) would have /dev/null as
249 c08d76f5 Michael Hanselmann
    # stdin otherwise
250 1d3dfa29 Michael Hanselmann
    dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
251 c08d76f5 Michael Hanselmann
                 (BUFSIZE, self._dd_stderr_fd))
252 c08d76f5 Michael Hanselmann
    # Send PID to daemon
253 c08d76f5 Michael Hanselmann
    dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
254 c08d76f5 Michael Hanselmann
    # And wait for dd
255 c08d76f5 Michael Hanselmann
    dd_cmd.write(" wait $pid;")
256 c08d76f5 Michael Hanselmann
    dd_cmd.write(" }")
257 1d3dfa29 Michael Hanselmann
258 1d3dfa29 Michael Hanselmann
    if magic_cmd:
259 1d3dfa29 Michael Hanselmann
      dd_cmd.write(" }")
260 1d3dfa29 Michael Hanselmann
261 fbb6b864 Michael Hanselmann
    return dd_cmd.getvalue()
262 fbb6b864 Michael Hanselmann
263 fbb6b864 Michael Hanselmann
  def _GetTransportCommand(self):
264 fbb6b864 Michael Hanselmann
    """Returns the command for the transport part of the daemon.
265 fbb6b864 Michael Hanselmann

266 fbb6b864 Michael Hanselmann
    """
267 fbb6b864 Michael Hanselmann
    socat_cmd = ("%s 2>&%d" %
268 fbb6b864 Michael Hanselmann
                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
269 fbb6b864 Michael Hanselmann
                  self._socat_stderr_fd))
270 fbb6b864 Michael Hanselmann
    dd_cmd = self._GetDdCommand()
271 c08d76f5 Michael Hanselmann
272 bb44b1ae Michael Hanselmann
    compr = self._opts.compress
273 bb44b1ae Michael Hanselmann
274 bb44b1ae Michael Hanselmann
    assert compr in constants.IEC_ALL
275 bb44b1ae Michael Hanselmann
276 fbb6b864 Michael Hanselmann
    parts = []
277 fbb6b864 Michael Hanselmann
278 bb44b1ae Michael Hanselmann
    if self._mode == constants.IEM_IMPORT:
279 fbb6b864 Michael Hanselmann
      parts.append(socat_cmd)
280 fbb6b864 Michael Hanselmann
281 bb44b1ae Michael Hanselmann
      if compr == constants.IEC_GZIP:
282 fbb6b864 Michael Hanselmann
        parts.append("gunzip -c")
283 fbb6b864 Michael Hanselmann
284 fbb6b864 Michael Hanselmann
      parts.append(dd_cmd)
285 c08d76f5 Michael Hanselmann
286 bb44b1ae Michael Hanselmann
    elif self._mode == constants.IEM_EXPORT:
287 fbb6b864 Michael Hanselmann
      parts.append(dd_cmd)
288 fbb6b864 Michael Hanselmann
289 bb44b1ae Michael Hanselmann
      if compr == constants.IEC_GZIP:
290 fbb6b864 Michael Hanselmann
        parts.append("gzip -c")
291 fbb6b864 Michael Hanselmann
292 fbb6b864 Michael Hanselmann
      parts.append(socat_cmd)
293 c08d76f5 Michael Hanselmann
294 bb44b1ae Michael Hanselmann
    else:
295 bb44b1ae Michael Hanselmann
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
296 bb44b1ae Michael Hanselmann
297 bb44b1ae Michael Hanselmann
    # TODO: Run transport as separate user
298 bb44b1ae Michael Hanselmann
    # The transport uses its own shell to simplify running it as a separate user
299 bb44b1ae Michael Hanselmann
    # in the future.
300 fbb6b864 Michael Hanselmann
    return self.GetBashCommand(" | ".join(parts))
301 bb44b1ae Michael Hanselmann
302 bb44b1ae Michael Hanselmann
  def GetCommand(self):
303 bb44b1ae Michael Hanselmann
    """Returns the complete child process command.
304 bb44b1ae Michael Hanselmann

305 bb44b1ae Michael Hanselmann
    """
306 bb44b1ae Michael Hanselmann
    transport_cmd = self._GetTransportCommand()
307 bb44b1ae Michael Hanselmann
308 bb44b1ae Michael Hanselmann
    buf = StringIO()
309 bb44b1ae Michael Hanselmann
310 bb44b1ae Michael Hanselmann
    if self._opts.cmd_prefix:
311 bb44b1ae Michael Hanselmann
      buf.write(self._opts.cmd_prefix)
312 bb44b1ae Michael Hanselmann
      buf.write(" ")
313 bb44b1ae Michael Hanselmann
314 bb44b1ae Michael Hanselmann
    buf.write(utils.ShellQuoteArgs(transport_cmd))
315 bb44b1ae Michael Hanselmann
316 bb44b1ae Michael Hanselmann
    if self._opts.cmd_suffix:
317 bb44b1ae Michael Hanselmann
      buf.write(" ")
318 bb44b1ae Michael Hanselmann
      buf.write(self._opts.cmd_suffix)
319 bb44b1ae Michael Hanselmann
320 bb44b1ae Michael Hanselmann
    return self.GetBashCommand(buf.getvalue())
321 34c9ee7b Michael Hanselmann
322 34c9ee7b Michael Hanselmann
323 34c9ee7b Michael Hanselmann
def _VerifyListening(family, address, port):
324 34c9ee7b Michael Hanselmann
  """Verify address given as listening address by socat.
325 34c9ee7b Michael Hanselmann

326 34c9ee7b Michael Hanselmann
  """
327 34c9ee7b Michael Hanselmann
  # TODO: Implement IPv6 support
328 34c9ee7b Michael Hanselmann
  if family != socket.AF_INET:
329 34c9ee7b Michael Hanselmann
    raise errors.GenericError("Address family %r not supported" % family)
330 34c9ee7b Michael Hanselmann
331 34c9ee7b Michael Hanselmann
  try:
332 34c9ee7b Michael Hanselmann
    packed_address = socket.inet_pton(family, address)
333 34c9ee7b Michael Hanselmann
  except socket.error:
334 34c9ee7b Michael Hanselmann
    raise errors.GenericError("Invalid address %r for family %s" %
335 34c9ee7b Michael Hanselmann
                              (address, family))
336 34c9ee7b Michael Hanselmann
337 34c9ee7b Michael Hanselmann
  return (socket.inet_ntop(family, packed_address), port)
338 34c9ee7b Michael Hanselmann
339 34c9ee7b Michael Hanselmann
340 34c9ee7b Michael Hanselmann
class ChildIOProcessor(object):
341 f9323011 Michael Hanselmann
  def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
342 34c9ee7b Michael Hanselmann
    """Initializes this class.
343 34c9ee7b Michael Hanselmann

344 34c9ee7b Michael Hanselmann
    """
345 34c9ee7b Michael Hanselmann
    self._debug = debug
346 34c9ee7b Michael Hanselmann
    self._status_file = status_file
347 34c9ee7b Michael Hanselmann
    self._logger = logger
348 34c9ee7b Michael Hanselmann
349 34c9ee7b Michael Hanselmann
    self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
350 34c9ee7b Michael Hanselmann
                           for prog in PROG_ALL])
351 34c9ee7b Michael Hanselmann
352 c08d76f5 Michael Hanselmann
    self._dd_pid = None
353 c08d76f5 Michael Hanselmann
    self._dd_ready = False
354 c08d76f5 Michael Hanselmann
    self._dd_tp_samples = throughput_samples
355 c08d76f5 Michael Hanselmann
    self._dd_progress = []
356 c08d76f5 Michael Hanselmann
357 c08d76f5 Michael Hanselmann
    # Expected size of transferred data
358 f9323011 Michael Hanselmann
    self._exp_size = exp_size
359 c08d76f5 Michael Hanselmann
360 34c9ee7b Michael Hanselmann
  def GetLineSplitter(self, prog):
361 34c9ee7b Michael Hanselmann
    """Returns the line splitter for a program.
362 34c9ee7b Michael Hanselmann

363 34c9ee7b Michael Hanselmann
    """
364 34c9ee7b Michael Hanselmann
    return self._splitter[prog]
365 34c9ee7b Michael Hanselmann
366 34c9ee7b Michael Hanselmann
  def FlushAll(self):
367 34c9ee7b Michael Hanselmann
    """Flushes all line splitters.
368 34c9ee7b Michael Hanselmann

369 34c9ee7b Michael Hanselmann
    """
370 34c9ee7b Michael Hanselmann
    for ls in self._splitter.itervalues():
371 34c9ee7b Michael Hanselmann
      ls.flush()
372 34c9ee7b Michael Hanselmann
373 34c9ee7b Michael Hanselmann
  def CloseAll(self):
374 34c9ee7b Michael Hanselmann
    """Closes all line splitters.
375 34c9ee7b Michael Hanselmann

376 34c9ee7b Michael Hanselmann
    """
377 34c9ee7b Michael Hanselmann
    for ls in self._splitter.itervalues():
378 34c9ee7b Michael Hanselmann
      ls.close()
379 34c9ee7b Michael Hanselmann
    self._splitter.clear()
380 34c9ee7b Michael Hanselmann
381 c08d76f5 Michael Hanselmann
  def NotifyDd(self):
382 c08d76f5 Michael Hanselmann
    """Tells dd(1) to write statistics.
383 c08d76f5 Michael Hanselmann

384 c08d76f5 Michael Hanselmann
    """
385 c08d76f5 Michael Hanselmann
    if self._dd_pid is None:
386 c08d76f5 Michael Hanselmann
      # Can't notify
387 c08d76f5 Michael Hanselmann
      return False
388 c08d76f5 Michael Hanselmann
389 c08d76f5 Michael Hanselmann
    if not self._dd_ready:
390 c08d76f5 Michael Hanselmann
      # There's a race condition between starting the program and sending
391 c08d76f5 Michael Hanselmann
      # signals.  The signal handler is only registered after some time, so we
392 c08d76f5 Michael Hanselmann
      # have to check whether the program is ready. If it isn't, sending a
393 c08d76f5 Michael Hanselmann
      # signal will invoke the default handler (and usually abort the program).
394 c08d76f5 Michael Hanselmann
      if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
395 c08d76f5 Michael Hanselmann
        logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
396 c08d76f5 Michael Hanselmann
        return False
397 c08d76f5 Michael Hanselmann
398 c08d76f5 Michael Hanselmann
      logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
399 c08d76f5 Michael Hanselmann
      self._dd_ready = True
400 c08d76f5 Michael Hanselmann
401 c08d76f5 Michael Hanselmann
    logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
402 c08d76f5 Michael Hanselmann
    try:
403 c08d76f5 Michael Hanselmann
      os.kill(self._dd_pid, DD_INFO_SIGNAL)
404 c08d76f5 Michael Hanselmann
    except EnvironmentError, err:
405 c08d76f5 Michael Hanselmann
      if err.errno != errno.ESRCH:
406 c08d76f5 Michael Hanselmann
        raise
407 c08d76f5 Michael Hanselmann
408 c08d76f5 Michael Hanselmann
      # Process no longer exists
409 560cbec1 Michael Hanselmann
      logging.debug("dd exited")
410 c08d76f5 Michael Hanselmann
      self._dd_pid = None
411 c08d76f5 Michael Hanselmann
412 c08d76f5 Michael Hanselmann
    return True
413 c08d76f5 Michael Hanselmann
414 34c9ee7b Michael Hanselmann
  def _ProcessOutput(self, line, prog):
415 34c9ee7b Michael Hanselmann
    """Takes care of child process output.
416 34c9ee7b Michael Hanselmann

417 34c9ee7b Michael Hanselmann
    @type line: string
418 34c9ee7b Michael Hanselmann
    @param line: Child output line
419 34c9ee7b Michael Hanselmann
    @type prog: number
420 34c9ee7b Michael Hanselmann
    @param prog: Program from which the line originates
421 34c9ee7b Michael Hanselmann

422 34c9ee7b Michael Hanselmann
    """
423 34c9ee7b Michael Hanselmann
    force_update = False
424 34c9ee7b Michael Hanselmann
    forward_line = line
425 34c9ee7b Michael Hanselmann
426 34c9ee7b Michael Hanselmann
    if prog == PROG_SOCAT:
427 34c9ee7b Michael Hanselmann
      level = None
428 34c9ee7b Michael Hanselmann
      parts = line.split(None, 4)
429 34c9ee7b Michael Hanselmann
430 34c9ee7b Michael Hanselmann
      if len(parts) == 5:
431 34c9ee7b Michael Hanselmann
        (_, _, _, level, msg) = parts
432 34c9ee7b Michael Hanselmann
433 34c9ee7b Michael Hanselmann
        force_update = self._ProcessSocatOutput(self._status_file, level, msg)
434 34c9ee7b Michael Hanselmann
435 34c9ee7b Michael Hanselmann
        if self._debug or (level and level not in SOCAT_LOG_IGNORE):
436 34c9ee7b Michael Hanselmann
          forward_line = "socat: %s %s" % (level, msg)
437 34c9ee7b Michael Hanselmann
        else:
438 34c9ee7b Michael Hanselmann
          forward_line = None
439 34c9ee7b Michael Hanselmann
      else:
440 34c9ee7b Michael Hanselmann
        forward_line = "socat: %s" % line
441 34c9ee7b Michael Hanselmann
442 c08d76f5 Michael Hanselmann
    elif prog == PROG_DD:
443 c08d76f5 Michael Hanselmann
      (should_forward, force_update) = self._ProcessDdOutput(line)
444 c08d76f5 Michael Hanselmann
445 c08d76f5 Michael Hanselmann
      if should_forward or self._debug:
446 c08d76f5 Michael Hanselmann
        forward_line = "dd: %s" % line
447 c08d76f5 Michael Hanselmann
      else:
448 c08d76f5 Michael Hanselmann
        forward_line = None
449 c08d76f5 Michael Hanselmann
450 c08d76f5 Michael Hanselmann
    elif prog == PROG_DD_PID:
451 c08d76f5 Michael Hanselmann
      if self._dd_pid:
452 c08d76f5 Michael Hanselmann
        raise RuntimeError("dd PID reported more than once")
453 c08d76f5 Michael Hanselmann
      logging.debug("Received dd PID %r", line)
454 c08d76f5 Michael Hanselmann
      self._dd_pid = int(line)
455 c08d76f5 Michael Hanselmann
      forward_line = None
456 c08d76f5 Michael Hanselmann
457 f9323011 Michael Hanselmann
    elif prog == PROG_EXP_SIZE:
458 f9323011 Michael Hanselmann
      logging.debug("Received predicted size %r", line)
459 f9323011 Michael Hanselmann
      forward_line = None
460 f9323011 Michael Hanselmann
461 f9323011 Michael Hanselmann
      if line:
462 f9323011 Michael Hanselmann
        try:
463 f9323011 Michael Hanselmann
          exp_size = utils.BytesToMebibyte(int(line))
464 f9323011 Michael Hanselmann
        except (ValueError, TypeError), err:
465 f9323011 Michael Hanselmann
          logging.error("Failed to convert predicted size %r to number: %s",
466 f9323011 Michael Hanselmann
                        line, err)
467 f9323011 Michael Hanselmann
          exp_size = None
468 f9323011 Michael Hanselmann
      else:
469 f9323011 Michael Hanselmann
        exp_size = None
470 f9323011 Michael Hanselmann
471 f9323011 Michael Hanselmann
      self._exp_size = exp_size
472 f9323011 Michael Hanselmann
473 34c9ee7b Michael Hanselmann
    if forward_line:
474 34c9ee7b Michael Hanselmann
      self._logger.info(forward_line)
475 34c9ee7b Michael Hanselmann
      self._status_file.AddRecentOutput(forward_line)
476 34c9ee7b Michael Hanselmann
477 34c9ee7b Michael Hanselmann
    self._status_file.Update(force_update)
478 34c9ee7b Michael Hanselmann
479 34c9ee7b Michael Hanselmann
  @staticmethod
480 34c9ee7b Michael Hanselmann
  def _ProcessSocatOutput(status_file, level, msg):
481 34c9ee7b Michael Hanselmann
    """Interprets socat log output.
482 34c9ee7b Michael Hanselmann

483 34c9ee7b Michael Hanselmann
    """
484 34c9ee7b Michael Hanselmann
    if level == SOCAT_LOG_NOTICE:
485 34c9ee7b Michael Hanselmann
      if status_file.GetListenPort() is None:
486 34c9ee7b Michael Hanselmann
        # TODO: Maybe implement timeout to not listen forever
487 34c9ee7b Michael Hanselmann
        m = LISTENING_RE.match(msg)
488 34c9ee7b Michael Hanselmann
        if m:
489 34c9ee7b Michael Hanselmann
          (_, port) = _VerifyListening(int(m.group("family")),
490 34c9ee7b Michael Hanselmann
                                       m.group("address"),
491 34c9ee7b Michael Hanselmann
                                       int(m.group("port")))
492 34c9ee7b Michael Hanselmann
493 34c9ee7b Michael Hanselmann
          status_file.SetListenPort(port)
494 34c9ee7b Michael Hanselmann
          return True
495 34c9ee7b Michael Hanselmann
496 34c9ee7b Michael Hanselmann
      if not status_file.GetConnected():
497 34c9ee7b Michael Hanselmann
        m = TRANSFER_LOOP_RE.match(msg)
498 34c9ee7b Michael Hanselmann
        if m:
499 53dbf14c Michael Hanselmann
          logging.debug("Connection established")
500 34c9ee7b Michael Hanselmann
          status_file.SetConnected()
501 34c9ee7b Michael Hanselmann
          return True
502 34c9ee7b Michael Hanselmann
503 34c9ee7b Michael Hanselmann
    return False
504 c08d76f5 Michael Hanselmann
505 c08d76f5 Michael Hanselmann
  def _ProcessDdOutput(self, line):
506 c08d76f5 Michael Hanselmann
    """Interprets a line of dd(1)'s output.
507 c08d76f5 Michael Hanselmann

508 c08d76f5 Michael Hanselmann
    """
509 c08d76f5 Michael Hanselmann
    m = DD_INFO_RE.match(line)
510 c08d76f5 Michael Hanselmann
    if m:
511 c08d76f5 Michael Hanselmann
      seconds = float(m.group("seconds"))
512 c08d76f5 Michael Hanselmann
      mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
513 c08d76f5 Michael Hanselmann
      self._UpdateDdProgress(seconds, mbytes)
514 c08d76f5 Michael Hanselmann
      return (False, True)
515 c08d76f5 Michael Hanselmann
516 c08d76f5 Michael Hanselmann
    m = DD_STDERR_IGNORE.match(line)
517 c08d76f5 Michael Hanselmann
    if m:
518 c08d76f5 Michael Hanselmann
      # Ignore
519 c08d76f5 Michael Hanselmann
      return (False, False)
520 c08d76f5 Michael Hanselmann
521 c08d76f5 Michael Hanselmann
    # Forward line
522 c08d76f5 Michael Hanselmann
    return (True, False)
523 c08d76f5 Michael Hanselmann
524 c08d76f5 Michael Hanselmann
  def _UpdateDdProgress(self, seconds, mbytes):
525 c08d76f5 Michael Hanselmann
    """Updates the internal status variables for dd(1) progress.
526 c08d76f5 Michael Hanselmann

527 c08d76f5 Michael Hanselmann
    @type seconds: float
528 c08d76f5 Michael Hanselmann
    @param seconds: Timestamp of this update
529 c08d76f5 Michael Hanselmann
    @type mbytes: float
530 c08d76f5 Michael Hanselmann
    @param mbytes: Total number of MiB transferred so far
531 c08d76f5 Michael Hanselmann

532 c08d76f5 Michael Hanselmann
    """
533 c08d76f5 Michael Hanselmann
    # Add latest sample
534 c08d76f5 Michael Hanselmann
    self._dd_progress.append((seconds, mbytes))
535 c08d76f5 Michael Hanselmann
536 c08d76f5 Michael Hanselmann
    # Remove old samples
537 c08d76f5 Michael Hanselmann
    del self._dd_progress[:-self._dd_tp_samples]
538 c08d76f5 Michael Hanselmann
539 c08d76f5 Michael Hanselmann
    # Calculate throughput
540 c08d76f5 Michael Hanselmann
    throughput = _CalcThroughput(self._dd_progress)
541 c08d76f5 Michael Hanselmann
542 c08d76f5 Michael Hanselmann
    # Calculate percent and ETA
543 c08d76f5 Michael Hanselmann
    percent = None
544 c08d76f5 Michael Hanselmann
    eta = None
545 c08d76f5 Michael Hanselmann
546 c08d76f5 Michael Hanselmann
    if self._exp_size is not None:
547 c08d76f5 Michael Hanselmann
      if self._exp_size != 0:
548 c08d76f5 Michael Hanselmann
        percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
549 c08d76f5 Michael Hanselmann
550 c08d76f5 Michael Hanselmann
      if throughput:
551 c08d76f5 Michael Hanselmann
        eta = max(0, float(self._exp_size - mbytes) / throughput)
552 c08d76f5 Michael Hanselmann
553 c08d76f5 Michael Hanselmann
    self._status_file.SetProgress(mbytes, throughput, percent, eta)
554 c08d76f5 Michael Hanselmann
555 c08d76f5 Michael Hanselmann
556 c08d76f5 Michael Hanselmann
def _CalcThroughput(samples):
557 c08d76f5 Michael Hanselmann
  """Calculates the throughput in MiB/second.
558 c08d76f5 Michael Hanselmann

559 c08d76f5 Michael Hanselmann
  @type samples: sequence
560 c08d76f5 Michael Hanselmann
  @param samples: List of samples, each consisting of a (timestamp, mbytes)
561 c08d76f5 Michael Hanselmann
                  tuple
562 c08d76f5 Michael Hanselmann
  @rtype: float or None
563 c08d76f5 Michael Hanselmann
  @return: Throughput in MiB/second
564 c08d76f5 Michael Hanselmann

565 c08d76f5 Michael Hanselmann
  """
566 c08d76f5 Michael Hanselmann
  if len(samples) < 2:
567 c08d76f5 Michael Hanselmann
    # Can't calculate throughput
568 c08d76f5 Michael Hanselmann
    return None
569 c08d76f5 Michael Hanselmann
570 c08d76f5 Michael Hanselmann
  (start_time, start_mbytes) = samples[0]
571 c08d76f5 Michael Hanselmann
  (end_time, end_mbytes) = samples[-1]
572 c08d76f5 Michael Hanselmann
573 c08d76f5 Michael Hanselmann
  return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)