Statistics
| Branch: | Tag: | Revision:

root / lib / impexpd / __init__.py @ c08d76f5

History | View | Annotate | Download (13.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Classes and functions for import/export daemon.
23

24
"""
25

    
26
import os
27
import re
28
import socket
29
import logging
30
import signal
31
import errno
32
import time
33
from cStringIO import StringIO
34

    
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import utils
38

    
39

    
40
#: Used to recognize point at which socat(1) starts to listen on its socket.
41
#: The local address is required for the remote peer to connect (in particular
42
#: the port number).
43
LISTENING_RE = re.compile(r"^listening on\s+"
44
                          r"AF=(?P<family>\d+)\s+"
45
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
46

    
47
#: Used to recognize point at which socat(1) is sending data over the wire
48
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
49
                              re.I)
50

    
51
SOCAT_LOG_DEBUG = "D"
52
SOCAT_LOG_INFO = "I"
53
SOCAT_LOG_NOTICE = "N"
54
SOCAT_LOG_WARNING = "W"
55
SOCAT_LOG_ERROR = "E"
56
SOCAT_LOG_FATAL = "F"
57

    
58
SOCAT_LOG_IGNORE = frozenset([
59
  SOCAT_LOG_DEBUG,
60
  SOCAT_LOG_INFO,
61
  SOCAT_LOG_NOTICE,
62
  ])
63

    
64
#: Used to parse GNU dd(1) statistics
65
DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
66
                        r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
67

    
68
#: Used to ignore "N+N records in/out" on dd(1)'s stderr
69
DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
70

    
71
#: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
72
#: unavailable and SIGUSR1 is used instead)
73
DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
74

    
75
#: Buffer size: at most this many bytes are transferred at once
76
BUFSIZE = 1024 * 1024
77

    
78
# Common options for socat
79
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
80
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
81

    
82
(PROG_OTHER,
83
 PROG_SOCAT,
84
 PROG_DD,
85
 PROG_DD_PID) = range(1, 5)
86
PROG_ALL = frozenset([
87
  PROG_OTHER,
88
  PROG_SOCAT,
89
  PROG_DD,
90
  PROG_DD_PID,
91
  ])
92

    
93

    
94
class CommandBuilder(object):
95
  def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
96
    """Initializes this class.
97

98
    @param mode: Daemon mode (import or export)
99
    @param opts: Options object
100
    @type socat_stderr_fd: int
101
    @param socat_stderr_fd: File descriptor socat should write its stderr to
102
    @type dd_stderr_fd: int
103
    @param dd_stderr_fd: File descriptor dd should write its stderr to
104
    @type dd_pid_fd: int
105
    @param dd_pid_fd: File descriptor the child should write dd's PID to
106

107
    """
108
    self._opts = opts
109
    self._mode = mode
110
    self._socat_stderr_fd = socat_stderr_fd
111
    self._dd_stderr_fd = dd_stderr_fd
112
    self._dd_pid_fd = dd_pid_fd
113

    
114
  @staticmethod
115
  def GetBashCommand(cmd):
116
    """Prepares a command to be run in Bash.
117

118
    """
119
    return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
120

    
121
  def _GetSocatCommand(self):
122
    """Returns the socat command.
123

124
    """
125
    common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
126
      "key=%s" % self._opts.key,
127
      "cert=%s" % self._opts.cert,
128
      "cafile=%s" % self._opts.ca,
129
      ]
130

    
131
    if self._opts.bind is not None:
132
      common_addr_opts.append("bind=%s" % self._opts.bind)
133

    
134
    if self._mode == constants.IEM_IMPORT:
135
      if self._opts.port is None:
136
        port = 0
137
      else:
138
        port = self._opts.port
139

    
140
      addr1 = [
141
        "OPENSSL-LISTEN:%s" % port,
142
        "reuseaddr",
143

    
144
        # Retry to listen if connection wasn't established successfully, up to
145
        # 100 times a second. Note that this still leaves room for DoS attacks.
146
        "forever",
147
        "intervall=0.01",
148
        ] + common_addr_opts
149
      addr2 = ["stdout"]
150

    
151
    elif self._mode == constants.IEM_EXPORT:
152
      addr1 = ["stdin"]
153
      addr2 = [
154
        "OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
155

    
156
        # How long to wait per connection attempt
157
        "connect-timeout=%s" % self._opts.connect_timeout,
158

    
159
        # Retry a few times before giving up to connect (once per second)
160
        "retry=%s" % self._opts.connect_retries,
161
        "intervall=1",
162
        ] + common_addr_opts
163

    
164
    else:
165
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
166

    
167
    for i in [addr1, addr2]:
168
      for value in i:
169
        if "," in value:
170
          raise errors.GenericError("Comma not allowed in socat option"
171
                                    " value: %r" % value)
172

    
173
    return [
174
      constants.SOCAT_PATH,
175

    
176
      # Log to stderr
177
      "-ls",
178

    
179
      # Log level
180
      "-d", "-d",
181

    
182
      # Buffer size
183
      "-b%s" % BUFSIZE,
184

    
185
      # Unidirectional mode, the first address is only used for reading, and the
186
      # second address is only used for writing
187
      "-u",
188

    
189
      ",".join(addr1), ",".join(addr2)
190
      ]
191

    
192
  def _GetTransportCommand(self):
193
    """Returns the command for the transport part of the daemon.
194

195
    """
196
    socat_cmd = ("%s 2>&%d" %
197
                 (utils.ShellQuoteArgs(self._GetSocatCommand()),
198
                  self._socat_stderr_fd))
199

    
200
    dd_cmd = StringIO()
201
    # Setting LC_ALL since we want to parse the output and explicitely
202
    # redirecting stdin, as the background process (dd) would have /dev/null as
203
    # stdin otherwise
204
    dd_cmd.write("{ LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
205
                 (BUFSIZE, self._dd_stderr_fd))
206
    # Send PID to daemon
207
    dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
208
    # And wait for dd
209
    dd_cmd.write(" wait $pid;")
210
    dd_cmd.write(" }")
211

    
212
    compr = self._opts.compress
213

    
214
    assert compr in constants.IEC_ALL
215

    
216
    if self._mode == constants.IEM_IMPORT:
217
      if compr == constants.IEC_GZIP:
218
        transport_cmd = "%s | gunzip -c" % socat_cmd
219
      else:
220
        transport_cmd = socat_cmd
221

    
222
      transport_cmd += " | %s" % dd_cmd.getvalue()
223
    elif self._mode == constants.IEM_EXPORT:
224
      if compr == constants.IEC_GZIP:
225
        transport_cmd = "gzip -c | %s" % socat_cmd
226
      else:
227
        transport_cmd = socat_cmd
228

    
229
      transport_cmd = "%s | %s" % (dd_cmd.getvalue(), transport_cmd)
230
    else:
231
      raise errors.GenericError("Invalid mode '%s'" % self._mode)
232

    
233
    # TODO: Run transport as separate user
234
    # The transport uses its own shell to simplify running it as a separate user
235
    # in the future.
236
    return self.GetBashCommand(transport_cmd)
237

    
238
  def GetCommand(self):
239
    """Returns the complete child process command.
240

241
    """
242
    transport_cmd = self._GetTransportCommand()
243

    
244
    buf = StringIO()
245

    
246
    if self._opts.cmd_prefix:
247
      buf.write(self._opts.cmd_prefix)
248
      buf.write(" ")
249

    
250
    buf.write(utils.ShellQuoteArgs(transport_cmd))
251

    
252
    if self._opts.cmd_suffix:
253
      buf.write(" ")
254
      buf.write(self._opts.cmd_suffix)
255

    
256
    return self.GetBashCommand(buf.getvalue())
257

    
258

    
259
def _VerifyListening(family, address, port):
260
  """Verify address given as listening address by socat.
261

262
  """
263
  # TODO: Implement IPv6 support
264
  if family != socket.AF_INET:
265
    raise errors.GenericError("Address family %r not supported" % family)
266

    
267
  try:
268
    packed_address = socket.inet_pton(family, address)
269
  except socket.error:
270
    raise errors.GenericError("Invalid address %r for family %s" %
271
                              (address, family))
272

    
273
  return (socket.inet_ntop(family, packed_address), port)
274

    
275

    
276
class ChildIOProcessor(object):
277
  def __init__(self, debug, status_file, logger, throughput_samples):
278
    """Initializes this class.
279

280
    """
281
    self._debug = debug
282
    self._status_file = status_file
283
    self._logger = logger
284

    
285
    self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
286
                           for prog in PROG_ALL])
287

    
288
    self._dd_pid = None
289
    self._dd_ready = False
290
    self._dd_tp_samples = throughput_samples
291
    self._dd_progress = []
292

    
293
    # Expected size of transferred data
294
    self._exp_size = None
295

    
296
  def GetLineSplitter(self, prog):
297
    """Returns the line splitter for a program.
298

299
    """
300
    return self._splitter[prog]
301

    
302
  def FlushAll(self):
303
    """Flushes all line splitters.
304

305
    """
306
    for ls in self._splitter.itervalues():
307
      ls.flush()
308

    
309
  def CloseAll(self):
310
    """Closes all line splitters.
311

312
    """
313
    for ls in self._splitter.itervalues():
314
      ls.close()
315
    self._splitter.clear()
316

    
317
  def NotifyDd(self):
318
    """Tells dd(1) to write statistics.
319

320
    """
321
    if self._dd_pid is None:
322
      # Can't notify
323
      return False
324

    
325
    if not self._dd_ready:
326
      # There's a race condition between starting the program and sending
327
      # signals.  The signal handler is only registered after some time, so we
328
      # have to check whether the program is ready. If it isn't, sending a
329
      # signal will invoke the default handler (and usually abort the program).
330
      if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
331
        logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
332
        return False
333

    
334
      logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
335
      self._dd_ready = True
336

    
337
    logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
338
    try:
339
      os.kill(self._dd_pid, DD_INFO_SIGNAL)
340
    except EnvironmentError, err:
341
      if err.errno != errno.ESRCH:
342
        raise
343

    
344
      # Process no longer exists
345
      self._dd_pid = None
346

    
347
    return True
348

    
349
  def _ProcessOutput(self, line, prog):
350
    """Takes care of child process output.
351

352
    @type line: string
353
    @param line: Child output line
354
    @type prog: number
355
    @param prog: Program from which the line originates
356

357
    """
358
    force_update = False
359
    forward_line = line
360

    
361
    if prog == PROG_SOCAT:
362
      level = None
363
      parts = line.split(None, 4)
364

    
365
      if len(parts) == 5:
366
        (_, _, _, level, msg) = parts
367

    
368
        force_update = self._ProcessSocatOutput(self._status_file, level, msg)
369

    
370
        if self._debug or (level and level not in SOCAT_LOG_IGNORE):
371
          forward_line = "socat: %s %s" % (level, msg)
372
        else:
373
          forward_line = None
374
      else:
375
        forward_line = "socat: %s" % line
376

    
377
    elif prog == PROG_DD:
378
      (should_forward, force_update) = self._ProcessDdOutput(line)
379

    
380
      if should_forward or self._debug:
381
        forward_line = "dd: %s" % line
382
      else:
383
        forward_line = None
384

    
385
    elif prog == PROG_DD_PID:
386
      if self._dd_pid:
387
        raise RuntimeError("dd PID reported more than once")
388
      logging.debug("Received dd PID %r", line)
389
      self._dd_pid = int(line)
390
      forward_line = None
391

    
392
    if forward_line:
393
      self._logger.info(forward_line)
394
      self._status_file.AddRecentOutput(forward_line)
395

    
396
    self._status_file.Update(force_update)
397

    
398
  @staticmethod
399
  def _ProcessSocatOutput(status_file, level, msg):
400
    """Interprets socat log output.
401

402
    """
403
    if level == SOCAT_LOG_NOTICE:
404
      if status_file.GetListenPort() is None:
405
        # TODO: Maybe implement timeout to not listen forever
406
        m = LISTENING_RE.match(msg)
407
        if m:
408
          (_, port) = _VerifyListening(int(m.group("family")),
409
                                       m.group("address"),
410
                                       int(m.group("port")))
411

    
412
          status_file.SetListenPort(port)
413
          return True
414

    
415
      if not status_file.GetConnected():
416
        m = TRANSFER_LOOP_RE.match(msg)
417
        if m:
418
          logging.debug("Connection established")
419
          status_file.SetConnected()
420
          return True
421

    
422
    return False
423

    
424
  def _ProcessDdOutput(self, line):
425
    """Interprets a line of dd(1)'s output.
426

427
    """
428
    m = DD_INFO_RE.match(line)
429
    if m:
430
      seconds = float(m.group("seconds"))
431
      mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
432
      self._UpdateDdProgress(seconds, mbytes)
433
      return (False, True)
434

    
435
    m = DD_STDERR_IGNORE.match(line)
436
    if m:
437
      # Ignore
438
      return (False, False)
439

    
440
    # Forward line
441
    return (True, False)
442

    
443
  def _UpdateDdProgress(self, seconds, mbytes):
444
    """Updates the internal status variables for dd(1) progress.
445

446
    @type seconds: float
447
    @param seconds: Timestamp of this update
448
    @type mbytes: float
449
    @param mbytes: Total number of MiB transferred so far
450

451
    """
452
    # Add latest sample
453
    self._dd_progress.append((seconds, mbytes))
454

    
455
    # Remove old samples
456
    del self._dd_progress[:-self._dd_tp_samples]
457

    
458
    # Calculate throughput
459
    throughput = _CalcThroughput(self._dd_progress)
460

    
461
    # Calculate percent and ETA
462
    percent = None
463
    eta = None
464

    
465
    if self._exp_size is not None:
466
      if self._exp_size != 0:
467
        percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
468

    
469
      if throughput:
470
        eta = max(0, float(self._exp_size - mbytes) / throughput)
471

    
472
    self._status_file.SetProgress(mbytes, throughput, percent, eta)
473

    
474

    
475
def _CalcThroughput(samples):
476
  """Calculates the throughput in MiB/second.
477

478
  @type samples: sequence
479
  @param samples: List of samples, each consisting of a (timestamp, mbytes)
480
                  tuple
481
  @rtype: float or None
482
  @return: Throughput in MiB/second
483

484
  """
485
  if len(samples) < 2:
486
    # Can't calculate throughput
487
    return None
488

    
489
  (start_time, start_mbytes) = samples[0]
490
  (end_time, end_mbytes) = samples[-1]
491

    
492
  return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)