Revision bb44b1ae

b/Makefile.am
22 22
masterddir = $(pkgpythondir)/masterd
23 23
confddir = $(pkgpythondir)/confd
24 24
rapidir = $(pkgpythondir)/rapi
25
impexpddir = $(pkgpythondir)/impexpd
25 26
toolsdir = $(pkglibdir)/tools
26 27
docdir = $(datadir)/doc/$(PACKAGE)
27 28

  
......
40 41
	lib/confd \
41 42
	lib/http \
42 43
	lib/hypervisor \
44
	lib/impexpd \
43 45
	lib/masterd \
44 46
	lib/rapi \
45 47
	man \
......
148 150
	lib/masterd/__init__.py \
149 151
	lib/masterd/instance.py
150 152

  
153
impexpd_PYTHON = \
154
	lib/impexpd/__init__.py
155

  
151 156
docrst = \
152 157
	doc/admin.rst \
153 158
	doc/design-2.0.rst \
......
350 355
	test/ganeti.errors_unittest.py \
351 356
	test/ganeti.hooks_unittest.py \
352 357
	test/ganeti.http_unittest.py \
358
	test/ganeti.impexpd_unittest.py \
353 359
	test/ganeti.locking_unittest.py \
354 360
	test/ganeti.luxi_unittest.py \
355 361
	test/ganeti.masterd.instance_unittest.py \
......
397 403
	$(http_PYTHON) \
398 404
	$(confd_PYTHON) \
399 405
	$(masterd_PYTHON) \
406
	$(impexpd_PYTHON) \
400 407
	$(noinst_PYTHON)
401 408

  
402 409
srclink_files = \
b/daemons/import-export
37 37
import subprocess
38 38
import sys
39 39
import time
40
from cStringIO import StringIO
41 40

  
42 41
from ganeti import constants
43 42
from ganeti import cli
......
45 44
from ganeti import serializer
46 45
from ganeti import objects
47 46
from ganeti import locking
47
from ganeti import impexpd
48 48

  
49 49

  
50 50
#: Used to recognize point at which socat(1) starts to listen on its socket.
......
71 71
  SOCAT_LOG_NOTICE,
72 72
  ])
73 73

  
74
#: Socat buffer size: at most this many bytes are transferred per step
75
SOCAT_BUFSIZE = 1024 * 1024
76

  
77 74
#: How many lines to keep in the status file
78 75
MAX_RECENT_OUTPUT_LINES = 20
79 76

  
......
86 83
#: How long to wait for a connection to be established
87 84
DEFAULT_CONNECT_TIMEOUT = 60
88 85

  
89
# Common options for socat
90
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
91
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
92

  
93 86

  
94 87
# Global variable for options
95 88
options = None
......
291 284
  status_file.Update(force_update)
292 285

  
293 286

  
294
class CommandBuilder(object):
295
  def __init__(self, mode, opts, socat_stderr_fd):
296
    """Initializes this class.
297

  
298
    @param mode: Daemon mode (import or export)
299
    @param opts: Options object
300
    @type socat_stderr_fd: int
301
    @param socat_stderr_fd: File descriptor socat should write its stderr to
302

  
303
    """
304
    self._opts = opts
305
    self._mode = mode
306
    self._socat_stderr_fd = socat_stderr_fd
307

  
308
  @staticmethod
309
  def GetBashCommand(cmd):
310
    """Prepares a command to be run in Bash.
311

  
312
    """
313
    return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
314

  
315
  def _GetSocatCommand(self):
316
    """Returns the socat command.
317

  
318
    """
319
    common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
320
      "key=%s" % self._opts.key,
321
      "cert=%s" % self._opts.cert,
322
      "cafile=%s" % self._opts.ca,
323
      ]
324

  
325
    if self._opts.bind is not None:
326
      common_addr_opts.append("bind=%s" % self._opts.bind)
327

  
328
    if self._mode == constants.IEM_IMPORT:
329
      if self._opts.port is None:
330
        port = 0
331
      else:
332
        port = self._opts.port
333

  
334
      addr1 = [
335
        "OPENSSL-LISTEN:%s" % port,
336
        "reuseaddr",
337

  
338
        # Retry to listen if connection wasn't established successfully, up to
339
        # 100 times a second. Note that this still leaves room for DoS attacks.
340
        "forever",
341
        "intervall=0.01",
342
        ] + common_addr_opts
