Revision 34c9ee7b

b/daemons/import-export
30 30
import logging
31 31
import optparse
32 32
import os
33
import re
34 33
import select
35 34
import signal
36
import socket
37 35
import subprocess
38 36
import sys
39 37
import time
......
47 45
from ganeti import impexpd
48 46

  
49 47

  
50
#: Used to recognize point at which socat(1) starts to listen on its socket.
51
#: The local address is required for the remote peer to connect (in particular
52
#: the port number).
53
LISTENING_RE = re.compile(r"^listening on\s+"
54
                          r"AF=(?P<family>\d+)\s+"
55
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
56

  
57
#: Used to recognize point at which socat(1) is sending data over the wire
58
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
59
                              re.I)
60

  
61
SOCAT_LOG_DEBUG = "D"
62
SOCAT_LOG_INFO = "I"
63
SOCAT_LOG_NOTICE = "N"
64
SOCAT_LOG_WARNING = "W"
65
SOCAT_LOG_ERROR = "E"
66
SOCAT_LOG_FATAL = "F"
67

  
68
SOCAT_LOG_IGNORE = frozenset([
69
  SOCAT_LOG_DEBUG,
70
  SOCAT_LOG_INFO,
71
  SOCAT_LOG_NOTICE,
72
  ])
73

  
74 48
#: How many lines to keep in the status file
75 49
MAX_RECENT_OUTPUT_LINES = 20
76 50

  
......
88 62
options = None
89 63

  
90 64

  
91
class Error(Exception):
92
  """Generic exception"""
93

  
94

  
95 65
def SetupLogging():
96 66
  """Configures the logging module.
97 67

  
......
120 90
  return child_logger
121 91

  
122 92

  
123
def _VerifyListening(family, address, port):
124
  """Verify address given as listening address by socat.
125

  
126
  """
127
  # TODO: Implement IPv6 support
128
  if family != socket.AF_INET:
129
    raise Error("Address family %r not supported" % family)
130

  
131
  try:
132
    packed_address = socket.inet_pton(family, address)
133
  except socket.error:
134
    raise Error("Invalid address %r for family %s" % (address, family))
135

  
136
  return (socket.inet_ntop(family, packed_address), port)
137

  
138

  
139 93
class StatusFile:
140 94
  """Status file manager.
141 95

  
......
223 177
                    mode=0400)
224 178

  
225 179

  
226
def _ProcessSocatOutput(status_file, level, msg):
227
  """Interprets socat log output.
228

  
229
  """
230
  if level == SOCAT_LOG_NOTICE:
231
    if status_file.GetListenPort() is None:
232
      # TODO: Maybe implement timeout to not listen forever
233
      m = LISTENING_RE.match(msg)
234
      if m:
235
        (_, port) = _VerifyListening(int(m.group("family")), m.group("address"),
236
                                     int(m.group("port")))
237

  
238
        status_file.SetListenPort(port)
239
        return True
240

  
241
    if not status_file.GetConnected():
242
      m = TRANSFER_LOOP_RE.match(msg)
243
      if m:
244
        status_file.SetConnected()
245
        return True
246

  
247
  return False
248

  
249

  
250
def ProcessOutput(line, status_file, logger, socat):
251
  """Takes care of child process output.
252

  
253
  @param status_file: Status file manager
254
  @param logger: Child output logger
255
  @type socat: bool
256
  @param socat: Whether it's a socat output line
257
  @type line: string
258
  @param line: Child output line
259

  
260
  """
261
  force_update = False
262
  forward_line = line
263

  
264
  if socat:
265
    level = None
266
    parts = line.split(None, 4)
267

  
268
    if len(parts) == 5:
269
      (_, _, _, level, msg) = parts
270

  
271
      force_update = _ProcessSocatOutput(status_file, level, msg)
272

  
273
      if options.debug or (level and level not in SOCAT_LOG_IGNORE):
