Statistics
| Branch: | Tag: | Revision:

root / lib / impexpd / __init__.py @ 0559f745

History | View | Annotate | Download (14.2 kB)

1 bb44b1ae Michael Hanselmann
#
2 bb44b1ae Michael Hanselmann
#
3 bb44b1ae Michael Hanselmann
4 bb44b1ae Michael Hanselmann
# Copyright (C) 2010 Google Inc.
5 bb44b1ae Michael Hanselmann
#
6 bb44b1ae Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 bb44b1ae Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 bb44b1ae Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 bb44b1ae Michael Hanselmann
# (at your option) any later version.
10 bb44b1ae Michael Hanselmann
#
11 bb44b1ae Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 bb44b1ae Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 bb44b1ae Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 bb44b1ae Michael Hanselmann
# General Public License for more details.
15 bb44b1ae Michael Hanselmann
#
16 bb44b1ae Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 bb44b1ae Michael Hanselmann
# along with this program; if not, write to the Free Software
18 bb44b1ae Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 bb44b1ae Michael Hanselmann
# 02110-1301, USA.
20 bb44b1ae Michael Hanselmann
21 bb44b1ae Michael Hanselmann
22 bb44b1ae Michael Hanselmann
"""Classes and functions for import/export daemon.
23 bb44b1ae Michael Hanselmann

24 bb44b1ae Michael Hanselmann
"""
25 bb44b1ae Michael Hanselmann
26 c08d76f5 Michael Hanselmann
import os
27 34c9ee7b Michael Hanselmann
import re
28 34c9ee7b Michael Hanselmann
import socket
29 53dbf14c Michael Hanselmann
import logging
30 c08d76f5 Michael Hanselmann
import signal
31 c08d76f5 Michael Hanselmann
import errno
32 c08d76f5 Michael Hanselmann
import time
33 bb44b1ae Michael Hanselmann
from cStringIO import StringIO
34 bb44b1ae Michael Hanselmann
35 bb44b1ae Michael Hanselmann
from ganeti import constants
36 bb44b1ae Michael Hanselmann
from ganeti import errors
37 bb44b1ae Michael Hanselmann
from ganeti import utils
38 bb44b1ae Michael Hanselmann
39 bb44b1ae Michael Hanselmann
40 34c9ee7b Michael Hanselmann
#: Used to recognize point at which socat(1) starts to listen on its socket.
41 34c9ee7b Michael Hanselmann
#: The local address is required for the remote peer to connect (in particular
42 34c9ee7b Michael Hanselmann
#: the port number).
43 34c9ee7b Michael Hanselmann
LISTENING_RE = re.compile(r"^listening on\s+"
44 34c9ee7b Michael Hanselmann
                          r"AF=(?P<family>\d+)\s+"
45 34c9ee7b Michael Hanselmann
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
46 34c9ee7b Michael Hanselmann
47 34c9ee7b Michael Hanselmann
#: Used to recognize point at which socat(1) is sending data over the wire
48 34c9ee7b Michael Hanselmann
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
49 34c9ee7b Michael Hanselmann
                              re.I)
50 34c9ee7b Michael Hanselmann
51 34c9ee7b Michael Hanselmann
SOCAT_LOG_DEBUG = "D"
52 34c9ee7b Michael Hanselmann
SOCAT_LOG_INFO = "I"
53 34c9ee7b Michael Hanselmann
SOCAT_LOG_NOTICE = "N"
54 34c9ee7b Michael Hanselmann
SOCAT_LOG_WARNING = "W"
55 34c9ee7b Michael Hanselmann
SOCAT_LOG_ERROR = "E"
56 34c9ee7b Michael Hanselmann
SOCAT_LOG_FATAL = "F"
57 34c9ee7b Michael Hanselmann
58 34c9ee7b Michael Hanselmann
SOCAT_LOG_IGNORE = frozenset([
59 34c9ee7b Michael Hanselmann
  SOCAT_LOG_DEBUG,
60 34c9ee7b Michael Hanselmann
  SOCAT_LOG_INFO,
61 34c9ee7b Michael Hanselmann
  SOCAT_LOG_NOTICE,
62 34c9ee7b Michael Hanselmann
  ])
63 34c9ee7b Michael Hanselmann
64 c08d76f5 Michael Hanselmann
#: Used to parse GNU dd(1) statistics
65 c08d76f5 Michael Hanselmann
DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
66 c08d76f5 Michael Hanselmann
                        r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
