Statistics
| Branch: | Tag: | Revision:

root / lib / impexpd / __init__.py @ 560cbec1

History | View | Annotate | Download (14 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Classes and functions for import/export daemon.
23

24
"""
25

    
26
import os
27
import re
28
import socket
29
import logging
30
import signal
31
import errno
32
import time
33
from cStringIO import StringIO
34

    
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import utils
38

    
39

    
40
#: Used to recognize point at which socat(1) starts to listen on its socket.
41
#: The local address is required for the remote peer to connect (in particular
42
#: the port number).
43
LISTENING_RE = re.compile(r"^listening on\s+"
44
                          r"AF=(?P<family>\d+)\s+"
45
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
46

    
47
#: Used to recognize point at which socat(1) is sending data over the wire
48
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
49
                              re.I)
50

    
51
SOCAT_LOG_DEBUG = "D"
52
SOCAT_LOG_INFO = "I"
53
SOCAT_LOG_NOTICE = "N"
54
SOCAT_LOG_WARNING = "W"
55
SOCAT_LOG_ERROR = "E"
56
SOCAT_LOG_FATAL = "F"
57

    
58
SOCAT_LOG_IGNORE = frozenset([
59
  SOCAT_LOG_DEBUG,
60
  SOCAT_LOG_INFO,
61
  SOCAT_LOG_NOTICE,
62
  ])
63

    
64
#: Used to parse GNU dd(1) statistics
65
DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
66
                        r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
67

    
68
#: Used to ignore "N+N records in/out" on dd(1)'s stderr
69
DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
70

    
71
#: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
72
#: unavailable and SIGUSR1 is used instead)
73
DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
74

    
75
#: Buffer size: at most this many bytes are transferred at once
76
BUFSIZE = 1024 * 1024
77

    
78
# Common options for socat
79
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
80
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
81

    
82
(PROG_OTHER,
83
 PROG_SOCAT,
84
 PROG_DD,
85
 PROG_DD_PID,
86
 PROG_EXP_SIZE) = range(1, 6)
87
PROG_ALL = frozenset([
88
  PROG_OTHER,
89
  PROG_SOCAT,
90
  PROG_DD,
91
  PROG_DD_PID,
92
  PROG_EXP_SIZE,
93
  ])
94

    
95

    
96
class CommandBuilder(object):
97
  def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
98
    """Initializes this class.
99

100
    @param mode: Daemon mode (import or export)
101
    @param opts: Options object
102
    @type socat_stderr_fd: int
103
    @param socat_stderr_fd: File descriptor socat should write its stderr to
104
    @type dd_stderr_fd: int
105
    @param dd_stderr_fd: File descriptor dd should write its stderr to
106
    @type dd_pid_fd: int
107
    @param dd_pid_fd: File descriptor the child should write dd's PID to
108

109
    """
110
    self._opts = opts
111
    self._mode = mode
112
    self._socat_stderr_fd = socat_stderr_fd
113
    self._dd_stderr_fd = dd_stderr_fd
114
    self._dd_pid_fd = dd_pid_fd
115

    
116
  @staticmethod
117
  def GetBashCommand(cmd):
118
    """Prepares a command to be run in Bash.
119

120
    """
121
    return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
122

    
123
  def _GetSocatCommand(self):
124
    """Returns the socat command.
125

126
    """
127
    common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
128
      "key=%s" % self._opts.key,
129
      "cert=%s" % self._opts.cert,
130
      "cafile=%s" % self._opts.ca,
131
      ]
132

    
133
    if self._opts.bind is not None:
134
      common_addr_opts.append("bind=%s" % self._opts.bind)
135

    
136
    if self._mode == constants.IEM_IMPORT:
137
      if self._opts.port is None:
138
        port = 0
139
      else:
140
        port = self._opts.port
141

    
142
      addr1 = [
143
        "OPENSSL-LISTEN:%s" % port,
144
        "reuseaddr",
145

    
146
        # Retry to listen if connection wasn't established successfully, up to
147
        # 100 times a second. Note that this still leaves room for DoS attacks.
148
        "forever",
149
        "intervall=0.01",
150
        ] + common_addr_opts
151
      addr2 = ["stdout"]
152

    
153
    elif self._mode == constants.IEM_EXPORT:
154
      addr1 = ["stdin"]
155
      addr2 = [
156
        "OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
157

    
158
        # How long to wait per connection attempt
159
        "connect-timeout=%s" % self._opts.connect_timeout,
160

    
161
        # Retry a few times before giving up to connect (once per second)
162
        "retry=%s" % self._opts.connect_retries,
163
        "intervall=1",
164
        ] + common_addr_opts
165

    
166
    else:
167
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
168

    
169
    for i in [addr1, addr2]:
170
      for value in i:
171
        if "," in value:
172
          raise errors.GenericError("Comma not allowed in socat option"
173
                                    " value: %r" % value)
174

    
175
    return [
176
      constants.SOCAT_PATH,
177

    
178
      # Log to stderr
179
      "-ls",
180

    
181
      # Log level
182
      "-d", "-d",
183

    
184
      # Buffer size
185
      "-b%s" % BUFSIZE,
186

    
187
      # Unidirectional mode, the first address is only used for reading, and the
188
      # second address is only used for writing
189
      "-u",
190

    
191
      ",".join(addr1), ",".join(addr2)
192
      ]
193

    
194
  def _GetTransportCommand(self):
195
    """Returns the command for the transport part of the daemon.
196

197
    """
198
    socat_cmd = ("%s 2>&%d" %
199
                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
200
                  self._socat_stderr_fd))
201

    
202
    dd_cmd = StringIO()
203
    # Setting LC_ALL since we want to parse the output and explicitely
204
    # redirecting stdin, as the background process (dd) would have /dev/null as
205
    # stdin otherwise
206
    dd_cmd.write("{ LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
207
                 (BUFSIZE, self._dd_stderr_fd))
208
    # Send PID to daemon
209
    dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
210
    # And wait for dd
211
    dd_cmd.write(" wait $pid;")
212
    dd_cmd.write(" }")
213

    
214
    compr = self._opts.compress
215

    
216
    assert compr in constants.IEC_ALL
217

    
218
    if self._mode == constants.IEM_IMPORT:
219
      if compr == constants.IEC_GZIP:
220
        transport_cmd = "%s | gunzip -c" % socat_cmd
221
      else:
222
        transport_cmd = socat_cmd
223

    
224
      transport_cmd += " | %s" % dd_cmd.getvalue()
225
    elif self._mode == constants.IEM_EXPORT:
226
      if compr == constants.IEC_GZIP:
227
        transport_cmd = "gzip -c | %s" % socat_cmd
228
      else:
229
        transport_cmd = socat_cmd
230

    
231
      transport_cmd = "%s | %s" % (dd_cmd.getvalue(), transport_cmd)
232
    else:
233
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
234

    
235
    # TODO: Run transport as separate user
236
    # The transport uses its own shell to simplify running it as a separate user
237
    # in the future.
238
    return self.GetBashCommand(transport_cmd)
239

    
240
  def GetCommand(self):
241
    """Returns the complete child process command.
242

243
    """
244
    transport_cmd = self._GetTransportCommand()
245

    
246
    buf = StringIO()
247

    
248
    if self._opts.cmd_prefix:
249
      buf.write(self._opts.cmd_prefix)
250
      buf.write(" ")
251

    
252
    buf.write(utils.ShellQuoteArgs(transport_cmd))
253

    
254
    if self._opts.cmd_suffix:
255
      buf.write(" ")
256
      buf.write(self._opts.cmd_suffix)
257

    
258
    return self.GetBashCommand(buf.getvalue())
259

    
260

    
261
def _VerifyListening(family, address, port):
262
  """Verify address given as listening address by socat.
263

264
  """
265
  # TODO: Implement IPv6 support
266
  if family != socket.AF_INET:
267
    raise errors.GenericError("Address family %r not supported" % family)
268

    
269
  try:
270
    packed_address = socket.inet_pton(family, address)
271
  except socket.error:
272
    raise errors.GenericError("Invalid address %r for family %s" %
273
                              (address, family))
274

    
275
  return (socket.inet_ntop(family, packed_address), port)
276

    
277

    
278
class ChildIOProcessor(object):
279
  def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
280
    """Initializes this class.
281

282
    """
283
    self._debug = debug
284
    self._status_file = status_file
285
    self._logger = logger
286

    
287
    self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
288
                           for prog in PROG_ALL])
289

    
290
    self._dd_pid = None
291
    self._dd_ready = False
292
    self._dd_tp_samples = throughput_samples
293
    self._dd_progress = []
294

    
295
    # Expected size of transferred data
296
    self._exp_size = exp_size
297

    
298
  def GetLineSplitter(self, prog):
299
    """Returns the line splitter for a program.
300

301
    """
302
    return self._splitter[prog]
303

    
304
  def FlushAll(self):
305
    """Flushes all line splitters.
306

307
    """
308
    for ls in self._splitter.itervalues():
309
      ls.flush()
310

    
311
  def CloseAll(self):
312
    """Closes all line splitters.
313

314
    """
315
    for ls in self._splitter.itervalues():
316
      ls.close()
317
    self._splitter.clear()
318

    
319
  def NotifyDd(self):
320
    """Tells dd(1) to write statistics.
321

322
    """
323
    if self._dd_pid is None:
324
      # Can't notify
325
      return False
326

    
327
    if not self._dd_ready:
328
      # There's a race condition between starting the program and sending
329
      # signals.  The signal handler is only registered after some time, so we
330
      # have to check whether the program is ready. If it isn't, sending a
331
      # signal will invoke the default handler (and usually abort the program).
332
      if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
333
        logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
334
        return False
335

    
336
      logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
337
      self._dd_ready = True
338

    
339
    logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
340
    try:
341
      os.kill(self._dd_pid, DD_INFO_SIGNAL)
342
    except EnvironmentError, err:
343
      if err.errno != errno.ESRCH:
344
        raise
345

    
346
      # Process no longer exists
347
      logging.debug("dd exited")
348
      self._dd_pid = None
349

    
350
    return True
351

    
352
  def _ProcessOutput(self, line, prog):
353
    """Takes care of child process output.
354

355
    @type line: string
356
    @param line: Child output line
357
    @type prog: number
358
    @param prog: Program from which the line originates
359

360
    """
361
    force_update = False
362
    forward_line = line
363

    
364
    if prog == PROG_SOCAT:
365
      level = None
366
      parts = line.split(None, 4)
367

    
368
      if len(parts) == 5:
369
        (_, _, _, level, msg) = parts
370

    
371
        force_update = self._ProcessSocatOutput(self._status_file, level, msg)
372

    
373
        if self._debug or (level and level not in SOCAT_LOG_IGNORE):
374
          forward_line = "socat: %s %s" % (level, msg)
375
        else:
376
          forward_line = None
377
      else:
378
        forward_line = "socat: %s" % line
379

    
380
    elif prog == PROG_DD:
381
      (should_forward, force_update) = self._ProcessDdOutput(line)
382

    
383
      if should_forward or self._debug:
384
        forward_line = "dd: %s" % line
385
      else:
386
        forward_line = None
387

    
388
    elif prog == PROG_DD_PID:
389
      if self._dd_pid:
390
        raise RuntimeError("dd PID reported more than once")
391
      logging.debug("Received dd PID %r", line)
392
      self._dd_pid = int(line)
393
      forward_line = None
394

    
395
    elif prog == PROG_EXP_SIZE:
396
      logging.debug("Received predicted size %r", line)
397
      forward_line = None
398

    
399
      if line:
400
        try:
401
          exp_size = utils.BytesToMebibyte(int(line))
402
        except (ValueError, TypeError), err:
403
          logging.error("Failed to convert predicted size %r to number: %s",
404
                        line, err)
405
          exp_size = None
406
      else:
407
        exp_size = None
408

    
409
      self._exp_size = exp_size
410

    
411
    if forward_line:
412
      self._logger.info(forward_line)
413
      self._status_file.AddRecentOutput(forward_line)
414

    
415
    self._status_file.Update(force_update)
416

    
417
  @staticmethod
418
  def _ProcessSocatOutput(status_file, level, msg):
419
    """Interprets socat log output.
420

421
    """
422
    if level == SOCAT_LOG_NOTICE:
423
      if status_file.GetListenPort() is None:
424
        # TODO: Maybe implement timeout to not listen forever
425
        m = LISTENING_RE.match(msg)
426
        if m:
427
          (_, port) = _VerifyListening(int(m.group("family")),
428
                                       m.group("address"),
429
                                       int(m.group("port")))
430

    
431
          status_file.SetListenPort(port)
432
          return True
433

    
434
      if not status_file.GetConnected():
435
        m = TRANSFER_LOOP_RE.match(msg)
436
        if m:
437
          logging.debug("Connection established")
438
          status_file.SetConnected()
439
          return True
440

    
441
    return False
442

    
443
  def _ProcessDdOutput(self, line):
444
    """Interprets a line of dd(1)'s output.
445

446
    """
447
    m = DD_INFO_RE.match(line)
448
    if m:
449
      seconds = float(m.group("seconds"))
450
      mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
451
      self._UpdateDdProgress(seconds, mbytes)
452
      return (False, True)
453

    
454
    m = DD_STDERR_IGNORE.match(line)
455
    if m:
456
      # Ignore
457
      return (False, False)
458

    
459
    # Forward line
460
    return (True, False)
461

    
462
  def _UpdateDdProgress(self, seconds, mbytes):
463
    """Updates the internal status variables for dd(1) progress.
464

465
    @type seconds: float
466
    @param seconds: Timestamp of this update
467
    @type mbytes: float
468
    @param mbytes: Total number of MiB transferred so far
469

470
    """
471
    # Add latest sample
472
    self._dd_progress.append((seconds, mbytes))
473

    
474
    # Remove old samples
475
    del self._dd_progress[:-self._dd_tp_samples]
476

    
477
    # Calculate throughput
478
    throughput = _CalcThroughput(self._dd_progress)
479

    
480
    # Calculate percent and ETA
481
    percent = None
482
    eta = None
483

    
484
    if self._exp_size is not None:
485
      if self._exp_size != 0:
486
        percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
487

    
488
      if throughput:
489
        eta = max(0, float(self._exp_size - mbytes) / throughput)
490

    
491
    self._status_file.SetProgress(mbytes, throughput, percent, eta)
492

    
493

    
494
def _CalcThroughput(samples):
495
  """Calculates the throughput in MiB/second.
496

497
  @type samples: sequence
498
  @param samples: List of samples, each consisting of a (timestamp, mbytes)
499
                  tuple
500
  @rtype: float or None
501
  @return: Throughput in MiB/second
502

503
  """
504
  if len(samples) < 2:
505
    # Can't calculate throughput
506
    return None
507

    
508
  (start_time, start_mbytes) = samples[0]
509
  (end_time, end_mbytes) = samples[-1]
510

    
511
  return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)