Revision c08d76f5 lib/impexpd/__init__.py
b/lib/impexpd/__init__.py | ||
---|---|---|
23 | 23 |
|
24 | 24 |
""" |
25 | 25 |
|
26 |
import os |
|
26 | 27 |
import re |
27 | 28 |
import socket |
28 | 29 |
import logging |
30 |
import signal |
|
31 |
import errno |
|
32 |
import time |
|
29 | 33 |
from cStringIO import StringIO |
30 | 34 |
|
31 | 35 |
from ganeti import constants |
... | ... | |
57 | 61 |
SOCAT_LOG_NOTICE, |
58 | 62 |
]) |
59 | 63 |
|
64 |
#: Used to parse GNU dd(1) statistics |
|
65 |
DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*" |
|
66 |
r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I) |
|
67 |
|
|
68 |
#: Used to ignore "N+N records in/out" on dd(1)'s stderr |
|
69 |
DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I) |
|
70 |
|
|
71 |
#: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is |
|
72 |
#: unavailable and SIGUSR1 is used instead) |
|
73 |
DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1) |
|
74 |
|
|
60 | 75 |
#: Buffer size: at most this many bytes are transferred at once |
61 | 76 |
BUFSIZE = 1024 * 1024 |
62 | 77 |
|
... | ... | |
65 | 80 |
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"] |
66 | 81 |
|
67 | 82 |
(PROG_OTHER, |
68 |
PROG_SOCAT) = range(1, 3) |
|
83 |
PROG_SOCAT, |
|
84 |
PROG_DD, |
|
85 |
PROG_DD_PID) = range(1, 5) |
|
69 | 86 |
PROG_ALL = frozenset([ |
70 | 87 |
PROG_OTHER, |
71 | 88 |
PROG_SOCAT, |
89 |
PROG_DD, |
|
90 |
PROG_DD_PID, |
|
72 | 91 |
]) |
73 | 92 |
|
74 | 93 |
|
75 | 94 |
class CommandBuilder(object): |
76 |
def __init__(self, mode, opts, socat_stderr_fd): |
|
95 |
def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
|
|
77 | 96 |
"""Initializes this class. |
78 | 97 |
|
79 | 98 |
@param mode: Daemon mode (import or export) |
80 | 99 |
@param opts: Options object |
81 | 100 |
@type socat_stderr_fd: int |
82 | 101 |
@param socat_stderr_fd: File descriptor socat should write its stderr to |
102 |
@type dd_stderr_fd: int |
|
103 |
@param dd_stderr_fd: File descriptor dd should write its stderr to |
|
104 |
@type dd_pid_fd: int |
|
105 |
@param dd_pid_fd: File descriptor the child should write dd's PID to |
|
83 | 106 |
|
84 | 107 |
""" |
85 | 108 |
self._opts = opts |
86 | 109 |
self._mode = mode |
87 | 110 |
self._socat_stderr_fd = socat_stderr_fd |
111 |
self._dd_stderr_fd = dd_stderr_fd |
|
112 |
self._dd_pid_fd = dd_pid_fd |
|
88 | 113 |
|
89 | 114 |
@staticmethod |
90 | 115 |
def GetBashCommand(cmd): |
... | ... | |
172 | 197 |
(utils.ShellQuoteArgs(self._GetSocatCommand()), |
173 | 198 |
self._socat_stderr_fd)) |
174 | 199 |
|
200 |
dd_cmd = StringIO() |
|
201 |
# Setting LC_ALL since we want to parse the output and explicitely |
|
202 |
# redirecting stdin, as the background process (dd) would have /dev/null as |
|
203 |
# stdin otherwise |
|
204 |
dd_cmd.write("{ LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" % |
|
205 |
(BUFSIZE, self._dd_stderr_fd)) |
|
206 |
# Send PID to daemon |
|
207 |
dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd) |
|
208 |
# And wait for dd |
|
209 |
dd_cmd.write(" wait $pid;") |
|
210 |
dd_cmd.write(" }") |
|
211 |
|
|
175 | 212 |
compr = self._opts.compress |
176 | 213 |
|
177 | 214 |
assert compr in constants.IEC_ALL |
... | ... | |
181 | 218 |
transport_cmd = "%s | gunzip -c" % socat_cmd |
182 | 219 |
else: |
183 | 220 |
transport_cmd = socat_cmd |
221 |
|
|
222 |
transport_cmd += " | %s" % dd_cmd.getvalue() |
|
184 | 223 |
elif self._mode == constants.IEM_EXPORT: |
185 | 224 |
if compr == constants.IEC_GZIP: |
186 | 225 |
transport_cmd = "gzip -c | %s" % socat_cmd |
187 | 226 |
else: |
188 | 227 |
transport_cmd = socat_cmd |
228 |
|
|
229 |
transport_cmd = "%s | %s" % (dd_cmd.getvalue(), transport_cmd) |
|
189 | 230 |
else: |
190 | 231 |
raise errors.GenericError("Invalid mode '%s'" % self._mode) |
191 | 232 |
|
192 |
# TODO: Use "dd" to measure processed data (allows to give an ETA) |
|
193 |
|
|
194 | 233 |
# TODO: Run transport as separate user |
195 | 234 |
# The transport uses its own shell to simplify running it as a separate user |
196 | 235 |
# in the future. |
... | ... | |
235 | 274 |
|
236 | 275 |
|
237 | 276 |
class ChildIOProcessor(object): |
238 |
def __init__(self, debug, status_file, logger): |
|
277 |
def __init__(self, debug, status_file, logger, throughput_samples):
|
|
239 | 278 |
"""Initializes this class. |
240 | 279 |
|
241 | 280 |
""" |
... | ... | |
246 | 285 |
self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog)) |
247 | 286 |
for prog in PROG_ALL]) |
248 | 287 |
|
288 |
self._dd_pid = None |
|
289 |
self._dd_ready = False |
|
290 |
self._dd_tp_samples = throughput_samples |
|
291 |
self._dd_progress = [] |
|
292 |
|
|
293 |
# Expected size of transferred data |
|
294 |
self._exp_size = None |
|
295 |
|
|
249 | 296 |
def GetLineSplitter(self, prog): |
250 | 297 |
"""Returns the line splitter for a program. |
251 | 298 |
|
... | ... | |
267 | 314 |
ls.close() |
268 | 315 |
self._splitter.clear() |
269 | 316 |
|
317 |
def NotifyDd(self): |
|
318 |
"""Tells dd(1) to write statistics. |
|
319 |
|
|
320 |
""" |
|
321 |
if self._dd_pid is None: |
|
322 |
# Can't notify |
|
323 |
return False |
|
324 |
|
|
325 |
if not self._dd_ready: |
|
326 |
# There's a race condition between starting the program and sending |
|
327 |
# signals. The signal handler is only registered after some time, so we |
|
328 |
# have to check whether the program is ready. If it isn't, sending a |
|
329 |
# signal will invoke the default handler (and usually abort the program). |
|
330 |
if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL): |
|
331 |
logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL) |
|
332 |
return False |
|
333 |
|
|
334 |
logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL) |
|
335 |
self._dd_ready = True |
|
336 |
|
|
337 |
logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid) |
|
338 |
try: |
|
339 |
os.kill(self._dd_pid, DD_INFO_SIGNAL) |
|
340 |
except EnvironmentError, err: |
|
341 |
if err.errno != errno.ESRCH: |
|
342 |
raise |
|
343 |
|
|
344 |
# Process no longer exists |
|
345 |
self._dd_pid = None |
|
346 |
|
|
347 |
return True |
|
348 |
|
|
270 | 349 |
def _ProcessOutput(self, line, prog): |
271 | 350 |
"""Takes care of child process output. |
272 | 351 |
|
... | ... | |
295 | 374 |
else: |
296 | 375 |
forward_line = "socat: %s" % line |
297 | 376 |
|
377 |
elif prog == PROG_DD: |
|
378 |
(should_forward, force_update) = self._ProcessDdOutput(line) |
|
379 |
|
|
380 |
if should_forward or self._debug: |
|
381 |
forward_line = "dd: %s" % line |
|
382 |
else: |
|
383 |
forward_line = None |
|
384 |
|
|
385 |
elif prog == PROG_DD_PID: |
|
386 |
if self._dd_pid: |
|
387 |
raise RuntimeError("dd PID reported more than once") |
|
388 |
logging.debug("Received dd PID %r", line) |
|
389 |
self._dd_pid = int(line) |
|
390 |
forward_line = None |
|
391 |
|
|
298 | 392 |
if forward_line: |
299 | 393 |
self._logger.info(forward_line) |
300 | 394 |
self._status_file.AddRecentOutput(forward_line) |
... | ... | |
326 | 420 |
return True |
327 | 421 |
|
328 | 422 |
return False |
423 |
|
|
424 |
def _ProcessDdOutput(self, line): |
|
425 |
"""Interprets a line of dd(1)'s output. |
|
426 |
|
|
427 |
""" |
|
428 |
m = DD_INFO_RE.match(line) |
|
429 |
if m: |
|
430 |
seconds = float(m.group("seconds")) |
|
431 |
mbytes = utils.BytesToMebibyte(int(m.group("bytes"))) |
|
432 |
self._UpdateDdProgress(seconds, mbytes) |
|
433 |
return (False, True) |
|
434 |
|
|
435 |
m = DD_STDERR_IGNORE.match(line) |
|
436 |
if m: |
|
437 |
# Ignore |
|
438 |
return (False, False) |
|
439 |
|
|
440 |
# Forward line |
|
441 |
return (True, False) |
|
442 |
|
|
443 |
def _UpdateDdProgress(self, seconds, mbytes): |
|
444 |
"""Updates the internal status variables for dd(1) progress. |
|
445 |
|
|
446 |
@type seconds: float |
|
447 |
@param seconds: Timestamp of this update |
|
448 |
@type mbytes: float |
|
449 |
@param mbytes: Total number of MiB transferred so far |
|
450 |
|
|
451 |
""" |
|
452 |
# Add latest sample |
|
453 |
self._dd_progress.append((seconds, mbytes)) |
|
454 |
|
|
455 |
# Remove old samples |
|
456 |
del self._dd_progress[:-self._dd_tp_samples] |
|
457 |
|
|
458 |
# Calculate throughput |
|
459 |
throughput = _CalcThroughput(self._dd_progress) |
|
460 |
|
|
461 |
# Calculate percent and ETA |
|
462 |
percent = None |
|
463 |
eta = None |
|
464 |
|
|
465 |
if self._exp_size is not None: |
|
466 |
if self._exp_size != 0: |
|
467 |
percent = max(0, min(100, (100.0 * mbytes) / self._exp_size)) |
|
468 |
|
|
469 |
if throughput: |
|
470 |
eta = max(0, float(self._exp_size - mbytes) / throughput) |
|
471 |
|
|
472 |
self._status_file.SetProgress(mbytes, throughput, percent, eta) |
|
473 |
|
|
474 |
|
|
475 |
def _CalcThroughput(samples): |
|
476 |
"""Calculates the throughput in MiB/second. |
|
477 |
|
|
478 |
@type samples: sequence |
|
479 |
@param samples: List of samples, each consisting of a (timestamp, mbytes) |
|
480 |
tuple |
|
481 |
@rtype: float or None |
|
482 |
@return: Throughput in MiB/second |
|
483 |
|
|
484 |
""" |
|
485 |
if len(samples) < 2: |
|
486 |
# Can't calculate throughput |
|
487 |
return None |
|
488 |
|
|
489 |
(start_time, start_mbytes) = samples[0] |
|
490 |
(end_time, end_mbytes) = samples[-1] |
|
491 |
|
|
492 |
return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time) |
Also available in: Unified diff