67 c08d76f5 Michael Hanselmann
68 c08d76f5 Michael Hanselmann
#: Used to ignore "N+N records in/out" on dd(1)'s stderr
69 c08d76f5 Michael Hanselmann
DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
70 c08d76f5 Michael Hanselmann
71 c08d76f5 Michael Hanselmann
#: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
72 c08d76f5 Michael Hanselmann
#: unavailable and SIGUSR1 is used instead)
73 c08d76f5 Michael Hanselmann
DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
74 c08d76f5 Michael Hanselmann
75 bb44b1ae Michael Hanselmann
#: Buffer size: at most this many bytes are transferred at once
76 bb44b1ae Michael Hanselmann
BUFSIZE = 1024 * 1024
77 bb44b1ae Michael Hanselmann
78 bb44b1ae Michael Hanselmann
# Common options for socat
79 bb44b1ae Michael Hanselmann
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
80 bb44b1ae Michael Hanselmann
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
81 bb44b1ae Michael Hanselmann
82 0559f745 Michael Hanselmann
SOCAT_OPTION_MAXLEN = 400
83 0559f745 Michael Hanselmann
84 34c9ee7b Michael Hanselmann
(PROG_OTHER,
85 c08d76f5 Michael Hanselmann
 PROG_SOCAT,
86 c08d76f5 Michael Hanselmann
 PROG_DD,
87 f9323011 Michael Hanselmann
 PROG_DD_PID,
88 f9323011 Michael Hanselmann
 PROG_EXP_SIZE) = range(1, 6)
89 34c9ee7b Michael Hanselmann
PROG_ALL = frozenset([
90 34c9ee7b Michael Hanselmann
  PROG_OTHER,
91 34c9ee7b Michael Hanselmann
  PROG_SOCAT,
92 c08d76f5 Michael Hanselmann
  PROG_DD,
93 c08d76f5 Michael Hanselmann
  PROG_DD_PID,
94 f9323011 Michael Hanselmann
  PROG_EXP_SIZE,
95 34c9ee7b Michael Hanselmann
  ])
96 34c9ee7b Michael Hanselmann
97 bb44b1ae Michael Hanselmann
98 bb44b1ae Michael Hanselmann
class CommandBuilder(object):
99 c08d76f5 Michael Hanselmann
  def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
100 bb44b1ae Michael Hanselmann
    """Initializes this class.
101 bb44b1ae Michael Hanselmann

102 bb44b1ae Michael Hanselmann
    @param mode: Daemon mode (import or export)
103 bb44b1ae Michael Hanselmann
    @param opts: Options object
104 bb44b1ae Michael Hanselmann
    @type socat_stderr_fd: int
105 bb44b1ae Michael Hanselmann
    @param socat_stderr_fd: File descriptor socat should write its stderr to
106 c08d76f5 Michael Hanselmann
    @type dd_stderr_fd: int
107 c08d76f5 Michael Hanselmann
    @param dd_stderr_fd: File descriptor dd should write its stderr to
108 c08d76f5 Michael Hanselmann
    @type dd_pid_fd: int
109 c08d76f5 Michael Hanselmann
    @param dd_pid_fd: File descriptor the child should write dd's PID to
110 bb44b1ae Michael Hanselmann

111 bb44b1ae Michael Hanselmann
    """
112 bb44b1ae Michael Hanselmann
    self._opts = opts
113 bb44b1ae Michael Hanselmann
    self._mode = mode
114 bb44b1ae Michael Hanselmann
    self._socat_stderr_fd = socat_stderr_fd
115 c08d76f5 Michael Hanselmann
    self._dd_stderr_fd = dd_stderr_fd
116 c08d76f5 Michael Hanselmann
    self._dd_pid_fd = dd_pid_fd
117 bb44b1ae Michael Hanselmann
118 bb44b1ae Michael Hanselmann
  @staticmethod
119 bb44b1ae Michael Hanselmann
  def GetBashCommand(cmd):
120 bb44b1ae Michael Hanselmann
    """Prepares a command to be run in Bash.
121 bb44b1ae Michael Hanselmann

122 bb44b1ae Michael Hanselmann
    """
123 bb44b1ae Michael Hanselmann
    return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
124 bb44b1ae Michael Hanselmann
125 bb44b1ae Michael Hanselmann
  def _GetSocatCommand(self):
126 bb44b1ae Michael Hanselmann
    """Returns the socat command.
127 bb44b1ae Michael Hanselmann

128 bb44b1ae Michael Hanselmann
    """
129 bb44b1ae Michael Hanselmann
    common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
130 bb44b1ae Michael Hanselmann
      "key=%s" % self._opts.key,
131 bb44b1ae Michael Hanselmann
      "cert=%s" % self._opts.cert,
132 bb44b1ae Michael Hanselmann
      "cafile=%s" % self._opts.ca,
133 bb44b1ae Michael Hanselmann
      ]