274
        forward_line = "socat: %s %s" % (level, msg)
275
      else:
276
        forward_line = None
277
    else:
278
      forward_line = "socat: %s" % line
279

  
280
  if forward_line:
281
    logger.info(forward_line)
282
    status_file.AddRecentOutput(forward_line)
283

  
284
  status_file.Update(force_update)
285

  
286

  
287 180
def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
288 181
                   signal_notify, signal_handler, mode):
289 182
  """Handles the child processes' output.
......
297 190
  # readable again.
298 191
  socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
299 192

  
300
  script_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
301
                                           child_logger, False)
193
  child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
194
                                           child_logger)
302 195
  try:
303
    socat_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
304
                                            child_logger, True)
305
    try:
306
      fdmap = {
307
        child.stderr.fileno(): (child.stderr, script_stderr_lines),
308
        socat_stderr_read.fileno(): (socat_stderr_read, socat_stderr_lines),
309
        signal_notify.fileno(): (signal_notify, None),
310
        }
311

  
312
      poller = select.poll()
313
      for fd in fdmap:
314
        utils.SetNonblockFlag(fd, True)
315
        poller.register(fd, select.POLLIN)
316

  
317
      if options.connect_timeout and mode == constants.IEM_IMPORT:
318
        listen_timeout = locking.RunningTimeout(options.connect_timeout, True)
319
      else:
320
        listen_timeout = None
321

  
322
      exit_timeout = None
323

  
324
      while True:
325
        # Break out of loop if only signal notify FD is left
326
        if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
196
    fdmap = {
197
      child.stderr.fileno():
198
        (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)),
199
      socat_stderr_read.fileno():
200
        (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)),
201
      signal_notify.fileno(): (signal_notify, None),
202
      }
203

  
204
    poller = select.poll()
205
    for fd in fdmap:
206
      utils.SetNonblockFlag(fd, True)
207
      poller.register(fd, select.POLLIN)
208

  
209
    if options.connect_timeout and mode == constants.IEM_IMPORT:
210
      listen_timeout = locking.RunningTimeout(options.connect_timeout, True)
211
    else:
212
      listen_timeout = None
213

  
214
    exit_timeout = None
215

  
216
    while True:
217
      # Break out of loop if only signal notify FD is left
218
      if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
219
        break
220

  
221
      timeout = None
222

  
223
      if listen_timeout and not exit_timeout:
224
        if status_file.GetConnected():
225
          listen_timeout = None
226
        elif listen_timeout.Remaining() < 0:
227
          logging.info("Child process didn't establish connection in time")
228
          child.Kill(signal.SIGTERM)
229
          exit_timeout = \
230
            locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
231
          # Next block will calculate timeout
232
        else:
233
          # Not yet connected, check again in a second
234
          timeout = 1000
235

  
236
      if exit_timeout:
237
        timeout = exit_timeout.Remaining() * 1000
238
        if timeout < 0:
239
          logging.info("Child process didn't exit in time")
327 240
          break
328 241

  
329
        timeout = None
330

  
331
        if listen_timeout and not exit_timeout:
332
          if status_file.GetConnected():
333
            listen_timeout = None
334
          elif listen_timeout.Remaining() < 0:
335
            logging.info("Child process didn't establish connection in time")
336
            child.Kill(signal.SIGTERM)
337
            exit_timeout = \
338
              locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
339
            # Next block will calculate timeout
242
      for fd, event in utils.RetryOnSignal(poller.poll, timeout):
243
        if event & (select.POLLIN | event & select.POLLPRI):
244
          (from_, to) = fdmap[fd]
245

  
246
          # Read up to 1 KB of data
247
          data = from_.read(1024)
248
          if data:
249
            if to:
250
              to.write(data)
251
            elif fd == signal_notify.fileno():
252
              # Signal handling
253
              if signal_handler.called:
254
                signal_handler.Clear()
255
                if exit_timeout:
256
                  logging.info("Child process still has about %0.2f seconds"
257
                               " to exit", exit_timeout.Remaining())
258
                else:
259
                  logging.info("Giving child process %0.2f seconds to exit",
260
                               CHILD_LINGER_TIMEOUT)
261
                  exit_timeout = \
262
                    locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
340 263
          else:
341
            # Not yet connected, check again in a second
342
            timeout = 1000
343

  
344
        if exit_timeout:
345
          timeout = exit_timeout.Remaining() * 1000
346
          if timeout < 0:
347
            logging.info("Child process didn't exit in time")
348
            break
349

  
350
        for fd, event in utils.RetryOnSignal(poller.poll, timeout):
351
          if event & (select.POLLIN | event & select.POLLPRI):
352
            (from_, to) = fdmap[fd]
353

  
354
            # Read up to 1 KB of data
355
            data = from_.read(1024)
356
            if data:
357
              if to:
358
                to.write(data)
359
              elif fd == signal_notify.fileno():
360
                # Signal handling
361
                if signal_handler.called:
362
                  signal_handler.Clear()
363
                  if exit_timeout:
364
                    logging.info("Child process still has about %0.2f seconds"
365
                                 " to exit", exit_timeout.Remaining())
366
                  else:
367
                    logging.info("Giving child process %0.2f seconds to exit",
368
                                 CHILD_LINGER_TIMEOUT)
369
                    exit_timeout = \
370
                      locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
371
            else:
372
              poller.unregister(fd)
373
              del fdmap[fd]
374

  
375
          elif event & (select.POLLNVAL | select.POLLHUP |
376
                        select.POLLERR):
377 264
            poller.unregister(fd)
378 265
            del fdmap[fd]
379 266

  
380
        script_stderr_lines.flush()
381
        socat_stderr_lines.flush()
267
        elif event & (select.POLLNVAL | select.POLLHUP |
268
                      select.POLLERR):
269
          poller.unregister(fd)
270
          del fdmap[fd]
271

  
272
      child_io_proc.FlushAll()
382 273

  
383
      # If there was a timeout calculator, we were waiting for the child to
384
      # finish, e.g. due to a signal
385
      return not bool(exit_timeout)
386
    finally:
387
      socat_stderr_lines.close()
274
    # If there was a timeout calculator, we were waiting for the child to
275
    # finish, e.g. due to a signal
276
    return not bool(exit_timeout)
388 277
  finally:
389
    script_stderr_lines.close()
278
    child_io_proc.CloseAll()
390 279

  
391 280

  
392 281
def ParseOptions():
b/lib/impexpd/__init__.py
23 23

  
24 24
"""
25 25

  
26
import re
27
import socket
26 28
from cStringIO import StringIO
27 29

  
28 30
from ganeti import constants
......
30 32
from ganeti import utils
31 33

  
32 34

  
35
#: Used to recognize point at which socat(1) starts to listen on its socket.
36
#: The local address is required for the remote peer to connect (in particular
37
#: the port number).
38
LISTENING_RE = re.compile(r"^listening on\s+"
39
                          r"AF=(?P<family>\d+)\s+"
40
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
41

  
42
#: Used to recognize point at which socat(1) is sending data over the wire
43
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
44
                              re.I)
