Statistics
| Branch: | Tag: | Revision:

root / lib / impexpd / __init__.py @ f9323011

History | View | Annotate | Download (14 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Classes and functions for import/export daemon.
23

24
"""
25

    
26
import os
27
import re
28
import socket
29
import logging
30
import signal
31
import errno
32
import time
33
from cStringIO import StringIO
34

    
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import utils
38

    
39

    
40
#: Used to recognize point at which socat(1) starts to listen on its socket.
41
#: The local address is required for the remote peer to connect (in particular
42
#: the port number).
43
LISTENING_RE = re.compile(r"^listening on\s+"
44
                          r"AF=(?P<family>\d+)\s+"
45
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
46

    
47
#: Used to recognize point at which socat(1) is sending data over the wire
48
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
49
                              re.I)
50

    
51
SOCAT_LOG_DEBUG = "D"
52
SOCAT_LOG_INFO = "I"
53
SOCAT_LOG_NOTICE = "N"
54
SOCAT_LOG_WARNING = "W"
55
SOCAT_LOG_ERROR = "E"
56
SOCAT_LOG_FATAL = "F"
57

    
58
SOCAT_LOG_IGNORE = frozenset([
59
  SOCAT_LOG_DEBUG,
60
  SOCAT_LOG_INFO,
61
  SOCAT_LOG_NOTICE,
62
  ])
63

    
64
#: Used to parse GNU dd(1) statistics
65
DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
66
                        r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
67

    
68
#: Used to ignore "N+N records in/out" on dd(1)'s stderr
69
DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
70

    
71
#: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
72
#: unavailable and SIGUSR1 is used instead)
73
DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
74

    
75
#: Buffer size: at most this many bytes are transferred at once
76
BUFSIZE = 1024 * 1024
77

    
78
# Common options for socat
79
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
80
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
81

    
82
(PROG_OTHER,
83
 PROG_SOCAT,
84
 PROG_DD,
85
 PROG_DD_PID,
86
 PROG_EXP_SIZE) = range(1, 6)
87
PROG_ALL = frozenset([
88
  PROG_OTHER,
89
  PROG_SOCAT,
90
  PROG_DD,
91
  PROG_DD_PID,
92
  PROG_EXP_SIZE,
93
  ])
94

    
95

    
96
class CommandBuilder(object):
97
  def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
98
    """Initializes this class.
99

100
    @param mode: Daemon mode (import or export)
101
    @param opts: Options object
102
    @type socat_stderr_fd: int
103
    @param socat_stderr_fd: File descriptor socat should write its stderr to
104
    @type dd_stderr_fd: int
105
    @param dd_stderr_fd: File descriptor dd should write its stderr to
106
    @type dd_pid_fd: int
107
    @param dd_pid_fd: File descriptor the child should write dd's PID to
108

109
    """
110
    self._opts = opts
111
    self._mode = mode
112
    self._socat_stderr_fd = socat_stderr_fd
113
    self._dd_stderr_fd = dd_stderr_fd
114
    self._dd_pid_fd = dd_pid_fd
115

    
116
  @staticmethod
117
  def GetBashCommand(cmd):
118
    """Prepares a command to be run in Bash.
119

120
    """
121
    return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
122

    
123
  def _GetSocatCommand(self):
124
    """Returns the socat command.
125

126
    """
127
    common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
128
      "key=%s" % self._opts.key,
129
      "cert=%s" % self._opts.cert,
130
      "cafile=%s" % self._opts.ca,
131
      ]
132

    
133
    if self._opts.bind is not None:
134
      common_addr_opts.append("bind=%s" % self._opts.bind)
135

    
136
    if self._mode == constants.IEM_IMPORT:
137
      if self._opts.port is None:
138
        port = 0
139
      else:
140
        port = self._opts.port
141

    
142
      addr1 = [
143
        "OPENSSL-LISTEN:%s" % port,
144
        "reuseaddr",
145

    
146
        # Retry to listen if connection wasn't established successfully, up to
147
        # 100 times a second. Note that this still leaves room for DoS attacks.
148
        "forever",
149
        "intervall=0.01",
150
        ] + common_addr_opts
151
      addr2 = ["stdout"]
152

    
153
    elif self._mode == constants.IEM_EXPORT:
154
      addr1 = ["stdin"]
155
      addr2 = [
156
        "OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
157

    
158
        # How long to wait per connection attempt
159
        "connect-timeout=%s" % self._opts.connect_timeout,
160

    
161
        # Retry a few times before giving up to connect (once per second)
162
        "retry=%s" % self._opts.connect_retries,
163
        "intervall=1",
164
        ] + common_addr_opts
165

    
166
    else:
167
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
168

    
169
    for i in [addr1, addr2]:
170
      for value in i:
171
        if "," in value:
172
          raise errors.GenericError("Comma not allowed in socat option"
173
                                    " value: %r" % value)
174

    
175
    return [
176
      constants.SOCAT_PATH,
177

    
178
      # Log to stderr
179
      "-ls",
180

    
181
      # Log level
182
      "-d", "-d",
183

    
184
      # Buffer size
185
      "-b%s" % BUFSIZE,
186

    
187
      # Unidirectional mode, the first address is only used for reading, and the
188
      # second address is only used for writing
189
      "-u",
190

    
191
      ",".join(addr1), ",".join(addr2)
192
      ]
193

    
194
  def _GetTransportCommand(self):
195
    """Returns the command for the transport part of the daemon.
196

197
    """
198
    socat_cmd = ("%s 2>&%d" %
199
                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
200
                  self._socat_stderr_fd))
201

    
202
    dd_cmd = StringIO()
203
    # Setting LC_ALL since we want to parse the output and explicitely
204
    # redirecting stdin, as the background process (dd) would have /dev/null as
205
    # stdin otherwise
206
    dd_cmd.write("{ LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
207
                 (BUFSIZE, self._dd_stderr_fd))
208
    # Send PID to daemon
209
    dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
210
    # And wait for dd
211
    dd_cmd.write(" wait $pid;")
212
    dd_cmd.write(" }")
213

    
214
    compr = self._opts.compress
215

    
216
    assert compr in constants.IEC_ALL
217

    
218
    if self._mode == constants.IEM_IMPORT:
219
      if compr == constants.IEC_GZIP:
220
        transport_cmd = "%s | gunzip -c" % socat_cmd
221
      else:
222
        transport_cmd = socat_cmd
223

    
224
      transport_cmd += " | %s" % dd_cmd.getvalue()
225
    elif self._mode == constants.IEM_EXPORT:
226
      if compr == constants.IEC_GZIP:
227
        transport_cmd = "gzip -c | %s" % socat_cmd
228
      else:
229
        transport_cmd = socat_cmd
230

    
231
      transport_cmd = "%s | %s" % (dd_cmd.getvalue(), transport_cmd)
232
    else:
233
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
234

    
235
    # TODO: Run transport as separate user
236
    # The transport uses its own shell to simplify running it as a separate user
237
    # in the future.
238
    return self.GetBashCommand(transport_cmd)
239

    
240
  def GetCommand(self):
241
    """Returns the complete child process command.
242

243
    """
244
    transport_cmd = self._GetTransportCommand()
245

    
246
    buf = StringIO()
247

    
248
    if self._opts.cmd_prefix:
249
      buf.write(self._opts.cmd_prefix)
250
      buf.write(" ")
251

    
252
    buf.write(utils.ShellQuoteArgs(transport_cmd))
253

    
254
    if self._opts.cmd_suffix:
255
      buf.write(" ")
256
      buf.write(self._opts.cmd_suffix)
257

    
258
    return self.GetBashCommand(buf.getvalue())
259

    
260

    
261
def _VerifyListening(family, address, port):
262
  """Verify address given as listening address by socat.
263

264
  """
265
  # TODO: Implement IPv6 support
266
  if family != socket.AF_INET:
267
    raise errors.GenericError("Address family %r not supported" % family)
268

    
269
  try:
270
    packed_address = socket.inet_pton(family, address)
271
  except socket.error:
272
    raise errors.GenericError("Invalid address %r for family %s" %
273
                              (address, family))
274

    
275
  return (socket.inet_ntop(family, packed_address), port)
276

    
277

    
278
class ChildIOProcessor(object):
279
  def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
280
    """Initializes this class.
281

282
    """
283
    self._debug = debug
284
    self._status_file = status_file
285
    self._logger = logger
286

    
287
    self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
288
                           for prog in PROG_ALL])
289

    
290
    self._dd_pid = None
291
    self._dd_ready = False
292
    self._dd_tp_samples = throughput_samples
293
    self._dd_progress = []
294

    
295
    # Expected size of transferred data
296
    self._exp_size = exp_size
297

    
298
  def GetLineSplitter(self, prog):
299
    """Returns the line splitter for a program.
300

301
    """
302
    return self._splitter[prog]
303

    
304
  def FlushAll(self):
305
    """Flushes all line splitters.
306

307
    """
308
    for ls in self._splitter.itervalues():
309
      ls.flush()
310

    
311
  def CloseAll(self):
312
    """Closes all line splitters.
313

314
    """
315
    for ls in self._splitter.itervalues():
316
      ls.close()
317
    self._splitter.clear()
318

    
319
  def NotifyDd(self):
320
    """Tells dd(1) to write statistics.
321

322
    """
323
    if self._dd_pid is None:
324
      # Can't notify
325
      return False
326

    
327
    if not self._dd_ready:
328
      # There's a race condition between starting the program and sending
329
      # signals.  The signal handler is only registered after some time, so we
330
      # have to check whether the program is ready. If it isn't, sending a
331
      # signal will invoke the default handler (and usually abort the program).
332
      if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
333
        logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
334
        return False
335

    
336
      logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
337
      self._dd_ready = True
338

    
339
    logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
340
    try:
341
      os.kill(self._dd_pid, DD_INFO_SIGNAL)
342
    except EnvironmentError, err:
343
      if err.errno != errno.ESRCH:
344
        raise
345

    
346
      # Process no longer exists
347
      self._dd_pid = None
348

    
349
    return True
350

    
351
  def _ProcessOutput(self, line, prog):
352
    """Takes care of child process output.
353

354
    @type line: string
355
    @param line: Child output line
356
    @type prog: number
357
    @param prog: Program from which the line originates
358

359
    """
360
    force_update = False
361
    forward_line = line
362

    
363
    if prog == PROG_SOCAT:
364
      level = None
365
      parts = line.split(None, 4)
366

    
367
      if len(parts) == 5:
368
        (_, _, _, level, msg) = parts
369

    
370
        force_update = self._ProcessSocatOutput(self._status_file, level, msg)
371

    
372
        if self._debug or (level and level not in SOCAT_LOG_IGNORE):
373
          forward_line = "socat: %s %s" % (level, msg)
374
        else:
375
          forward_line = None
376
      else:
377
        forward_line = "socat: %s" % line
378

    
379
    elif prog == PROG_DD:
380
      (should_forward, force_update) = self._ProcessDdOutput(line)
381

    
382
      if should_forward or self._debug:
383
        forward_line = "dd: %s" % line
384
      else:
385
        forward_line = None
386

    
387
    elif prog == PROG_DD_PID:
388
      if self._dd_pid:
389
        raise RuntimeError("dd PID reported more than once")
390
      logging.debug("Received dd PID %r", line)
391
      self._dd_pid = int(line)
392
      forward_line = None
393

    
394
    elif prog == PROG_EXP_SIZE:
395
      logging.debug("Received predicted size %r", line)
396
      forward_line = None
397

    
398
      if line:
399
        try:
400
          exp_size = utils.BytesToMebibyte(int(line))
401
        except (ValueError, TypeError), err:
402
          logging.error("Failed to convert predicted size %r to number: %s",
403
                        line, err)
404
          exp_size = None
405
      else:
406
        exp_size = None
407

    
408
      self._exp_size = exp_size
409

    
410
    if forward_line:
411
      self._logger.info(forward_line)
412
      self._status_file.AddRecentOutput(forward_line)
413

    
414
    self._status_file.Update(force_update)
415

    
416
  @staticmethod
417
  def _ProcessSocatOutput(status_file, level, msg):
418
    """Interprets socat log output.
419

420
    """
421
    if level == SOCAT_LOG_NOTICE:
422
      if status_file.GetListenPort() is None:
423
        # TODO: Maybe implement timeout to not listen forever
424
        m = LISTENING_RE.match(msg)
425
        if m:
426
          (_, port) = _VerifyListening(int(m.group("family")),
427
                                       m.group("address"),
428
                                       int(m.group("port")))
429

    
430
          status_file.SetListenPort(port)
431
          return True
432

    
433
      if not status_file.GetConnected():
434
        m = TRANSFER_LOOP_RE.match(msg)
435
        if m:
436
          logging.debug("Connection established")
437
          status_file.SetConnected()
438
          return True
439

    
440
    return False
441

    
442
  def _ProcessDdOutput(self, line):
443
    """Interprets a line of dd(1)'s output.
444

445
    """
446
    m = DD_INFO_RE.match(line)
447
    if m:
448
      seconds = float(m.group("seconds"))
449
      mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
450
      self._UpdateDdProgress(seconds, mbytes)
451
      return (False, True)
452

    
453
    m = DD_STDERR_IGNORE.match(line)
454
    if m:
455
      # Ignore
456
      return (False, False)
457

    
458
    # Forward line
459
    return (True, False)
460

    
461
  def _UpdateDdProgress(self, seconds, mbytes):
462
    """Updates the internal status variables for dd(1) progress.
463

464
    @type seconds: float
465
    @param seconds: Timestamp of this update
466
    @type mbytes: float
467
    @param mbytes: Total number of MiB transferred so far
468

469
    """
470
    # Add latest sample
471
    self._dd_progress.append((seconds, mbytes))
472

    
473
    # Remove old samples
474
    del self._dd_progress[:-self._dd_tp_samples]
475

    
476
    # Calculate throughput
477
    throughput = _CalcThroughput(self._dd_progress)
478

    
479
    # Calculate percent and ETA
480
    percent = None
481
    eta = None
482

    
483
    if self._exp_size is not None:
484
      if self._exp_size != 0:
485
        percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
486

    
487
      if throughput:
488
        eta = max(0, float(self._exp_size - mbytes) / throughput)
489

    
490
    self._status_file.SetProgress(mbytes, throughput, percent, eta)
491

    
492

    
493
def _CalcThroughput(samples):
494
  """Calculates the throughput in MiB/second.
495

496
  @type samples: sequence
497
  @param samples: List of samples, each consisting of a (timestamp, mbytes)
498
                  tuple
499
  @rtype: float or None
500
  @return: Throughput in MiB/second
501

502
  """
503
  if len(samples) < 2:
504
    # Can't calculate throughput
505
    return None
506

    
507
  (start_time, start_mbytes) = samples[0]
508
  (end_time, end_mbytes) = samples[-1]
509

    
510
  return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)