134 bb44b1ae Michael Hanselmann
135 bb44b1ae Michael Hanselmann
    if self._opts.bind is not None:
136 bb44b1ae Michael Hanselmann
      common_addr_opts.append("bind=%s" % self._opts.bind)
137 bb44b1ae Michael Hanselmann
138 bb44b1ae Michael Hanselmann
    if self._mode == constants.IEM_IMPORT:
139 bb44b1ae Michael Hanselmann
      if self._opts.port is None:
140 bb44b1ae Michael Hanselmann
        port = 0
141 bb44b1ae Michael Hanselmann
      else:
142 bb44b1ae Michael Hanselmann
        port = self._opts.port
143 bb44b1ae Michael Hanselmann
144 bb44b1ae Michael Hanselmann
      addr1 = [
145 bb44b1ae Michael Hanselmann
        "OPENSSL-LISTEN:%s" % port,
146 bb44b1ae Michael Hanselmann
        "reuseaddr",
147 bb44b1ae Michael Hanselmann
148 bb44b1ae Michael Hanselmann
        # Retry to listen if connection wasn't established successfully, up to
149 bb44b1ae Michael Hanselmann
        # 100 times a second. Note that this still leaves room for DoS attacks.
150 bb44b1ae Michael Hanselmann
        "forever",
151 bb44b1ae Michael Hanselmann
        "intervall=0.01",
152 bb44b1ae Michael Hanselmann
        ] + common_addr_opts
153 bb44b1ae Michael Hanselmann
      addr2 = ["stdout"]
154 bb44b1ae Michael Hanselmann
155 bb44b1ae Michael Hanselmann
    elif self._mode == constants.IEM_EXPORT:
156 bb44b1ae Michael Hanselmann
      addr1 = ["stdin"]
157 bb44b1ae Michael Hanselmann
      addr2 = [
158 bb44b1ae Michael Hanselmann
        "OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
159 bb44b1ae Michael Hanselmann
160 bb44b1ae Michael Hanselmann
        # How long to wait per connection attempt
161 bb44b1ae Michael Hanselmann
        "connect-timeout=%s" % self._opts.connect_timeout,
162 bb44b1ae Michael Hanselmann
163 bb44b1ae Michael Hanselmann
        # Retry a few times before giving up to connect (once per second)
164 bb44b1ae Michael Hanselmann
        "retry=%s" % self._opts.connect_retries,
165 bb44b1ae Michael Hanselmann
        "intervall=1",
166 bb44b1ae Michael Hanselmann
        ] + common_addr_opts
167 bb44b1ae Michael Hanselmann
168 bb44b1ae Michael Hanselmann
    else:
169 bb44b1ae Michael Hanselmann
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
170 bb44b1ae Michael Hanselmann
171 bb44b1ae Michael Hanselmann
    for i in [addr1, addr2]:
172 bb44b1ae Michael Hanselmann
      for value in i:
173 0559f745 Michael Hanselmann
        if len(value) > SOCAT_OPTION_MAXLEN:
174 0559f745 Michael Hanselmann
          raise errors.GenericError("Socat option longer than %s"
175 0559f745 Michael Hanselmann
                                    " characters: %r" %
176 0559f745 Michael Hanselmann
                                    (SOCAT_OPTION_MAXLEN, value))
177 bb44b1ae Michael Hanselmann
        if "," in value:
178 bb44b1ae Michael Hanselmann
          raise errors.GenericError("Comma not allowed in socat option"
179 bb44b1ae Michael Hanselmann
                                    " value: %r" % value)
180 bb44b1ae Michael Hanselmann
181 bb44b1ae Michael Hanselmann
    return [
182 bb44b1ae Michael Hanselmann
      constants.SOCAT_PATH,
183 bb44b1ae Michael Hanselmann
184 bb44b1ae Michael Hanselmann
      # Log to stderr
185 bb44b1ae Michael Hanselmann
      "-ls",
186 bb44b1ae Michael Hanselmann
187 bb44b1ae Michael Hanselmann
      # Log level
188 bb44b1ae Michael Hanselmann
      "-d", "-d",
189 bb44b1ae Michael Hanselmann
190 bb44b1ae Michael Hanselmann
      # Buffer size
191 bb44b1ae Michael Hanselmann
      "-b%s" % BUFSIZE,
192 bb44b1ae Michael Hanselmann
193 bb44b1ae Michael Hanselmann
      # Unidirectional mode, the first address is only used for reading, and the
194 bb44b1ae Michael Hanselmann
      # second address is only used for writing
195 bb44b1ae Michael Hanselmann
      "-u",
196 bb44b1ae Michael Hanselmann
197 bb44b1ae Michael Hanselmann
      ",".join(addr1), ",".join(addr2)
198 bb44b1ae Michael Hanselmann
      ]
