Revision 1e915b86 daemons/import-export
b/daemons/import-export | ||
---|---|---|
291 | 291 |
status_file.Update(force_update) |
292 | 292 |
|
293 | 293 |
|
294 |
def GetBashCommand(cmd): |
|
295 |
"""Prepares a command to be run in Bash. |
|
294 |
class CommandBuilder(object): |
|
295 |
def __init__(self, mode, opts, socat_stderr_fd): |
|
296 |
"""Initializes this class. |
|
296 | 297 |
|
297 |
""" |
|
298 |
return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd] |
|
298 |
@param mode: Daemon mode (import or export) |
|
299 |
@param opts: Options object |
|
300 |
@type socat_stderr_fd: int |
|
301 |
@param socat_stderr_fd: File descriptor socat should write its stderr to |
|
299 | 302 |
|
303 |
""" |
|
304 |
self._opts = opts |
|
305 |
self._mode = mode |
|
306 |
self._socat_stderr_fd = socat_stderr_fd |
|
300 | 307 |
|
301 |
def GetSocatCommand(mode): |
|
302 |
"""Returns the socat command. |
|
308 |
@staticmethod |
|
309 |
def GetBashCommand(cmd): |
|
310 |
"""Prepares a command to be run in Bash. |
|
303 | 311 |
|
304 |
""" |
|
305 |
common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [ |
|
306 |
"key=%s" % options.key, |
|
307 |
"cert=%s" % options.cert, |
|
308 |
"cafile=%s" % options.ca, |
|
309 |
] |
|
310 |
|
|
311 |
if options.bind is not None: |
|
312 |
common_addr_opts.append("bind=%s" % options.bind) |
|
313 |
|
|
314 |
if mode == constants.IEM_IMPORT: |
|
315 |
if options.port is None: |
|
316 |
port = 0 |
|
317 |
else: |
|
318 |
port = options.port |
|
312 |
""" |
|
313 |
return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd] |
|
319 | 314 |
|
320 |
addr1 = [ |
|
321 |
"OPENSSL-LISTEN:%s" % port, |
|
322 |
"reuseaddr", |
|
315 |
def _GetSocatCommand(self): |
|
316 |
"""Returns the socat command. |
|
323 | 317 |
|
324 |
# Retry to listen if connection wasn't established successfully, up to |
|
325 |
# 100 times a second. Note that this still leaves room for DoS attacks. |
|
326 |
"forever", |
|
327 |
"intervall=0.01", |
|
328 |
] + common_addr_opts |
|
329 |
addr2 = ["stdout"] |
|
318 |
""" |
|
319 |
common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [ |
|
320 |
"key=%s" % self._opts.key, |
|
321 |
"cert=%s" % self._opts.cert, |
|
322 |
"cafile=%s" % self._opts.ca, |
|
323 |
] |
|
324 |
|
|
325 |
if self._opts.bind is not None: |
|
326 |
common_addr_opts.append("bind=%s" % self._opts.bind) |
|
327 |
|
|
328 |
if self._mode == constants.IEM_IMPORT: |
|
329 |
if self._opts.port is None: |
|
330 |
port = 0 |
|
331 |
else: |
|
332 |
port = self._opts.port |
|
330 | 333 |
|
331 |
elif mode == constants.IEM_EXPORT: |
|
332 |
addr1 = ["stdin"] |
|
333 |
addr2 = [ |
|
334 |
"OPENSSL:%s:%s" % (options.host, options.port), |
|
334 |
addr1 = [ |
|
335 |
"OPENSSL-LISTEN:%s" % port, |
|
336 |
"reuseaddr", |
|
335 | 337 |
|
336 |
# How long to wait per connection attempt |
|
337 |
"connect-timeout=%s" % options.connect_timeout, |
|
338 |
# Retry to listen if connection wasn't established successfully, up to |
|
339 |
# 100 times a second. Note that this still leaves room for DoS attacks. |
|
340 |
"forever", |
|
341 |
"intervall=0.01", |
|
342 |
] + common_addr_opts |
|
343 |
addr2 = ["stdout"] |
|
338 | 344 |
|
339 |
# Retry a few times before giving up to connect (once per second)
|
|
340 |
"retry=%s" % options.connect_retries,
|
|
341 |
"intervall=1",
|
|
342 |
] + common_addr_opts
|
|
345 |
elif self._mode == constants.IEM_EXPORT:
|
|
346 |
addr1 = ["stdin"]
|
|
347 |
addr2 = [
|
|
348 |
"OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
|
|
343 | 349 |
|
344 |
else:
|
|
345 |
raise Error("Invalid mode")
|
|
350 |
# How long to wait per connection attempt
|
|
351 |
"connect-timeout=%s" % self._opts.connect_timeout,
|
|
346 | 352 |
|
347 |
for i in [addr1, addr2]:
|
|
348 |
for value in i:
|
|
349 |
if "," in value:
|
|
350 |
raise Error("Comma not allowed in socat option value: %r" % value)
|
|
353 |
# Retry a few times before giving up to connect (once per second)
|
|
354 |
"retry=%s" % self._opts.connect_retries,
|
|
355 |
"intervall=1",
|
|
356 |
] + common_addr_opts
|
|
351 | 357 |
|
352 |
return [
|
|
353 |
constants.SOCAT_PATH,
|
|
358 |
else:
|
|
359 |
raise Error("Invalid mode '%s'" % self._mode)
|
|
354 | 360 |
|
355 |
# Log to stderr |
|
356 |
"-ls", |
|
361 |
for i in [addr1, addr2]: |
|
362 |
for value in i: |
|
363 |
if "," in value: |
|
364 |
raise Error("Comma not allowed in socat option value: %r" % value) |
|
357 | 365 |
|
358 |
# Log level
|
|
359 |
"-d", "-d",
|
|
366 |
return [
|
|
367 |
constants.SOCAT_PATH,
|
|
360 | 368 |
|
361 |
# Buffer size
|
|
362 |
"-b%s" % SOCAT_BUFSIZE,
|
|
369 |
# Log to stderr
|
|
370 |
"-ls",
|
|
363 | 371 |
|
364 |
# Unidirectional mode, the first address is only used for reading, and the |
|
365 |
# second address is only used for writing |
|
366 |
"-u", |
|
372 |
# Log level |
|
373 |
"-d", "-d", |
|
367 | 374 |
|
368 |
",".join(addr1), ",".join(addr2)
|
|
369 |
]
|
|
375 |
# Buffer size
|
|
376 |
"-b%s" % SOCAT_BUFSIZE,
|
|
370 | 377 |
|
378 |
# Unidirectional mode, the first address is only used for reading, and the |
|
379 |
# second address is only used for writing |
|
380 |
"-u", |
|
371 | 381 |
|
372 |
def GetTransportCommand(mode, socat_stderr_fd):
|
|
373 |
"""Returns the command for the transport part of the daemon.
|
|
382 |
",".join(addr1), ",".join(addr2)
|
|
383 |
]
|
|
374 | 384 |
|
375 |
@param mode: Daemon mode (import or export) |
|
376 |
@type socat_stderr_fd: int |
|
377 |
@param socat_stderr_fd: File descriptor socat should write its stderr to |
|
385 |
def _GetTransportCommand(self): |
|
386 |
"""Returns the command for the transport part of the daemon. |
|
378 | 387 |
|
379 |
""" |
|
380 |
socat_cmd = ("%s 2>&%d" % |
|
381 |
(utils.ShellQuoteArgs(GetSocatCommand(mode)),
|
|
382 |
socat_stderr_fd)) |
|
388 |
"""
|
|
389 |
socat_cmd = ("%s 2>&%d" %
|
|
390 |
(utils.ShellQuoteArgs(self._GetSocatCommand()),
|
|
391 |
self._socat_stderr_fd))
|
|
383 | 392 |
|
384 |
compr = options.compress
|
|
393 |
compr = self._opts.compress
|
|
385 | 394 |
|
386 |
assert compr in constants.IEC_ALL |
|
395 |
assert compr in constants.IEC_ALL
|
|
387 | 396 |
|
388 |
if mode == constants.IEM_IMPORT: |
|
389 |
if compr == constants.IEC_GZIP: |
|
390 |
transport_cmd = "%s | gunzip -c" % socat_cmd |
|
391 |
else: |
|
392 |
transport_cmd = socat_cmd |
|
393 |
elif mode == constants.IEM_EXPORT: |
|
394 |
if compr == constants.IEC_GZIP: |
|
395 |
transport_cmd = "gzip -c | %s" % socat_cmd |
|
397 |
if self._mode == constants.IEM_IMPORT: |
|
398 |
if compr == constants.IEC_GZIP: |
|
399 |
transport_cmd = "%s | gunzip -c" % socat_cmd |
|
400 |
else: |
|
401 |
transport_cmd = socat_cmd |
|
402 |
elif self._mode == constants.IEM_EXPORT: |
|
403 |
if compr == constants.IEC_GZIP: |
|
404 |
transport_cmd = "gzip -c | %s" % socat_cmd |
|
405 |
else: |
|
406 |
transport_cmd = socat_cmd |
|
396 | 407 |
else: |
397 |
transport_cmd = socat_cmd |
|
398 |
else: |
|
399 |
raise Error("Invalid mode") |
|
408 |
raise Error("Invalid mode '%s'" % self._mode) |
|
400 | 409 |
|
401 |
# TODO: Use "dd" to measure processed data (allows to give an ETA) |
|
410 |
# TODO: Use "dd" to measure processed data (allows to give an ETA)
|
|
402 | 411 |
|
403 |
# TODO: Run transport as separate user |
|
404 |
# The transport uses its own shell to simplify running it as a separate user |
|
405 |
# in the future. |
|
406 |
return GetBashCommand(transport_cmd)
|
|
412 |
# TODO: Run transport as separate user
|
|
413 |
# The transport uses its own shell to simplify running it as a separate user
|
|
414 |
# in the future.
|
|
415 |
return self.GetBashCommand(transport_cmd)
|
|
407 | 416 |
|
417 |
def GetCommand(self): |
|
418 |
"""Returns the complete child process command. |
|
408 | 419 |
|
409 |
def GetCommand(mode, socat_stderr_fd):
|
|
410 |
"""Returns the complete child process command.
|
|
420 |
"""
|
|
421 |
transport_cmd = self._GetTransportCommand()
|
|
411 | 422 |
|
412 |
""" |
|
413 |
buf = StringIO() |
|
423 |
buf = StringIO() |
|
414 | 424 |
|
415 |
if options.cmd_prefix:
|
|
416 |
buf.write(options.cmd_prefix)
|
|
417 |
buf.write(" ") |
|
425 |
if self._opts.cmd_prefix:
|
|
426 |
buf.write(self._opts.cmd_prefix)
|
|
427 |
buf.write(" ")
|
|
418 | 428 |
|
419 |
buf.write(utils.ShellQuoteArgs(GetTransportCommand(mode, socat_stderr_fd)))
|
|
429 |
buf.write(utils.ShellQuoteArgs(transport_cmd))
|
|
420 | 430 |
|
421 |
if options.cmd_suffix:
|
|
422 |
buf.write(" ") |
|
423 |
buf.write(options.cmd_suffix)
|
|
431 |
if self._opts.cmd_suffix:
|
|
432 |
buf.write(" ")
|
|
433 |
buf.write(self._opts.cmd_suffix)
|
|
424 | 434 |
|
425 |
return GetBashCommand(buf.getvalue())
|
|
435 |
return self.GetBashCommand(buf.getvalue())
|
|
426 | 436 |
|
427 | 437 |
|
428 | 438 |
def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger, |
... | ... | |
663 | 673 |
(socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe() |
664 | 674 |
|
665 | 675 |
# Get child process command |
666 |
cmd = GetCommand(mode, socat_stderr_write_fd)
|
|
676 |
cmd = CommandBuilder(mode, options, socat_stderr_write_fd).GetCommand()
|
|
667 | 677 |
|
668 | 678 |
logging.debug("Starting command %r", cmd) |
669 | 679 |
|
Also available in: Unified diff