Revision bb44b1ae daemons/import-export

b/daemons/import-export
37 37
import subprocess
38 38
import sys
39 39
import time
40
from cStringIO import StringIO
41 40

  
42 41
from ganeti import constants
43 42
from ganeti import cli
......
45 44
from ganeti import serializer
46 45
from ganeti import objects
47 46
from ganeti import locking
47
from ganeti import impexpd
48 48

  
49 49

  
50 50
#: Used to recognize point at which socat(1) starts to listen on its socket.
......
71 71
  SOCAT_LOG_NOTICE,
72 72
  ])
73 73

  
74
#: Socat buffer size: at most this many bytes are transferred per step
75
SOCAT_BUFSIZE = 1024 * 1024
76

  
77 74
#: How many lines to keep in the status file
78 75
MAX_RECENT_OUTPUT_LINES = 20
79 76

  
......
86 83
#: How long to wait for a connection to be established
87 84
DEFAULT_CONNECT_TIMEOUT = 60
88 85

  
89
# Common options for socat
90
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
91
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
92

  
93 86

  
94 87
# Global variable for options
95 88
options = None
......
291 284
  status_file.Update(force_update)
292 285

  
293 286

  
294
class CommandBuilder(object):
295
  def __init__(self, mode, opts, socat_stderr_fd):
296
    """Initializes this class.
297

  
298
    @param mode: Daemon mode (import or export)
299
    @param opts: Options object
300
    @type socat_stderr_fd: int
301
    @param socat_stderr_fd: File descriptor socat should write its stderr to
302

  
303
    """
304
    self._opts = opts
305
    self._mode = mode
306
    self._socat_stderr_fd = socat_stderr_fd
307

  
308
  @staticmethod
309
  def GetBashCommand(cmd):
310
    """Prepares a command to be run in Bash.
311

  
312
    """
313
    return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
314

  
315
  def _GetSocatCommand(self):
316
    """Returns the socat command.
317

  
318
    """
319
    common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
320
      "key=%s" % self._opts.key,
321
      "cert=%s" % self._opts.cert,
322
      "cafile=%s" % self._opts.ca,
323
      ]
324

  
325
    if self._opts.bind is not None:
326
      common_addr_opts.append("bind=%s" % self._opts.bind)
327

  
328
    if self._mode == constants.IEM_IMPORT:
329
      if self._opts.port is None:
330
        port = 0
331
      else:
332
        port = self._opts.port
333

  
334
      addr1 = [
335
        "OPENSSL-LISTEN:%s" % port,
336
        "reuseaddr",
337

  
338
        # Retry to listen if connection wasn't established successfully, up to
339
        # 100 times a second. Note that this still leaves room for DoS attacks.
340
        "forever",
341
        "intervall=0.01",
342
        ] + common_addr_opts
343
      addr2 = ["stdout"]
344

  
345
    elif self._mode == constants.IEM_EXPORT:
346
      addr1 = ["stdin"]
347
      addr2 = [
348
        "OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
349

  
350
        # How long to wait per connection attempt
351
        "connect-timeout=%s" % self._opts.connect_timeout,
352

  
353
        # Retry a few times before giving up to connect (once per second)
354
        "retry=%s" % self._opts.connect_retries,
355
        "intervall=1",
356
        ] + common_addr_opts
357

  
358
    else:
359
      raise Error("Invalid mode '%s'" % self._mode)
360

  
361
    for i in [addr1, addr2]:
362
      for value in i:
363
        if "," in value:
364
          raise Error("Comma not allowed in socat option value: %r" % value)
365

  
366
    return [
367
      constants.SOCAT_PATH,
368

  
369
      # Log to stderr
370
      "-ls",
371

  
372
      # Log level
373
      "-d", "-d",
374

  
375
      # Buffer size
376
      "-b%s" % SOCAT_BUFSIZE,
377

  
378
      # Unidirectional mode, the first address is only used for reading, and the
379
      # second address is only used for writing
380
      "-u",
381

  
382
      ",".join(addr1), ",".join(addr2)
383
      ]
384

  
385
  def _GetTransportCommand(self):
386
    """Returns the command for the transport part of the daemon.
387

  
388
    """
389
    socat_cmd = ("%s 2>&%d" %
390
                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
391
                  self._socat_stderr_fd))
392

  
393
    compr = self._opts.compress
394

  
395
    assert compr in constants.IEC_ALL
396

  
397
    if self._mode == constants.IEM_IMPORT:
398
      if compr == constants.IEC_GZIP:
399
        transport_cmd = "%s | gunzip -c" % socat_cmd
400
      else:
401
        transport_cmd = socat_cmd
402
    elif self._mode == constants.IEM_EXPORT:
403
      if compr == constants.IEC_GZIP:
404
        transport_cmd = "gzip -c | %s" % socat_cmd
405
      else:
406
        transport_cmd = socat_cmd
407
    else:
408
      raise Error("Invalid mode '%s'" % self._mode)
409

  
410
    # TODO: Use "dd" to measure processed data (allows to give an ETA)
411

  
412
    # TODO: Run transport as separate user
413
    # The transport uses its own shell to simplify running it as a separate user
414
    # in the future.
415
    return self.GetBashCommand(transport_cmd)
416

  
417
  def GetCommand(self):
418
    """Returns the complete child process command.
419

  
420
    """
421
    transport_cmd = self._GetTransportCommand()
422

  
423
    buf = StringIO()
424

  
425
    if self._opts.cmd_prefix:
426
      buf.write(self._opts.cmd_prefix)
427
      buf.write(" ")
428

  
429
    buf.write(utils.ShellQuoteArgs(transport_cmd))
430

  
431
    if self._opts.cmd_suffix:
432
      buf.write(" ")
433
      buf.write(self._opts.cmd_suffix)
434

  
435
    return self.GetBashCommand(buf.getvalue())
436

  
437

  
438 287
def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
439 288
                   signal_notify, signal_handler, mode):
440 289
  """Handles the child processes' output.
......
673 522
      (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
674 523

  
675 524
      # Get child process command
676
      cmd = CommandBuilder(mode, options, socat_stderr_write_fd).GetCommand()
525
      cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd)
526
      cmd = cmd_builder.GetCommand()
677 527

  
678 528
      logging.debug("Starting command %r", cmd)
679 529

  

Also available in: Unified diff