Statistics
| Branch: | Tag: | Revision:

root / lib / impexpd / __init__.py @ 53dbf14c

History | View | Annotate | Download (8.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Classes and functions for import/export daemon.
23

24
"""
25

    
26
import re
27
import socket
28
import logging
29
from cStringIO import StringIO
30

    
31
from ganeti import constants
32
from ganeti import errors
33
from ganeti import utils
34

    
35

    
36
#: Used to recognize point at which socat(1) starts to listen on its socket.
37
#: The local address is required for the remote peer to connect (in particular
38
#: the port number).
39
LISTENING_RE = re.compile(r"^listening on\s+"
40
                          r"AF=(?P<family>\d+)\s+"
41
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
42

    
43
#: Used to recognize point at which socat(1) is sending data over the wire
44
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
45
                              re.I)
46

    
47
SOCAT_LOG_DEBUG = "D"
48
SOCAT_LOG_INFO = "I"
49
SOCAT_LOG_NOTICE = "N"
50
SOCAT_LOG_WARNING = "W"
51
SOCAT_LOG_ERROR = "E"
52
SOCAT_LOG_FATAL = "F"
53

    
54
SOCAT_LOG_IGNORE = frozenset([
55
  SOCAT_LOG_DEBUG,
56
  SOCAT_LOG_INFO,
57
  SOCAT_LOG_NOTICE,
58
  ])
59

    
60
#: Buffer size: at most this many bytes are transferred at once
61
BUFSIZE = 1024 * 1024
62

    
63
# Common options for socat
64
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
65
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
66

    
67
(PROG_OTHER,
68
 PROG_SOCAT) = range(1, 3)
69
PROG_ALL = frozenset([
70
  PROG_OTHER,
71
  PROG_SOCAT,
72
  ])
73

    
74

    
75
class CommandBuilder(object):
76
  def __init__(self, mode, opts, socat_stderr_fd):
77
    """Initializes this class.
78

79
    @param mode: Daemon mode (import or export)
80
    @param opts: Options object
81
    @type socat_stderr_fd: int
82
    @param socat_stderr_fd: File descriptor socat should write its stderr to
83

84
    """
85
    self._opts = opts
86
    self._mode = mode
87
    self._socat_stderr_fd = socat_stderr_fd
88

    
89
  @staticmethod
90
  def GetBashCommand(cmd):
91
    """Prepares a command to be run in Bash.
92

93
    """
94
    return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
95

    
96
  def _GetSocatCommand(self):
97
    """Returns the socat command.
98

99
    """
100
    common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
101
      "key=%s" % self._opts.key,
102
      "cert=%s" % self._opts.cert,
103
      "cafile=%s" % self._opts.ca,
104
      ]
105

    
106
    if self._opts.bind is not None:
107
      common_addr_opts.append("bind=%s" % self._opts.bind)
108

    
109
    if self._mode == constants.IEM_IMPORT:
110
      if self._opts.port is None:
111
        port = 0
112
      else:
113
        port = self._opts.port
114

    
115
      addr1 = [
116
        "OPENSSL-LISTEN:%s" % port,
117
        "reuseaddr",
118

    
119
        # Retry to listen if connection wasn't established successfully, up to
120
        # 100 times a second. Note that this still leaves room for DoS attacks.
121
        "forever",
122
        "intervall=0.01",
123
        ] + common_addr_opts
124
      addr2 = ["stdout"]
125

    
126
    elif self._mode == constants.IEM_EXPORT:
127
      addr1 = ["stdin"]
128
      addr2 = [
129
        "OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
130

    
131
        # How long to wait per connection attempt
132
        "connect-timeout=%s" % self._opts.connect_timeout,
133

    
134
        # Retry a few times before giving up to connect (once per second)
135
        "retry=%s" % self._opts.connect_retries,
136
        "intervall=1",
137
        ] + common_addr_opts
138

    
139
    else:
140
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
141

    
142
    for i in [addr1, addr2]:
143
      for value in i:
144
        if "," in value:
145
          raise errors.GenericError("Comma not allowed in socat option"
146
                                    " value: %r" % value)
147

    
148
    return [
149
      constants.SOCAT_PATH,
150

    
151
      # Log to stderr
152
      "-ls",
153

    
154
      # Log level
155
      "-d", "-d",
156

    
157
      # Buffer size
158
      "-b%s" % BUFSIZE,
159

    
160
      # Unidirectional mode, the first address is only used for reading, and the
161
      # second address is only used for writing
162
      "-u",
163

    
164
      ",".join(addr1), ",".join(addr2)
165
      ]
166

    
167
  def _GetTransportCommand(self):
168
    """Returns the command for the transport part of the daemon.
169

170
    """
171
    socat_cmd = ("%s 2>&%d" %
172
                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
173
                  self._socat_stderr_fd))
174

    
175
    compr = self._opts.compress
176

    
177
    assert compr in constants.IEC_ALL
178

    
179
    if self._mode == constants.IEM_IMPORT:
180
      if compr == constants.IEC_GZIP:
181
        transport_cmd = "%s | gunzip -c" % socat_cmd
182
      else:
183
        transport_cmd = socat_cmd
184
    elif self._mode == constants.IEM_EXPORT:
185
      if compr == constants.IEC_GZIP:
186
        transport_cmd = "gzip -c | %s" % socat_cmd
187
      else:
188
        transport_cmd = socat_cmd
189
    else:
190
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
191

    
192
    # TODO: Use "dd" to measure processed data (allows to give an ETA)
193

    
194
    # TODO: Run transport as separate user
195
    # The transport uses its own shell to simplify running it as a separate user
196
    # in the future.
197
    return self.GetBashCommand(transport_cmd)
198

    
199
  def GetCommand(self):
200
    """Returns the complete child process command.
201

202
    """
203
    transport_cmd = self._GetTransportCommand()
204

    
205
    buf = StringIO()
206

    
207
    if self._opts.cmd_prefix:
208
      buf.write(self._opts.cmd_prefix)
209
      buf.write(" ")
210

    
211
    buf.write(utils.ShellQuoteArgs(transport_cmd))
212

    
213
    if self._opts.cmd_suffix:
214
      buf.write(" ")
215
      buf.write(self._opts.cmd_suffix)
216

    
217
    return self.GetBashCommand(buf.getvalue())
218

    
219

    
220
def _VerifyListening(family, address, port):
221
  """Verify address given as listening address by socat.
222

223
  """
224
  # TODO: Implement IPv6 support
225
  if family != socket.AF_INET:
226
    raise errors.GenericError("Address family %r not supported" % family)
227

    
228
  try:
229
    packed_address = socket.inet_pton(family, address)
230
  except socket.error:
231
    raise errors.GenericError("Invalid address %r for family %s" %
232
                              (address, family))
233

    
234
  return (socket.inet_ntop(family, packed_address), port)
235

    
236

    
237
class ChildIOProcessor(object):
238
  def __init__(self, debug, status_file, logger):
239
    """Initializes this class.
240

241
    """
242
    self._debug = debug
243
    self._status_file = status_file
244
    self._logger = logger
245

    
246
    self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
247
                           for prog in PROG_ALL])
248

    
249
  def GetLineSplitter(self, prog):
250
    """Returns the line splitter for a program.
251

252
    """
253
    return self._splitter[prog]
254

    
255
  def FlushAll(self):
256
    """Flushes all line splitters.
257

258
    """
259
    for ls in self._splitter.itervalues():
260
      ls.flush()
261

    
262
  def CloseAll(self):
263
    """Closes all line splitters.
264

265
    """
266
    for ls in self._splitter.itervalues():
267
      ls.close()
268
    self._splitter.clear()
269

    
270
  def _ProcessOutput(self, line, prog):
271
    """Takes care of child process output.
272

273
    @type line: string
274
    @param line: Child output line
275
    @type prog: number
276
    @param prog: Program from which the line originates
277

278
    """
279
    force_update = False
280
    forward_line = line
281

    
282
    if prog == PROG_SOCAT:
283
      level = None
284
      parts = line.split(None, 4)
285

    
286
      if len(parts) == 5:
287
        (_, _, _, level, msg) = parts
288

    
289
        force_update = self._ProcessSocatOutput(self._status_file, level, msg)
290

    
291
        if self._debug or (level and level not in SOCAT_LOG_IGNORE):
292
          forward_line = "socat: %s %s" % (level, msg)
293
        else:
294
          forward_line = None
295
      else:
296
        forward_line = "socat: %s" % line
297

    
298
    if forward_line:
299
      self._logger.info(forward_line)
300
      self._status_file.AddRecentOutput(forward_line)
301

    
302
    self._status_file.Update(force_update)
303

    
304
  @staticmethod
305
  def _ProcessSocatOutput(status_file, level, msg):
306
    """Interprets socat log output.
307

308
    """
309
    if level == SOCAT_LOG_NOTICE:
310
      if status_file.GetListenPort() is None:
311
        # TODO: Maybe implement timeout to not listen forever
312
        m = LISTENING_RE.match(msg)
313
        if m:
314
          (_, port) = _VerifyListening(int(m.group("family")),
315
                                       m.group("address"),
316
                                       int(m.group("port")))
317

    
318
          status_file.SetListenPort(port)
319
          return True
320

    
321
      if not status_file.GetConnected():
322
        m = TRANSFER_LOOP_RE.match(msg)
323
        if m:
324
          logging.debug("Connection established")
325
          status_file.SetConnected()
326
          return True
327

    
328
    return False