199 bb44b1ae Michael Hanselmann
200 bb44b1ae Michael Hanselmann
  def _GetTransportCommand(self):
201 bb44b1ae Michael Hanselmann
    """Returns the command for the transport part of the daemon.
202 bb44b1ae Michael Hanselmann

203 bb44b1ae Michael Hanselmann
    """
204 bb44b1ae Michael Hanselmann
    socat_cmd = ("%s 2>&%d" %
205 bb44b1ae Michael Hanselmann
                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
206 bb44b1ae Michael Hanselmann
                  self._socat_stderr_fd))
207 bb44b1ae Michael Hanselmann
208 c08d76f5 Michael Hanselmann
    dd_cmd = StringIO()
209 c08d76f5 Michael Hanselmann
    # Setting LC_ALL since we want to parse the output and explicitely
210 c08d76f5 Michael Hanselmann
    # redirecting stdin, as the background process (dd) would have /dev/null as
211 c08d76f5 Michael Hanselmann
    # stdin otherwise
212 c08d76f5 Michael Hanselmann
    dd_cmd.write("{ LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
213 c08d76f5 Michael Hanselmann
                 (BUFSIZE, self._dd_stderr_fd))
214 c08d76f5 Michael Hanselmann
    # Send PID to daemon
215 c08d76f5 Michael Hanselmann
    dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
216 c08d76f5 Michael Hanselmann
    # And wait for dd
217 c08d76f5 Michael Hanselmann
    dd_cmd.write(" wait $pid;")
218 c08d76f5 Michael Hanselmann
    dd_cmd.write(" }")
219 c08d76f5 Michael Hanselmann
220 bb44b1ae Michael Hanselmann
    compr = self._opts.compress
221 bb44b1ae Michael Hanselmann
222 bb44b1ae Michael Hanselmann
    assert compr in constants.IEC_ALL
223 bb44b1ae Michael Hanselmann
224 bb44b1ae Michael Hanselmann
    if self._mode == constants.IEM_IMPORT:
225 bb44b1ae Michael Hanselmann
      if compr == constants.IEC_GZIP:
226 bb44b1ae Michael Hanselmann
        transport_cmd = "%s | gunzip -c" % socat_cmd
227 bb44b1ae Michael Hanselmann
      else:
228 bb44b1ae Michael Hanselmann
        transport_cmd = socat_cmd
229 c08d76f5 Michael Hanselmann
230 c08d76f5 Michael Hanselmann
      transport_cmd += " | %s" % dd_cmd.getvalue()
231 bb44b1ae Michael Hanselmann
    elif self._mode == constants.IEM_EXPORT:
232 bb44b1ae Michael Hanselmann
      if compr == constants.IEC_GZIP:
233 bb44b1ae Michael Hanselmann
        transport_cmd = "gzip -c | %s" % socat_cmd
234 bb44b1ae Michael Hanselmann
      else:
235 bb44b1ae Michael Hanselmann
        transport_cmd = socat_cmd
236 c08d76f5 Michael Hanselmann
237 c08d76f5 Michael Hanselmann
      transport_cmd = "%s | %s" % (dd_cmd.getvalue(), transport_cmd)
238 bb44b1ae Michael Hanselmann
    else:
239 bb44b1ae Michael Hanselmann
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
240 bb44b1ae Michael Hanselmann
241 bb44b1ae Michael Hanselmann
    # TODO: Run transport as separate user
242 bb44b1ae Michael Hanselmann
    # The transport uses its own shell to simplify running it as a separate user
243 bb44b1ae Michael Hanselmann
    # in the future.
244 bb44b1ae Michael Hanselmann
    return self.GetBashCommand(transport_cmd)
245 bb44b1ae Michael Hanselmann
246 bb44b1ae Michael Hanselmann
  def GetCommand(self):
247 bb44b1ae Michael Hanselmann
    """Returns the complete child process command.
248 bb44b1ae Michael Hanselmann

249 bb44b1ae Michael Hanselmann
    """
250 bb44b1ae Michael Hanselmann
    transport_cmd = self._GetTransportCommand()
251 bb44b1ae Michael Hanselmann
252 bb44b1ae Michael Hanselmann
    buf = StringIO()
253 bb44b1ae Michael Hanselmann
254 bb44b1ae Michael Hanselmann
    if self._opts.cmd_prefix:
255 bb44b1ae Michael Hanselmann
      buf.write(self._opts.cmd_prefix)
256 bb44b1ae Michael Hanselmann
      buf.write(" ")
