Revision 34c9ee7b lib/impexpd/__init__.py

b/lib/impexpd/__init__.py
23 23

  
24 24
"""
25 25

  
26
import re
27
import socket
26 28
from cStringIO import StringIO
27 29

  
28 30
from ganeti import constants
......
30 32
from ganeti import utils
31 33

  
32 34

  
35
#: Used to recognize point at which socat(1) starts to listen on its socket.
36
#: The local address is required for the remote peer to connect (in particular
37
#: the port number).
38
LISTENING_RE = re.compile(r"^listening on\s+"
39
                          r"AF=(?P<family>\d+)\s+"
40
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
41

  
42
#: Used to recognize point at which socat(1) is sending data over the wire
43
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
44
                              re.I)
45

  
46
SOCAT_LOG_DEBUG = "D"
47
SOCAT_LOG_INFO = "I"
48
SOCAT_LOG_NOTICE = "N"
49
SOCAT_LOG_WARNING = "W"
50
SOCAT_LOG_ERROR = "E"
51
SOCAT_LOG_FATAL = "F"
52

  
53
SOCAT_LOG_IGNORE = frozenset([
54
  SOCAT_LOG_DEBUG,
55
  SOCAT_LOG_INFO,
56
  SOCAT_LOG_NOTICE,
57
  ])
58

  
33 59
#: Buffer size: at most this many bytes are transferred at once
34 60
BUFSIZE = 1024 * 1024
35 61

  
......
37 63
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
38 64
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
39 65

  
66
(PROG_OTHER,
67
 PROG_SOCAT) = range(1, 3)
68
PROG_ALL = frozenset([
69
  PROG_OTHER,
70
  PROG_SOCAT,
71
  ])
72

  
40 73

  
41 74
class CommandBuilder(object):
42 75
  def __init__(self, mode, opts, socat_stderr_fd):
......
181 214
      buf.write(self._opts.cmd_suffix)
182 215

  
183 216
    return self.GetBashCommand(buf.getvalue())
217

  
218

  
219
def _VerifyListening(family, address, port):
220
  """Verify address given as listening address by socat.
221

  
222
  """
223
  # TODO: Implement IPv6 support
224
  if family != socket.AF_INET:
225
    raise errors.GenericError("Address family %r not supported" % family)
226

  
227
  try:
228
    packed_address = socket.inet_pton(family, address)
229
  except socket.error:
230
    raise errors.GenericError("Invalid address %r for family %s" %
231
                              (address, family))
232

  
233
  return (socket.inet_ntop(family, packed_address), port)
234

  
235

  
236
class ChildIOProcessor(object):
237
  def __init__(self, debug, status_file, logger):
238
    """Initializes this class.
239

  
240
    """
241
    self._debug = debug
242
    self._status_file = status_file
243
    self._logger = logger
244

  
245
    self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
246
                           for prog in PROG_ALL])
247

  
248
  def GetLineSplitter(self, prog):
249
    """Returns the line splitter for a program.
250

  
251
    """
252
    return self._splitter[prog]
253

  
254
  def FlushAll(self):
255
    """Flushes all line splitters.
256

  
257
    """
258
    for ls in self._splitter.itervalues():
259
      ls.flush()
260

  
261
  def CloseAll(self):
262
    """Closes all line splitters.
263

  
264
    """
265
    for ls in self._splitter.itervalues():
266
      ls.close()
267
    self._splitter.clear()
268

  
269
  def _ProcessOutput(self, line, prog):
270
    """Takes care of child process output.
271

  
272
    @type line: string
273
    @param line: Child output line
274
    @type prog: number
275
    @param prog: Program from which the line originates
276

  
277
    """
278
    force_update = False
279
    forward_line = line
280

  
281
    if prog == PROG_SOCAT:
282
      level = None
283
      parts = line.split(None, 4)
284

  
285
      if len(parts) == 5:
286
        (_, _, _, level, msg) = parts
287

  
288
        force_update = self._ProcessSocatOutput(self._status_file, level, msg)
289

  
290
        if self._debug or (level and level not in SOCAT_LOG_IGNORE):
291
          forward_line = "socat: %s %s" % (level, msg)
292
        else:
293
          forward_line = None
294
      else:
295
        forward_line = "socat: %s" % line
296

  
297
    if forward_line:
298
      self._logger.info(forward_line)
299
      self._status_file.AddRecentOutput(forward_line)
300

  
301
    self._status_file.Update(force_update)
302

  
303
  @staticmethod
304
  def _ProcessSocatOutput(status_file, level, msg):
305
    """Interprets socat log output.
306

  
307
    """
308
    if level == SOCAT_LOG_NOTICE:
309
      if status_file.GetListenPort() is None:
310
        # TODO: Maybe implement timeout to not listen forever
311
        m = LISTENING_RE.match(msg)
312
        if m:
313
          (_, port) = _VerifyListening(int(m.group("family")),
314
                                       m.group("address"),
315
                                       int(m.group("port")))
316

  
317
          status_file.SetListenPort(port)
318
          return True
319

  
320
      if not status_file.GetConnected():
321
        m = TRANSFER_LOOP_RE.match(msg)
322
        if m:
323
          status_file.SetConnected()
324
          return True
325

  
326
    return False

Also available in: Unified diff