Revision c08d76f5 lib/impexpd/__init__.py

b/lib/impexpd/__init__.py
23 23

  
24 24
"""
25 25

  
26
import os
26 27
import re
27 28
import socket
28 29
import logging
30
import signal
31
import errno
32
import time
29 33
from cStringIO import StringIO
30 34

  
31 35
from ganeti import constants
......
57 61
  SOCAT_LOG_NOTICE,
58 62
  ])
59 63

  
64
#: Used to parse GNU dd(1) statistics
65
DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
66
                        r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
67

  
68
#: Used to ignore "N+N records in/out" on dd(1)'s stderr
69
DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
70

  
71
#: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
72
#: unavailable and SIGUSR1 is used instead)
73
DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
74

  
60 75
#: Buffer size: at most this many bytes are transferred at once
61 76
BUFSIZE = 1024 * 1024
62 77

  
......
65 80
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
66 81

  
67 82
(PROG_OTHER,
68
 PROG_SOCAT) = range(1, 3)
83
 PROG_SOCAT,
84
 PROG_DD,
85
 PROG_DD_PID) = range(1, 5)
69 86
PROG_ALL = frozenset([
70 87
  PROG_OTHER,
71 88
  PROG_SOCAT,
89
  PROG_DD,
90
  PROG_DD_PID,
72 91
  ])
73 92

  
74 93

  
75 94
class CommandBuilder(object):
76
  def __init__(self, mode, opts, socat_stderr_fd):
95
  def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
77 96
    """Initializes this class.
78 97

  
79 98
    @param mode: Daemon mode (import or export)
80 99
    @param opts: Options object
81 100
    @type socat_stderr_fd: int
82 101
    @param socat_stderr_fd: File descriptor socat should write its stderr to
102
    @type dd_stderr_fd: int
103
    @param dd_stderr_fd: File descriptor dd should write its stderr to
104
    @type dd_pid_fd: int
105
    @param dd_pid_fd: File descriptor the child should write dd's PID to
83 106

  
84 107
    """
85 108
    self._opts = opts
86 109
    self._mode = mode
87 110
    self._socat_stderr_fd = socat_stderr_fd
111
    self._dd_stderr_fd = dd_stderr_fd
112
    self._dd_pid_fd = dd_pid_fd
88 113

  
89 114
  @staticmethod
90 115
  def GetBashCommand(cmd):
......
172 197
                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
173 198
                  self._socat_stderr_fd))
174 199

  
200
    dd_cmd = StringIO()
201
    # Setting LC_ALL since we want to parse the output and explicitely
202
    # redirecting stdin, as the background process (dd) would have /dev/null as
203
    # stdin otherwise
204
    dd_cmd.write("{ LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
205
                 (BUFSIZE, self._dd_stderr_fd))
206
    # Send PID to daemon
207
    dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
208
    # And wait for dd
209
    dd_cmd.write(" wait $pid;")
210
    dd_cmd.write(" }")
211

  
175 212
    compr = self._opts.compress
176 213

  
177 214
    assert compr in constants.IEC_ALL
......
181 218
        transport_cmd = "%s | gunzip -c" % socat_cmd
182 219
      else:
183 220
        transport_cmd = socat_cmd
221

  
222
      transport_cmd += " | %s" % dd_cmd.getvalue()
184 223
    elif self._mode == constants.IEM_EXPORT:
185 224
      if compr == constants.IEC_GZIP:
186 225
        transport_cmd = "gzip -c | %s" % socat_cmd
187 226
      else:
188 227
        transport_cmd = socat_cmd
228

  
229
      transport_cmd = "%s | %s" % (dd_cmd.getvalue(), transport_cmd)
189 230
    else:
190 231
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
191 232

  
192
    # TODO: Use "dd" to measure processed data (allows to give an ETA)
193

  
194 233
    # TODO: Run transport as separate user
195 234
    # The transport uses its own shell to simplify running it as a separate user
196 235
    # in the future.
......
235 274

  
236 275

  
237 276
class ChildIOProcessor(object):
238
  def __init__(self, debug, status_file, logger):
277
  def __init__(self, debug, status_file, logger, throughput_samples):
239 278
    """Initializes this class.