257 bb44b1ae Michael Hanselmann
258 bb44b1ae Michael Hanselmann
    buf.write(utils.ShellQuoteArgs(transport_cmd))
259 bb44b1ae Michael Hanselmann
260 bb44b1ae Michael Hanselmann
    if self._opts.cmd_suffix:
261 bb44b1ae Michael Hanselmann
      buf.write(" ")
262 bb44b1ae Michael Hanselmann
      buf.write(self._opts.cmd_suffix)
263 bb44b1ae Michael Hanselmann
264 bb44b1ae Michael Hanselmann
    return self.GetBashCommand(buf.getvalue())
265 34c9ee7b Michael Hanselmann
266 34c9ee7b Michael Hanselmann
267 34c9ee7b Michael Hanselmann
def _VerifyListening(family, address, port):
268 34c9ee7b Michael Hanselmann
  """Verify address given as listening address by socat.
269 34c9ee7b Michael Hanselmann

270 34c9ee7b Michael Hanselmann
  """
271 34c9ee7b Michael Hanselmann
  # TODO: Implement IPv6 support
272 34c9ee7b Michael Hanselmann
  if family != socket.AF_INET:
273 34c9ee7b Michael Hanselmann
    raise errors.GenericError("Address family %r not supported" % family)
274 34c9ee7b Michael Hanselmann
275 34c9ee7b Michael Hanselmann
  try:
276 34c9ee7b Michael Hanselmann
    packed_address = socket.inet_pton(family, address)
277 34c9ee7b Michael Hanselmann
  except socket.error:
278 34c9ee7b Michael Hanselmann
    raise errors.GenericError("Invalid address %r for family %s" %
279 34c9ee7b Michael Hanselmann
                              (address, family))
280 34c9ee7b Michael Hanselmann
281 34c9ee7b Michael Hanselmann
  return (socket.inet_ntop(family, packed_address), port)
282 34c9ee7b Michael Hanselmann
283 34c9ee7b Michael Hanselmann
284 34c9ee7b Michael Hanselmann
class ChildIOProcessor(object):
285 f9323011 Michael Hanselmann
  def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
286 34c9ee7b Michael Hanselmann
    """Initializes this class.
287 34c9ee7b Michael Hanselmann

288 34c9ee7b Michael Hanselmann
    """
289 34c9ee7b Michael Hanselmann
    self._debug = debug
290 34c9ee7b Michael Hanselmann
    self._status_file = status_file
291 34c9ee7b Michael Hanselmann
    self._logger = logger
292 34c9ee7b Michael Hanselmann
293 34c9ee7b Michael Hanselmann
    self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
294 34c9ee7b Michael Hanselmann
                           for prog in PROG_ALL])
295 34c9ee7b Michael Hanselmann
296 c08d76f5 Michael Hanselmann
    self._dd_pid = None
297 c08d76f5 Michael Hanselmann
    self._dd_ready = False
298 c08d76f5 Michael Hanselmann
    self._dd_tp_samples = throughput_samples
299 c08d76f5 Michael Hanselmann
    self._dd_progress = []
300 c08d76f5 Michael Hanselmann
301 c08d76f5 Michael Hanselmann
    # Expected size of transferred data
302 f9323011 Michael Hanselmann
    self._exp_size = exp_size
303 c08d76f5 Michael Hanselmann
304 34c9ee7b Michael Hanselmann
  def GetLineSplitter(self, prog):
305 34c9ee7b Michael Hanselmann
    """Returns the line splitter for a program.
306 34c9ee7b Michael Hanselmann

307 34c9ee7b Michael Hanselmann
    """
308 34c9ee7b Michael Hanselmann
    return self._splitter[prog]
309 34c9ee7b Michael Hanselmann
310 34c9ee7b Michael Hanselmann
  def FlushAll(self):
311 34c9ee7b Michael Hanselmann
    """Flushes all line splitters.
312 34c9ee7b Michael Hanselmann

313 34c9ee7b Michael Hanselmann
    """
314 34c9ee7b Michael Hanselmann
    for ls in self._splitter.itervalues():
315 34c9ee7b Michael Hanselmann
      ls.flush()
316 34c9ee7b Michael Hanselmann
317 34c9ee7b Michael Hanselmann
  def CloseAll(self):
318 34c9ee7b Michael Hanselmann
    """Closes all line splitters.
319 34c9ee7b Michael Hanselmann

320 34c9ee7b Michael Hanselmann
    """
321 34c9ee7b Michael Hanselmann
    for ls in self._splitter.itervalues():
322 34c9ee7b Michael Hanselmann
      ls.close()
323 34c9ee7b Michael Hanselmann
    self._splitter.clear()
