Revision c08d76f5 daemons/import-export
b/daemons/import-export | ||
---|---|---|
35 | 35 |
import subprocess |
36 | 36 |
import sys |
37 | 37 |
import time |
38 |
import math |
|
38 | 39 |
|
39 | 40 |
from ganeti import constants |
40 | 41 |
from ganeti import cli |
... | ... | |
57 | 58 |
#: How long to wait for a connection to be established |
58 | 59 |
DEFAULT_CONNECT_TIMEOUT = 60 |
59 | 60 |
|
61 |
#: Get dd(1) statistics every few seconds |
|
62 |
DD_STATISTICS_INTERVAL = 5.0 |
|
63 |
|
|
64 |
#: Seconds for throughput calculation |
|
65 |
DD_THROUGHPUT_INTERVAL = 60.0 |
|
66 |
|
|
67 |
#: Number of samples for throughput calculation |
|
68 |
DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) / |
|
69 |
DD_STATISTICS_INTERVAL)) |
|
70 |
|
|
60 | 71 |
|
61 | 72 |
# Global variable for options |
62 | 73 |
options = None |
... | ... | |
140 | 151 |
""" |
141 | 152 |
return self._data.connected |
142 | 153 |
|
154 |
def SetProgress(self, mbytes, throughput, percent, eta): |
|
155 |
"""Sets how much data has been transferred so far. |
|
156 |
|
|
157 |
@type mbytes: number |
|
158 |
@param mbytes: Transferred amount of data in MiB. |
|
159 |
@type throughput: float |
|
160 |
@param throughput: MiB/second |
|
161 |
@type percent: number |
|
162 |
@param percent: Percent processed |
|
163 |
@type eta: number |
|
164 |
@param eta: Expected number of seconds until done |
|
165 |
|
|
166 |
""" |
|
167 |
self._data.progress_mbytes = mbytes |
|
168 |
self._data.progress_throughput = throughput |
|
169 |
self._data.progress_percent = percent |
|
170 |
self._data.progress_eta = eta |
|
171 |
|
|
143 | 172 |
def SetExitStatus(self, exit_status, error_message): |
144 | 173 |
"""Sets the exit status and an error message. |
145 | 174 |
|
... | ... | |
177 | 206 |
mode=0400) |
178 | 207 |
|
179 | 208 |
|
180 |
def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger, |
|
209 |
def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, |
|
210 |
dd_pid_read_fd, status_file, child_logger, |
|
181 | 211 |
signal_notify, signal_handler, mode): |
182 | 212 |
"""Handles the child processes' output. |
183 | 213 |
|
... | ... | |
189 | 219 |
# might buffer data while poll(2) won't mark its file descriptor as |
190 | 220 |
# readable again. |
191 | 221 |
socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0) |
222 |
dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0) |
|
223 |
dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0) |
|
224 |
|
|
225 |
tp_samples = DD_THROUGHPUT_SAMPLES |
|
192 | 226 |
|
193 | 227 |
child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file, |
194 |
child_logger) |
|
228 |
child_logger, |
|
229 |
throughput_samples=tp_samples) |
|
195 | 230 |
try: |
196 | 231 |
fdmap = { |
197 | 232 |
child.stderr.fileno(): |
198 | 233 |
(child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)), |
199 | 234 |
socat_stderr_read.fileno(): |
200 | 235 |
(socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)), |
236 |
dd_pid_read.fileno(): |
|
237 |
(dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)), |
|
238 |
dd_stderr_read.fileno(): |
|
239 |
(dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)), |
|
201 | 240 |
signal_notify.fileno(): (signal_notify, None), |
202 | 241 |
} |
203 | 242 |
|
... | ... | |
212 | 251 |
listen_timeout = None |
213 | 252 |
|
214 | 253 |
exit_timeout = None |
254 |
dd_stats_timeout = None |
|
215 | 255 |
|
216 | 256 |
while True: |
217 | 257 |
# Break out of loop if only signal notify FD is left |
... | ... | |
239 | 279 |
logging.info("Child process didn't exit in time") |
240 | 280 |
break |
241 | 281 |
|
282 |
if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0: |
|
283 |
notify_status = child_io_proc.NotifyDd() |
|
284 |
if notify_status: |
|
285 |
# Schedule next notification |
|
286 |
dd_stats_timeout = locking.RunningTimeout(DD_STATISTICS_INTERVAL, |
|
287 |
True) |
|
288 |
else: |
|
289 |
# Try again soon (dd isn't ready yet) |
|
290 |
dd_stats_timeout = locking.RunningTimeout(1.0, True) |
|
291 |
|
|
292 |
if dd_stats_timeout: |
|
293 |
dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000) |
|
294 |
|
|
295 |
if timeout is None: |
|
296 |
timeout = dd_timeout |
|
297 |
else: |
|
298 |
timeout = min(timeout, dd_timeout) |
|
299 |
|
|
242 | 300 |
for fd, event in utils.RetryOnSignal(poller.poll, timeout): |
243 | 301 |
if event & (select.POLLIN | event & select.POLLPRI): |
244 | 302 |
(from_, to) = fdmap[fd] |
... | ... | |
410 | 468 |
# Pipe to receive socat's stderr output |
411 | 469 |
(socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe() |
412 | 470 |
|
471 |
# Pipe to receive dd's stderr output |
|
472 |
(dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe() |
|
473 |
|
|
474 |
# Pipe to receive dd's PID |
|
475 |
(dd_pid_read_fd, dd_pid_write_fd) = os.pipe() |
|
476 |
|
|
413 | 477 |
# Get child process command |
414 |
cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd) |
|
478 |
cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd, |
|
479 |
dd_stderr_write_fd, dd_pid_write_fd) |
|
415 | 480 |
cmd = cmd_builder.GetCommand() |
416 | 481 |
|
417 | 482 |
logging.debug("Starting command %r", cmd) |
418 | 483 |
|
419 | 484 |
# Start child process |
420 |
child = ChildProcess(cmd, [socat_stderr_write_fd]) |
|
485 |
child = ChildProcess(cmd, [socat_stderr_write_fd, dd_stderr_write_fd, |
|
486 |
dd_pid_write_fd]) |
|
421 | 487 |
try: |
422 | 488 |
def _ForwardSignal(signum, _): |
423 | 489 |
"""Forwards signals to child process. |
... | ... | |
438 | 504 |
try: |
439 | 505 |
# Close child's side |
440 | 506 |
utils.RetryOnSignal(os.close, socat_stderr_write_fd) |
507 |
utils.RetryOnSignal(os.close, dd_stderr_write_fd) |
|
508 |
utils.RetryOnSignal(os.close, dd_pid_write_fd) |
|
441 | 509 |
|
442 |
if ProcessChildIO(child, socat_stderr_read_fd, status_file,
|
|
443 |
child_logger, signal_wakeup, signal_handler,
|
|
444 |
mode): |
|
510 |
if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
|
|
511 |
dd_pid_read_fd, status_file, child_logger,
|
|
512 |
signal_wakeup, signal_handler, mode):
|
|
445 | 513 |
# The child closed all its file descriptors and there was no |
446 | 514 |
# signal |
447 | 515 |
# TODO: Implement timeout instead of waiting indefinitely |
Also available in: Unified diff