Revision c08d76f5 daemons/import-export

b/daemons/import-export
35 35
import subprocess
36 36
import sys
37 37
import time
38
import math
38 39

  
39 40
from ganeti import constants
40 41
from ganeti import cli
......
57 58
#: How long to wait for a connection to be established
58 59
DEFAULT_CONNECT_TIMEOUT = 60
59 60

  
61
#: Get dd(1) statistics every few seconds
62
DD_STATISTICS_INTERVAL = 5.0
63

  
64
#: Seconds for throughput calculation
65
DD_THROUGHPUT_INTERVAL = 60.0
66

  
67
#: Number of samples for throughput calculation
68
DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) /
69
                                      DD_STATISTICS_INTERVAL))
70

  
60 71

  
61 72
# Global variable for options
62 73
options = None
......
140 151
    """
141 152
    return self._data.connected
142 153

  
154
  def SetProgress(self, mbytes, throughput, percent, eta):
155
    """Sets how much data has been transferred so far.
156

  
157
    @type mbytes: number
158
    @param mbytes: Transferred amount of data in MiB.
159
    @type throughput: float
160
    @param throughput: MiB/second
161
    @type percent: number
162
    @param percent: Percent processed
163
    @type eta: number
164
    @param eta: Expected number of seconds until done
165

  
166
    """
167
    self._data.progress_mbytes = mbytes
168
    self._data.progress_throughput = throughput
169
    self._data.progress_percent = percent
170
    self._data.progress_eta = eta
171

  
143 172
  def SetExitStatus(self, exit_status, error_message):
144 173
    """Sets the exit status and an error message.
145 174

  
......
177 206
                    mode=0400)
178 207

  
179 208

  
180
def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
209
def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
210
                   dd_pid_read_fd, status_file, child_logger,
181 211
                   signal_notify, signal_handler, mode):
182 212
  """Handles the child processes' output.
183 213

  
......
189 219
  # might buffer data while poll(2) won't mark its file descriptor as
190 220
  # readable again.
191 221
  socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
222
  dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
223
  dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
224

  
225
  tp_samples = DD_THROUGHPUT_SAMPLES
192 226

  
193 227
  child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
194
                                           child_logger)
228
                                           child_logger,
229
                                           throughput_samples=tp_samples)
195 230
  try:
196 231
    fdmap = {
197 232
      child.stderr.fileno():
198 233
        (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)),
199 234
      socat_stderr_read.fileno():
200 235
        (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)),
236
      dd_pid_read.fileno():
237
        (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
238
      dd_stderr_read.fileno():
239
        (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
201 240
      signal_notify.fileno(): (signal_notify, None),
202 241
      }
203 242

  
......
212 251
      listen_timeout = None
213 252

  
214 253
    exit_timeout = None
254
    dd_stats_timeout = None
215 255

  
216 256
    while True:
217 257
      # Break out of loop if only signal notify FD is left
......
239 279
          logging.info("Child process didn't exit in time")
240 280
          break
241 281

  
282
      if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0:
283
        notify_status = child_io_proc.NotifyDd()
284
        if notify_status:
285
          # Schedule next notification
286
          dd_stats_timeout = locking.RunningTimeout(DD_STATISTICS_INTERVAL,
287
                                                    True)
288
        else:
289
          # Try again soon (dd isn't ready yet)
290
          dd_stats_timeout = locking.RunningTimeout(1.0, True)
291

  
292
      if dd_stats_timeout:
293
        dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000)
294

  
295
        if timeout is None:
296
          timeout = dd_timeout
297
        else:
298
          timeout = min(timeout, dd_timeout)
299

  
242 300
      for fd, event in utils.RetryOnSignal(poller.poll, timeout):
243 301
        if event & (select.POLLIN | event & select.POLLPRI):
244 302
          (from_, to) = fdmap[fd]
......
410 468
      # Pipe to receive socat's stderr output
411 469
      (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
412 470

  
471
      # Pipe to receive dd's stderr output
472
      (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe()
473

  
474
      # Pipe to receive dd's PID
475
      (dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
476

  
413 477
      # Get child process command
414
      cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd)
478
      cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
479
                                           dd_stderr_write_fd, dd_pid_write_fd)
415 480
      cmd = cmd_builder.GetCommand()
416 481

  
417 482
      logging.debug("Starting command %r", cmd)
418 483

  
419 484
      # Start child process
420
      child = ChildProcess(cmd, [socat_stderr_write_fd])
485
      child = ChildProcess(cmd, [socat_stderr_write_fd, dd_stderr_write_fd,
486
                                 dd_pid_write_fd])
421 487
      try:
422 488
        def _ForwardSignal(signum, _):
423 489
          """Forwards signals to child process.
......
438 504
          try:
439 505
            # Close child's side
440 506
            utils.RetryOnSignal(os.close, socat_stderr_write_fd)
507
            utils.RetryOnSignal(os.close, dd_stderr_write_fd)
508
            utils.RetryOnSignal(os.close, dd_pid_write_fd)
441 509

  
442
            if ProcessChildIO(child, socat_stderr_read_fd, status_file,
443
                              child_logger, signal_wakeup, signal_handler,
444
                              mode):
510
            if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
511
                              dd_pid_read_fd, status_file, child_logger,
512
                              signal_wakeup, signal_handler, mode):
445 513
              # The child closed all its file descriptors and there was no
446 514
              # signal
447 515
              # TODO: Implement timeout instead of waiting indefinitely

Also available in: Unified diff