324 34c9ee7b Michael Hanselmann
325 c08d76f5 Michael Hanselmann
  def NotifyDd(self):
326 c08d76f5 Michael Hanselmann
    """Tells dd(1) to write statistics.
327 c08d76f5 Michael Hanselmann

328 c08d76f5 Michael Hanselmann
    """
329 c08d76f5 Michael Hanselmann
    if self._dd_pid is None:
330 c08d76f5 Michael Hanselmann
      # Can't notify
331 c08d76f5 Michael Hanselmann
      return False
332 c08d76f5 Michael Hanselmann
333 c08d76f5 Michael Hanselmann
    if not self._dd_ready:
334 c08d76f5 Michael Hanselmann
      # There's a race condition between starting the program and sending
335 c08d76f5 Michael Hanselmann
      # signals.  The signal handler is only registered after some time, so we
336 c08d76f5 Michael Hanselmann
      # have to check whether the program is ready. If it isn't, sending a
337 c08d76f5 Michael Hanselmann
      # signal will invoke the default handler (and usually abort the program).
338 c08d76f5 Michael Hanselmann
      if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
339 c08d76f5 Michael Hanselmann
        logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
340 c08d76f5 Michael Hanselmann
        return False
341 c08d76f5 Michael Hanselmann
342 c08d76f5 Michael Hanselmann
      logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
343 c08d76f5 Michael Hanselmann
      self._dd_ready = True
344 c08d76f5 Michael Hanselmann
345 c08d76f5 Michael Hanselmann
    logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
346 c08d76f5 Michael Hanselmann
    try:
347 c08d76f5 Michael Hanselmann
      os.kill(self._dd_pid, DD_INFO_SIGNAL)
348 c08d76f5 Michael Hanselmann
    except EnvironmentError, err:
349 c08d76f5 Michael Hanselmann
      if err.errno != errno.ESRCH:
350 c08d76f5 Michael Hanselmann
        raise
351 c08d76f5 Michael Hanselmann
352 c08d76f5 Michael Hanselmann
      # Process no longer exists
353 560cbec1 Michael Hanselmann
      logging.debug("dd exited")
354 c08d76f5 Michael Hanselmann
      self._dd_pid = None
355 c08d76f5 Michael Hanselmann
356 c08d76f5 Michael Hanselmann
    return True
357 c08d76f5 Michael Hanselmann
358 34c9ee7b Michael Hanselmann
  def _ProcessOutput(self, line, prog):
359 34c9ee7b Michael Hanselmann
    """Takes care of child process output.
360 34c9ee7b Michael Hanselmann

361 34c9ee7b Michael Hanselmann
    @type line: string
362 34c9ee7b Michael Hanselmann
    @param line: Child output line
363 34c9ee7b Michael Hanselmann
    @type prog: number
364 34c9ee7b Michael Hanselmann
    @param prog: Program from which the line originates
365 34c9ee7b Michael Hanselmann

366 34c9ee7b Michael Hanselmann
    """
367 34c9ee7b Michael Hanselmann
    force_update = False
368 34c9ee7b Michael Hanselmann
    forward_line = line
369 34c9ee7b Michael Hanselmann
370 34c9ee7b Michael Hanselmann
    if prog == PROG_SOCAT:
371 34c9ee7b Michael Hanselmann
      level = None
372 34c9ee7b Michael Hanselmann
      parts = line.split(None, 4)
373 34c9ee7b Michael Hanselmann
374 34c9ee7b Michael Hanselmann
      if len(parts) == 5:
375 34c9ee7b Michael Hanselmann
        (_, _, _, level, msg) = parts
376 34c9ee7b Michael Hanselmann
377 34c9ee7b Michael Hanselmann
        force_update = self._ProcessSocatOutput(self._status_file, level, msg)
378 34c9ee7b Michael Hanselmann
379 34c9ee7b Michael Hanselmann
        if self._debug or (level and level not in SOCAT_LOG_IGNORE):
380 34c9ee7b Michael Hanselmann
          forward_line = "socat: %s %s" % (level, msg)
381 34c9ee7b Michael Hanselmann
        else:
382 34c9ee7b Michael Hanselmann
          forward_line = None
383 34c9ee7b Michael Hanselmann
      else:
384 34c9ee7b Michael Hanselmann
        forward_line = "socat: %s" % line
385 34c9ee7b Michael Hanselmann
386 c08d76f5 Michael Hanselmann
    elif prog == PROG_DD:
387 c08d76f5 Michael Hanselmann
      (should_forward, force_update) = self._ProcessDdOutput(line)
388 c08d76f5 Michael Hanselmann
389 c08d76f5 Michael Hanselmann
      if should_forward or self._debug:
390 c08d76f5 Michael Hanselmann
        forward_line = "dd: %s" % line
391 c08d76f5 Michael Hanselmann
      else:
392 c08d76f5 Michael Hanselmann
        forward_line = None
393 c08d76f5 Michael Hanselmann
394 c08d76f5 Michael Hanselmann
    elif prog == PROG_DD_PID:
395 c08d76f5 Michael Hanselmann
      if self._dd_pid:
396 c08d76f5 Michael Hanselmann
        raise RuntimeError("dd PID reported more than once")
397 c08d76f5 Michael Hanselmann
      logging.debug("Received dd PID %r", line)
398 c08d76f5 Michael Hanselmann
      self._dd_pid = int(line)
399 c08d76f5 Michael Hanselmann
      forward_line = None
400 c08d76f5 Michael Hanselmann
401 f9323011 Michael Hanselmann
    elif prog == PROG_EXP_SIZE:
402 f9323011 Michael Hanselmann
      logging.debug("Received predicted size %r", line)
403 f9323011 Michael Hanselmann
      forward_line = None
404 f9323011 Michael Hanselmann
405 f9323011 Michael Hanselmann
      if line:
406 f9323011 Michael Hanselmann
        try:
407 f9323011 Michael Hanselmann
          exp_size = utils.BytesToMebibyte(int(line))
408 f9323011 Michael Hanselmann
        except (ValueError, TypeError), err:
409 f9323011 Michael Hanselmann
          logging.error("Failed to convert predicted size %r to number: %s",
410 f9323011 Michael Hanselmann
                        line, err)
411 f9323011 Michael Hanselmann
          exp_size = None
412 f9323011 Michael Hanselmann
      else:
413 f9323011 Michael Hanselmann
        exp_size = None
414 f9323011 Michael Hanselmann
415 f9323011 Michael Hanselmann
      self._exp_size = exp_size
416 f9323011 Michael Hanselmann
417 34c9ee7b Michael Hanselmann
    if forward_line:
418 34c9ee7b Michael Hanselmann
      self._logger.info(forward_line)
419 34c9ee7b Michael Hanselmann
      self._status_file.AddRecentOutput(forward_line)
420 34c9ee7b Michael Hanselmann
421 34c9ee7b Michael Hanselmann
    self._status_file.Update(force_update)
422 34c9ee7b Michael Hanselmann
423 34c9ee7b Michael Hanselmann
  @staticmethod
424 34c9ee7b Michael Hanselmann
  def _ProcessSocatOutput(status_file, level, msg):
425 34c9ee7b Michael Hanselmann
    """Interprets socat log output.
426 34c9ee7b Michael Hanselmann

427 34c9ee7b Michael Hanselmann
    """
428 34c9ee7b Michael Hanselmann
    if level == SOCAT_LOG_NOTICE:
429 34c9ee7b Michael Hanselmann
      if status_file.GetListenPort() is None:
430 34c9ee7b Michael Hanselmann
        # TODO: Maybe implement timeout to not listen forever
431 34c9ee7b Michael Hanselmann
        m = LISTENING_RE.match(msg)
432 34c9ee7b Michael Hanselmann
        if m:
433 34c9ee7b Michael Hanselmann
          (_, port) = _VerifyListening(int(m.group("family")),
434 34c9ee7b Michael Hanselmann
                                       m.group("address"),
435 34c9ee7b Michael Hanselmann
                                       int(m.group("port")))
436 34c9ee7b Michael Hanselmann
437 34c9ee7b Michael Hanselmann
          status_file.SetListenPort(port)
438 34c9ee7b Michael Hanselmann
          return True
439 34c9ee7b Michael Hanselmann
440 34c9ee7b Michael Hanselmann
      if not status_file.GetConnected():
441 34c9ee7b Michael Hanselmann
        m = TRANSFER_LOOP_RE.match(msg)
442 34c9ee7b Michael Hanselmann
        if m:
443 53dbf14c Michael Hanselmann
          logging.debug("Connection established")
444 34c9ee7b Michael Hanselmann
          status_file.SetConnected()
445 34c9ee7b Michael Hanselmann
          return True
446 34c9ee7b Michael Hanselmann
447 34c9ee7b Michael Hanselmann
    return False
448 c08d76f5 Michael Hanselmann
449 c08d76f5 Michael Hanselmann
  def _ProcessDdOutput(self, line):
450 c08d76f5 Michael Hanselmann
    """Interprets a line of dd(1)'s output.
451 c08d76f5 Michael Hanselmann

452 c08d76f5 Michael Hanselmann
    """
