Statistics
| Branch: | Tag: | Revision:

root / lib / impexpd / __init__.py @ fbb6b864

History | View | Annotate | Download (14.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Classes and functions for import/export daemon.
23

24
"""
25

    
26
import os
27
import re
28
import socket
29
import logging
30
import signal
31
import errno
32
import time
33
from cStringIO import StringIO
34

    
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import utils
38

    
39

    
40
#: Used to recognize point at which socat(1) starts to listen on its socket.
41
#: The local address is required for the remote peer to connect (in particular
42
#: the port number).
43
LISTENING_RE = re.compile(r"^listening on\s+"
44
                          r"AF=(?P<family>\d+)\s+"
45
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
46

    
47
#: Used to recognize point at which socat(1) is sending data over the wire
48
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
49
                              re.I)
50

    
51
SOCAT_LOG_DEBUG = "D"
52
SOCAT_LOG_INFO = "I"
53
SOCAT_LOG_NOTICE = "N"
54
SOCAT_LOG_WARNING = "W"
55
SOCAT_LOG_ERROR = "E"
56
SOCAT_LOG_FATAL = "F"
57

    
58
SOCAT_LOG_IGNORE = frozenset([
59
  SOCAT_LOG_DEBUG,
60
  SOCAT_LOG_INFO,
61
  SOCAT_LOG_NOTICE,
62
  ])
63

    
64
#: Used to parse GNU dd(1) statistics
65
DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
66
                        r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
67

    
68
#: Used to ignore "N+N records in/out" on dd(1)'s stderr
69
DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
70

    
71
#: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
72
#: unavailable and SIGUSR1 is used instead)
73
DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
74

    
75
#: Buffer size: at most this many bytes are transferred at once
76
BUFSIZE = 1024 * 1024
77

    
78
# Common options for socat
79
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
80
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
81

    
82
SOCAT_OPTION_MAXLEN = 400
83

    
84
(PROG_OTHER,
85
 PROG_SOCAT,
86
 PROG_DD,
87
 PROG_DD_PID,
88
 PROG_EXP_SIZE) = range(1, 6)
89
PROG_ALL = frozenset([
90
  PROG_OTHER,
91
  PROG_SOCAT,
92
  PROG_DD,
93
  PROG_DD_PID,
94
  PROG_EXP_SIZE,
95
  ])
96

    
97

    
98
class CommandBuilder(object):
99
  def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
100
    """Initializes this class.
101

102
    @param mode: Daemon mode (import or export)
103
    @param opts: Options object
104
    @type socat_stderr_fd: int
105
    @param socat_stderr_fd: File descriptor socat should write its stderr to
106
    @type dd_stderr_fd: int
107
    @param dd_stderr_fd: File descriptor dd should write its stderr to
108
    @type dd_pid_fd: int
109
    @param dd_pid_fd: File descriptor the child should write dd's PID to
110

111
    """
112
    self._opts = opts
113
    self._mode = mode
114
    self._socat_stderr_fd = socat_stderr_fd
115
    self._dd_stderr_fd = dd_stderr_fd
116
    self._dd_pid_fd = dd_pid_fd
117

    
118
  @staticmethod
119
  def GetBashCommand(cmd):
120
    """Prepares a command to be run in Bash.
121

122
    """
123
    return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
124

    
125
  def _GetSocatCommand(self):
126
    """Returns the socat command.
127

128
    """
129
    common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
130
      "key=%s" % self._opts.key,
131
      "cert=%s" % self._opts.cert,
132
      "cafile=%s" % self._opts.ca,
133
      ]
134

    
135
    if self._opts.bind is not None:
136
      common_addr_opts.append("bind=%s" % self._opts.bind)
137

    
138
    if self._mode == constants.IEM_IMPORT:
139
      if self._opts.port is None:
140
        port = 0
141
      else:
142
        port = self._opts.port
143

    
144
      addr1 = [
145
        "OPENSSL-LISTEN:%s" % port,
146
        "reuseaddr",
147

    
148
        # Retry to listen if connection wasn't established successfully, up to
149
        # 100 times a second. Note that this still leaves room for DoS attacks.
150
        "forever",
151
        "intervall=0.01",
152
        ] + common_addr_opts
153
      addr2 = ["stdout"]
154

    
155
    elif self._mode == constants.IEM_EXPORT:
156
      addr1 = ["stdin"]
157
      addr2 = [
158
        "OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
159

    
160
        # How long to wait per connection attempt
161
        "connect-timeout=%s" % self._opts.connect_timeout,
162

    
163
        # Retry a few times before giving up to connect (once per second)
164
        "retry=%s" % self._opts.connect_retries,
165
        "intervall=1",
166
        ] + common_addr_opts
167

    
168
    else:
169
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
170

    
171
    for i in [addr1, addr2]:
172
      for value in i:
173
        if len(value) > SOCAT_OPTION_MAXLEN:
174
          raise errors.GenericError("Socat option longer than %s"
175
                                    " characters: %r" %
176
                                    (SOCAT_OPTION_MAXLEN, value))
177
        if "," in value:
178
          raise errors.GenericError("Comma not allowed in socat option"
179
                                    " value: %r" % value)
180

    
181
    return [
182
      constants.SOCAT_PATH,
183

    
184
      # Log to stderr
185
      "-ls",
186

    
187
      # Log level
188
      "-d", "-d",
189

    
190
      # Buffer size
191
      "-b%s" % BUFSIZE,
192

    
193
      # Unidirectional mode, the first address is only used for reading, and the
194
      # second address is only used for writing
195
      "-u",
196

    
197
      ",".join(addr1), ",".join(addr2)
198
      ]
199

    
200
  def _GetDdCommand(self):
201
    """Returns the command for measuring throughput.
202

203
    """
204
    dd_cmd = StringIO()
205
    # Setting LC_ALL since we want to parse the output and explicitely
206
    # redirecting stdin, as the background process (dd) would have /dev/null as
207
    # stdin otherwise
208
    dd_cmd.write("{ LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
209
                 (BUFSIZE, self._dd_stderr_fd))
210
    # Send PID to daemon
211
    dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
212
    # And wait for dd
213
    dd_cmd.write(" wait $pid;")
214
    dd_cmd.write(" }")
215
    return dd_cmd.getvalue()
216

    
217
  def _GetTransportCommand(self):
218
    """Returns the command for the transport part of the daemon.
219

220
    """
221
    socat_cmd = ("%s 2>&%d" %
222
                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
223
                  self._socat_stderr_fd))
224
    dd_cmd = self._GetDdCommand()
225

    
226
    compr = self._opts.compress
227

    
228
    assert compr in constants.IEC_ALL
229

    
230
    parts = []
231

    
232
    if self._mode == constants.IEM_IMPORT:
233
      parts.append(socat_cmd)
234

    
235
      if compr == constants.IEC_GZIP:
236
        parts.append("gunzip -c")
237

    
238
      parts.append(dd_cmd)
239

    
240
    elif self._mode == constants.IEM_EXPORT:
241
      parts.append(dd_cmd)
242

    
243
      if compr == constants.IEC_GZIP:
244
        parts.append("gzip -c")
245

    
246
      parts.append(socat_cmd)
247

    
248
    else:
249
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
250

    
251
    # TODO: Run transport as separate user
252
    # The transport uses its own shell to simplify running it as a separate user
253
    # in the future.
254
    return self.GetBashCommand(" | ".join(parts))
255

    
256
  def GetCommand(self):
257
    """Returns the complete child process command.
258

259
    """
260
    transport_cmd = self._GetTransportCommand()
261

    
262
    buf = StringIO()
263

    
264
    if self._opts.cmd_prefix:
265
      buf.write(self._opts.cmd_prefix)
266
      buf.write(" ")
267

    
268
    buf.write(utils.ShellQuoteArgs(transport_cmd))
269

    
270
    if self._opts.cmd_suffix:
271
      buf.write(" ")
272
      buf.write(self._opts.cmd_suffix)
273

    
274
    return self.GetBashCommand(buf.getvalue())
275

    
276

    
277
def _VerifyListening(family, address, port):
278
  """Verify address given as listening address by socat.
279

280
  """
281
  # TODO: Implement IPv6 support
282
  if family != socket.AF_INET:
283
    raise errors.GenericError("Address family %r not supported" % family)
284

    
285
  try:
286
    packed_address = socket.inet_pton(family, address)
287
  except socket.error:
288
    raise errors.GenericError("Invalid address %r for family %s" %
289
                              (address, family))
290

    
291
  return (socket.inet_ntop(family, packed_address), port)
292

    
293

    
294
class ChildIOProcessor(object):
295
  def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
296
    """Initializes this class.
297

298
    """
299
    self._debug = debug
300
    self._status_file = status_file
301
    self._logger = logger
302

    
303
    self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
304
                           for prog in PROG_ALL])
305

    
306
    self._dd_pid = None
307
    self._dd_ready = False
308
    self._dd_tp_samples = throughput_samples
309
    self._dd_progress = []
310

    
311
    # Expected size of transferred data
312
    self._exp_size = exp_size
313

    
314
  def GetLineSplitter(self, prog):
315
    """Returns the line splitter for a program.
316

317
    """
318
    return self._splitter[prog]
319

    
320
  def FlushAll(self):
321
    """Flushes all line splitters.
322

323
    """
324
    for ls in self._splitter.itervalues():
325
      ls.flush()
326

    
327
  def CloseAll(self):
328
    """Closes all line splitters.
329

330
    """
331
    for ls in self._splitter.itervalues():
332
      ls.close()
333
    self._splitter.clear()
334

    
335
  def NotifyDd(self):
336
    """Tells dd(1) to write statistics.
337

338
    """
339
    if self._dd_pid is None:
340
      # Can't notify
341
      return False
342

    
343
    if not self._dd_ready:
344
      # There's a race condition between starting the program and sending
345
      # signals.  The signal handler is only registered after some time, so we
346
      # have to check whether the program is ready. If it isn't, sending a
347
      # signal will invoke the default handler (and usually abort the program).
348
      if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
349
        logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
350
        return False
351

    
352
      logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
353
      self._dd_ready = True
354

    
355
    logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
356
    try:
357
      os.kill(self._dd_pid, DD_INFO_SIGNAL)
358
    except EnvironmentError, err:
359
      if err.errno != errno.ESRCH:
360
        raise
361

    
362
      # Process no longer exists
363
      logging.debug("dd exited")
364
      self._dd_pid = None
365

    
366
    return True
367

    
368
  def _ProcessOutput(self, line, prog):
369
    """Takes care of child process output.
370

371
    @type line: string
372
    @param line: Child output line
373
    @type prog: number
374
    @param prog: Program from which the line originates
375

376
    """
377
    force_update = False
378
    forward_line = line
379

    
380
    if prog == PROG_SOCAT:
381
      level = None
382
      parts = line.split(None, 4)
383

    
384
      if len(parts) == 5:
385
        (_, _, _, level, msg) = parts
386

    
387
        force_update = self._ProcessSocatOutput(self._status_file, level, msg)
388

    
389
        if self._debug or (level and level not in SOCAT_LOG_IGNORE):
390
          forward_line = "socat: %s %s" % (level, msg)
391
        else:
392
          forward_line = None
393
      else:
394
        forward_line = "socat: %s" % line
395

    
396
    elif prog == PROG_DD:
397
      (should_forward, force_update) = self._ProcessDdOutput(line)
398

    
399
      if should_forward or self._debug:
400
        forward_line = "dd: %s" % line
401
      else:
402
        forward_line = None
403

    
404
    elif prog == PROG_DD_PID:
405
      if self._dd_pid:
406
        raise RuntimeError("dd PID reported more than once")
407
      logging.debug("Received dd PID %r", line)
408
      self._dd_pid = int(line)
409
      forward_line = None
410

    
411
    elif prog == PROG_EXP_SIZE:
412
      logging.debug("Received predicted size %r", line)
413
      forward_line = None
414

    
415
      if line:
416
        try:
417
          exp_size = utils.BytesToMebibyte(int(line))
418
        except (ValueError, TypeError), err:
419
          logging.error("Failed to convert predicted size %r to number: %s",
420
                        line, err)
421
          exp_size = None
422
      else:
423
        exp_size = None
424

    
425
      self._exp_size = exp_size
426

    
427
    if forward_line:
428
      self._logger.info(forward_line)
429
      self._status_file.AddRecentOutput(forward_line)
430

    
431
    self._status_file.Update(force_update)
432

    
433
  @staticmethod
434
  def _ProcessSocatOutput(status_file, level, msg):
435
    """Interprets socat log output.
436

437
    """
438
    if level == SOCAT_LOG_NOTICE:
439
      if status_file.GetListenPort() is None:
440
        # TODO: Maybe implement timeout to not listen forever
441
        m = LISTENING_RE.match(msg)
442
        if m:
443
          (_, port) = _VerifyListening(int(m.group("family")),
444
                                       m.group("address"),
445
                                       int(m.group("port")))
446

    
447
          status_file.SetListenPort(port)
448
          return True
449

    
450
      if not status_file.GetConnected():
451
        m = TRANSFER_LOOP_RE.match(msg)
452
        if m:
453
          logging.debug("Connection established")
454
          status_file.SetConnected()
455
          return True
456

    
457
    return False
458

    
459
  def _ProcessDdOutput(self, line):
460
    """Interprets a line of dd(1)'s output.
461

462
    """
463
    m = DD_INFO_RE.match(line)
464
    if m:
465
      seconds = float(m.group("seconds"))
466
      mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
467
      self._UpdateDdProgress(seconds, mbytes)
468
      return (False, True)
469

    
470
    m = DD_STDERR_IGNORE.match(line)
471
    if m:
472
      # Ignore
473
      return (False, False)
474

    
475
    # Forward line
476
    return (True, False)
477

    
478
  def _UpdateDdProgress(self, seconds, mbytes):
479
    """Updates the internal status variables for dd(1) progress.
480

481
    @type seconds: float
482
    @param seconds: Timestamp of this update
483
    @type mbytes: float
484
    @param mbytes: Total number of MiB transferred so far
485

486
    """
487
    # Add latest sample
488
    self._dd_progress.append((seconds, mbytes))
489

    
490
    # Remove old samples
491
    del self._dd_progress[:-self._dd_tp_samples]
492

    
493
    # Calculate throughput
494
    throughput = _CalcThroughput(self._dd_progress)
495

    
496
    # Calculate percent and ETA
497
    percent = None
498
    eta = None
499

    
500
    if self._exp_size is not None:
501
      if self._exp_size != 0:
502
        percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
503

    
504
      if throughput:
505
        eta = max(0, float(self._exp_size - mbytes) / throughput)
506

    
507
    self._status_file.SetProgress(mbytes, throughput, percent, eta)
508

    
509

    
510
def _CalcThroughput(samples):
511
  """Calculates the throughput in MiB/second.
512

513
  @type samples: sequence
514
  @param samples: List of samples, each consisting of a (timestamp, mbytes)
515
                  tuple
516
  @rtype: float or None
517
  @return: Throughput in MiB/second
518

519
  """
520
  if len(samples) < 2:
521
    # Can't calculate throughput
522
    return None
523

    
524
  (start_time, start_mbytes) = samples[0]
525
  (end_time, end_mbytes) = samples[-1]
526

    
527
  return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)