Statistics
| Branch: | Tag: | Revision:

root / lib / impexpd / __init__.py @ e90739d6

History | View | Annotate | Download (15.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Classes and functions for import/export daemon.
23

24
"""
25

    
26
import os
27
import re
28
import socket
29
import logging
30
import signal
31
import errno
32
import time
33
from cStringIO import StringIO
34

    
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import utils
38

    
39

    
40
#: Used to recognize point at which socat(1) starts to listen on its socket.
41
#: The local address is required for the remote peer to connect (in particular
42
#: the port number).
43
LISTENING_RE = re.compile(r"^listening on\s+"
44
                          r"AF=(?P<family>\d+)\s+"
45
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
46

    
47
#: Used to recognize point at which socat(1) is sending data over the wire
48
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
49
                              re.I)
50

    
51
SOCAT_LOG_DEBUG = "D"
52
SOCAT_LOG_INFO = "I"
53
SOCAT_LOG_NOTICE = "N"
54
SOCAT_LOG_WARNING = "W"
55
SOCAT_LOG_ERROR = "E"
56
SOCAT_LOG_FATAL = "F"
57

    
58
SOCAT_LOG_IGNORE = frozenset([
59
  SOCAT_LOG_DEBUG,
60
  SOCAT_LOG_INFO,
61
  SOCAT_LOG_NOTICE,
62
  ])
63

    
64
#: Used to parse GNU dd(1) statistics
65
DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
66
                        r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
67

    
68
#: Used to ignore "N+N records in/out" on dd(1)'s stderr
69
DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
70

    
71
#: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
72
#: unavailable and SIGUSR1 is used instead)
73
DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
74

    
75
#: Buffer size: at most this many bytes are transferred at once
76
BUFSIZE = 1024 * 1024
77

    
78
# Common options for socat
79
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
80
SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1",
81
                      "cipher=%s" % constants.OPENSSL_CIPHERS]
82

    
83
if constants.SOCAT_USE_COMPRESS:
84
  # Disables all compression in by OpenSSL. Only supported in patched versions
85
  # of socat (as of November 2010). See INSTALL for more information.
86
  SOCAT_OPENSSL_OPTS.append("compress=none")
87

    
88
SOCAT_OPTION_MAXLEN = 400
89

    
90
(PROG_OTHER,
91
 PROG_SOCAT,
92
 PROG_DD,
93
 PROG_DD_PID,
94
 PROG_EXP_SIZE) = range(1, 6)
95
PROG_ALL = frozenset([
96
  PROG_OTHER,
97
  PROG_SOCAT,
98
  PROG_DD,
99
  PROG_DD_PID,
100
  PROG_EXP_SIZE,
101
  ])
102

    
103

    
104
class CommandBuilder(object):
105
  def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
106
    """Initializes this class.
107

108
    @param mode: Daemon mode (import or export)
109
    @param opts: Options object
110
    @type socat_stderr_fd: int
111
    @param socat_stderr_fd: File descriptor socat should write its stderr to
112
    @type dd_stderr_fd: int
113
    @param dd_stderr_fd: File descriptor dd should write its stderr to
114
    @type dd_pid_fd: int
115
    @param dd_pid_fd: File descriptor the child should write dd's PID to
116

117
    """
118
    self._opts = opts
119
    self._mode = mode
120
    self._socat_stderr_fd = socat_stderr_fd
121
    self._dd_stderr_fd = dd_stderr_fd
122
    self._dd_pid_fd = dd_pid_fd
123

    
124
    assert (self._opts.magic is None or
125
            constants.IE_MAGIC_RE.match(self._opts.magic))
126

    
127
  @staticmethod
128
  def GetBashCommand(cmd):
129
    """Prepares a command to be run in Bash.
130

131
    """
132
    return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
133

    
134
  def _GetSocatCommand(self):
135
    """Returns the socat command.
136

137
    """
138
    common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
139
      "key=%s" % self._opts.key,
140
      "cert=%s" % self._opts.cert,
141
      "cafile=%s" % self._opts.ca,
142
      ]
143

    
144
    if self._opts.bind is not None:
145
      common_addr_opts.append("bind=%s" % self._opts.bind)
146

    
147
    if self._mode == constants.IEM_IMPORT:
148
      if self._opts.port is None:
149
        port = 0
150
      else:
151
        port = self._opts.port
152

    
153
      addr1 = [
154
        "OPENSSL-LISTEN:%s" % port,
155
        "reuseaddr",
156

    
157
        # Retry to listen if connection wasn't established successfully, up to
158
        # 100 times a second. Note that this still leaves room for DoS attacks.
159
        "forever",
160
        "intervall=0.01",
161
        ] + common_addr_opts
162
      addr2 = ["stdout"]
163

    
164
    elif self._mode == constants.IEM_EXPORT:
165
      addr1 = ["stdin"]
166
      addr2 = [
167
        "OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
168

    
169
        # How long to wait per connection attempt
170
        "connect-timeout=%s" % self._opts.connect_timeout,
171

    
172
        # Retry a few times before giving up to connect (once per second)
173
        "retry=%s" % self._opts.connect_retries,
174
        "intervall=1",
175
        ] + common_addr_opts
176

    
177
    else:
178
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
179

    
180
    for i in [addr1, addr2]:
181
      for value in i:
182
        if len(value) > SOCAT_OPTION_MAXLEN:
183
          raise errors.GenericError("Socat option longer than %s"
184
                                    " characters: %r" %
185
                                    (SOCAT_OPTION_MAXLEN, value))
186
        if "," in value:
187
          raise errors.GenericError("Comma not allowed in socat option"
188
                                    " value: %r" % value)
189

    
190
    return [
191
      constants.SOCAT_PATH,
192

    
193
      # Log to stderr
194
      "-ls",
195

    
196
      # Log level
197
      "-d", "-d",
198

    
199
      # Buffer size
200
      "-b%s" % BUFSIZE,
201

    
202
      # Unidirectional mode, the first address is only used for reading, and the
203
      # second address is only used for writing
204
      "-u",
205

    
206
      ",".join(addr1), ",".join(addr2)
207
      ]
208

    
209
  def _GetMagicCommand(self):
210
    """Returns the command to read/write the magic value.
211

212
    """
213
    if not self._opts.magic:
214
      return None
215

    
216
    # Prefix to ensure magic isn't interpreted as option to "echo"
217
    magic = "M=%s" % self._opts.magic
218

    
219
    cmd = StringIO()
220

    
221
    if self._mode == constants.IEM_IMPORT:
222
      cmd.write("{ ")
223
      cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"]))
224
      cmd.write(" && ")
225
      cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic))
226
      cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch"))
227
      cmd.write(" exit 1;")
228
      cmd.write("fi;")
229
      cmd.write(" }")
230

    
231
    elif self._mode == constants.IEM_EXPORT:
232
      cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic]))
233

    
234
    else:
235
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
236

    
237
    return cmd.getvalue()
238

    
239
  def _GetDdCommand(self):
240
    """Returns the command for measuring throughput.
241

242
    """
243
    dd_cmd = StringIO()
244

    
245
    magic_cmd = self._GetMagicCommand()
246
    if magic_cmd:
247
      dd_cmd.write("{ ")
248
      dd_cmd.write(magic_cmd)
249
      dd_cmd.write(" && ")
250

    
251
    dd_cmd.write("{ ")
252
    # Setting LC_ALL since we want to parse the output and explicitely
253
    # redirecting stdin, as the background process (dd) would have /dev/null as
254
    # stdin otherwise
255
    dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
256
                 (BUFSIZE, self._dd_stderr_fd))
257
    # Send PID to daemon
258
    dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
259
    # And wait for dd
260
    dd_cmd.write(" wait $pid;")
261
    dd_cmd.write(" }")
262

    
263
    if magic_cmd:
264
      dd_cmd.write(" }")
265

    
266
    return dd_cmd.getvalue()
267

    
268
  def _GetTransportCommand(self):
269
    """Returns the command for the transport part of the daemon.
270

271
    """
272
    socat_cmd = ("%s 2>&%d" %
273
                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
274
                  self._socat_stderr_fd))
275
    dd_cmd = self._GetDdCommand()
276

    
277
    compr = self._opts.compress
278

    
279
    assert compr in constants.IEC_ALL
280

    
281
    parts = []
282

    
283
    if self._mode == constants.IEM_IMPORT:
284
      parts.append(socat_cmd)
285

    
286
      if compr == constants.IEC_GZIP:
287
        parts.append("gunzip -c")
288

    
289
      parts.append(dd_cmd)
290

    
291
    elif self._mode == constants.IEM_EXPORT:
292
      parts.append(dd_cmd)
293

    
294
      if compr == constants.IEC_GZIP:
295
        parts.append("gzip -c")
296

    
297
      parts.append(socat_cmd)
298

    
299
    else:
300
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
301

    
302
    # TODO: Run transport as separate user
303
    # The transport uses its own shell to simplify running it as a separate user
304
    # in the future.
305
    return self.GetBashCommand(" | ".join(parts))
306

    
307
  def GetCommand(self):
308
    """Returns the complete child process command.
309

310
    """
311
    transport_cmd = self._GetTransportCommand()
312

    
313
    buf = StringIO()
314

    
315
    if self._opts.cmd_prefix:
316
      buf.write(self._opts.cmd_prefix)
317
      buf.write(" ")
318

    
319
    buf.write(utils.ShellQuoteArgs(transport_cmd))
320

    
321
    if self._opts.cmd_suffix:
322
      buf.write(" ")
323
      buf.write(self._opts.cmd_suffix)
324

    
325
    return self.GetBashCommand(buf.getvalue())
326

    
327

    
328
def _VerifyListening(family, address, port):
329
  """Verify address given as listening address by socat.
330

331
  """
332
  # TODO: Implement IPv6 support
333
  if family != socket.AF_INET:
334
    raise errors.GenericError("Address family %r not supported" % family)
335

    
336
  try:
337
    packed_address = socket.inet_pton(family, address)
338
  except socket.error:
339
    raise errors.GenericError("Invalid address %r for family %s" %
340
                              (address, family))
341

    
342
  return (socket.inet_ntop(family, packed_address), port)
343

    
344

    
345
class ChildIOProcessor(object):
346
  def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
347
    """Initializes this class.
348

349
    """
350
    self._debug = debug
351
    self._status_file = status_file
352
    self._logger = logger
353

    
354
    self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
355
                           for prog in PROG_ALL])
356

    
357
    self._dd_pid = None
358
    self._dd_ready = False
359
    self._dd_tp_samples = throughput_samples
360
    self._dd_progress = []
361

    
362
    # Expected size of transferred data
363
    self._exp_size = exp_size
364

    
365
  def GetLineSplitter(self, prog):
366
    """Returns the line splitter for a program.
367

368
    """
369
    return self._splitter[prog]
370

    
371
  def FlushAll(self):
372
    """Flushes all line splitters.
373

374
    """
375
    for ls in self._splitter.itervalues():
376
      ls.flush()
377

    
378
  def CloseAll(self):
379
    """Closes all line splitters.
380

381
    """
382
    for ls in self._splitter.itervalues():
383
      ls.close()
384
    self._splitter.clear()
385

    
386
  def NotifyDd(self):
387
    """Tells dd(1) to write statistics.
388

389
    """
390
    if self._dd_pid is None:
391
      # Can't notify
392
      return False
393

    
394
    if not self._dd_ready:
395
      # There's a race condition between starting the program and sending
396
      # signals.  The signal handler is only registered after some time, so we
397
      # have to check whether the program is ready. If it isn't, sending a
398
      # signal will invoke the default handler (and usually abort the program).
399
      if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
400
        logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
401
        return False
402

    
403
      logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
404
      self._dd_ready = True
405

    
406
    logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
407
    try:
408
      os.kill(self._dd_pid, DD_INFO_SIGNAL)
409
    except EnvironmentError, err:
410
      if err.errno != errno.ESRCH:
411
        raise
412

    
413
      # Process no longer exists
414
      logging.debug("dd exited")
415
      self._dd_pid = None
416

    
417
    return True
418

    
419
  def _ProcessOutput(self, line, prog):
420
    """Takes care of child process output.
421

422
    @type line: string
423
    @param line: Child output line
424
    @type prog: number
425
    @param prog: Program from which the line originates
426

427
    """
428
    force_update = False
429
    forward_line = line
430

    
431
    if prog == PROG_SOCAT:
432
      level = None
433
      parts = line.split(None, 4)
434

    
435
      if len(parts) == 5:
436
        (_, _, _, level, msg) = parts
437

    
438
        force_update = self._ProcessSocatOutput(self._status_file, level, msg)
439

    
440
        if self._debug or (level and level not in SOCAT_LOG_IGNORE):
441
          forward_line = "socat: %s %s" % (level, msg)
442
        else:
443
          forward_line = None
444
      else:
445
        forward_line = "socat: %s" % line
446

    
447
    elif prog == PROG_DD:
448
      (should_forward, force_update) = self._ProcessDdOutput(line)
449

    
450
      if should_forward or self._debug:
451
        forward_line = "dd: %s" % line
452
      else:
453
        forward_line = None
454

    
455
    elif prog == PROG_DD_PID:
456
      if self._dd_pid:
457
        raise RuntimeError("dd PID reported more than once")
458
      logging.debug("Received dd PID %r", line)
459
      self._dd_pid = int(line)
460
      forward_line = None
461

    
462
    elif prog == PROG_EXP_SIZE:
463
      logging.debug("Received predicted size %r", line)
464
      forward_line = None
465

    
466
      if line:
467
        try:
468
          exp_size = utils.BytesToMebibyte(int(line))
469
        except (ValueError, TypeError), err:
470
          logging.error("Failed to convert predicted size %r to number: %s",
471
                        line, err)
472
          exp_size = None
473
      else:
474
        exp_size = None
475

    
476
      self._exp_size = exp_size
477

    
478
    if forward_line:
479
      self._logger.info(forward_line)
480
      self._status_file.AddRecentOutput(forward_line)
481

    
482
    self._status_file.Update(force_update)
483

    
484
  @staticmethod
485
  def _ProcessSocatOutput(status_file, level, msg):
486
    """Interprets socat log output.
487

488
    """
489
    if level == SOCAT_LOG_NOTICE:
490
      if status_file.GetListenPort() is None:
491
        # TODO: Maybe implement timeout to not listen forever
492
        m = LISTENING_RE.match(msg)
493
        if m:
494
          (_, port) = _VerifyListening(int(m.group("family")),
495
                                       m.group("address"),
496
                                       int(m.group("port")))
497

    
498
          status_file.SetListenPort(port)
499
          return True
500

    
501
      if not status_file.GetConnected():
502
        m = TRANSFER_LOOP_RE.match(msg)
503
        if m:
504
          logging.debug("Connection established")
505
          status_file.SetConnected()
506
          return True
507

    
508
    return False
509

    
510
  def _ProcessDdOutput(self, line):
511
    """Interprets a line of dd(1)'s output.
512

513
    """
514
    m = DD_INFO_RE.match(line)
515
    if m:
516
      seconds = float(m.group("seconds"))
517
      mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
518
      self._UpdateDdProgress(seconds, mbytes)
519
      return (False, True)
520

    
521
    m = DD_STDERR_IGNORE.match(line)
522
    if m:
523
      # Ignore
524
      return (False, False)
525

    
526
    # Forward line
527
    return (True, False)
528

    
529
  def _UpdateDdProgress(self, seconds, mbytes):
530
    """Updates the internal status variables for dd(1) progress.
531

532
    @type seconds: float
533
    @param seconds: Timestamp of this update
534
    @type mbytes: float
535
    @param mbytes: Total number of MiB transferred so far
536

537
    """
538
    # Add latest sample
539
    self._dd_progress.append((seconds, mbytes))
540

    
541
    # Remove old samples
542
    del self._dd_progress[:-self._dd_tp_samples]
543

    
544
    # Calculate throughput
545
    throughput = _CalcThroughput(self._dd_progress)
546

    
547
    # Calculate percent and ETA
548
    percent = None
549
    eta = None
550

    
551
    if self._exp_size is not None:
552
      if self._exp_size != 0:
553
        percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
554

    
555
      if throughput:
556
        eta = max(0, float(self._exp_size - mbytes) / throughput)
557

    
558
    self._status_file.SetProgress(mbytes, throughput, percent, eta)
559

    
560

    
561
def _CalcThroughput(samples):
562
  """Calculates the throughput in MiB/second.
563

564
  @type samples: sequence
565
  @param samples: List of samples, each consisting of a (timestamp, mbytes)
566
                  tuple
567
  @rtype: float or None
568
  @return: Throughput in MiB/second
569

570
  """
571
  if len(samples) < 2:
572
    # Can't calculate throughput
573
    return None
574

    
575
  (start_time, start_mbytes) = samples[0]
576
  (end_time, end_mbytes) = samples[-1]
577

    
578
  return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)