Statistics
| Branch: | Tag: | Revision:

root / lib / impexpd / __init__.py @ 34c9ee7b

History | View | Annotate | Download (8.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Classes and functions for import/export daemon.
23

24
"""
25

    
26
import re
27
import socket
28
from cStringIO import StringIO
29

    
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import utils
33

    
34

    
35
#: Used to recognize point at which socat(1) starts to listen on its socket.
36
#: The local address is required for the remote peer to connect (in particular
37
#: the port number).
38
LISTENING_RE = re.compile(r"^listening on\s+"
39
                          r"AF=(?P<family>\d+)\s+"
40
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
41

    
42
#: Used to recognize point at which socat(1) is sending data over the wire
43
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
44
                              re.I)
45

    
46
SOCAT_LOG_DEBUG = "D"
47
SOCAT_LOG_INFO = "I"
48
SOCAT_LOG_NOTICE = "N"
49
SOCAT_LOG_WARNING = "W"
50
SOCAT_LOG_ERROR = "E"
51
SOCAT_LOG_FATAL = "F"
52

    
53
SOCAT_LOG_IGNORE = frozenset([
54
  SOCAT_LOG_DEBUG,
55
  SOCAT_LOG_INFO,
56
  SOCAT_LOG_NOTICE,
57
  ])
58

    
59
#: Buffer size: at most this many bytes are transferred at once
60
BUFSIZE = 1024 * 1024
61

    
62
# Common options for socat
63
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
64
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
65

    
66
(PROG_OTHER,
67
 PROG_SOCAT) = range(1, 3)
68
PROG_ALL = frozenset([
69
  PROG_OTHER,
70
  PROG_SOCAT,
71
  ])
72

    
73

    
74
class CommandBuilder(object):
75
  def __init__(self, mode, opts, socat_stderr_fd):
76
    """Initializes this class.
77

78
    @param mode: Daemon mode (import or export)
79
    @param opts: Options object
80
    @type socat_stderr_fd: int
81
    @param socat_stderr_fd: File descriptor socat should write its stderr to
82

83
    """
84
    self._opts = opts
85
    self._mode = mode
86
    self._socat_stderr_fd = socat_stderr_fd
87

    
88
  @staticmethod
89
  def GetBashCommand(cmd):
90
    """Prepares a command to be run in Bash.
91

92
    """
93
    return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
94

    
95
  def _GetSocatCommand(self):
96
    """Returns the socat command.
97

98
    """
99
    common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
100
      "key=%s" % self._opts.key,
101
      "cert=%s" % self._opts.cert,
102
      "cafile=%s" % self._opts.ca,
103
      ]
104

    
105
    if self._opts.bind is not None:
106
      common_addr_opts.append("bind=%s" % self._opts.bind)
107

    
108
    if self._mode == constants.IEM_IMPORT:
109
      if self._opts.port is None:
110
        port = 0
111
      else:
112
        port = self._opts.port
113

    
114
      addr1 = [
115
        "OPENSSL-LISTEN:%s" % port,
116
        "reuseaddr",
117

    
118
        # Retry to listen if connection wasn't established successfully, up to
119
        # 100 times a second. Note that this still leaves room for DoS attacks.
120
        "forever",
121
        "intervall=0.01",
122
        ] + common_addr_opts
123
      addr2 = ["stdout"]
124

    
125
    elif self._mode == constants.IEM_EXPORT:
126
      addr1 = ["stdin"]
127
      addr2 = [
128
        "OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
129

    
130
        # How long to wait per connection attempt
131
        "connect-timeout=%s" % self._opts.connect_timeout,
132

    
133
        # Retry a few times before giving up to connect (once per second)
134
        "retry=%s" % self._opts.connect_retries,
135
        "intervall=1",
136
        ] + common_addr_opts
137

    
138
    else:
139
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
140

    
141
    for i in [addr1, addr2]:
142
      for value in i:
143
        if "," in value:
144
          raise errors.GenericError("Comma not allowed in socat option"
145
                                    " value: %r" % value)
146

    
147
    return [
148
      constants.SOCAT_PATH,
149

    
150
      # Log to stderr
151
      "-ls",
152

    
153
      # Log level
154
      "-d", "-d",
155

    
156
      # Buffer size
157
      "-b%s" % BUFSIZE,
158

    
159
      # Unidirectional mode, the first address is only used for reading, and the
160
      # second address is only used for writing
161
      "-u",
162

    
163
      ",".join(addr1), ",".join(addr2)
164
      ]
165

    
166
  def _GetTransportCommand(self):
167
    """Returns the command for the transport part of the daemon.
168

169
    """
170
    socat_cmd = ("%s 2>&%d" %
171
                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
172
                  self._socat_stderr_fd))
173

    
174
    compr = self._opts.compress
175

    
176
    assert compr in constants.IEC_ALL
177

    
178
    if self._mode == constants.IEM_IMPORT:
179
      if compr == constants.IEC_GZIP:
180
        transport_cmd = "%s | gunzip -c" % socat_cmd
181
      else:
182
        transport_cmd = socat_cmd
183
    elif self._mode == constants.IEM_EXPORT:
184
      if compr == constants.IEC_GZIP:
185
        transport_cmd = "gzip -c | %s" % socat_cmd
186
      else:
187
        transport_cmd = socat_cmd
188
    else:
189
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
190

    
191
    # TODO: Use "dd" to measure processed data (allows to give an ETA)
192

    
193
    # TODO: Run transport as separate user
194
    # The transport uses its own shell to simplify running it as a separate user
195
    # in the future.
196
    return self.GetBashCommand(transport_cmd)
197

    
198
  def GetCommand(self):
199
    """Returns the complete child process command.
200

201
    """
202
    transport_cmd = self._GetTransportCommand()
203

    
204
    buf = StringIO()
205

    
206
    if self._opts.cmd_prefix:
207
      buf.write(self._opts.cmd_prefix)
208
      buf.write(" ")
209

    
210
    buf.write(utils.ShellQuoteArgs(transport_cmd))
211

    
212
    if self._opts.cmd_suffix:
213
      buf.write(" ")
214
      buf.write(self._opts.cmd_suffix)
215

    
216
    return self.GetBashCommand(buf.getvalue())
217

    
218

    
219
def _VerifyListening(family, address, port):
220
  """Verify address given as listening address by socat.
221

222
  """
223
  # TODO: Implement IPv6 support
224
  if family != socket.AF_INET:
225
    raise errors.GenericError("Address family %r not supported" % family)
226

    
227
  try:
228
    packed_address = socket.inet_pton(family, address)
229
  except socket.error:
230
    raise errors.GenericError("Invalid address %r for family %s" %
231
                              (address, family))
232

    
233
  return (socket.inet_ntop(family, packed_address), port)
234

    
235

    
236
class ChildIOProcessor(object):
237
  def __init__(self, debug, status_file, logger):
238
    """Initializes this class.
239

240
    """
241
    self._debug = debug
242
    self._status_file = status_file
243
    self._logger = logger
244

    
245
    self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
246
                           for prog in PROG_ALL])
247

    
248
  def GetLineSplitter(self, prog):
249
    """Returns the line splitter for a program.
250

251
    """
252
    return self._splitter[prog]
253

    
254
  def FlushAll(self):
255
    """Flushes all line splitters.
256

257
    """
258
    for ls in self._splitter.itervalues():
259
      ls.flush()
260

    
261
  def CloseAll(self):
262
    """Closes all line splitters.
263

264
    """
265
    for ls in self._splitter.itervalues():
266
      ls.close()
267
    self._splitter.clear()
268

    
269
  def _ProcessOutput(self, line, prog):
270
    """Takes care of child process output.
271

272
    @type line: string
273
    @param line: Child output line
274
    @type prog: number
275
    @param prog: Program from which the line originates
276

277
    """
278
    force_update = False
279
    forward_line = line
280

    
281
    if prog == PROG_SOCAT:
282
      level = None
283
      parts = line.split(None, 4)
284

    
285
      if len(parts) == 5:
286
        (_, _, _, level, msg) = parts
287

    
288
        force_update = self._ProcessSocatOutput(self._status_file, level, msg)
289

    
290
        if self._debug or (level and level not in SOCAT_LOG_IGNORE):
291
          forward_line = "socat: %s %s" % (level, msg)
292
        else:
293
          forward_line = None
294
      else:
295
        forward_line = "socat: %s" % line
296

    
297
    if forward_line:
298
      self._logger.info(forward_line)
299
      self._status_file.AddRecentOutput(forward_line)
300

    
301
    self._status_file.Update(force_update)
302

    
303
  @staticmethod
304
  def _ProcessSocatOutput(status_file, level, msg):
305
    """Interprets socat log output.
306

307
    """
308
    if level == SOCAT_LOG_NOTICE:
309
      if status_file.GetListenPort() is None:
310
        # TODO: Maybe implement timeout to not listen forever
311
        m = LISTENING_RE.match(msg)
312
        if m:
313
          (_, port) = _VerifyListening(int(m.group("family")),
314
                                       m.group("address"),
315
                                       int(m.group("port")))
316

    
317
          status_file.SetListenPort(port)
318
          return True
319

    
320
      if not status_file.GetConnected():
321
        m = TRANSFER_LOOP_RE.match(msg)
322
        if m:
323
          status_file.SetConnected()
324
          return True
325

    
326
    return False