Statistics
| Branch: | Tag: | Revision:

root / lib / impexpd / __init__.py @ 1d3dfa29

History | View | Annotate | Download (15.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Classes and functions for import/export daemon.
23

24
"""
25

    
26
import os
27
import re
28
import socket
29
import logging
30
import signal
31
import errno
32
import time
33
from cStringIO import StringIO
34

    
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import utils
38

    
39

    
40
#: Used to recognize point at which socat(1) starts to listen on its socket.
41
#: The local address is required for the remote peer to connect (in particular
42
#: the port number).
43
LISTENING_RE = re.compile(r"^listening on\s+"
44
                          r"AF=(?P<family>\d+)\s+"
45
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
46

    
47
#: Used to recognize point at which socat(1) is sending data over the wire
48
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
49
                              re.I)
50

    
51
SOCAT_LOG_DEBUG = "D"
52
SOCAT_LOG_INFO = "I"
53
SOCAT_LOG_NOTICE = "N"
54
SOCAT_LOG_WARNING = "W"
55
SOCAT_LOG_ERROR = "E"
56
SOCAT_LOG_FATAL = "F"
57

    
58
SOCAT_LOG_IGNORE = frozenset([
59
  SOCAT_LOG_DEBUG,
60
  SOCAT_LOG_INFO,
61
  SOCAT_LOG_NOTICE,
62
  ])
63

    
64
#: Used to parse GNU dd(1) statistics
65
DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
66
                        r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
67

    
68
#: Used to ignore "N+N records in/out" on dd(1)'s stderr
69
DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
70

    
71
#: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
72
#: unavailable and SIGUSR1 is used instead)
73
DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
74

    
75
#: Buffer size: at most this many bytes are transferred at once
76
BUFSIZE = 1024 * 1024
77

    
78
# Common options for socat
79
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
80
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
81

    
82
SOCAT_OPTION_MAXLEN = 400
83

    
84
(PROG_OTHER,
85
 PROG_SOCAT,
86
 PROG_DD,
87
 PROG_DD_PID,
88
 PROG_EXP_SIZE) = range(1, 6)
89
PROG_ALL = frozenset([
90
  PROG_OTHER,
91
  PROG_SOCAT,
92
  PROG_DD,
93
  PROG_DD_PID,
94
  PROG_EXP_SIZE,
95
  ])
96

    
97

    
98
class CommandBuilder(object):
99
  def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
100
    """Initializes this class.
101

102
    @param mode: Daemon mode (import or export)
103
    @param opts: Options object
104
    @type socat_stderr_fd: int
105
    @param socat_stderr_fd: File descriptor socat should write its stderr to
106
    @type dd_stderr_fd: int
107
    @param dd_stderr_fd: File descriptor dd should write its stderr to
108
    @type dd_pid_fd: int
109
    @param dd_pid_fd: File descriptor the child should write dd's PID to
110

111
    """
112
    self._opts = opts
113
    self._mode = mode
114
    self._socat_stderr_fd = socat_stderr_fd
115
    self._dd_stderr_fd = dd_stderr_fd
116
    self._dd_pid_fd = dd_pid_fd
117

    
118
    assert (self._opts.magic is None or
119
            constants.IE_MAGIC_RE.match(self._opts.magic))
120

    
121
  @staticmethod
122
  def GetBashCommand(cmd):
123
    """Prepares a command to be run in Bash.
124

125
    """
126
    return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
127

    
128
  def _GetSocatCommand(self):
129
    """Returns the socat command.
130

131
    """
132
    common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
133
      "key=%s" % self._opts.key,
134
      "cert=%s" % self._opts.cert,
135
      "cafile=%s" % self._opts.ca,
136
      ]
137

    
138
    if self._opts.bind is not None:
139
      common_addr_opts.append("bind=%s" % self._opts.bind)
140

    
141
    if self._mode == constants.IEM_IMPORT:
142
      if self._opts.port is None:
143
        port = 0
144
      else:
145
        port = self._opts.port
146

    
147
      addr1 = [
148
        "OPENSSL-LISTEN:%s" % port,
149
        "reuseaddr",
150

    
151
        # Retry to listen if connection wasn't established successfully, up to
152
        # 100 times a second. Note that this still leaves room for DoS attacks.
153
        "forever",
154
        "intervall=0.01",
155
        ] + common_addr_opts
156
      addr2 = ["stdout"]
157

    
158
    elif self._mode == constants.IEM_EXPORT:
159
      addr1 = ["stdin"]
160
      addr2 = [
161
        "OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
162

    
163
        # How long to wait per connection attempt
164
        "connect-timeout=%s" % self._opts.connect_timeout,
165

    
166
        # Retry a few times before giving up to connect (once per second)
167
        "retry=%s" % self._opts.connect_retries,
168
        "intervall=1",
169
        ] + common_addr_opts
170

    
171
    else:
172
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
173

    
174
    for i in [addr1, addr2]:
175
      for value in i:
176
        if len(value) > SOCAT_OPTION_MAXLEN:
177
          raise errors.GenericError("Socat option longer than %s"
178
                                    " characters: %r" %
179
                                    (SOCAT_OPTION_MAXLEN, value))
180
        if "," in value:
181
          raise errors.GenericError("Comma not allowed in socat option"
182
                                    " value: %r" % value)
183

    
184
    return [
185
      constants.SOCAT_PATH,
186

    
187
      # Log to stderr
188
      "-ls",
189

    
190
      # Log level
191
      "-d", "-d",
192

    
193
      # Buffer size
194
      "-b%s" % BUFSIZE,
195

    
196
      # Unidirectional mode, the first address is only used for reading, and the
197
      # second address is only used for writing
198
      "-u",
199

    
200
      ",".join(addr1), ",".join(addr2)
201
      ]
202

    
203
  def _GetMagicCommand(self):
204
    """Returns the command to read/write the magic value.
205

206
    """
207
    if not self._opts.magic:
208
      return None
209

    
210
    # Prefix to ensure magic isn't interpreted as option to "echo"
211
    magic = "M=%s" % self._opts.magic
212

    
213
    cmd = StringIO()
214

    
215
    if self._mode == constants.IEM_IMPORT:
216
      cmd.write("{ ")
217
      cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"]))
218
      cmd.write(" && ")
219
      cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic))
220
      cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch"))
221
      cmd.write(" exit 1;")
222
      cmd.write("fi;")
223
      cmd.write(" }")
224

    
225
    elif self._mode == constants.IEM_EXPORT:
226
      cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic]))
227

    
228
    else:
229
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
230

    
231
    return cmd.getvalue()
232

    
233
  def _GetDdCommand(self):
234
    """Returns the command for measuring throughput.
235

236
    """
237
    dd_cmd = StringIO()
238

    
239
    magic_cmd = self._GetMagicCommand()
240
    if magic_cmd:
241
      dd_cmd.write("{ ")
242
      dd_cmd.write(magic_cmd)
243
      dd_cmd.write(" && ")
244

    
245
    dd_cmd.write("{ ")
246
    # Setting LC_ALL since we want to parse the output and explicitely
247
    # redirecting stdin, as the background process (dd) would have /dev/null as
248
    # stdin otherwise
249
    dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
250
                 (BUFSIZE, self._dd_stderr_fd))
251
    # Send PID to daemon
252
    dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
253
    # And wait for dd
254
    dd_cmd.write(" wait $pid;")
255
    dd_cmd.write(" }")
256

    
257
    if magic_cmd:
258
      dd_cmd.write(" }")
259

    
260
    return dd_cmd.getvalue()
261

    
262
  def _GetTransportCommand(self):
263
    """Returns the command for the transport part of the daemon.
264

265
    """
266
    socat_cmd = ("%s 2>&%d" %
267
                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
268
                  self._socat_stderr_fd))
269
    dd_cmd = self._GetDdCommand()
270

    
271
    compr = self._opts.compress
272

    
273
    assert compr in constants.IEC_ALL
274

    
275
    parts = []
276

    
277
    if self._mode == constants.IEM_IMPORT:
278
      parts.append(socat_cmd)
279

    
280
      if compr == constants.IEC_GZIP:
281
        parts.append("gunzip -c")
282

    
283
      parts.append(dd_cmd)
284

    
285
    elif self._mode == constants.IEM_EXPORT:
286
      parts.append(dd_cmd)
287

    
288
      if compr == constants.IEC_GZIP:
289
        parts.append("gzip -c")
290

    
291
      parts.append(socat_cmd)
292

    
293
    else:
294
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
295

    
296
    # TODO: Run transport as separate user
297
    # The transport uses its own shell to simplify running it as a separate user
298
    # in the future.
299
    return self.GetBashCommand(" | ".join(parts))
300

    
301
  def GetCommand(self):
302
    """Returns the complete child process command.
303

304
    """
305
    transport_cmd = self._GetTransportCommand()
306

    
307
    buf = StringIO()
308

    
309
    if self._opts.cmd_prefix:
310
      buf.write(self._opts.cmd_prefix)
311
      buf.write(" ")
312

    
313
    buf.write(utils.ShellQuoteArgs(transport_cmd))
314

    
315
    if self._opts.cmd_suffix:
316
      buf.write(" ")
317
      buf.write(self._opts.cmd_suffix)
318

    
319
    return self.GetBashCommand(buf.getvalue())
320

    
321

    
322
def _VerifyListening(family, address, port):
323
  """Verify address given as listening address by socat.
324

325
  """
326
  # TODO: Implement IPv6 support
327
  if family != socket.AF_INET:
328
    raise errors.GenericError("Address family %r not supported" % family)
329

    
330
  try:
331
    packed_address = socket.inet_pton(family, address)
332
  except socket.error:
333
    raise errors.GenericError("Invalid address %r for family %s" %
334
                              (address, family))
335

    
336
  return (socket.inet_ntop(family, packed_address), port)
337

    
338

    
339
class ChildIOProcessor(object):
340
  def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
341
    """Initializes this class.
342

343
    """
344
    self._debug = debug
345
    self._status_file = status_file
346
    self._logger = logger
347

    
348
    self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
349
                           for prog in PROG_ALL])
350

    
351
    self._dd_pid = None
352
    self._dd_ready = False
353
    self._dd_tp_samples = throughput_samples
354
    self._dd_progress = []
355

    
356
    # Expected size of transferred data
357
    self._exp_size = exp_size
358

    
359
  def GetLineSplitter(self, prog):
360
    """Returns the line splitter for a program.
361

362
    """
363
    return self._splitter[prog]
364

    
365
  def FlushAll(self):
366
    """Flushes all line splitters.
367

368
    """
369
    for ls in self._splitter.itervalues():
370
      ls.flush()
371

    
372
  def CloseAll(self):
373
    """Closes all line splitters.
374

375
    """
376
    for ls in self._splitter.itervalues():
377
      ls.close()
378
    self._splitter.clear()
379

    
380
  def NotifyDd(self):
381
    """Tells dd(1) to write statistics.
382

383
    """
384
    if self._dd_pid is None:
385
      # Can't notify
386
      return False
387

    
388
    if not self._dd_ready:
389
      # There's a race condition between starting the program and sending
390
      # signals.  The signal handler is only registered after some time, so we
391
      # have to check whether the program is ready. If it isn't, sending a
392
      # signal will invoke the default handler (and usually abort the program).
393
      if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
394
        logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
395
        return False
396

    
397
      logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
398
      self._dd_ready = True
399

    
400
    logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
401
    try:
402
      os.kill(self._dd_pid, DD_INFO_SIGNAL)
403
    except EnvironmentError, err:
404
      if err.errno != errno.ESRCH:
405
        raise
406

    
407
      # Process no longer exists
408
      logging.debug("dd exited")
409
      self._dd_pid = None
410

    
411
    return True
412

    
413
  def _ProcessOutput(self, line, prog):
414
    """Takes care of child process output.
415

416
    @type line: string
417
    @param line: Child output line
418
    @type prog: number
419
    @param prog: Program from which the line originates
420

421
    """
422
    force_update = False
423
    forward_line = line
424

    
425
    if prog == PROG_SOCAT:
426
      level = None
427
      parts = line.split(None, 4)
428

    
429
      if len(parts) == 5:
430
        (_, _, _, level, msg) = parts
431

    
432
        force_update = self._ProcessSocatOutput(self._status_file, level, msg)
433

    
434
        if self._debug or (level and level not in SOCAT_LOG_IGNORE):
435
          forward_line = "socat: %s %s" % (level, msg)
436
        else:
437
          forward_line = None
438
      else:
439
        forward_line = "socat: %s" % line
440

    
441
    elif prog == PROG_DD:
442
      (should_forward, force_update) = self._ProcessDdOutput(line)
443

    
444
      if should_forward or self._debug:
445
        forward_line = "dd: %s" % line
446
      else:
447
        forward_line = None
448

    
449
    elif prog == PROG_DD_PID:
450
      if self._dd_pid:
451
        raise RuntimeError("dd PID reported more than once")
452
      logging.debug("Received dd PID %r", line)
453
      self._dd_pid = int(line)
454
      forward_line = None
455

    
456
    elif prog == PROG_EXP_SIZE:
457
      logging.debug("Received predicted size %r", line)
458
      forward_line = None
459

    
460
      if line:
461
        try:
462
          exp_size = utils.BytesToMebibyte(int(line))
463
        except (ValueError, TypeError), err:
464
          logging.error("Failed to convert predicted size %r to number: %s",
465
                        line, err)
466
          exp_size = None
467
      else:
468
        exp_size = None
469

    
470
      self._exp_size = exp_size
471

    
472
    if forward_line:
473
      self._logger.info(forward_line)
474
      self._status_file.AddRecentOutput(forward_line)
475

    
476
    self._status_file.Update(force_update)
477

    
478
  @staticmethod
479
  def _ProcessSocatOutput(status_file, level, msg):
480
    """Interprets socat log output.
481

482
    """
483
    if level == SOCAT_LOG_NOTICE:
484
      if status_file.GetListenPort() is None:
485
        # TODO: Maybe implement timeout to not listen forever
486
        m = LISTENING_RE.match(msg)
487
        if m:
488
          (_, port) = _VerifyListening(int(m.group("family")),
489
                                       m.group("address"),
490
                                       int(m.group("port")))
491

    
492
          status_file.SetListenPort(port)
493
          return True
494

    
495
      if not status_file.GetConnected():
496
        m = TRANSFER_LOOP_RE.match(msg)
497
        if m:
498
          logging.debug("Connection established")
499
          status_file.SetConnected()
500
          return True
501

    
502
    return False
503

    
504
  def _ProcessDdOutput(self, line):
505
    """Interprets a line of dd(1)'s output.
506

507
    """
508
    m = DD_INFO_RE.match(line)
509
    if m:
510
      seconds = float(m.group("seconds"))
511
      mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
512
      self._UpdateDdProgress(seconds, mbytes)
513
      return (False, True)
514

    
515
    m = DD_STDERR_IGNORE.match(line)
516
    if m:
517
      # Ignore
518
      return (False, False)
519

    
520
    # Forward line
521
    return (True, False)
522

    
523
  def _UpdateDdProgress(self, seconds, mbytes):
524
    """Updates the internal status variables for dd(1) progress.
525

526
    @type seconds: float
527
    @param seconds: Timestamp of this update
528
    @type mbytes: float
529
    @param mbytes: Total number of MiB transferred so far
530

531
    """
532
    # Add latest sample
533
    self._dd_progress.append((seconds, mbytes))
534

    
535
    # Remove old samples
536
    del self._dd_progress[:-self._dd_tp_samples]
537

    
538
    # Calculate throughput
539
    throughput = _CalcThroughput(self._dd_progress)
540

    
541
    # Calculate percent and ETA
542
    percent = None
543
    eta = None
544

    
545
    if self._exp_size is not None:
546
      if self._exp_size != 0:
547
        percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
548

    
549
      if throughput:
550
        eta = max(0, float(self._exp_size - mbytes) / throughput)
551

    
552
    self._status_file.SetProgress(mbytes, throughput, percent, eta)
553

    
554

    
555
def _CalcThroughput(samples):
556
  """Calculates the throughput in MiB/second.
557

558
  @type samples: sequence
559
  @param samples: List of samples, each consisting of a (timestamp, mbytes)
560
                  tuple
561
  @rtype: float or None
562
  @return: Throughput in MiB/second
563

564
  """
565
  if len(samples) < 2:
566
    # Can't calculate throughput
567
    return None
568

    
569
  (start_time, start_mbytes) = samples[0]
570
  (end_time, end_mbytes) = samples[-1]
571

    
572
  return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)