Statistics
| Branch: | Tag: | Revision:

root / lib / impexpd / __init__.py @ 84a12e40

History | View | Annotate | Download (15.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Classes and functions for import/export daemon.
23

24
"""
25

    
26
import os
27
import re
28
import socket
29
import logging
30
import signal
31
import errno
32
import time
33
from cStringIO import StringIO
34

    
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import utils
38

    
39

    
40
#: Used to recognize point at which socat(1) starts to listen on its socket.
41
#: The local address is required for the remote peer to connect (in particular
42
#: the port number).
43
LISTENING_RE = re.compile(r"^listening on\s+"
44
                          r"AF=(?P<family>\d+)\s+"
45
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
46

    
47
#: Used to recognize point at which socat(1) is sending data over the wire
48
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
49
                              re.I)
50

    
51
SOCAT_LOG_DEBUG = "D"
52
SOCAT_LOG_INFO = "I"
53
SOCAT_LOG_NOTICE = "N"
54
SOCAT_LOG_WARNING = "W"
55
SOCAT_LOG_ERROR = "E"
56
SOCAT_LOG_FATAL = "F"
57

    
58
SOCAT_LOG_IGNORE = frozenset([
59
  SOCAT_LOG_DEBUG,
60
  SOCAT_LOG_INFO,
61
  SOCAT_LOG_NOTICE,
62
  ])
63

    
64
#: Used to parse GNU dd(1) statistics
65
DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
66
                        r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
67

    
68
#: Used to ignore "N+N records in/out" on dd(1)'s stderr
69
DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
70

    
71
#: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
72
#: unavailable and SIGUSR1 is used instead)
73
DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
74

    
75
#: Buffer size: at most this many bytes are transferred at once
76
BUFSIZE = 1024 * 1024
77

    
78
# Common options for socat
79
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
80
SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1",
81
                      "cipher=%s" % constants.OPENSSL_CIPHERS]
82

    
83
SOCAT_OPTION_MAXLEN = 400
84

    
85
(PROG_OTHER,
86
 PROG_SOCAT,
87
 PROG_DD,
88
 PROG_DD_PID,
89
 PROG_EXP_SIZE) = range(1, 6)
90
PROG_ALL = frozenset([
91
  PROG_OTHER,
92
  PROG_SOCAT,
93
  PROG_DD,
94
  PROG_DD_PID,
95
  PROG_EXP_SIZE,
96
  ])
97

    
98

    
99
class CommandBuilder(object):
100
  def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
101
    """Initializes this class.
102

103
    @param mode: Daemon mode (import or export)
104
    @param opts: Options object
105
    @type socat_stderr_fd: int
106
    @param socat_stderr_fd: File descriptor socat should write its stderr to
107
    @type dd_stderr_fd: int
108
    @param dd_stderr_fd: File descriptor dd should write its stderr to
109
    @type dd_pid_fd: int
110
    @param dd_pid_fd: File descriptor the child should write dd's PID to
111

112
    """
113
    self._opts = opts
114
    self._mode = mode
115
    self._socat_stderr_fd = socat_stderr_fd
116
    self._dd_stderr_fd = dd_stderr_fd
117
    self._dd_pid_fd = dd_pid_fd
118

    
119
    assert (self._opts.magic is None or
120
            constants.IE_MAGIC_RE.match(self._opts.magic))
121

    
122
  @staticmethod
123
  def GetBashCommand(cmd):
124
    """Prepares a command to be run in Bash.
125

126
    """
127
    return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
128

    
129
  def _GetSocatCommand(self):
130
    """Returns the socat command.
131

132
    """
133
    common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
134
      "key=%s" % self._opts.key,
135
      "cert=%s" % self._opts.cert,
136
      "cafile=%s" % self._opts.ca,
137
      ]
138

    
139
    if self._opts.bind is not None:
140
      common_addr_opts.append("bind=%s" % self._opts.bind)
141

    
142
    if self._mode == constants.IEM_IMPORT:
143
      if self._opts.port is None:
144
        port = 0
145
      else:
146
        port = self._opts.port
147

    
148
      addr1 = [
149
        "OPENSSL-LISTEN:%s" % port,
150
        "reuseaddr",
151

    
152
        # Retry to listen if connection wasn't established successfully, up to
153
        # 100 times a second. Note that this still leaves room for DoS attacks.
154
        "forever",
155
        "intervall=0.01",
156
        ] + common_addr_opts
157
      addr2 = ["stdout"]
158

    
159
    elif self._mode == constants.IEM_EXPORT:
160
      addr1 = ["stdin"]
161
      addr2 = [
162
        "OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
163

    
164
        # How long to wait per connection attempt
165
        "connect-timeout=%s" % self._opts.connect_timeout,
166

    
167
        # Retry a few times before giving up to connect (once per second)
168
        "retry=%s" % self._opts.connect_retries,
169
        "intervall=1",
170
        ] + common_addr_opts
171

    
172
    else:
173
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
174

    
175
    for i in [addr1, addr2]:
176
      for value in i:
177
        if len(value) > SOCAT_OPTION_MAXLEN:
178
          raise errors.GenericError("Socat option longer than %s"
179
                                    " characters: %r" %
180
                                    (SOCAT_OPTION_MAXLEN, value))
181
        if "," in value:
182
          raise errors.GenericError("Comma not allowed in socat option"
183
                                    " value: %r" % value)
184

    
185
    return [
186
      constants.SOCAT_PATH,
187

    
188
      # Log to stderr
189
      "-ls",
190

    
191
      # Log level
192
      "-d", "-d",
193

    
194
      # Buffer size
195
      "-b%s" % BUFSIZE,
196

    
197
      # Unidirectional mode, the first address is only used for reading, and the
198
      # second address is only used for writing
199
      "-u",
200

    
201
      ",".join(addr1), ",".join(addr2)
202
      ]
203

    
204
  def _GetMagicCommand(self):
205
    """Returns the command to read/write the magic value.
206

207
    """
208
    if not self._opts.magic:
209
      return None
210

    
211
    # Prefix to ensure magic isn't interpreted as option to "echo"
212
    magic = "M=%s" % self._opts.magic
213

    
214
    cmd = StringIO()
215

    
216
    if self._mode == constants.IEM_IMPORT:
217
      cmd.write("{ ")
218
      cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"]))
219
      cmd.write(" && ")
220
      cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic))
221
      cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch"))
222
      cmd.write(" exit 1;")
223
      cmd.write("fi;")
224
      cmd.write(" }")
225

    
226
    elif self._mode == constants.IEM_EXPORT:
227
      cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic]))
228

    
229
    else:
230
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
231

    
232
    return cmd.getvalue()
233

    
234
  def _GetDdCommand(self):
235
    """Returns the command for measuring throughput.
236

237
    """
238
    dd_cmd = StringIO()
239

    
240
    magic_cmd = self._GetMagicCommand()
241
    if magic_cmd:
242
      dd_cmd.write("{ ")
243
      dd_cmd.write(magic_cmd)
244
      dd_cmd.write(" && ")
245

    
246
    dd_cmd.write("{ ")
247
    # Setting LC_ALL since we want to parse the output and explicitely
248
    # redirecting stdin, as the background process (dd) would have /dev/null as
249
    # stdin otherwise
250
    dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
251
                 (BUFSIZE, self._dd_stderr_fd))
252
    # Send PID to daemon
253
    dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
254
    # And wait for dd
255
    dd_cmd.write(" wait $pid;")
256
    dd_cmd.write(" }")
257

    
258
    if magic_cmd:
259
      dd_cmd.write(" }")
260

    
261
    return dd_cmd.getvalue()
262

    
263
  def _GetTransportCommand(self):
264
    """Returns the command for the transport part of the daemon.
265

266
    """
267
    socat_cmd = ("%s 2>&%d" %
268
                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
269
                  self._socat_stderr_fd))
270
    dd_cmd = self._GetDdCommand()
271

    
272
    compr = self._opts.compress
273

    
274
    assert compr in constants.IEC_ALL
275

    
276
    parts = []
277

    
278
    if self._mode == constants.IEM_IMPORT:
279
      parts.append(socat_cmd)
280

    
281
      if compr == constants.IEC_GZIP:
282
        parts.append("gunzip -c")
283

    
284
      parts.append(dd_cmd)
285

    
286
    elif self._mode == constants.IEM_EXPORT:
287
      parts.append(dd_cmd)
288

    
289
      if compr == constants.IEC_GZIP:
290
        parts.append("gzip -c")
291

    
292
      parts.append(socat_cmd)
293

    
294
    else:
295
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
296

    
297
    # TODO: Run transport as separate user
298
    # The transport uses its own shell to simplify running it as a separate user
299
    # in the future.
300
    return self.GetBashCommand(" | ".join(parts))
301

    
302
  def GetCommand(self):
303
    """Returns the complete child process command.
304

305
    """
306
    transport_cmd = self._GetTransportCommand()
307

    
308
    buf = StringIO()
309

    
310
    if self._opts.cmd_prefix:
311
      buf.write(self._opts.cmd_prefix)
312
      buf.write(" ")
313

    
314
    buf.write(utils.ShellQuoteArgs(transport_cmd))
315

    
316
    if self._opts.cmd_suffix:
317
      buf.write(" ")
318
      buf.write(self._opts.cmd_suffix)
319

    
320
    return self.GetBashCommand(buf.getvalue())
321

    
322

    
323
def _VerifyListening(family, address, port):
324
  """Verify address given as listening address by socat.
325

326
  """
327
  # TODO: Implement IPv6 support
328
  if family != socket.AF_INET:
329
    raise errors.GenericError("Address family %r not supported" % family)
330

    
331
  try:
332
    packed_address = socket.inet_pton(family, address)
333
  except socket.error:
334
    raise errors.GenericError("Invalid address %r for family %s" %
335
                              (address, family))
336

    
337
  return (socket.inet_ntop(family, packed_address), port)
338

    
339

    
340
class ChildIOProcessor(object):
341
  def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
342
    """Initializes this class.
343

344
    """
345
    self._debug = debug
346
    self._status_file = status_file
347
    self._logger = logger
348

    
349
    self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
350
                           for prog in PROG_ALL])
351

    
352
    self._dd_pid = None
353
    self._dd_ready = False
354
    self._dd_tp_samples = throughput_samples
355
    self._dd_progress = []
356

    
357
    # Expected size of transferred data
358
    self._exp_size = exp_size
359

    
360
  def GetLineSplitter(self, prog):
361
    """Returns the line splitter for a program.
362

363
    """
364
    return self._splitter[prog]
365

    
366
  def FlushAll(self):
367
    """Flushes all line splitters.
368

369
    """
370
    for ls in self._splitter.itervalues():
371
      ls.flush()
372

    
373
  def CloseAll(self):
374
    """Closes all line splitters.
375

376
    """
377
    for ls in self._splitter.itervalues():
378
      ls.close()
379
    self._splitter.clear()
380

    
381
  def NotifyDd(self):
382
    """Tells dd(1) to write statistics.
383

384
    """
385
    if self._dd_pid is None:
386
      # Can't notify
387
      return False
388

    
389
    if not self._dd_ready:
390
      # There's a race condition between starting the program and sending
391
      # signals.  The signal handler is only registered after some time, so we
392
      # have to check whether the program is ready. If it isn't, sending a
393
      # signal will invoke the default handler (and usually abort the program).
394
      if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
395
        logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
396
        return False
397

    
398
      logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
399
      self._dd_ready = True
400

    
401
    logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
402
    try:
403
      os.kill(self._dd_pid, DD_INFO_SIGNAL)
404
    except EnvironmentError, err:
405
      if err.errno != errno.ESRCH:
406
        raise
407

    
408
      # Process no longer exists
409
      logging.debug("dd exited")
410
      self._dd_pid = None
411

    
412
    return True
413

    
414
  def _ProcessOutput(self, line, prog):
415
    """Takes care of child process output.
416

417
    @type line: string
418
    @param line: Child output line
419
    @type prog: number
420
    @param prog: Program from which the line originates
421

422
    """
423
    force_update = False
424
    forward_line = line
425

    
426
    if prog == PROG_SOCAT:
427
      level = None
428
      parts = line.split(None, 4)
429

    
430
      if len(parts) == 5:
431
        (_, _, _, level, msg) = parts
432

    
433
        force_update = self._ProcessSocatOutput(self._status_file, level, msg)
434

    
435
        if self._debug or (level and level not in SOCAT_LOG_IGNORE):
436
          forward_line = "socat: %s %s" % (level, msg)
437
        else:
438
          forward_line = None
439
      else:
440
        forward_line = "socat: %s" % line
441

    
442
    elif prog == PROG_DD:
443
      (should_forward, force_update) = self._ProcessDdOutput(line)
444

    
445
      if should_forward or self._debug:
446
        forward_line = "dd: %s" % line
447
      else:
448
        forward_line = None
449

    
450
    elif prog == PROG_DD_PID:
451
      if self._dd_pid:
452
        raise RuntimeError("dd PID reported more than once")
453
      logging.debug("Received dd PID %r", line)
454
      self._dd_pid = int(line)
455
      forward_line = None
456

    
457
    elif prog == PROG_EXP_SIZE:
458
      logging.debug("Received predicted size %r", line)
459
      forward_line = None
460

    
461
      if line:
462
        try:
463
          exp_size = utils.BytesToMebibyte(int(line))
464
        except (ValueError, TypeError), err:
465
          logging.error("Failed to convert predicted size %r to number: %s",
466
                        line, err)
467
          exp_size = None
468
      else:
469
        exp_size = None
470

    
471
      self._exp_size = exp_size
472

    
473
    if forward_line:
474
      self._logger.info(forward_line)
475
      self._status_file.AddRecentOutput(forward_line)
476

    
477
    self._status_file.Update(force_update)
478

    
479
  @staticmethod
480
  def _ProcessSocatOutput(status_file, level, msg):
481
    """Interprets socat log output.
482

483
    """
484
    if level == SOCAT_LOG_NOTICE:
485
      if status_file.GetListenPort() is None:
486
        # TODO: Maybe implement timeout to not listen forever
487
        m = LISTENING_RE.match(msg)
488
        if m:
489
          (_, port) = _VerifyListening(int(m.group("family")),
490
                                       m.group("address"),
491
                                       int(m.group("port")))
492

    
493
          status_file.SetListenPort(port)
494
          return True
495

    
496
      if not status_file.GetConnected():
497
        m = TRANSFER_LOOP_RE.match(msg)
498
        if m:
499
          logging.debug("Connection established")
500
          status_file.SetConnected()
501
          return True
502

    
503
    return False
504

    
505
  def _ProcessDdOutput(self, line):
506
    """Interprets a line of dd(1)'s output.
507

508
    """
509
    m = DD_INFO_RE.match(line)
510
    if m:
511
      seconds = float(m.group("seconds"))
512
      mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
513
      self._UpdateDdProgress(seconds, mbytes)
514
      return (False, True)
515

    
516
    m = DD_STDERR_IGNORE.match(line)
517
    if m:
518
      # Ignore
519
      return (False, False)
520

    
521
    # Forward line
522
    return (True, False)
523

    
524
  def _UpdateDdProgress(self, seconds, mbytes):
525
    """Updates the internal status variables for dd(1) progress.
526

527
    @type seconds: float
528
    @param seconds: Timestamp of this update
529
    @type mbytes: float
530
    @param mbytes: Total number of MiB transferred so far
531

532
    """
533
    # Add latest sample
534
    self._dd_progress.append((seconds, mbytes))
535

    
536
    # Remove old samples
537
    del self._dd_progress[:-self._dd_tp_samples]
538

    
539
    # Calculate throughput
540
    throughput = _CalcThroughput(self._dd_progress)
541

    
542
    # Calculate percent and ETA
543
    percent = None
544
    eta = None
545

    
546
    if self._exp_size is not None:
547
      if self._exp_size != 0:
548
        percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
549

    
550
      if throughput:
551
        eta = max(0, float(self._exp_size - mbytes) / throughput)
552

    
553
    self._status_file.SetProgress(mbytes, throughput, percent, eta)
554

    
555

    
556
def _CalcThroughput(samples):
557
  """Calculates the throughput in MiB/second.
558

559
  @type samples: sequence
560
  @param samples: List of samples, each consisting of a (timestamp, mbytes)
561
                  tuple
562
  @rtype: float or None
563
  @return: Throughput in MiB/second
564

565
  """
566
  if len(samples) < 2:
567
    # Can't calculate throughput
568
    return None
569

    
570
  (start_time, start_mbytes) = samples[0]
571
  (end_time, end_mbytes) = samples[-1]
572

    
573
  return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)