import/export daemon: Move command building into separate module
[ganeti-local] / lib / impexpd / __init__.py
1 #
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Classes and functions for import/export daemon.
23
24 """
25
26 from cStringIO import StringIO
27
28 from ganeti import constants
29 from ganeti import errors
30 from ganeti import utils
31
32
33 #: Buffer size: at most this many bytes are transferred at once
34 BUFSIZE = 1024 * 1024
35
36 # Common options for socat
37 SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
38 SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
39
40
41 class CommandBuilder(object):
42   def __init__(self, mode, opts, socat_stderr_fd):
43     """Initializes this class.
44
45     @param mode: Daemon mode (import or export)
46     @param opts: Options object
47     @type socat_stderr_fd: int
48     @param socat_stderr_fd: File descriptor socat should write its stderr to
49
50     """
51     self._opts = opts
52     self._mode = mode
53     self._socat_stderr_fd = socat_stderr_fd
54
55   @staticmethod
56   def GetBashCommand(cmd):
57     """Prepares a command to be run in Bash.
58
59     """
60     return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
61
62   def _GetSocatCommand(self):
63     """Returns the socat command.
64
65     """
66     common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
67       "key=%s" % self._opts.key,
68       "cert=%s" % self._opts.cert,
69       "cafile=%s" % self._opts.ca,
70       ]
71
72     if self._opts.bind is not None:
73       common_addr_opts.append("bind=%s" % self._opts.bind)
74
75     if self._mode == constants.IEM_IMPORT:
76       if self._opts.port is None:
77         port = 0
78       else:
79         port = self._opts.port
80
81       addr1 = [
82         "OPENSSL-LISTEN:%s" % port,
83         "reuseaddr",
84
85         # Retry to listen if connection wasn't established successfully, up to
86         # 100 times a second. Note that this still leaves room for DoS attacks.
87         "forever",
88         "intervall=0.01",
89         ] + common_addr_opts
90       addr2 = ["stdout"]
91
92     elif self._mode == constants.IEM_EXPORT:
93       addr1 = ["stdin"]
94       addr2 = [
95         "OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
96
97         # How long to wait per connection attempt
98         "connect-timeout=%s" % self._opts.connect_timeout,
99
100         # Retry a few times before giving up to connect (once per second)
101         "retry=%s" % self._opts.connect_retries,
102         "intervall=1",
103         ] + common_addr_opts
104
105     else:
106       raise errors.GenericError("Invalid mode '%s'" % self._mode)
107
108     for i in [addr1, addr2]:
109       for value in i:
110         if "," in value:
111           raise errors.GenericError("Comma not allowed in socat option"
112                                     " value: %r" % value)
113
114     return [
115       constants.SOCAT_PATH,
116
117       # Log to stderr
118       "-ls",
119
120       # Log level
121       "-d", "-d",
122
123       # Buffer size
124       "-b%s" % BUFSIZE,
125
126       # Unidirectional mode, the first address is only used for reading, and the
127       # second address is only used for writing
128       "-u",
129
130       ",".join(addr1), ",".join(addr2)
131       ]
132
133   def _GetTransportCommand(self):
134     """Returns the command for the transport part of the daemon.
135
136     """
137     socat_cmd = ("%s 2>&%d" %
138                  (utils.ShellQuoteArgs(self._GetSocatCommand()),
139                   self._socat_stderr_fd))
140
141     compr = self._opts.compress
142
143     assert compr in constants.IEC_ALL
144
145     if self._mode == constants.IEM_IMPORT:
146       if compr == constants.IEC_GZIP:
147         transport_cmd = "%s | gunzip -c" % socat_cmd
148       else:
149         transport_cmd = socat_cmd
150     elif self._mode == constants.IEM_EXPORT:
151       if compr == constants.IEC_GZIP:
152         transport_cmd = "gzip -c | %s" % socat_cmd
153       else:
154         transport_cmd = socat_cmd
155     else:
156       raise errors.GenericError("Invalid mode '%s'" % self._mode)
157
158     # TODO: Use "dd" to measure processed data (allows to give an ETA)
159
160     # TODO: Run transport as separate user
161     # The transport uses its own shell to simplify running it as a separate user
162     # in the future.
163     return self.GetBashCommand(transport_cmd)
164
165   def GetCommand(self):
166     """Returns the complete child process command.
167
168     """
169     transport_cmd = self._GetTransportCommand()
170
171     buf = StringIO()
172
173     if self._opts.cmd_prefix:
174       buf.write(self._opts.cmd_prefix)
175       buf.write(" ")
176
177     buf.write(utils.ShellQuoteArgs(transport_cmd))
178
179     if self._opts.cmd_suffix:
180       buf.write(" ")
181       buf.write(self._opts.cmd_suffix)
182
183     return self.GetBashCommand(buf.getvalue())