343
      addr2 = ["stdout"]
344

  
345
    elif self._mode == constants.IEM_EXPORT:
346
      addr1 = ["stdin"]
347
      addr2 = [
348
        "OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
349

  
350
        # How long to wait per connection attempt
351
        "connect-timeout=%s" % self._opts.connect_timeout,
352

  
353
        # Retry a few times before giving up to connect (once per second)
354
        "retry=%s" % self._opts.connect_retries,
355
        "intervall=1",
356
        ] + common_addr_opts
357

  
358
    else:
359
      raise Error("Invalid mode '%s'" % self._mode)
360

  
361
    for i in [addr1, addr2]:
362
      for value in i:
363
        if "," in value:
364
          raise Error("Comma not allowed in socat option value: %r" % value)
365

  
366
    return [
367
      constants.SOCAT_PATH,
368

  
369
      # Log to stderr
370
      "-ls",
371

  
372
      # Log level
373
      "-d", "-d",
374

  
375
      # Buffer size
376
      "-b%s" % SOCAT_BUFSIZE,
377

  
378
      # Unidirectional mode, the first address is only used for reading, and the
379
      # second address is only used for writing
380
      "-u",
381

  
382
      ",".join(addr1), ",".join(addr2)
383
      ]
384

  
385
  def _GetTransportCommand(self):
386
    """Returns the command for the transport part of the daemon.
387

  
388
    """
389
    socat_cmd = ("%s 2>&%d" %
390
                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
391
                  self._socat_stderr_fd))
392

  
393
    compr = self._opts.compress
394

  
395
    assert compr in constants.IEC_ALL
396

  
397
    if self._mode == constants.IEM_IMPORT:
398
      if compr == constants.IEC_GZIP:
399
        transport_cmd = "%s | gunzip -c" % socat_cmd
400
      else:
401
        transport_cmd = socat_cmd
402
    elif self._mode == constants.IEM_EXPORT:
403
      if compr == constants.IEC_GZIP:
404
        transport_cmd = "gzip -c | %s" % socat_cmd
405
      else:
406
        transport_cmd = socat_cmd
407
    else:
408
      raise Error("Invalid mode '%s'" % self._mode)
409

  
410
    # TODO: Use "dd" to measure processed data (allows to give an ETA)
411

  
412
    # TODO: Run transport as separate user
413
    # The transport uses its own shell to simplify running it as a separate user
414
    # in the future.
415
    return self.GetBashCommand(transport_cmd)
416

  
417
  def GetCommand(self):
418
    """Returns the complete child process command.
419

  
420
    """
421
    transport_cmd = self._GetTransportCommand()
422

  
423
    buf = StringIO()
424

  
425
    if self._opts.cmd_prefix:
426
      buf.write(self._opts.cmd_prefix)
427
      buf.write(" ")
428

  
429
    buf.write(utils.ShellQuoteArgs(transport_cmd))
430

  
431
    if self._opts.cmd_suffix:
432
      buf.write(" ")
433
      buf.write(self._opts.cmd_suffix)
434

  
435
    return self.GetBashCommand(buf.getvalue())
436

  
437

  
438 287
def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
439 288
                   signal_notify, signal_handler, mode):
440 289
  """Handles the child processes' output.
......
673 522
      (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
674 523

  
675 524
      # Get child process command
676
      cmd = CommandBuilder(mode, options, socat_stderr_write_fd).GetCommand()
525
      cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd)
526
      cmd = cmd_builder.GetCommand()
677 527

  
678 528
      logging.debug("Starting command %r", cmd)
679 529

  
b/lib/impexpd/__init__.py
1
#
2
#
3

  
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21

  
22
"""Classes and functions for import/export daemon.
23

  
24
"""
25

  
26
from cStringIO import StringIO
27

  
28
from ganeti import constants
29
from ganeti import errors
30
from ganeti import utils
31

  
32

  
33
#: Buffer size: at most this many bytes are transferred at once
34
BUFSIZE = 1024 * 1024
35

  
36
# Common options for socat
37
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
38
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
39

  
40

  
41
class CommandBuilder(object):
42
  def __init__(self, mode, opts, socat_stderr_fd):
43
    """Initializes this class.
