Revision c08d76f5
b/daemons/import-export | ||
---|---|---|
35 | 35 |
import subprocess |
36 | 36 |
import sys |
37 | 37 |
import time |
38 |
import math |
|
38 | 39 |
|
39 | 40 |
from ganeti import constants |
40 | 41 |
from ganeti import cli |
... | ... | |
57 | 58 |
#: How long to wait for a connection to be established |
58 | 59 |
DEFAULT_CONNECT_TIMEOUT = 60 |
59 | 60 |
|
61 |
#: Get dd(1) statistics every few seconds |
|
62 |
DD_STATISTICS_INTERVAL = 5.0 |
|
63 |
|
|
64 |
#: Seconds for throughput calculation |
|
65 |
DD_THROUGHPUT_INTERVAL = 60.0 |
|
66 |
|
|
67 |
#: Number of samples for throughput calculation |
|
68 |
DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) / |
|
69 |
DD_STATISTICS_INTERVAL)) |
|
70 |
|
|
60 | 71 |
|
61 | 72 |
# Global variable for options |
62 | 73 |
options = None |
... | ... | |
140 | 151 |
""" |
141 | 152 |
return self._data.connected |
142 | 153 |
|
154 |
def SetProgress(self, mbytes, throughput, percent, eta): |
|
155 |
"""Sets how much data has been transferred so far. |
|
156 |
|
|
157 |
@type mbytes: number |
|
158 |
@param mbytes: Transferred amount of data in MiB. |
|
159 |
@type throughput: float |
|
160 |
@param throughput: MiB/second |
|
161 |
@type percent: number |
|
162 |
@param percent: Percent processed |
|
163 |
@type eta: number |
|
164 |
@param eta: Expected number of seconds until done |
|
165 |
|
|
166 |
""" |
|
167 |
self._data.progress_mbytes = mbytes |
|
168 |
self._data.progress_throughput = throughput |
|
169 |
self._data.progress_percent = percent |
|
170 |
self._data.progress_eta = eta |
|
171 |
|
|
143 | 172 |
def SetExitStatus(self, exit_status, error_message): |
144 | 173 |
"""Sets the exit status and an error message. |
145 | 174 |
|
... | ... | |
177 | 206 |
mode=0400) |
178 | 207 |
|
179 | 208 |
|
180 |
def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger, |
|
209 |
def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, |
|
210 |
dd_pid_read_fd, status_file, child_logger, |
|
181 | 211 |
signal_notify, signal_handler, mode): |
182 | 212 |
"""Handles the child processes' output. |
183 | 213 |
|
... | ... | |
189 | 219 |
# might buffer data while poll(2) won't mark its file descriptor as |
190 | 220 |
# readable again. |
191 | 221 |
socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0) |
222 |
dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0) |
|
223 |
dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0) |
|
224 |
|
|
225 |
tp_samples = DD_THROUGHPUT_SAMPLES |
|
192 | 226 |
|
193 | 227 |
child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file, |
194 |
child_logger) |
|
228 |
child_logger, |
|
229 |
throughput_samples=tp_samples) |
|
195 | 230 |
try: |
196 | 231 |
fdmap = { |
197 | 232 |
child.stderr.fileno(): |
198 | 233 |
(child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)), |
199 | 234 |
socat_stderr_read.fileno(): |
200 | 235 |
(socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)), |
236 |
dd_pid_read.fileno(): |
|
237 |
(dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)), |
|
238 |
dd_stderr_read.fileno(): |
|
239 |
(dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)), |
|
201 | 240 |
signal_notify.fileno(): (signal_notify, None), |
202 | 241 |
} |
203 | 242 |
|
... | ... | |
212 | 251 |
listen_timeout = None |
213 | 252 |
|
214 | 253 |
exit_timeout = None |
254 |
dd_stats_timeout = None |
|
215 | 255 |
|
216 | 256 |
while True: |
217 | 257 |
# Break out of loop if only signal notify FD is left |
... | ... | |
239 | 279 |
logging.info("Child process didn't exit in time") |
240 | 280 |
break |
241 | 281 |
|
282 |
if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0: |
|
283 |
notify_status = child_io_proc.NotifyDd() |
|
284 |
if notify_status: |
|
285 |
# Schedule next notification |
|
286 |
dd_stats_timeout = locking.RunningTimeout(DD_STATISTICS_INTERVAL, |
|
287 |
True) |
|
288 |
else: |
|
289 |
# Try again soon (dd isn't ready yet) |
|
290 |
dd_stats_timeout = locking.RunningTimeout(1.0, True) |
|
291 |
|
|
292 |
if dd_stats_timeout: |
|
293 |
dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000) |
|
294 |
|
|
295 |
if timeout is None: |
|
296 |
timeout = dd_timeout |
|
297 |
else: |
|
298 |
timeout = min(timeout, dd_timeout) |
|
299 |
|
|
242 | 300 |
for fd, event in utils.RetryOnSignal(poller.poll, timeout): |
243 | 301 |
if event & (select.POLLIN | event & select.POLLPRI): |
244 | 302 |
(from_, to) = fdmap[fd] |
... | ... | |
410 | 468 |
# Pipe to receive socat's stderr output |
411 | 469 |
(socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe() |
412 | 470 |
|
471 |
# Pipe to receive dd's stderr output |
|
472 |
(dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe() |
|
473 |
|
|
474 |
# Pipe to receive dd's PID |
|
475 |
(dd_pid_read_fd, dd_pid_write_fd) = os.pipe() |
|
476 |
|
|
413 | 477 |
# Get child process command |
414 |
cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd) |
|
478 |
cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd, |
|
479 |
dd_stderr_write_fd, dd_pid_write_fd) |
|
415 | 480 |
cmd = cmd_builder.GetCommand() |
416 | 481 |
|
417 | 482 |
logging.debug("Starting command %r", cmd) |
418 | 483 |
|
419 | 484 |
# Start child process |
420 |
child = ChildProcess(cmd, [socat_stderr_write_fd]) |
|
485 |
child = ChildProcess(cmd, [socat_stderr_write_fd, dd_stderr_write_fd, |
|
486 |
dd_pid_write_fd]) |
|
421 | 487 |
try: |
422 | 488 |
def _ForwardSignal(signum, _): |
423 | 489 |
"""Forwards signals to child process. |
... | ... | |
438 | 504 |
try: |
439 | 505 |
# Close child's side |
440 | 506 |
utils.RetryOnSignal(os.close, socat_stderr_write_fd) |
507 |
utils.RetryOnSignal(os.close, dd_stderr_write_fd) |
|
508 |
utils.RetryOnSignal(os.close, dd_pid_write_fd) |
|
441 | 509 |
|
442 |
if ProcessChildIO(child, socat_stderr_read_fd, status_file,
|
|
443 |
child_logger, signal_wakeup, signal_handler,
|
|
444 |
mode): |
|
510 |
if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
|
|
511 |
dd_pid_read_fd, status_file, child_logger,
|
|
512 |
signal_wakeup, signal_handler, mode):
|
|
445 | 513 |
# The child closed all its file descriptors and there was no |
446 | 514 |
# signal |
447 | 515 |
# TODO: Implement timeout instead of waiting indefinitely |
b/lib/impexpd/__init__.py | ||
---|---|---|
23 | 23 |
|
24 | 24 |
""" |
25 | 25 |
|
26 |
import os |
|
26 | 27 |
import re |
27 | 28 |
import socket |
28 | 29 |
import logging |
30 |
import signal |
|
31 |
import errno |
|
32 |
import time |
|
29 | 33 |
from cStringIO import StringIO |
30 | 34 |
|
31 | 35 |
from ganeti import constants |
... | ... | |
57 | 61 |
SOCAT_LOG_NOTICE, |
58 | 62 |
]) |
59 | 63 |
|
64 |
#: Used to parse GNU dd(1) statistics |
|
65 |
DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*" |
|
66 |
r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I) |
|
67 |
|
|
68 |
#: Used to ignore "N+N records in/out" on dd(1)'s stderr |
|
69 |
DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I) |
|
70 |
|
|
71 |
#: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is |
|
72 |
#: unavailable and SIGUSR1 is used instead) |
|
73 |
DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1) |
|
74 |
|
|
60 | 75 |
#: Buffer size: at most this many bytes are transferred at once |
61 | 76 |
BUFSIZE = 1024 * 1024 |
62 | 77 |
|
... | ... | |
65 | 80 |
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"] |
66 | 81 |
|
67 | 82 |
(PROG_OTHER, |
68 |
PROG_SOCAT) = range(1, 3) |
|
83 |
PROG_SOCAT, |
|
84 |
PROG_DD, |
|
85 |
PROG_DD_PID) = range(1, 5) |
|
69 | 86 |
PROG_ALL = frozenset([ |
70 | 87 |
PROG_OTHER, |
71 | 88 |
PROG_SOCAT, |
89 |
PROG_DD, |
|
90 |
PROG_DD_PID, |
|
72 | 91 |
]) |
73 | 92 |
|
74 | 93 |
|
75 | 94 |
class CommandBuilder(object): |
76 |
def __init__(self, mode, opts, socat_stderr_fd): |
|
95 |
def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
|
|
77 | 96 |
"""Initializes this class. |
78 | 97 |
|
79 | 98 |
@param mode: Daemon mode (import or export) |
80 | 99 |
@param opts: Options object |
81 | 100 |
@type socat_stderr_fd: int |
82 | 101 |
@param socat_stderr_fd: File descriptor socat should write its stderr to |
102 |
@type dd_stderr_fd: int |
|
103 |
@param dd_stderr_fd: File descriptor dd should write its stderr to |
|
104 |
@type dd_pid_fd: int |
|
105 |
@param dd_pid_fd: File descriptor the child should write dd's PID to |
|
83 | 106 |
|
84 | 107 |
""" |
85 | 108 |
self._opts = opts |
86 | 109 |
self._mode = mode |
87 | 110 |
self._socat_stderr_fd = socat_stderr_fd |
111 |
self._dd_stderr_fd = dd_stderr_fd |
|
112 |
self._dd_pid_fd = dd_pid_fd |
|
88 | 113 |
|
89 | 114 |
@staticmethod |
90 | 115 |
def GetBashCommand(cmd): |
... | ... | |
172 | 197 |
(utils.ShellQuoteArgs(self._GetSocatCommand()), |
173 | 198 |
self._socat_stderr_fd)) |
174 | 199 |
|
200 |
dd_cmd = StringIO() |
|
201 |
# Setting LC_ALL since we want to parse the output and explicitely |
|
202 |
# redirecting stdin, as the background process (dd) would have /dev/null as |
|
203 |
# stdin otherwise |
|
204 |
dd_cmd.write("{ LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" % |
|
205 |
(BUFSIZE, self._dd_stderr_fd)) |
|
206 |
# Send PID to daemon |
|
207 |
dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd) |
|
208 |
# And wait for dd |
|
209 |
dd_cmd.write(" wait $pid;") |
|
210 |
dd_cmd.write(" }") |
|
211 |
|
|
175 | 212 |
compr = self._opts.compress |
176 | 213 |
|
177 | 214 |
assert compr in constants.IEC_ALL |
... | ... | |
181 | 218 |
transport_cmd = "%s | gunzip -c" % socat_cmd |
182 | 219 |
else: |
183 | 220 |
transport_cmd = socat_cmd |
221 |
|
|
222 |
transport_cmd += " | %s" % dd_cmd.getvalue() |
|
184 | 223 |
elif self._mode == constants.IEM_EXPORT: |
185 | 224 |
if compr == constants.IEC_GZIP: |
186 | 225 |
transport_cmd = "gzip -c | %s" % socat_cmd |
187 | 226 |
else: |
188 | 227 |
transport_cmd = socat_cmd |
228 |
|
|
229 |
transport_cmd = "%s | %s" % (dd_cmd.getvalue(), transport_cmd) |
|
189 | 230 |
else: |
190 | 231 |
raise errors.GenericError("Invalid mode '%s'" % self._mode) |
191 | 232 |
|
192 |
# TODO: Use "dd" to measure processed data (allows to give an ETA) |
|
193 |
|
|
194 | 233 |
# TODO: Run transport as separate user |
195 | 234 |
# The transport uses its own shell to simplify running it as a separate user |
196 | 235 |
# in the future. |
... | ... | |
235 | 274 |
|
236 | 275 |
|
237 | 276 |
class ChildIOProcessor(object): |
238 |
def __init__(self, debug, status_file, logger): |
|
277 |
def __init__(self, debug, status_file, logger, throughput_samples):
|
|
239 | 278 |
"""Initializes this class. |
240 | 279 |
|
241 | 280 |
""" |
... | ... | |
246 | 285 |
self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog)) |
247 | 286 |
for prog in PROG_ALL]) |
248 | 287 |
|
288 |
self._dd_pid = None |
|
289 |
self._dd_ready = False |
|
290 |
self._dd_tp_samples = throughput_samples |
|
291 |
self._dd_progress = [] |
|
292 |
|
|
293 |
# Expected size of transferred data |
|
294 |
self._exp_size = None |
|
295 |
|
|
249 | 296 |
def GetLineSplitter(self, prog): |
250 | 297 |
"""Returns the line splitter for a program. |
251 | 298 |
|
... | ... | |
267 | 314 |
ls.close() |
268 | 315 |
self._splitter.clear() |
269 | 316 |
|
317 |
def NotifyDd(self): |
|
318 |
"""Tells dd(1) to write statistics. |
|
319 |
|
|
320 |
""" |
|
321 |
if self._dd_pid is None: |
|
322 |
# Can't notify |
|
323 |
return False |
|
324 |
|
|
325 |
if not self._dd_ready: |
|
326 |
# There's a race condition between starting the program and sending |
|
327 |
# signals. The signal handler is only registered after some time, so we |
|
328 |
# have to check whether the program is ready. If it isn't, sending a |
|
329 |
# signal will invoke the default handler (and usually abort the program). |
|
330 |
if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL): |
|
331 |
logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL) |
|
332 |
return False |
|
333 |
|
|
334 |
logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL) |
|
335 |
self._dd_ready = True |
|
336 |
|
|
337 |
logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid) |
|
338 |
try: |
|
339 |
os.kill(self._dd_pid, DD_INFO_SIGNAL) |
|
340 |
except EnvironmentError, err: |
|
341 |
if err.errno != errno.ESRCH: |
|
342 |
raise |
|
343 |
|
|
344 |
# Process no longer exists |
|
345 |
self._dd_pid = None |
|
346 |
|
|
347 |
return True |
|
348 |
|
|
270 | 349 |
def _ProcessOutput(self, line, prog): |
271 | 350 |
"""Takes care of child process output. |
272 | 351 |
|
... | ... | |
295 | 374 |
else: |
296 | 375 |
forward_line = "socat: %s" % line |
297 | 376 |
|
377 |
elif prog == PROG_DD: |
|
378 |
(should_forward, force_update) = self._ProcessDdOutput(line) |
|
379 |
|
|
380 |
if should_forward or self._debug: |
|
381 |
forward_line = "dd: %s" % line |
|
382 |
else: |
|
383 |
forward_line = None |
|
384 |
|
|
385 |
elif prog == PROG_DD_PID: |
|
386 |
if self._dd_pid: |
|
387 |
raise RuntimeError("dd PID reported more than once") |
|
388 |
logging.debug("Received dd PID %r", line) |
|
389 |
self._dd_pid = int(line) |
|
390 |
forward_line = None |
|
391 |
|
|
298 | 392 |
if forward_line: |
299 | 393 |
self._logger.info(forward_line) |
300 | 394 |
self._status_file.AddRecentOutput(forward_line) |
... | ... | |
326 | 420 |
return True |
327 | 421 |
|
328 | 422 |
return False |
423 |
|
|
424 |
def _ProcessDdOutput(self, line): |
|
425 |
"""Interprets a line of dd(1)'s output. |
|
426 |
|
|
427 |
""" |
|
428 |
m = DD_INFO_RE.match(line) |
|
429 |
if m: |
|
430 |
seconds = float(m.group("seconds")) |
|
431 |
mbytes = utils.BytesToMebibyte(int(m.group("bytes"))) |
|
432 |
self._UpdateDdProgress(seconds, mbytes) |
|
433 |
return (False, True) |
|
434 |
|
|
435 |
m = DD_STDERR_IGNORE.match(line) |
|
436 |
if m: |
|
437 |
# Ignore |
|
438 |
return (False, False) |
|
439 |
|
|
440 |
# Forward line |
|
441 |
return (True, False) |
|
442 |
|
|
443 |
def _UpdateDdProgress(self, seconds, mbytes): |
|
444 |
"""Updates the internal status variables for dd(1) progress. |
|
445 |
|
|
446 |
@type seconds: float |
|
447 |
@param seconds: Timestamp of this update |
|
448 |
@type mbytes: float |
|
449 |
@param mbytes: Total number of MiB transferred so far |
|
450 |
|
|
451 |
""" |
|
452 |
# Add latest sample |
|
453 |
self._dd_progress.append((seconds, mbytes)) |
|
454 |
|
|
455 |
# Remove old samples |
|
456 |
del self._dd_progress[:-self._dd_tp_samples] |
|
457 |
|
|
458 |
# Calculate throughput |
|
459 |
throughput = _CalcThroughput(self._dd_progress) |
|
460 |
|
|
461 |
# Calculate percent and ETA |
|
462 |
percent = None |
|
463 |
eta = None |
|
464 |
|
|
465 |
if self._exp_size is not None: |
|
466 |
if self._exp_size != 0: |
|
467 |
percent = max(0, min(100, (100.0 * mbytes) / self._exp_size)) |
|
468 |
|
|
469 |
if throughput: |
|
470 |
eta = max(0, float(self._exp_size - mbytes) / throughput) |
|
471 |
|
|
472 |
self._status_file.SetProgress(mbytes, throughput, percent, eta) |
|
473 |
|
|
474 |
|
|
475 |
def _CalcThroughput(samples): |
|
476 |
"""Calculates the throughput in MiB/second. |
|
477 |
|
|
478 |
@type samples: sequence |
|
479 |
@param samples: List of samples, each consisting of a (timestamp, mbytes) |
|
480 |
tuple |
|
481 |
@rtype: float or None |
|
482 |
@return: Throughput in MiB/second |
|
483 |
|
|
484 |
""" |
|
485 |
if len(samples) < 2: |
|
486 |
# Can't calculate throughput |
|
487 |
return None |
|
488 |
|
|
489 |
(start_time, start_mbytes) = samples[0] |
|
490 |
(end_time, end_mbytes) = samples[-1] |
|
491 |
|
|
492 |
return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time) |
b/lib/objects.py | ||
---|---|---|
1017 | 1017 |
"recent_output", |
1018 | 1018 |
"listen_port", |
1019 | 1019 |
"connected", |
1020 |
"progress_mbytes", |
|
1021 |
"progress_throughput", |
|
1022 |
"progress_eta", |
|
1023 |
"progress_percent", |
|
1020 | 1024 |
"exit_status", |
1021 | 1025 |
"error_message", |
1022 | 1026 |
] + _TIMESTAMPS |
b/test/ganeti.impexpd_unittest.py | ||
---|---|---|
76 | 76 |
cmd_prefix=cmd_prefix, |
77 | 77 |
cmd_suffix=cmd_suffix) |
78 | 78 |
|
79 |
builder = impexpd.CommandBuilder(mode, opts, 1) |
|
79 |
builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
|
|
80 | 80 |
|
81 | 81 |
# Check complete command |
82 | 82 |
cmd = builder.GetCommand() |
... | ... | |
108 | 108 |
ca="/some/path/with,a/,comma") |
109 | 109 |
|
110 | 110 |
for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]: |
111 |
builder = impexpd.CommandBuilder(mode, opts, 1) |
|
111 |
builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
|
|
112 | 112 |
self.assertRaises(errors.GenericError, builder.GetCommand) |
113 | 113 |
|
114 | 114 |
def testModeError(self): |
... | ... | |
117 | 117 |
assert mode not in [constants.IEM_IMPORT, constants.IEM_EXPORT] |
118 | 118 |
|
119 | 119 |
opts = CmdBuilderConfig(host="localhost", port=1234) |
120 |
builder = impexpd.CommandBuilder(mode, opts, 1) |
|
120 |
builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
|
|
121 | 121 |
self.assertRaises(errors.GenericError, builder.GetCommand) |
122 | 122 |
|
123 | 123 |
|
124 |
class TestCalcThroughput(unittest.TestCase): |
|
125 |
def test(self): |
|
126 |
self.assertEqual(impexpd._CalcThroughput([]), None) |
|
127 |
self.assertEqual(impexpd._CalcThroughput([(0, 0)]), None) |
|
128 |
|
|
129 |
samples = [ |
|
130 |
(0.0, 0.0), |
|
131 |
(10.0, 100.0), |
|
132 |
] |
|
133 |
self.assertAlmostEqual(impexpd._CalcThroughput(samples), 10.0, 3) |
|
134 |
|
|
135 |
samples = [ |
|
136 |
(5.0, 7.0), |
|
137 |
(10.0, 100.0), |
|
138 |
(16.0, 181.0), |
|
139 |
] |
|
140 |
self.assertAlmostEqual(impexpd._CalcThroughput(samples), 15.818, 3) |
|
141 |
|
|
142 |
|
|
124 | 143 |
if __name__ == "__main__": |
125 | 144 |
testutils.GanetiTestProgram() |
Also available in: Unified diff