240 279

  
241 280
    """
......
246 285
    self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
247 286
                           for prog in PROG_ALL])
248 287

  
288
    self._dd_pid = None
289
    self._dd_ready = False
290
    self._dd_tp_samples = throughput_samples
291
    self._dd_progress = []
292

  
293
    # Expected size of transferred data
294
    self._exp_size = None
295

  
249 296
  def GetLineSplitter(self, prog):
250 297
    """Returns the line splitter for a program.
251 298

  
......
267 314
      ls.close()
268 315
    self._splitter.clear()
269 316

  
317
  def NotifyDd(self):
318
    """Tells dd(1) to write statistics.
319

  
320
    """
321
    if self._dd_pid is None:
322
      # Can't notify
323
      return False
324

  
325
    if not self._dd_ready:
326
      # There's a race condition between starting the program and sending
327
      # signals.  The signal handler is only registered after some time, so we
328
      # have to check whether the program is ready. If it isn't, sending a
329
      # signal will invoke the default handler (and usually abort the program).
330
      if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
331
        logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
332
        return False
333

  
334
      logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
335
      self._dd_ready = True
336

  
337
    logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
338
    try:
339
      os.kill(self._dd_pid, DD_INFO_SIGNAL)
340
    except EnvironmentError, err:
341
      if err.errno != errno.ESRCH:
342
        raise
343

  
344
      # Process no longer exists
345
      self._dd_pid = None
346

  
347
    return True
348

  
270 349
  def _ProcessOutput(self, line, prog):
271 350
    """Takes care of child process output.
272 351

  
......
295 374
      else:
296 375
        forward_line = "socat: %s" % line
297 376

  
377
    elif prog == PROG_DD:
378
      (should_forward, force_update) = self._ProcessDdOutput(line)
379

  
380
      if should_forward or self._debug:
381
        forward_line = "dd: %s" % line
382
      else:
383
        forward_line = None
384

  
385
    elif prog == PROG_DD_PID:
386
      if self._dd_pid:
387
        raise RuntimeError("dd PID reported more than once")
388
      logging.debug("Received dd PID %r", line)
389
      self._dd_pid = int(line)
390
      forward_line = None
391

  
298 392
    if forward_line:
299 393
      self._logger.info(forward_line)
300 394
      self._status_file.AddRecentOutput(forward_line)
......
326 420
          return True
327 421

  
328 422
    return False
423

  
424
  def _ProcessDdOutput(self, line):
425
    """Interprets a line of dd(1)'s output.
426

  
427
    """
428
    m = DD_INFO_RE.match(line)
429
    if m:
430
      seconds = float(m.group("seconds"))
431
      mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
432
      self._UpdateDdProgress(seconds, mbytes)
433
      return (False, True)
434

  
435
    m = DD_STDERR_IGNORE.match(line)
436
    if m:
437
      # Ignore
438
      return (False, False)
439

  
440
    # Forward line
441
    return (True, False)
442

  
443
  def _UpdateDdProgress(self, seconds, mbytes):
444
    """Updates the internal status variables for dd(1) progress.
445

  
446
    @type seconds: float
447
    @param seconds: Timestamp of this update
448
    @type mbytes: float
449
    @param mbytes: Total number of MiB transferred so far
450

  
451
    """
452
    # Add latest sample
453
    self._dd_progress.append((seconds, mbytes))
454

  
455
    # Remove old samples
456
    del self._dd_progress[:-self._dd_tp_samples]
457

  
458
    # Calculate throughput
459
    throughput = _CalcThroughput(self._dd_progress)
460

  
461
    # Calculate percent and ETA
462
    percent = None
463
    eta = None
464

  
465
    if self._exp_size is not None:
466
      if self._exp_size != 0:
467
        percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
468

  
469
      if throughput:
470
        eta = max(0, float(self._exp_size - mbytes) / throughput)
471

  
472
    self._status_file.SetProgress(mbytes, throughput, percent, eta)
473

  
474

  
475
def _CalcThroughput(samples):
476
  """Calculates the throughput in MiB/second.
477

  
478
  @type samples: sequence
479
  @param samples: List of samples, each consisting of a (timestamp, mbytes)
480
                  tuple
481
  @rtype: float or None
482
  @return: Throughput in MiB/second
483

  
484
  """
485
  if len(samples) < 2:
486
    # Can't calculate throughput
487
    return None
488

  
489
  (start_time, start_mbytes) = samples[0]
490
  (end_time, end_mbytes) = samples[-1]
491

  
492
  return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)

Also available in: Unified diff