Statistics
| Branch: | Tag: | Revision:

root / lib / impexpd / __init__.py @ 0559f745

History | View | Annotate | Download (14.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Classes and functions for import/export daemon.
23

24
"""
25

    
26
import os
27
import re
28
import socket
29
import logging
30
import signal
31
import errno
32
import time
33
from cStringIO import StringIO
34

    
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import utils
38

    
39

    
40
#: Used to recognize point at which socat(1) starts to listen on its socket.
41
#: The local address is required for the remote peer to connect (in particular
42
#: the port number).
43
LISTENING_RE = re.compile(r"^listening on\s+"
44
                          r"AF=(?P<family>\d+)\s+"
45
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
46

    
47
#: Used to recognize point at which socat(1) is sending data over the wire
48
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
49
                              re.I)
50

    
51
SOCAT_LOG_DEBUG = "D"
52
SOCAT_LOG_INFO = "I"
53
SOCAT_LOG_NOTICE = "N"
54
SOCAT_LOG_WARNING = "W"
55
SOCAT_LOG_ERROR = "E"
56
SOCAT_LOG_FATAL = "F"
57

    
58
SOCAT_LOG_IGNORE = frozenset([
59
  SOCAT_LOG_DEBUG,
60
  SOCAT_LOG_INFO,
61
  SOCAT_LOG_NOTICE,
62
  ])
63

    
64
#: Used to parse GNU dd(1) statistics
65
DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
66
                        r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
67

    
68
#: Used to ignore "N+N records in/out" on dd(1)'s stderr
69
DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
70

    
71
#: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
72
#: unavailable and SIGUSR1 is used instead)
73
DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
74

    
75
#: Buffer size: at most this many bytes are transferred at once
76
BUFSIZE = 1024 * 1024
77

    
78
# Common options for socat
79
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
80
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
81

    
82
SOCAT_OPTION_MAXLEN = 400
83

    
84
(PROG_OTHER,
85
 PROG_SOCAT,
86
 PROG_DD,
87
 PROG_DD_PID,
88
 PROG_EXP_SIZE) = range(1, 6)
89
PROG_ALL = frozenset([
90
  PROG_OTHER,
91
  PROG_SOCAT,
92
  PROG_DD,
93
  PROG_DD_PID,
94
  PROG_EXP_SIZE,
95
  ])
96

    
97

    
98
class CommandBuilder(object):
99
  def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
100
    """Initializes this class.
101

102
    @param mode: Daemon mode (import or export)
103
    @param opts: Options object
104
    @type socat_stderr_fd: int
105
    @param socat_stderr_fd: File descriptor socat should write its stderr to
106
    @type dd_stderr_fd: int
107
    @param dd_stderr_fd: File descriptor dd should write its stderr to
108
    @type dd_pid_fd: int
109
    @param dd_pid_fd: File descriptor the child should write dd's PID to
110

111
    """
112
    self._opts = opts
113
    self._mode = mode
114
    self._socat_stderr_fd = socat_stderr_fd
115
    self._dd_stderr_fd = dd_stderr_fd
116
    self._dd_pid_fd = dd_pid_fd
117

    
118
  @staticmethod
119
  def GetBashCommand(cmd):
120
    """Prepares a command to be run in Bash.
121

122
    """
123
    return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
124

    
125
  def _GetSocatCommand(self):
126
    """Returns the socat command.
127

128
    """
129
    common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
130
      "key=%s" % self._opts.key,
131
      "cert=%s" % self._opts.cert,
132
      "cafile=%s" % self._opts.ca,
133
      ]
134

    
135
    if self._opts.bind is not None:
136
      common_addr_opts.append("bind=%s" % self._opts.bind)
137

    
138
    if self._mode == constants.IEM_IMPORT:
139
      if self._opts.port is None:
140
        port = 0
141
      else:
142
        port = self._opts.port
143

    
144
      addr1 = [
145
        "OPENSSL-LISTEN:%s" % port,
146
        "reuseaddr",
147

    
148
        # Retry to listen if connection wasn't established successfully, up to
149
        # 100 times a second. Note that this still leaves room for DoS attacks.
150
        "forever",
151
        "intervall=0.01",
152
        ] + common_addr_opts
153
      addr2 = ["stdout"]
154

    
155
    elif self._mode == constants.IEM_EXPORT:
156
      addr1 = ["stdin"]
157
      addr2 = [
158
        "OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
159

    
160
        # How long to wait per connection attempt
161
        "connect-timeout=%s" % self._opts.connect_timeout,
162

    
163
        # Retry a few times before giving up to connect (once per second)
164
        "retry=%s" % self._opts.connect_retries,
165
        "intervall=1",
166
        ] + common_addr_opts
167

    
168
    else:
169
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
170

    
171
    for i in [addr1, addr2]:
172
      for value in i:
173
        if len(value) > SOCAT_OPTION_MAXLEN:
174
          raise errors.GenericError("Socat option longer than %s"
175
                                    " characters: %r" %
176
                                    (SOCAT_OPTION_MAXLEN, value))
177
        if "," in value:
178
          raise errors.GenericError("Comma not allowed in socat option"
179
                                    " value: %r" % value)
180

    
181
    return [
182
      constants.SOCAT_PATH,
183

    
184
      # Log to stderr
185
      "-ls",
186

    
187
      # Log level
188
      "-d", "-d",
189

    
190
      # Buffer size
191
      "-b%s" % BUFSIZE,
192

    
193
      # Unidirectional mode, the first address is only used for reading, and the
194
      # second address is only used for writing
195
      "-u",
196

    
197
      ",".join(addr1), ",".join(addr2)
198
      ]
199

    
200
  def _GetTransportCommand(self):
201
    """Returns the command for the transport part of the daemon.
202

203
    """
204
    socat_cmd = ("%s 2>&%d" %
205
                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
206
                  self._socat_stderr_fd))
207

    
208
    dd_cmd = StringIO()
209
    # Setting LC_ALL since we want to parse the output and explicitely
210
    # redirecting stdin, as the background process (dd) would have /dev/null as
211
    # stdin otherwise
212
    dd_cmd.write("{ LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
213
                 (BUFSIZE, self._dd_stderr_fd))
214
    # Send PID to daemon
215
    dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
216
    # And wait for dd
217
    dd_cmd.write(" wait $pid;")
218
    dd_cmd.write(" }")
219

    
220
    compr = self._opts.compress
221

    
222
    assert compr in constants.IEC_ALL
223

    
224
    if self._mode == constants.IEM_IMPORT:
225
      if compr == constants.IEC_GZIP:
226
        transport_cmd = "%s | gunzip -c" % socat_cmd
227
      else:
228
        transport_cmd = socat_cmd
229

    
230
      transport_cmd += " | %s" % dd_cmd.getvalue()
231
    elif self._mode == constants.IEM_EXPORT:
232
      if compr == constants.IEC_GZIP:
233
        transport_cmd = "gzip -c | %s" % socat_cmd
234
      else:
235
        transport_cmd = socat_cmd
236

    
237
      transport_cmd = "%s | %s" % (dd_cmd.getvalue(), transport_cmd)
238
    else:
239
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
240

    
241
    # TODO: Run transport as separate user
242
    # The transport uses its own shell to simplify running it as a separate user
243
    # in the future.
244
    return self.GetBashCommand(transport_cmd)
245

    
246
  def GetCommand(self):
247
    """Returns the complete child process command.
248

249
    """
250
    transport_cmd = self._GetTransportCommand()
251

    
252
    buf = StringIO()
253

    
254
    if self._opts.cmd_prefix:
255
      buf.write(self._opts.cmd_prefix)
256
      buf.write(" ")
257

    
258
    buf.write(utils.ShellQuoteArgs(transport_cmd))
259

    
260
    if self._opts.cmd_suffix:
261
      buf.write(" ")
262
      buf.write(self._opts.cmd_suffix)
263

    
264
    return self.GetBashCommand(buf.getvalue())
265

    
266

    
267
def _VerifyListening(family, address, port):
268
  """Verify address given as listening address by socat.
269

270
  """
271
  # TODO: Implement IPv6 support
272
  if family != socket.AF_INET:
273
    raise errors.GenericError("Address family %r not supported" % family)
274

    
275
  try:
276
    packed_address = socket.inet_pton(family, address)
277
  except socket.error:
278
    raise errors.GenericError("Invalid address %r for family %s" %
279
                              (address, family))
280

    
281
  return (socket.inet_ntop(family, packed_address), port)
282

    
283

    
284
class ChildIOProcessor(object):
285
  def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
286
    """Initializes this class.
287

288
    """
289
    self._debug = debug
290
    self._status_file = status_file
291
    self._logger = logger
292

    
293
    self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
294
                           for prog in PROG_ALL])
295

    
296
    self._dd_pid = None
297
    self._dd_ready = False
298
    self._dd_tp_samples = throughput_samples
299
    self._dd_progress = []
300

    
301
    # Expected size of transferred data
302
    self._exp_size = exp_size
303

    
304
  def GetLineSplitter(self, prog):
305
    """Returns the line splitter for a program.
306

307
    """
308
    return self._splitter[prog]
309

    
310
  def FlushAll(self):
311
    """Flushes all line splitters.
312

313
    """
314
    for ls in self._splitter.itervalues():
315
      ls.flush()
316

    
317
  def CloseAll(self):
318
    """Closes all line splitters.
319

320
    """
321
    for ls in self._splitter.itervalues():
322
      ls.close()
323
    self._splitter.clear()
324

    
325
  def NotifyDd(self):
326
    """Tells dd(1) to write statistics.
327

328
    """
329
    if self._dd_pid is None:
330
      # Can't notify
331
      return False
332

    
333
    if not self._dd_ready:
334
      # There's a race condition between starting the program and sending
335
      # signals.  The signal handler is only registered after some time, so we
336
      # have to check whether the program is ready. If it isn't, sending a
337
      # signal will invoke the default handler (and usually abort the program).
338
      if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
339
        logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
340
        return False
341

    
342
      logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
343
      self._dd_ready = True
344

    
345
    logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
346
    try:
347
      os.kill(self._dd_pid, DD_INFO_SIGNAL)
348
    except EnvironmentError, err:
349
      if err.errno != errno.ESRCH:
350
        raise
351

    
352
      # Process no longer exists
353
      logging.debug("dd exited")
354
      self._dd_pid = None
355

    
356
    return True
357

    
358
  def _ProcessOutput(self, line, prog):
359
    """Takes care of child process output.
360

361
    @type line: string
362
    @param line: Child output line
363
    @type prog: number
364
    @param prog: Program from which the line originates
365

366
    """
367
    force_update = False
368
    forward_line = line
369

    
370
    if prog == PROG_SOCAT:
371
      level = None
372
      parts = line.split(None, 4)
373

    
374
      if len(parts) == 5:
375
        (_, _, _, level, msg) = parts
376

    
377
        force_update = self._ProcessSocatOutput(self._status_file, level, msg)
378

    
379
        if self._debug or (level and level not in SOCAT_LOG_IGNORE):
380
          forward_line = "socat: %s %s" % (level, msg)
381
        else:
382
          forward_line = None
383
      else:
384
        forward_line = "socat: %s" % line
385

    
386
    elif prog == PROG_DD:
387
      (should_forward, force_update) = self._ProcessDdOutput(line)
388

    
389
      if should_forward or self._debug:
390
        forward_line = "dd: %s" % line
391
      else:
392
        forward_line = None
393

    
394
    elif prog == PROG_DD_PID:
395
      if self._dd_pid:
396
        raise RuntimeError("dd PID reported more than once")
397
      logging.debug("Received dd PID %r", line)
398
      self._dd_pid = int(line)
399
      forward_line = None
400

    
401
    elif prog == PROG_EXP_SIZE:
402
      logging.debug("Received predicted size %r", line)
403
      forward_line = None
404

    
405
      if line:
406
        try:
407
          exp_size = utils.BytesToMebibyte(int(line))
408
        except (ValueError, TypeError), err:
409
          logging.error("Failed to convert predicted size %r to number: %s",
410
                        line, err)
411
          exp_size = None
412
      else:
413
        exp_size = None
414

    
415
      self._exp_size = exp_size
416

    
417
    if forward_line:
418
      self._logger.info(forward_line)
419
      self._status_file.AddRecentOutput(forward_line)
420

    
421
    self._status_file.Update(force_update)
422

    
423
  @staticmethod
424
  def _ProcessSocatOutput(status_file, level, msg):
425
    """Interprets socat log output.
426

427
    """
428
    if level == SOCAT_LOG_NOTICE:
429
      if status_file.GetListenPort() is None:
430
        # TODO: Maybe implement timeout to not listen forever
431
        m = LISTENING_RE.match(msg)
432
        if m:
433
          (_, port) = _VerifyListening(int(m.group("family")),
434
                                       m.group("address"),
435
                                       int(m.group("port")))
436

    
437
          status_file.SetListenPort(port)
438
          return True
439

    
440
      if not status_file.GetConnected():
441
        m = TRANSFER_LOOP_RE.match(msg)
442
        if m:
443
          logging.debug("Connection established")
444
          status_file.SetConnected()
445
          return True
446

    
447
    return False
448

    
449
  def _ProcessDdOutput(self, line):
450
    """Interprets a line of dd(1)'s output.
451

452
    """
453
    m = DD_INFO_RE.match(line)
454
    if m:
455
      seconds = float(m.group("seconds"))
456
      mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
457
      self._UpdateDdProgress(seconds, mbytes)
458
      return (False, True)
459

    
460
    m = DD_STDERR_IGNORE.match(line)
461
    if m:
462
      # Ignore
463
      return (False, False)
464

    
465
    # Forward line
466
    return (True, False)
467

    
468
  def _UpdateDdProgress(self, seconds, mbytes):
469
    """Updates the internal status variables for dd(1) progress.
470

471
    @type seconds: float
472
    @param seconds: Timestamp of this update
473
    @type mbytes: float
474
    @param mbytes: Total number of MiB transferred so far
475

476
    """
477
    # Add latest sample
478
    self._dd_progress.append((seconds, mbytes))
479

    
480
    # Remove old samples
481
    del self._dd_progress[:-self._dd_tp_samples]
482

    
483
    # Calculate throughput
484
    throughput = _CalcThroughput(self._dd_progress)
485

    
486
    # Calculate percent and ETA
487
    percent = None
488
    eta = None
489

    
490
    if self._exp_size is not None:
491
      if self._exp_size != 0:
492
        percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
493

    
494
      if throughput:
495
        eta = max(0, float(self._exp_size - mbytes) / throughput)
496

    
497
    self._status_file.SetProgress(mbytes, throughput, percent, eta)
498

    
499

    
500
def _CalcThroughput(samples):
501
  """Calculates the throughput in MiB/second.
502

503
  @type samples: sequence
504
  @param samples: List of samples, each consisting of a (timestamp, mbytes)
505
                  tuple
506
  @rtype: float or None
507
  @return: Throughput in MiB/second
508

509
  """
510
  if len(samples) < 2:
511
    # Can't calculate throughput
512
    return None
513

    
514
  (start_time, start_mbytes) = samples[0]
515
  (end_time, end_mbytes) = samples[-1]
516

    
517
  return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)