45

  
46
SOCAT_LOG_DEBUG = "D"
47
SOCAT_LOG_INFO = "I"
48
SOCAT_LOG_NOTICE = "N"
49
SOCAT_LOG_WARNING = "W"
50
SOCAT_LOG_ERROR = "E"
51
SOCAT_LOG_FATAL = "F"
52

  
53
SOCAT_LOG_IGNORE = frozenset([
54
  SOCAT_LOG_DEBUG,
55
  SOCAT_LOG_INFO,
56
  SOCAT_LOG_NOTICE,
57
  ])
58

  
33 59
#: Buffer size: at most this many bytes are transferred at once
34 60
BUFSIZE = 1024 * 1024
35 61

  
......
37 63
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
38 64
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
39 65

  
66
(PROG_OTHER,
67
 PROG_SOCAT) = range(1, 3)
68
PROG_ALL = frozenset([
69
  PROG_OTHER,
70
  PROG_SOCAT,
71
  ])
72

  
40 73

  
41 74
class CommandBuilder(object):
42 75
  def __init__(self, mode, opts, socat_stderr_fd):
......
181 214
      buf.write(self._opts.cmd_suffix)
182 215

  
183 216
    return self.GetBashCommand(buf.getvalue())
217

  
218

  
219
def _VerifyListening(family, address, port):
220
  """Verify address given as listening address by socat.
221

  
222
  """
