Revision bb44b1ae daemons/import-export
b/daemons/import-export | ||
---|---|---|
37 | 37 |
import subprocess |
38 | 38 |
import sys |
39 | 39 |
import time |
40 |
from cStringIO import StringIO |
|
41 | 40 |
|
42 | 41 |
from ganeti import constants |
43 | 42 |
from ganeti import cli |
... | ... | |
45 | 44 |
from ganeti import serializer |
46 | 45 |
from ganeti import objects |
47 | 46 |
from ganeti import locking |
47 |
from ganeti import impexpd |
|
48 | 48 |
|
49 | 49 |
|
50 | 50 |
#: Used to recognize point at which socat(1) starts to listen on its socket. |
... | ... | |
71 | 71 |
SOCAT_LOG_NOTICE, |
72 | 72 |
]) |
73 | 73 |
|
74 |
#: Socat buffer size: at most this many bytes are transferred per step |
|
75 |
SOCAT_BUFSIZE = 1024 * 1024 |
|
76 |
|
|
77 | 74 |
#: How many lines to keep in the status file |
78 | 75 |
MAX_RECENT_OUTPUT_LINES = 20 |
79 | 76 |
|
... | ... | |
86 | 83 |
#: How long to wait for a connection to be established |
87 | 84 |
DEFAULT_CONNECT_TIMEOUT = 60 |
88 | 85 |
|
89 |
# Common options for socat |
|
90 |
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"] |
|
91 |
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"] |
|
92 |
|
|
93 | 86 |
|
94 | 87 |
# Global variable for options |
95 | 88 |
options = None |
... | ... | |
291 | 284 |
status_file.Update(force_update) |
292 | 285 |
|
293 | 286 |
|
294 |
class CommandBuilder(object): |
|
295 |
def __init__(self, mode, opts, socat_stderr_fd): |
|
296 |
"""Initializes this class. |
|
297 |
|
|
298 |
@param mode: Daemon mode (import or export) |
|
299 |
@param opts: Options object |
|
300 |
@type socat_stderr_fd: int |
|
301 |
@param socat_stderr_fd: File descriptor socat should write its stderr to |
|
302 |
|
|
303 |
""" |
|
304 |
self._opts = opts |
|
305 |
self._mode = mode |
|
306 |
self._socat_stderr_fd = socat_stderr_fd |
|
307 |
|
|
308 |
@staticmethod |
|
309 |
def GetBashCommand(cmd): |
|
310 |
"""Prepares a command to be run in Bash. |
|
311 |
|
|
312 |
""" |
|
313 |
return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd] |
|
314 |
|
|
315 |
def _GetSocatCommand(self): |
|
316 |
"""Returns the socat command. |
|
317 |
|
|
318 |
""" |
|
319 |
common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [ |
|
320 |
"key=%s" % self._opts.key, |
|
321 |
"cert=%s" % self._opts.cert, |
|
322 |
"cafile=%s" % self._opts.ca, |
|
323 |
] |
|
324 |
|
|
325 |
if self._opts.bind is not None: |
|
326 |
common_addr_opts.append("bind=%s" % self._opts.bind) |
|
327 |
|
|
328 |
if self._mode == constants.IEM_IMPORT: |
|
329 |
if self._opts.port is None: |
|
330 |
port = 0 |
|
331 |
else: |
|
332 |
port = self._opts.port |
|
333 |
|
|
334 |
addr1 = [ |
|
335 |
"OPENSSL-LISTEN:%s" % port, |
|
336 |
"reuseaddr", |
|
337 |
|
|
338 |
# Retry to listen if connection wasn't established successfully, up to |
|
339 |
# 100 times a second. Note that this still leaves room for DoS attacks. |
|
340 |
"forever", |
|
341 |
"intervall=0.01", |
|
342 |
] + common_addr_opts |
|
343 |
addr2 = ["stdout"] |
|
344 |
|
|
345 |
elif self._mode == constants.IEM_EXPORT: |
|
346 |
addr1 = ["stdin"] |
|
347 |
addr2 = [ |
|
348 |
"OPENSSL:%s:%s" % (self._opts.host, self._opts.port), |
|
349 |
|
|
350 |
# How long to wait per connection attempt |
|
351 |
"connect-timeout=%s" % self._opts.connect_timeout, |
|
352 |
|
|
353 |
# Retry a few times before giving up to connect (once per second) |
|
354 |
"retry=%s" % self._opts.connect_retries, |
|
355 |
"intervall=1", |
|
356 |
] + common_addr_opts |
|
357 |
|
|
358 |
else: |
|
359 |
raise Error("Invalid mode '%s'" % self._mode) |
|
360 |
|
|
361 |
for i in [addr1, addr2]: |
|
362 |
for value in i: |
|
363 |
if "," in value: |
|
364 |
raise Error("Comma not allowed in socat option value: %r" % value) |
|
365 |
|
|
366 |
return [ |
|
367 |
constants.SOCAT_PATH, |
|
368 |
|
|
369 |
# Log to stderr |
|
370 |
"-ls", |
|
371 |
|
|
372 |
# Log level |
|
373 |
"-d", "-d", |
|
374 |
|
|
375 |
# Buffer size |
|
376 |
"-b%s" % SOCAT_BUFSIZE, |
|
377 |
|
|
378 |
# Unidirectional mode, the first address is only used for reading, and the |
|
379 |
# second address is only used for writing |
|
380 |
"-u", |
|
381 |
|
|
382 |
",".join(addr1), ",".join(addr2) |
|
383 |
] |
|
384 |
|
|
385 |
def _GetTransportCommand(self): |
|
386 |
"""Returns the command for the transport part of the daemon. |
|
387 |
|
|
388 |
""" |
|
389 |
socat_cmd = ("%s 2>&%d" % |
|
390 |
(utils.ShellQuoteArgs(self._GetSocatCommand()), |
|
391 |
self._socat_stderr_fd)) |
|
392 |
|
|
393 |
compr = self._opts.compress |
|
394 |
|
|
395 |
assert compr in constants.IEC_ALL |
|
396 |
|
|
397 |
if self._mode == constants.IEM_IMPORT: |
|
398 |
if compr == constants.IEC_GZIP: |
|
399 |
transport_cmd = "%s | gunzip -c" % socat_cmd |
|
400 |
else: |
|
401 |
transport_cmd = socat_cmd |
|
402 |
elif self._mode == constants.IEM_EXPORT: |
|
403 |
if compr == constants.IEC_GZIP: |
|
404 |
transport_cmd = "gzip -c | %s" % socat_cmd |
|
405 |
else: |
|
406 |
transport_cmd = socat_cmd |
|
407 |
else: |
|
408 |
raise Error("Invalid mode '%s'" % self._mode) |
|
409 |
|
|
410 |
# TODO: Use "dd" to measure processed data (allows to give an ETA) |
|
411 |
|
|
412 |
# TODO: Run transport as separate user |
|
413 |
# The transport uses its own shell to simplify running it as a separate user |
|
414 |
# in the future. |
|
415 |
return self.GetBashCommand(transport_cmd) |
|
416 |
|
|
417 |
def GetCommand(self): |
|
418 |
"""Returns the complete child process command. |
|
419 |
|
|
420 |
""" |
|
421 |
transport_cmd = self._GetTransportCommand() |
|
422 |
|
|
423 |
buf = StringIO() |
|
424 |
|
|
425 |
if self._opts.cmd_prefix: |
|
426 |
buf.write(self._opts.cmd_prefix) |
|
427 |
buf.write(" ") |
|
428 |
|
|
429 |
buf.write(utils.ShellQuoteArgs(transport_cmd)) |
|
430 |
|
|
431 |
if self._opts.cmd_suffix: |
|
432 |
buf.write(" ") |
|
433 |
buf.write(self._opts.cmd_suffix) |
|
434 |
|
|
435 |
return self.GetBashCommand(buf.getvalue()) |
|
436 |
|
|
437 |
|
|
438 | 287 |
def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger, |
439 | 288 |
signal_notify, signal_handler, mode): |
440 | 289 |
"""Handles the child processes' output. |
... | ... | |
673 | 522 |
(socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe() |
674 | 523 |
|
675 | 524 |
# Get child process command |
676 |
cmd = CommandBuilder(mode, options, socat_stderr_write_fd).GetCommand() |
|
525 |
cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd) |
|
526 |
cmd = cmd_builder.GetCommand() |
|
677 | 527 |
|
678 | 528 |
logging.debug("Starting command %r", cmd) |
679 | 529 |
|
Also available in: Unified diff