Statistics
| Branch: | Tag: | Revision:

root / lib / impexpd / __init__.py @ 2ed0e208

History | View | Annotate | Download (16.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Classes and functions for import/export daemon.
23

24
"""
25

    
26
import os
27
import re
28
import socket
29
import logging
30
import signal
31
import errno
32
import time
33
from cStringIO import StringIO
34

    
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import utils
38
from ganeti import netutils
39

    
40

    
41
#: Used to recognize point at which socat(1) starts to listen on its socket.
42
#: The local address is required for the remote peer to connect (in particular
43
#: the port number).
44
LISTENING_RE = re.compile(r"^listening on\s+"
45
                          r"AF=(?P<family>\d+)\s+"
46
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
47

    
48
#: Used to recognize point at which socat(1) is sending data over the wire
49
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
50
                              re.I)
51

    
52
SOCAT_LOG_DEBUG = "D"
53
SOCAT_LOG_INFO = "I"
54
SOCAT_LOG_NOTICE = "N"
55
SOCAT_LOG_WARNING = "W"
56
SOCAT_LOG_ERROR = "E"
57
SOCAT_LOG_FATAL = "F"
58

    
59
SOCAT_LOG_IGNORE = frozenset([
60
  SOCAT_LOG_DEBUG,
61
  SOCAT_LOG_INFO,
62
  SOCAT_LOG_NOTICE,
63
  ])
64

    
65
#: Used to parse GNU dd(1) statistics
66
DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
67
                        r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
68

    
69
#: Used to ignore "N+N records in/out" on dd(1)'s stderr
70
DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
71

    
72
#: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
73
#: unavailable and SIGUSR1 is used instead)
74
DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
75

    
76
#: Buffer size: at most this many bytes are transferred at once
77
BUFSIZE = 1024 * 1024
78

    
79
# Common options for socat
80
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
81
SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1",
82
                      "cipher=%s" % constants.OPENSSL_CIPHERS]
83

    
84
if constants.SOCAT_USE_COMPRESS:
85
  # Disables all compression in by OpenSSL. Only supported in patched versions
86
  # of socat (as of November 2010). See INSTALL for more information.
87
  SOCAT_OPENSSL_OPTS.append("compress=none")
88

    
89
SOCAT_OPTION_MAXLEN = 400
90

    
91
(PROG_OTHER,
92
 PROG_SOCAT,
93
 PROG_DD,
94
 PROG_DD_PID,
95
 PROG_EXP_SIZE) = range(1, 6)
96
PROG_ALL = frozenset([
97
  PROG_OTHER,
98
  PROG_SOCAT,
99
  PROG_DD,
100
  PROG_DD_PID,
101
  PROG_EXP_SIZE,
102
  ])
103

    
104

    
105
class CommandBuilder(object):
106
  def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
107
    """Initializes this class.
108

109
    @param mode: Daemon mode (import or export)
110
    @param opts: Options object
111
    @type socat_stderr_fd: int
112
    @param socat_stderr_fd: File descriptor socat should write its stderr to
113
    @type dd_stderr_fd: int
114
    @param dd_stderr_fd: File descriptor dd should write its stderr to
115
    @type dd_pid_fd: int
116
    @param dd_pid_fd: File descriptor the child should write dd's PID to
117

118
    """
119
    self._opts = opts
120
    self._mode = mode
121
    self._socat_stderr_fd = socat_stderr_fd
122
    self._dd_stderr_fd = dd_stderr_fd
123
    self._dd_pid_fd = dd_pid_fd
124

    
125
    assert (self._opts.magic is None or
126
            constants.IE_MAGIC_RE.match(self._opts.magic))
127

    
128
  @staticmethod
129
  def GetBashCommand(cmd):
130
    """Prepares a command to be run in Bash.
131

132
    """
133
    return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
134

    
135
  def _GetSocatCommand(self):
136
    """Returns the socat command.
137

138
    """
139
    common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
140
      "key=%s" % self._opts.key,
141
      "cert=%s" % self._opts.cert,
142
      "cafile=%s" % self._opts.ca,
143
      ]
144

    
145
    if self._opts.bind is not None:
146
      common_addr_opts.append("bind=%s" % self._opts.bind)
147

    
148
    assert not (self._opts.ipv4 and self._opts.ipv6)
149

    
150
    if self._opts.ipv4:
151
      common_addr_opts.append("pf=ipv4")
152
    elif self._opts.ipv6:
153
      common_addr_opts.append("pf=ipv6")
154

    
155
    if self._mode == constants.IEM_IMPORT:
156
      if self._opts.port is None:
157
        port = 0
158
      else:
159
        port = self._opts.port
160

    
161
      addr1 = [
162
        "OPENSSL-LISTEN:%s" % port,
163
        "reuseaddr",
164

    
165
        # Retry to listen if connection wasn't established successfully, up to
166
        # 100 times a second. Note that this still leaves room for DoS attacks.
167
        "forever",
168
        "intervall=0.01",
169
        ] + common_addr_opts
170
      addr2 = ["stdout"]
171

    
172
    elif self._mode == constants.IEM_EXPORT:
173
      if self._opts.host and netutils.IP6Address.IsValid(self._opts.host):
174
        host = "[%s]" % self._opts.host
175
      else:
176
        host = self._opts.host
177

    
178
      addr1 = ["stdin"]
179
      addr2 = [
180
        "OPENSSL:%s:%s" % (host, self._opts.port),
181

    
182
        # How long to wait per connection attempt
183
        "connect-timeout=%s" % self._opts.connect_timeout,
184

    
185
        # Retry a few times before giving up to connect (once per second)
186
        "retry=%s" % self._opts.connect_retries,
187
        "intervall=1",
188
        ] + common_addr_opts
189

    
190
    else:
191
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
192

    
193
    for i in [addr1, addr2]:
194
      for value in i:
195
        if len(value) > SOCAT_OPTION_MAXLEN:
196
          raise errors.GenericError("Socat option longer than %s"
197
                                    " characters: %r" %
198
                                    (SOCAT_OPTION_MAXLEN, value))
199
        if "," in value:
200
          raise errors.GenericError("Comma not allowed in socat option"
201
                                    " value: %r" % value)
202

    
203
    return [
204
      constants.SOCAT_PATH,
205

    
206
      # Log to stderr
207
      "-ls",
208

    
209
      # Log level
210
      "-d", "-d",
211

    
212
      # Buffer size
213
      "-b%s" % BUFSIZE,
214

    
215
      # Unidirectional mode, the first address is only used for reading, and the
216
      # second address is only used for writing
217
      "-u",
218

    
219
      ",".join(addr1), ",".join(addr2)
220
      ]
221

    
222
  def _GetMagicCommand(self):
223
    """Returns the command to read/write the magic value.
224

225
    """
226
    if not self._opts.magic:
227
      return None
228

    
229
    # Prefix to ensure magic isn't interpreted as option to "echo"
230
    magic = "M=%s" % self._opts.magic
231

    
232
    cmd = StringIO()
233

    
234
    if self._mode == constants.IEM_IMPORT:
235
      cmd.write("{ ")
236
      cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"]))
237
      cmd.write(" && ")
238
      cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic))
239
      cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch"))
240
      cmd.write(" exit 1;")
241
      cmd.write("fi;")
242
      cmd.write(" }")
243

    
244
    elif self._mode == constants.IEM_EXPORT:
245
      cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic]))
246

    
247
    else:
248
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
249

    
250
    return cmd.getvalue()
251

    
252
  def _GetDdCommand(self):
253
    """Returns the command for measuring throughput.
254

255
    """
256
    dd_cmd = StringIO()
257

    
258
    magic_cmd = self._GetMagicCommand()
259
    if magic_cmd:
260
      dd_cmd.write("{ ")
261
      dd_cmd.write(magic_cmd)
262
      dd_cmd.write(" && ")
263

    
264
    dd_cmd.write("{ ")
265
    # Setting LC_ALL since we want to parse the output and explicitly
266
    # redirecting stdin, as the background process (dd) would have
267
    # /dev/null as stdin otherwise
268
    dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
269
                 (BUFSIZE, self._dd_stderr_fd))
270
    # Send PID to daemon
271
    dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
272
    # And wait for dd
273
    dd_cmd.write(" wait $pid;")
274
    dd_cmd.write(" }")
275

    
276
    if magic_cmd:
277
      dd_cmd.write(" }")
278

    
279
    return dd_cmd.getvalue()
280

    
281
  def _GetTransportCommand(self):
282
    """Returns the command for the transport part of the daemon.
283

284
    """
285
    socat_cmd = ("%s 2>&%d" %
286
                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
287
                  self._socat_stderr_fd))
288
    dd_cmd = self._GetDdCommand()
289

    
290
    compr = self._opts.compress
291

    
292
    assert compr in constants.IEC_ALL
293

    
294
    parts = []
295

    
296
    if self._mode == constants.IEM_IMPORT:
297
      parts.append(socat_cmd)
298

    
299
      if compr == constants.IEC_GZIP:
300
        parts.append("gunzip -c")
301

    
302
      parts.append(dd_cmd)
303

    
304
    elif self._mode == constants.IEM_EXPORT:
305
      parts.append(dd_cmd)
306

    
307
      if compr == constants.IEC_GZIP:
308
        parts.append("gzip -c")
309

    
310
      parts.append(socat_cmd)
311

    
312
    else:
313
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
314

    
315
    # TODO: Run transport as separate user
316
    # The transport uses its own shell to simplify running it as a separate user
317
    # in the future.
318
    return self.GetBashCommand(" | ".join(parts))
319

    
320
  def GetCommand(self):
321
    """Returns the complete child process command.
322

323
    """
324
    transport_cmd = self._GetTransportCommand()
325

    
326
    buf = StringIO()
327

    
328
    if self._opts.cmd_prefix:
329
      buf.write(self._opts.cmd_prefix)
330
      buf.write(" ")
331

    
332
    buf.write(utils.ShellQuoteArgs(transport_cmd))
333

    
334
    if self._opts.cmd_suffix:
335
      buf.write(" ")
336
      buf.write(self._opts.cmd_suffix)
337

    
338
    return self.GetBashCommand(buf.getvalue())
339

    
340

    
341
def _VerifyListening(family, address, port):
342
  """Verify address given as listening address by socat.
343

344
  """
345
  if family not in (socket.AF_INET, socket.AF_INET6):
346
    raise errors.GenericError("Address family %r not supported" % family)
347

    
348
  if (family == socket.AF_INET6 and address.startswith("[") and
349
      address.endswith("]")):
350
    address = address.lstrip("[").rstrip("]")
351

    
352
  try:
353
    packed_address = socket.inet_pton(family, address)
354
  except socket.error:
355
    raise errors.GenericError("Invalid address %r for family %s" %
356
                              (address, family))
357

    
358
  return (socket.inet_ntop(family, packed_address), port)
359

    
360

    
361
class ChildIOProcessor(object):
362
  def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
363
    """Initializes this class.
364

365
    """
366
    self._debug = debug
367
    self._status_file = status_file
368
    self._logger = logger
369

    
370
    self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
371
                           for prog in PROG_ALL])
372

    
373
    self._dd_pid = None
374
    self._dd_ready = False
375
    self._dd_tp_samples = throughput_samples
376
    self._dd_progress = []
377

    
378
    # Expected size of transferred data
379
    self._exp_size = exp_size
380

    
381
  def GetLineSplitter(self, prog):
382
    """Returns the line splitter for a program.
383

384
    """
385
    return self._splitter[prog]
386

    
387
  def FlushAll(self):
388
    """Flushes all line splitters.
389

390
    """
391
    for ls in self._splitter.itervalues():
392
      ls.flush()
393

    
394
  def CloseAll(self):
395
    """Closes all line splitters.
396

397
    """
398
    for ls in self._splitter.itervalues():
399
      ls.close()
400
    self._splitter.clear()
401

    
402
  def NotifyDd(self):
403
    """Tells dd(1) to write statistics.
404

405
    """
406
    if self._dd_pid is None:
407
      # Can't notify
408
      return False
409

    
410
    if not self._dd_ready:
411
      # There's a race condition between starting the program and sending
412
      # signals.  The signal handler is only registered after some time, so we
413
      # have to check whether the program is ready. If it isn't, sending a
414
      # signal will invoke the default handler (and usually abort the program).
415
      if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
416
        logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
417
        return False
418

    
419
      logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
420
      self._dd_ready = True
421

    
422
    logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
423
    try:
424
      os.kill(self._dd_pid, DD_INFO_SIGNAL)
425
    except EnvironmentError, err:
426
      if err.errno != errno.ESRCH:
427
        raise
428

    
429
      # Process no longer exists
430
      logging.debug("dd exited")
431
      self._dd_pid = None
432

    
433
    return True
434

    
435
  def _ProcessOutput(self, line, prog):
436
    """Takes care of child process output.
437

438
    @type line: string
439
    @param line: Child output line
440
    @type prog: number
441
    @param prog: Program from which the line originates
442

443
    """
444
    force_update = False
445
    forward_line = line
446

    
447
    if prog == PROG_SOCAT:
448
      level = None
449
      parts = line.split(None, 4)
450

    
451
      if len(parts) == 5:
452
        (_, _, _, level, msg) = parts
453

    
454
        force_update = self._ProcessSocatOutput(self._status_file, level, msg)
455

    
456
        if self._debug or (level and level not in SOCAT_LOG_IGNORE):
457
          forward_line = "socat: %s %s" % (level, msg)
458
        else:
459
          forward_line = None
460
      else:
461
        forward_line = "socat: %s" % line
462

    
463
    elif prog == PROG_DD:
464
      (should_forward, force_update) = self._ProcessDdOutput(line)
465

    
466
      if should_forward or self._debug:
467
        forward_line = "dd: %s" % line
468
      else:
469
        forward_line = None
470

    
471
    elif prog == PROG_DD_PID:
472
      if self._dd_pid:
473
        raise RuntimeError("dd PID reported more than once")
474
      logging.debug("Received dd PID %r", line)
475
      self._dd_pid = int(line)
476
      forward_line = None
477

    
478
    elif prog == PROG_EXP_SIZE:
479
      logging.debug("Received predicted size %r", line)
480
      forward_line = None
481

    
482
      if line:
483
        try:
484
          exp_size = utils.BytesToMebibyte(int(line))
485
        except (ValueError, TypeError), err:
486
          logging.error("Failed to convert predicted size %r to number: %s",
487
                        line, err)
488
          exp_size = None
489
      else:
490
        exp_size = None
491

    
492
      self._exp_size = exp_size
493

    
494
    if forward_line:
495
      self._logger.info(forward_line)
496
      self._status_file.AddRecentOutput(forward_line)
497

    
498
    self._status_file.Update(force_update)
499

    
500
  @staticmethod
501
  def _ProcessSocatOutput(status_file, level, msg):
502
    """Interprets socat log output.
503

504
    """
505
    if level == SOCAT_LOG_NOTICE:
506
      if status_file.GetListenPort() is None:
507
        # TODO: Maybe implement timeout to not listen forever
508
        m = LISTENING_RE.match(msg)
509
        if m:
510
          (_, port) = _VerifyListening(int(m.group("family")),
511
                                       m.group("address"),
512
                                       int(m.group("port")))
513

    
514
          status_file.SetListenPort(port)
515
          return True
516

    
517
      if not status_file.GetConnected():
518
        m = TRANSFER_LOOP_RE.match(msg)
519
        if m:
520
          logging.debug("Connection established")
521
          status_file.SetConnected()
522
          return True
523

    
524
    return False
525

    
526
  def _ProcessDdOutput(self, line):
527
    """Interprets a line of dd(1)'s output.
528

529
    """
530
    m = DD_INFO_RE.match(line)
531
    if m:
532
      seconds = float(m.group("seconds"))
533
      mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
534
      self._UpdateDdProgress(seconds, mbytes)
535
      return (False, True)
536

    
537
    m = DD_STDERR_IGNORE.match(line)
538
    if m:
539
      # Ignore
540
      return (False, False)
541

    
542
    # Forward line
543
    return (True, False)
544

    
545
  def _UpdateDdProgress(self, seconds, mbytes):
546
    """Updates the internal status variables for dd(1) progress.
547

548
    @type seconds: float
549
    @param seconds: Timestamp of this update
550
    @type mbytes: float
551
    @param mbytes: Total number of MiB transferred so far
552

553
    """
554
    # Add latest sample
555
    self._dd_progress.append((seconds, mbytes))
556

    
557
    # Remove old samples
558
    del self._dd_progress[:-self._dd_tp_samples]
559

    
560
    # Calculate throughput
561
    throughput = _CalcThroughput(self._dd_progress)
562

    
563
    # Calculate percent and ETA
564
    percent = None
565
    eta = None
566

    
567
    if self._exp_size is not None:
568
      if self._exp_size != 0:
569
        percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
570

    
571
      if throughput:
572
        eta = max(0, float(self._exp_size - mbytes) / throughput)
573

    
574
    self._status_file.SetProgress(mbytes, throughput, percent, eta)
575

    
576

    
577
def _CalcThroughput(samples):
578
  """Calculates the throughput in MiB/second.
579

580
  @type samples: sequence
581
  @param samples: List of samples, each consisting of a (timestamp, mbytes)
582
                  tuple
583
  @rtype: float or None
584
  @return: Throughput in MiB/second
585

586
  """
587
  if len(samples) < 2:
588
    # Can't calculate throughput
589
    return None
590

    
591
  (start_time, start_mbytes) = samples[0]
592
  (end_time, end_mbytes) = samples[-1]
593

    
594
  return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)