44

  
45
    @param mode: Daemon mode (import or export)
46
    @param opts: Options object
47
    @type socat_stderr_fd: int
48
    @param socat_stderr_fd: File descriptor socat should write its stderr to
49

  
50
    """
51
    self._opts = opts
52
    self._mode = mode
53
    self._socat_stderr_fd = socat_stderr_fd
54

  
55
  @staticmethod
56
  def GetBashCommand(cmd):
57
    """Prepares a command to be run in Bash.
58

  
59
    """
60
    return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
61

  
62
  def _GetSocatCommand(self):
63
    """Returns the socat command.
64

  
65
    """
66
    common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
67
      "key=%s" % self._opts.key,
68
      "cert=%s" % self._opts.cert,
69
      "cafile=%s" % self._opts.ca,
70
      ]
71

  
72
    if self._opts.bind is not None:
73
      common_addr_opts.append("bind=%s" % self._opts.bind)
74

  
75
    if self._mode == constants.IEM_IMPORT:
76
      if self._opts.port is None:
77
        port = 0
78
      else:
79
        port = self._opts.port
80

  
81
      addr1 = [
82
        "OPENSSL-LISTEN:%s" % port,
83
        "reuseaddr",
84

  
85
        # Retry to listen if connection wasn't established successfully, up to
86
        # 100 times a second. Note that this still leaves room for DoS attacks.
87
        "forever",
88
        "intervall=0.01",
89
        ] + common_addr_opts
90
      addr2 = ["stdout"]
91

  
92
    elif self._mode == constants.IEM_EXPORT:
93
      addr1 = ["stdin"]
94
      addr2 = [
95
        "OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
96

  
97
        # How long to wait per connection attempt
98
        "connect-timeout=%s" % self._opts.connect_timeout,
99

  
100
        # Retry a few times before giving up to connect (once per second)
101
        "retry=%s" % self._opts.connect_retries,
102
        "intervall=1",
103
        ] + common_addr_opts
104

  
105
    else:
106
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
107

  
108
    for i in [addr1, addr2]:
109
      for value in i:
110
        if "," in value:
111
          raise errors.GenericError("Comma not allowed in socat option"
112
                                    " value: %r" % value)
113

  
114
    return [
115
      constants.SOCAT_PATH,
116

  
117
      # Log to stderr
118
      "-ls",
119

  
120
      # Log level
121
      "-d", "-d",
122

  
123
      # Buffer size
124
      "-b%s" % BUFSIZE,
125

  
126
      # Unidirectional mode, the first address is only used for reading, and the
127
      # second address is only used for writing
128
      "-u",
129

  
130
      ",".join(addr1), ",".join(addr2)
131
      ]
132

  
133
  def _GetTransportCommand(self):
134
    """Returns the command for the transport part of the daemon.
135

  
136
    """
137
    socat_cmd = ("%s 2>&%d" %
138
                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
139
                  self._socat_stderr_fd))
140

  
141
    compr = self._opts.compress
142

  
143
    assert compr in constants.IEC_ALL
144

  
145
    if self._mode == constants.IEM_IMPORT:
146
      if compr == constants.IEC_GZIP:
147
        transport_cmd = "%s | gunzip -c" % socat_cmd
148
      else:
149
        transport_cmd = socat_cmd
150
    elif self._mode == constants.IEM_EXPORT:
151
      if compr == constants.IEC_GZIP:
152
        transport_cmd = "gzip -c | %s" % socat_cmd
153
      else:
154
        transport_cmd = socat_cmd
155
    else:
156
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
157

  
158
    # TODO: Use "dd" to measure processed data (allows to give an ETA)
159

  
160
    # TODO: Run transport as separate user
161
    # The transport uses its own shell to simplify running it as a separate user
162
    # in the future.
163
    return self.GetBashCommand(transport_cmd)
164

  
165
  def GetCommand(self):
166
    """Returns the complete child process command.
