Statistics
| Branch: | Tag: | Revision:

root / lib / impexpd / __init__.py @ 653bc0f1

History | View | Annotate | Download (16.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Classes and functions for import/export daemon.
23

24
"""
25

    
26
import os
27
import re
28
import socket
29
import logging
30
import signal
31
import errno
32
import time
33
from cStringIO import StringIO
34

    
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import utils
38
from ganeti import netutils
39
from ganeti import compat
40

    
41

    
42
#: Used to recognize point at which socat(1) starts to listen on its socket.
43
#: The local address is required for the remote peer to connect (in particular
44
#: the port number).
45
LISTENING_RE = re.compile(r"^listening on\s+"
46
                          r"AF=(?P<family>\d+)\s+"
47
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
48

    
49
#: Used to recognize point at which socat(1) is sending data over the wire
50
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
51
                              re.I)
52

    
53
SOCAT_LOG_DEBUG = "D"
54
SOCAT_LOG_INFO = "I"
55
SOCAT_LOG_NOTICE = "N"
56
SOCAT_LOG_WARNING = "W"
57
SOCAT_LOG_ERROR = "E"
58
SOCAT_LOG_FATAL = "F"
59

    
60
SOCAT_LOG_IGNORE = compat.UniqueFrozenset([
61
  SOCAT_LOG_DEBUG,
62
  SOCAT_LOG_INFO,
63
  SOCAT_LOG_NOTICE,
64
  ])
65

    
66
#: Used to parse GNU dd(1) statistics
67
DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
68
                        r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
69

    
70
#: Used to ignore "N+N records in/out" on dd(1)'s stderr
71
DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
72

    
73
#: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
74
#: unavailable and SIGUSR1 is used instead)
75
DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
76

    
77
#: Buffer size: at most this many bytes are transferred at once
78
BUFSIZE = 1024 * 1024
79

    
80
# Common options for socat
81
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
82
SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1",
83
                      "cipher=%s" % constants.OPENSSL_CIPHERS]
84

    
85
if constants.SOCAT_USE_COMPRESS:
86
  # Disables all compression in by OpenSSL. Only supported in patched versions
87
  # of socat (as of November 2010). See INSTALL for more information.
88
  SOCAT_OPENSSL_OPTS.append("compress=none")
89

    
90
SOCAT_OPTION_MAXLEN = 400
91

    
92
(PROG_OTHER,
93
 PROG_SOCAT,
94
 PROG_DD,
95
 PROG_DD_PID,
96
 PROG_EXP_SIZE) = range(1, 6)
97

    
98
PROG_ALL = compat.UniqueFrozenset([
99
  PROG_OTHER,
100
  PROG_SOCAT,
101
  PROG_DD,
102
  PROG_DD_PID,
103
  PROG_EXP_SIZE,
104
  ])
105

    
106

    
107
class CommandBuilder(object):
108
  def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
109
    """Initializes this class.
110

111
    @param mode: Daemon mode (import or export)
112
    @param opts: Options object
113
    @type socat_stderr_fd: int
114
    @param socat_stderr_fd: File descriptor socat should write its stderr to
115
    @type dd_stderr_fd: int
116
    @param dd_stderr_fd: File descriptor dd should write its stderr to
117
    @type dd_pid_fd: int
118
    @param dd_pid_fd: File descriptor the child should write dd's PID to
119

120
    """
121
    self._opts = opts
122
    self._mode = mode
123
    self._socat_stderr_fd = socat_stderr_fd
124
    self._dd_stderr_fd = dd_stderr_fd
125
    self._dd_pid_fd = dd_pid_fd
126

    
127
    assert (self._opts.magic is None or
128
            constants.IE_MAGIC_RE.match(self._opts.magic))
129

    
130
  @staticmethod
131
  def GetBashCommand(cmd):
132
    """Prepares a command to be run in Bash.
133

134
    """
135
    return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
136

    
137
  def _GetSocatCommand(self):
138
    """Returns the socat command.
139

140
    """
141
    common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
142
      "key=%s" % self._opts.key,
143
      "cert=%s" % self._opts.cert,
144
      "cafile=%s" % self._opts.ca,
145
      ]
146

    
147
    if self._opts.bind is not None:
148
      common_addr_opts.append("bind=%s" % self._opts.bind)
149

    
150
    assert not (self._opts.ipv4 and self._opts.ipv6)
151

    
152
    if self._opts.ipv4:
153
      common_addr_opts.append("pf=ipv4")
154
    elif self._opts.ipv6:
155
      common_addr_opts.append("pf=ipv6")
156

    
157
    if self._mode == constants.IEM_IMPORT:
158
      if self._opts.port is None:
159
        port = 0
160
      else:
161
        port = self._opts.port
162

    
163
      addr1 = [
164
        "OPENSSL-LISTEN:%s" % port,
165
        "reuseaddr",
166

    
167
        # Retry to listen if connection wasn't established successfully, up to
168
        # 100 times a second. Note that this still leaves room for DoS attacks.
169
        "forever",
170
        "intervall=0.01",
171
        ] + common_addr_opts
172
      addr2 = ["stdout"]
173

    
174
    elif self._mode == constants.IEM_EXPORT:
175
      if self._opts.host and netutils.IP6Address.IsValid(self._opts.host):
176
        host = "[%s]" % self._opts.host
177
      else:
178
        host = self._opts.host
179

    
180
      addr1 = ["stdin"]
181
      addr2 = [
182
        "OPENSSL:%s:%s" % (host, self._opts.port),
183

    
184
        # How long to wait per connection attempt
185
        "connect-timeout=%s" % self._opts.connect_timeout,
186

    
187
        # Retry a few times before giving up to connect (once per second)
188
        "retry=%s" % self._opts.connect_retries,
189
        "intervall=1",
190
        ] + common_addr_opts
191

    
192
    else:
193
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
194

    
195
    for i in [addr1, addr2]:
196
      for value in i:
197
        if len(value) > SOCAT_OPTION_MAXLEN:
198
          raise errors.GenericError("Socat option longer than %s"
199
                                    " characters: %r" %
200
                                    (SOCAT_OPTION_MAXLEN, value))
201
        if "," in value:
202
          raise errors.GenericError("Comma not allowed in socat option"
203
                                    " value: %r" % value)
204

    
205
    return [
206
      constants.SOCAT_PATH,
207

    
208
      # Log to stderr
209
      "-ls",
210

    
211
      # Log level
212
      "-d", "-d",
213

    
214
      # Buffer size
215
      "-b%s" % BUFSIZE,
216

    
217
      # Unidirectional mode, the first address is only used for reading, and the
218
      # second address is only used for writing
219
      "-u",
220

    
221
      ",".join(addr1), ",".join(addr2),
222
      ]
223

    
224
  def _GetMagicCommand(self):
225
    """Returns the command to read/write the magic value.
226

227
    """
228
    if not self._opts.magic:
229
      return None
230

    
231
    # Prefix to ensure magic isn't interpreted as option to "echo"
232
    magic = "M=%s" % self._opts.magic
233

    
234
    cmd = StringIO()
235

    
236
    if self._mode == constants.IEM_IMPORT:
237
      cmd.write("{ ")
238
      cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"]))
239
      cmd.write(" && ")
240
      cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic))
241
      cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch"))
242
      cmd.write(" exit 1;")
243
      cmd.write("fi;")
244
      cmd.write(" }")
245

    
246
    elif self._mode == constants.IEM_EXPORT:
247
      cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic]))
248

    
249
    else:
250
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
251

    
252
    return cmd.getvalue()
253

    
254
  def _GetDdCommand(self):
255
    """Returns the command for measuring throughput.
256

257
    """
258
    dd_cmd = StringIO()
259

    
260
    magic_cmd = self._GetMagicCommand()
261
    if magic_cmd:
262
      dd_cmd.write("{ ")
263
      dd_cmd.write(magic_cmd)
264
      dd_cmd.write(" && ")
265

    
266
    dd_cmd.write("{ ")
267
    # Setting LC_ALL since we want to parse the output and explicitly
268
    # redirecting stdin, as the background process (dd) would have
269
    # /dev/null as stdin otherwise
270
    dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
271
                 (BUFSIZE, self._dd_stderr_fd))
272
    # Send PID to daemon
273
    dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
274
    # And wait for dd
275
    dd_cmd.write(" wait $pid;")
276
    dd_cmd.write(" }")
277

    
278
    if magic_cmd:
279
      dd_cmd.write(" }")
280

    
281
    return dd_cmd.getvalue()
282

    
283
  def _GetTransportCommand(self):
284
    """Returns the command for the transport part of the daemon.
285

286
    """
287
    socat_cmd = ("%s 2>&%d" %
288
                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
289
                  self._socat_stderr_fd))
290
    dd_cmd = self._GetDdCommand()
291

    
292
    compr = self._opts.compress
293

    
294
    assert compr in constants.IEC_ALL
295

    
296
    parts = []
297

    
298
    if self._mode == constants.IEM_IMPORT:
299
      parts.append(socat_cmd)
300

    
301
      if compr == constants.IEC_GZIP:
302
        parts.append("gunzip -c")
303

    
304
      parts.append(dd_cmd)
305

    
306
    elif self._mode == constants.IEM_EXPORT:
307
      parts.append(dd_cmd)
308

    
309
      if compr == constants.IEC_GZIP:
310
        parts.append("gzip -c")
311

    
312
      parts.append(socat_cmd)
313

    
314
    else:
315
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
316

    
317
    # TODO: Run transport as separate user
318
    # The transport uses its own shell to simplify running it as a separate user
319
    # in the future.
320
    return self.GetBashCommand(" | ".join(parts))
321

    
322
  def GetCommand(self):
323
    """Returns the complete child process command.
324

325
    """
326
    transport_cmd = self._GetTransportCommand()
327

    
328
    buf = StringIO()
329

    
330
    if self._opts.cmd_prefix:
331
      buf.write(self._opts.cmd_prefix)
332
      buf.write(" ")
333

    
334
    buf.write(utils.ShellQuoteArgs(transport_cmd))
335

    
336
    if self._opts.cmd_suffix:
337
      buf.write(" ")
338
      buf.write(self._opts.cmd_suffix)
339

    
340
    return self.GetBashCommand(buf.getvalue())
341

    
342

    
343
def _VerifyListening(family, address, port):
344
  """Verify address given as listening address by socat.
345

346
  """
347
  if family not in (socket.AF_INET, socket.AF_INET6):
348
    raise errors.GenericError("Address family %r not supported" % family)
349

    
350
  if (family == socket.AF_INET6 and address.startswith("[") and
351
      address.endswith("]")):
352
    address = address.lstrip("[").rstrip("]")
353

    
354
  try:
355
    packed_address = socket.inet_pton(family, address)
356
  except socket.error:
357
    raise errors.GenericError("Invalid address %r for family %s" %
358
                              (address, family))
359

    
360
  return (socket.inet_ntop(family, packed_address), port)
361

    
362

    
363
class ChildIOProcessor(object):
364
  def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
365
    """Initializes this class.
366

367
    """
368
    self._debug = debug
369
    self._status_file = status_file
370
    self._logger = logger
371

    
372
    self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
373
                           for prog in PROG_ALL])
374

    
375
    self._dd_pid = None
376
    self._dd_ready = False
377
    self._dd_tp_samples = throughput_samples
378
    self._dd_progress = []
379

    
380
    # Expected size of transferred data
381
    self._exp_size = exp_size
382

    
383
  def GetLineSplitter(self, prog):
384
    """Returns the line splitter for a program.
385

386
    """
387
    return self._splitter[prog]
388

    
389
  def FlushAll(self):
390
    """Flushes all line splitters.
391

392
    """
393
    for ls in self._splitter.itervalues():
394
      ls.flush()
395

    
396
  def CloseAll(self):
397
    """Closes all line splitters.
398

399
    """
400
    for ls in self._splitter.itervalues():
401
      ls.close()
402
    self._splitter.clear()
403

    
404
  def NotifyDd(self):
405
    """Tells dd(1) to write statistics.
406

407
    """
408
    if self._dd_pid is None:
409
      # Can't notify
410
      return False
411

    
412
    if not self._dd_ready:
413
      # There's a race condition between starting the program and sending
414
      # signals.  The signal handler is only registered after some time, so we
415
      # have to check whether the program is ready. If it isn't, sending a
416
      # signal will invoke the default handler (and usually abort the program).
417
      if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
418
        logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
419
        return False
420

    
421
      logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
422
      self._dd_ready = True
423

    
424
    logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
425
    try:
426
      os.kill(self._dd_pid, DD_INFO_SIGNAL)
427
    except EnvironmentError, err:
428
      if err.errno != errno.ESRCH:
429
        raise
430

    
431
      # Process no longer exists
432
      logging.debug("dd exited")
433
      self._dd_pid = None
434

    
435
    return True
436

    
437
  def _ProcessOutput(self, line, prog):
438
    """Takes care of child process output.
439

440
    @type line: string
441
    @param line: Child output line
442
    @type prog: number
443
    @param prog: Program from which the line originates
444

445
    """
446
    force_update = False
447
    forward_line = line
448

    
449
    if prog == PROG_SOCAT:
450
      level = None
451
      parts = line.split(None, 4)
452

    
453
      if len(parts) == 5:
454
        (_, _, _, level, msg) = parts
455

    
456
        force_update = self._ProcessSocatOutput(self._status_file, level, msg)
457

    
458
        if self._debug or (level and level not in SOCAT_LOG_IGNORE):
459
          forward_line = "socat: %s %s" % (level, msg)
460
        else:
461
          forward_line = None
462
      else:
463
        forward_line = "socat: %s" % line
464

    
465
    elif prog == PROG_DD:
466
      (should_forward, force_update) = self._ProcessDdOutput(line)
467

    
468
      if should_forward or self._debug:
469
        forward_line = "dd: %s" % line
470
      else:
471
        forward_line = None
472

    
473
    elif prog == PROG_DD_PID:
474
      if self._dd_pid:
475
        raise RuntimeError("dd PID reported more than once")
476
      logging.debug("Received dd PID %r", line)
477
      self._dd_pid = int(line)
478
      forward_line = None
479

    
480
    elif prog == PROG_EXP_SIZE:
481
      logging.debug("Received predicted size %r", line)
482
      forward_line = None
483

    
484
      if line:
485
        try:
486
          exp_size = utils.BytesToMebibyte(int(line))
487
        except (ValueError, TypeError), err:
488
          logging.error("Failed to convert predicted size %r to number: %s",
489
                        line, err)
490
          exp_size = None
491
      else:
492
        exp_size = None
493

    
494
      self._exp_size = exp_size
495

    
496
    if forward_line:
497
      self._logger.info(forward_line)
498
      self._status_file.AddRecentOutput(forward_line)
499

    
500
    self._status_file.Update(force_update)
501

    
502
  @staticmethod
503
  def _ProcessSocatOutput(status_file, level, msg):
504
    """Interprets socat log output.
505

506
    """
507
    if level == SOCAT_LOG_NOTICE:
508
      if status_file.GetListenPort() is None:
509
        # TODO: Maybe implement timeout to not listen forever
510
        m = LISTENING_RE.match(msg)
511
        if m:
512
          (_, port) = _VerifyListening(int(m.group("family")),
513
                                       m.group("address"),
514
                                       int(m.group("port")))
515

    
516
          status_file.SetListenPort(port)
517
          return True
518

    
519
      if not status_file.GetConnected():
520
        m = TRANSFER_LOOP_RE.match(msg)
521
        if m:
522
          logging.debug("Connection established")
523
          status_file.SetConnected()
524
          return True
525

    
526
    return False
527

    
528
  def _ProcessDdOutput(self, line):
529
    """Interprets a line of dd(1)'s output.
530

531
    """
532
    m = DD_INFO_RE.match(line)
533
    if m:
534
      seconds = float(m.group("seconds"))
535
      mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
536
      self._UpdateDdProgress(seconds, mbytes)
537
      return (False, True)
538

    
539
    m = DD_STDERR_IGNORE.match(line)
540
    if m:
541
      # Ignore
542
      return (False, False)
543

    
544
    # Forward line
545
    return (True, False)
546

    
547
  def _UpdateDdProgress(self, seconds, mbytes):
548
    """Updates the internal status variables for dd(1) progress.
549

550
    @type seconds: float
551
    @param seconds: Timestamp of this update
552
    @type mbytes: float
553
    @param mbytes: Total number of MiB transferred so far
554

555
    """
556
    # Add latest sample
557
    self._dd_progress.append((seconds, mbytes))
558

    
559
    # Remove old samples
560
    del self._dd_progress[:-self._dd_tp_samples]
561

    
562
    # Calculate throughput
563
    throughput = _CalcThroughput(self._dd_progress)
564

    
565
    # Calculate percent and ETA
566
    percent = None
567
    eta = None
568

    
569
    if self._exp_size is not None:
570
      if self._exp_size != 0:
571
        percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
572

    
573
      if throughput:
574
        eta = max(0, float(self._exp_size - mbytes) / throughput)
575

    
576
    self._status_file.SetProgress(mbytes, throughput, percent, eta)
577

    
578

    
579
def _CalcThroughput(samples):
580
  """Calculates the throughput in MiB/second.
581

582
  @type samples: sequence
583
  @param samples: List of samples, each consisting of a (timestamp, mbytes)
584
                  tuple
585
  @rtype: float or None
586
  @return: Throughput in MiB/second
587

588
  """
589
  if len(samples) < 2:
590
    # Can't calculate throughput
591
    return None
592

    
593
  (start_time, start_mbytes) = samples[0]
594
  (end_time, end_mbytes) = samples[-1]
595

    
596
  return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)