223
  # TODO: Implement IPv6 support
224
  if family != socket.AF_INET:
225
    raise errors.GenericError("Address family %r not supported" % family)
226

  
227
  try:
228
    packed_address = socket.inet_pton(family, address)
229
  except socket.error:
230
    raise errors.GenericError("Invalid address %r for family %s" %
231
                              (address, family))
232

  
233
  return (socket.inet_ntop(family, packed_address), port)
234

  
235

  
236
class ChildIOProcessor(object):
237
  def __init__(self, debug, status_file, logger):
238
    """Initializes this class.
239

  
240
    """
241
    self._debug = debug
242
    self._status_file = status_file
243
    self._logger = logger
244

  
245
    self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
246
                           for prog in PROG_ALL])
247

  
248
  def GetLineSplitter(self, prog):
249
    """Returns the line splitter for a program.
250

  
251
    """
252
    return self._splitter[prog]
253

  
254
  def FlushAll(self):
255
    """Flushes all line splitters.
256

  
257
    """
258
    for ls in self._splitter.itervalues():
259
      ls.flush()
260

  
261
  def CloseAll(self):
262
    """Closes all line splitters.
263

  
264
    """
265
    for ls in self._splitter.itervalues():
266
      ls.close()
267
    self._splitter.clear()
268

  
269
  def _ProcessOutput(self, line, prog):
270
    """Takes care of child process output.
271

  
272
    @type line: string
273
    @param line: Child output line
274
    @type prog: number
275
    @param prog: Program from which the line originates
276

  
277
    """
278
    force_update = False
279
    forward_line = line
280

  
281
    if prog == PROG_SOCAT:
282
      level = None
283
      parts = line.split(None, 4)
284

  
285
      if len(parts) == 5:
286
        (_, _, _, level, msg) = parts
287

  
288
        force_update = self._ProcessSocatOutput(self._status_file, level, msg)
289

  
290
        if self._debug or (level and level not in SOCAT_LOG_IGNORE):
291
          forward_line = "socat: %s %s" % (level, msg)
292
        else:
293
          forward_line = None
294
      else:
295
        forward_line = "socat: %s" % line
296

  
297
    if forward_line:
298
      self._logger.info(forward_line)
299
      self._status_file.AddRecentOutput(forward_line)
300

  
301
    self._status_file.Update(force_update)
302

  
303
  @staticmethod
304
  def _ProcessSocatOutput(status_file, level, msg):
305
    """Interprets socat log output.
306

  
307
    """
308
    if level == SOCAT_LOG_NOTICE:
309
      if status_file.GetListenPort() is None:
310
        # TODO: Maybe implement timeout to not listen forever
311
        m = LISTENING_RE.match(msg)
312
        if m:
313
          (_, port) = _VerifyListening(int(m.group("family")),
314
                                       m.group("address"),
315
                                       int(m.group("port")))
316

  
317
          status_file.SetListenPort(port)
318
          return True
319

  
320
      if not status_file.GetConnected():
321
        m = TRANSFER_LOOP_RE.match(msg)
322
        if m:
323
          status_file.SetConnected()
324
          return True
325

  
326
    return False

Also available in: Unified diff