453 c08d76f5 Michael Hanselmann
    m = DD_INFO_RE.match(line)
454 c08d76f5 Michael Hanselmann
    if m:
455 c08d76f5 Michael Hanselmann
      seconds = float(m.group("seconds"))
456 c08d76f5 Michael Hanselmann
      mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
457 c08d76f5 Michael Hanselmann
      self._UpdateDdProgress(seconds, mbytes)
458 c08d76f5 Michael Hanselmann
      return (False, True)
459 c08d76f5 Michael Hanselmann
460 c08d76f5 Michael Hanselmann
    m = DD_STDERR_IGNORE.match(line)
461 c08d76f5 Michael Hanselmann
    if m:
462 c08d76f5 Michael Hanselmann
      # Ignore
463 c08d76f5 Michael Hanselmann
      return (False, False)
464 c08d76f5 Michael Hanselmann
465 c08d76f5 Michael Hanselmann
    # Forward line
466 c08d76f5 Michael Hanselmann
    return (True, False)
467 c08d76f5 Michael Hanselmann
468 c08d76f5 Michael Hanselmann
  def _UpdateDdProgress(self, seconds, mbytes):
469 c08d76f5 Michael Hanselmann
    """Updates the internal status variables for dd(1) progress.
470 c08d76f5 Michael Hanselmann

471 c08d76f5 Michael Hanselmann
    @type seconds: float
472 c08d76f5 Michael Hanselmann
    @param seconds: Timestamp of this update
473 c08d76f5 Michael Hanselmann
    @type mbytes: float
474 c08d76f5 Michael Hanselmann
    @param mbytes: Total number of MiB transferred so far
475 c08d76f5 Michael Hanselmann

476 c08d76f5 Michael Hanselmann
    """
477 c08d76f5 Michael Hanselmann
    # Add latest sample
478 c08d76f5 Michael Hanselmann
    self._dd_progress.append((seconds, mbytes))
479 c08d76f5 Michael Hanselmann
480 c08d76f5 Michael Hanselmann
    # Remove old samples
481 c08d76f5 Michael Hanselmann
    del self._dd_progress[:-self._dd_tp_samples]
482 c08d76f5 Michael Hanselmann
483 c08d76f5 Michael Hanselmann
    # Calculate throughput
484 c08d76f5 Michael Hanselmann
    throughput = _CalcThroughput(self._dd_progress)
485 c08d76f5 Michael Hanselmann
486 c08d76f5 Michael Hanselmann
    # Calculate percent and ETA
487 c08d76f5 Michael Hanselmann
    percent = None
488 c08d76f5 Michael Hanselmann
    eta = None
489 c08d76f5 Michael Hanselmann
490 c08d76f5 Michael Hanselmann
    if self._exp_size is not None:
491 c08d76f5 Michael Hanselmann
      if self._exp_size != 0:
492 c08d76f5 Michael Hanselmann
        percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
493 c08d76f5 Michael Hanselmann
494 c08d76f5 Michael Hanselmann
      if throughput:
495 c08d76f5 Michael Hanselmann
        eta = max(0, float(self._exp_size - mbytes) / throughput)
496 c08d76f5 Michael Hanselmann
497 c08d76f5 Michael Hanselmann
    self._status_file.SetProgress(mbytes, throughput, percent, eta)
498 c08d76f5 Michael Hanselmann
499 c08d76f5 Michael Hanselmann
500 c08d76f5 Michael Hanselmann
def _CalcThroughput(samples):
501 c08d76f5 Michael Hanselmann
  """Calculates the throughput in MiB/second.
502 c08d76f5 Michael Hanselmann

503 c08d76f5 Michael Hanselmann
  @type samples: sequence
504 c08d76f5 Michael Hanselmann
  @param samples: List of samples, each consisting of a (timestamp, mbytes)
505 c08d76f5 Michael Hanselmann
                  tuple
506 c08d76f5 Michael Hanselmann
  @rtype: float or None
507 c08d76f5 Michael Hanselmann
  @return: Throughput in MiB/second
508 c08d76f5 Michael Hanselmann

509 c08d76f5 Michael Hanselmann
  """
510 c08d76f5 Michael Hanselmann
  if len(samples) < 2:
511 c08d76f5 Michael Hanselmann
    # Can't calculate throughput
512 c08d76f5 Michael Hanselmann
    return None
513 c08d76f5 Michael Hanselmann
514 c08d76f5 Michael Hanselmann
  (start_time, start_mbytes) = samples[0]
515 c08d76f5 Michael Hanselmann
  (end_time, end_mbytes) = samples[-1]
516 c08d76f5 Michael Hanselmann
517 c08d76f5 Michael Hanselmann
  return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)