167

  
168
    """
169
    transport_cmd = self._GetTransportCommand()
170

  
171
    buf = StringIO()
172

  
173
    if self._opts.cmd_prefix:
174
      buf.write(self._opts.cmd_prefix)
175
      buf.write(" ")
176

  
177
    buf.write(utils.ShellQuoteArgs(transport_cmd))
178

  
179
    if self._opts.cmd_suffix:
180
      buf.write(" ")
181
      buf.write(self._opts.cmd_suffix)
182

  
183
    return self.GetBashCommand(buf.getvalue())
b/test/ganeti.impexpd_unittest.py
1
#!/usr/bin/python
2
#
3

  
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21

  
22
"""Script for testing ganeti.impexpd"""
23

  
24
import os
25
import sys
26
import re
27
import unittest
28

  
29
from ganeti import constants
30
from ganeti import objects
31
from ganeti import compat
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import impexpd
35

  
36
import testutils
37

  
38

  
39
class CmdBuilderConfig(objects.ConfigObject):
40
  __slots__ = [
41
    "bind",
42
    "key",
43
    "cert",
44
    "ca",
45
    "host",
46
    "port",
47
    "compress",
48
    "connect_timeout",
49
    "connect_retries",
50
    "cmd_prefix",
51
    "cmd_suffix",
52
    ]
53

  
54

  
55
def CheckCmdWord(cmd, word):
56
  wre = re.compile(r"\b%s\b" % re.escape(word))
57
  return compat.any(wre.search(i) for i in cmd)
58

  
59

  
60
class TestCommandBuilder(unittest.TestCase):
61
  def test(self):
62
    for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
63
      if mode == constants.IEM_IMPORT:
64
        comprcmd = "gunzip"
65
      elif mode == constants.IEM_EXPORT:
66
        comprcmd = "gzip"
67

  
68
      for compress in [constants.IEC_NONE, constants.IEC_GZIP]:
69
        for host in ["localhost", "1.2.3.4", "192.0.2.99"]:
70
          for port in [0, 1, 1234, 7856, 45452]:
71
            for cmd_prefix in [None, "PrefixCommandGoesHere|",
72
                               "dd if=/dev/hda bs=1048576 |"]:
73
              for cmd_suffix in [None, "< /some/file/name",
74
                                 "| dd of=/dev/null"]:
75
                opts = CmdBuilderConfig(host=host, port=port, compress=compress,
76
                                        cmd_prefix=cmd_prefix,
77
                                        cmd_suffix=cmd_suffix)
78

  
79
                builder = impexpd.CommandBuilder(mode, opts, 1)
80

  
81
                # Check complete command
82
                cmd = builder.GetCommand()
83
                self.assert_(isinstance(cmd, list))
84

  
85
                if compress == constants.IEC_GZIP:
86
                  self.assert_(CheckCmdWord(cmd, comprcmd))
87

  
88
                if cmd_prefix is not None:
89
                  self.assert_(cmd_prefix in i for i in cmd)
90

  
91
                if cmd_suffix is not None:
92
                  self.assert_(cmd_suffix in i for i in cmd)
93

  
94
                # Check socat command
95
                socat_cmd = builder._GetSocatCommand()
96

  
97
                if mode == constants.IEM_IMPORT:
98
                  ssl_addr = socat_cmd[-2].split(",")
99
                  self.assert_(("OPENSSL-LISTEN:%s" % port) in ssl_addr)
100
                elif mode == constants.IEM_EXPORT:
101
                  ssl_addr = socat_cmd[-1].split(",")
102
                  self.assert_(("OPENSSL:%s:%s" % (host, port)) in ssl_addr)
103

  
104
                self.assert_("verify=1" in ssl_addr)
105

  
106
  def testCommaError(self):
107
    opts = CmdBuilderConfig(host="localhost", port=1234,
108
                            ca="/some/path/with,a/,comma")
109

  
110
    for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
111
      builder = impexpd.CommandBuilder(mode, opts, 1)
112
      self.assertRaises(errors.GenericError, builder.GetCommand)
113

  
114
  def testModeError(self):
115
    mode = "foobarbaz"
116

  
117
    assert mode not in [constants.IEM_IMPORT, constants.IEM_EXPORT]
118

  
119
    opts = CmdBuilderConfig(host="localhost", port=1234)
120
    builder = impexpd.CommandBuilder(mode, opts, 1)
121
    self.assertRaises(errors.GenericError, builder.GetCommand)
122

  
123

  
124
if __name__ == "__main__":
125
  testutils.GanetiTestProgram()

Also available in: Unified diff