import/export daemon: Record amount of data transferred
[ganeti-local] / lib / impexpd / __init__.py
1 #
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Classes and functions for import/export daemon.
23
24 """
25
26 import os
27 import re
28 import socket
29 import logging
30 import signal
31 import errno
32 import time
33 from cStringIO import StringIO
34
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import utils
38
39
40 #: Used to recognize point at which socat(1) starts to listen on its socket.
41 #: The local address is required for the remote peer to connect (in particular
42 #: the port number).
43 LISTENING_RE = re.compile(r"^listening on\s+"
44                           r"AF=(?P<family>\d+)\s+"
45                           r"(?P<address>.+):(?P<port>\d+)$", re.I)
46
47 #: Used to recognize point at which socat(1) is sending data over the wire
48 TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
49                               re.I)
50
51 SOCAT_LOG_DEBUG = "D"
52 SOCAT_LOG_INFO = "I"
53 SOCAT_LOG_NOTICE = "N"
54 SOCAT_LOG_WARNING = "W"
55 SOCAT_LOG_ERROR = "E"
56 SOCAT_LOG_FATAL = "F"
57
58 SOCAT_LOG_IGNORE = frozenset([
59   SOCAT_LOG_DEBUG,
60   SOCAT_LOG_INFO,
61   SOCAT_LOG_NOTICE,
62   ])
63
64 #: Used to parse GNU dd(1) statistics
65 DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
66                         r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
67
68 #: Used to ignore "N+N records in/out" on dd(1)'s stderr
69 DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
70
71 #: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
72 #: unavailable and SIGUSR1 is used instead)
73 DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
74
75 #: Buffer size: at most this many bytes are transferred at once
76 BUFSIZE = 1024 * 1024
77
78 # Common options for socat
79 SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
80 SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
81
82 (PROG_OTHER,
83  PROG_SOCAT,
84  PROG_DD,
85  PROG_DD_PID) = range(1, 5)
86 PROG_ALL = frozenset([
87   PROG_OTHER,
88   PROG_SOCAT,
89   PROG_DD,
90   PROG_DD_PID,
91   ])
92
93
94 class CommandBuilder(object):
95   def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
96     """Initializes this class.
97
98     @param mode: Daemon mode (import or export)
99     @param opts: Options object
100     @type socat_stderr_fd: int
101     @param socat_stderr_fd: File descriptor socat should write its stderr to
102     @type dd_stderr_fd: int
103     @param dd_stderr_fd: File descriptor dd should write its stderr to
104     @type dd_pid_fd: int
105     @param dd_pid_fd: File descriptor the child should write dd's PID to
106
107     """
108     self._opts = opts
109     self._mode = mode
110     self._socat_stderr_fd = socat_stderr_fd
111     self._dd_stderr_fd = dd_stderr_fd
112     self._dd_pid_fd = dd_pid_fd
113
114   @staticmethod
115   def GetBashCommand(cmd):
116     """Prepares a command to be run in Bash.
117
118     """
119     return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
120
121   def _GetSocatCommand(self):
122     """Returns the socat command.
123
124     """
125     common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
126       "key=%s" % self._opts.key,
127       "cert=%s" % self._opts.cert,
128       "cafile=%s" % self._opts.ca,
129       ]
130
131     if self._opts.bind is not None:
132       common_addr_opts.append("bind=%s" % self._opts.bind)
133
134     if self._mode == constants.IEM_IMPORT:
135       if self._opts.port is None:
136         port = 0
137       else:
138         port = self._opts.port
139
140       addr1 = [
141         "OPENSSL-LISTEN:%s" % port,
142         "reuseaddr",
143
144         # Retry to listen if connection wasn't established successfully, up to
145         # 100 times a second. Note that this still leaves room for DoS attacks.
146         "forever",
147         "intervall=0.01",
148         ] + common_addr_opts
149       addr2 = ["stdout"]
150
151     elif self._mode == constants.IEM_EXPORT:
152       addr1 = ["stdin"]
153       addr2 = [
154         "OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
155
156         # How long to wait per connection attempt
157         "connect-timeout=%s" % self._opts.connect_timeout,
158
159         # Retry a few times before giving up to connect (once per second)
160         "retry=%s" % self._opts.connect_retries,
161         "intervall=1",
162         ] + common_addr_opts
163
164     else:
165       raise errors.GenericError("Invalid mode '%s'" % self._mode)
166
167     for i in [addr1, addr2]:
168       for value in i:
169         if "," in value:
170           raise errors.GenericError("Comma not allowed in socat option"
171                                     " value: %r" % value)
172
173     return [
174       constants.SOCAT_PATH,
175
176       # Log to stderr
177       "-ls",
178
179       # Log level
180       "-d", "-d",
181
182       # Buffer size
183       "-b%s" % BUFSIZE,
184
185       # Unidirectional mode, the first address is only used for reading, and the
186       # second address is only used for writing
187       "-u",
188
189       ",".join(addr1), ",".join(addr2)
190       ]
191
192   def _GetTransportCommand(self):
193     """Returns the command for the transport part of the daemon.
194
195     """
196     socat_cmd = ("%s 2>&%d" %
197                  (utils.ShellQuoteArgs(self._GetSocatCommand()),
198                   self._socat_stderr_fd))
199
200     dd_cmd = StringIO()
201     # Setting LC_ALL since we want to parse the output and explicitely
202     # redirecting stdin, as the background process (dd) would have /dev/null as
203     # stdin otherwise
204     dd_cmd.write("{ LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
205                  (BUFSIZE, self._dd_stderr_fd))
206     # Send PID to daemon
207     dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
208     # And wait for dd
209     dd_cmd.write(" wait $pid;")
210     dd_cmd.write(" }")
211
212     compr = self._opts.compress
213
214     assert compr in constants.IEC_ALL
215
216     if self._mode == constants.IEM_IMPORT:
217       if compr == constants.IEC_GZIP:
218         transport_cmd = "%s | gunzip -c" % socat_cmd
219       else:
220         transport_cmd = socat_cmd
221
222       transport_cmd += " | %s" % dd_cmd.getvalue()
223     elif self._mode == constants.IEM_EXPORT:
224       if compr == constants.IEC_GZIP:
225         transport_cmd = "gzip -c | %s" % socat_cmd
226       else:
227         transport_cmd = socat_cmd
228
229       transport_cmd = "%s | %s" % (dd_cmd.getvalue(), transport_cmd)
230     else:
231       raise errors.GenericError("Invalid mode '%s'" % self._mode)
232
233     # TODO: Run transport as separate user
234     # The transport uses its own shell to simplify running it as a separate user
235     # in the future.
236     return self.GetBashCommand(transport_cmd)
237
238   def GetCommand(self):
239     """Returns the complete child process command.
240
241     """
242     transport_cmd = self._GetTransportCommand()
243
244     buf = StringIO()
245
246     if self._opts.cmd_prefix:
247       buf.write(self._opts.cmd_prefix)
248       buf.write(" ")
249
250     buf.write(utils.ShellQuoteArgs(transport_cmd))
251
252     if self._opts.cmd_suffix:
253       buf.write(" ")
254       buf.write(self._opts.cmd_suffix)
255
256     return self.GetBashCommand(buf.getvalue())
257
258
259 def _VerifyListening(family, address, port):
260   """Verify address given as listening address by socat.
261
262   """
263   # TODO: Implement IPv6 support
264   if family != socket.AF_INET:
265     raise errors.GenericError("Address family %r not supported" % family)
266
267   try:
268     packed_address = socket.inet_pton(family, address)
269   except socket.error:
270     raise errors.GenericError("Invalid address %r for family %s" %
271                               (address, family))
272
273   return (socket.inet_ntop(family, packed_address), port)
274
275
276 class ChildIOProcessor(object):
277   def __init__(self, debug, status_file, logger, throughput_samples):
278     """Initializes this class.
279
280     """
281     self._debug = debug
282     self._status_file = status_file
283     self._logger = logger
284
285     self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
286                            for prog in PROG_ALL])
287
288     self._dd_pid = None
289     self._dd_ready = False
290     self._dd_tp_samples = throughput_samples
291     self._dd_progress = []
292
293     # Expected size of transferred data
294     self._exp_size = None
295
296   def GetLineSplitter(self, prog):
297     """Returns the line splitter for a program.
298
299     """
300     return self._splitter[prog]
301
302   def FlushAll(self):
303     """Flushes all line splitters.
304
305     """
306     for ls in self._splitter.itervalues():
307       ls.flush()
308
309   def CloseAll(self):
310     """Closes all line splitters.
311
312     """
313     for ls in self._splitter.itervalues():
314       ls.close()
315     self._splitter.clear()
316
317   def NotifyDd(self):
318     """Tells dd(1) to write statistics.
319
320     """
321     if self._dd_pid is None:
322       # Can't notify
323       return False
324
325     if not self._dd_ready:
326       # There's a race condition between starting the program and sending
327       # signals.  The signal handler is only registered after some time, so we
328       # have to check whether the program is ready. If it isn't, sending a
329       # signal will invoke the default handler (and usually abort the program).
330       if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
331         logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
332         return False
333
334       logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
335       self._dd_ready = True
336
337     logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
338     try:
339       os.kill(self._dd_pid, DD_INFO_SIGNAL)
340     except EnvironmentError, err:
341       if err.errno != errno.ESRCH:
342         raise
343
344       # Process no longer exists
345       self._dd_pid = None
346
347     return True
348
349   def _ProcessOutput(self, line, prog):
350     """Takes care of child process output.
351
352     @type line: string
353     @param line: Child output line
354     @type prog: number
355     @param prog: Program from which the line originates
356
357     """
358     force_update = False
359     forward_line = line
360
361     if prog == PROG_SOCAT:
362       level = None
363       parts = line.split(None, 4)
364
365       if len(parts) == 5:
366         (_, _, _, level, msg) = parts
367
368         force_update = self._ProcessSocatOutput(self._status_file, level, msg)
369
370         if self._debug or (level and level not in SOCAT_LOG_IGNORE):
371           forward_line = "socat: %s %s" % (level, msg)
372         else:
373           forward_line = None
374       else:
375         forward_line = "socat: %s" % line
376
377     elif prog == PROG_DD:
378       (should_forward, force_update) = self._ProcessDdOutput(line)
379
380       if should_forward or self._debug:
381         forward_line = "dd: %s" % line
382       else:
383         forward_line = None
384
385     elif prog == PROG_DD_PID:
386       if self._dd_pid:
387         raise RuntimeError("dd PID reported more than once")
388       logging.debug("Received dd PID %r", line)
389       self._dd_pid = int(line)
390       forward_line = None
391
392     if forward_line:
393       self._logger.info(forward_line)
394       self._status_file.AddRecentOutput(forward_line)
395
396     self._status_file.Update(force_update)
397
398   @staticmethod
399   def _ProcessSocatOutput(status_file, level, msg):
400     """Interprets socat log output.
401
402     """
403     if level == SOCAT_LOG_NOTICE:
404       if status_file.GetListenPort() is None:
405         # TODO: Maybe implement timeout to not listen forever
406         m = LISTENING_RE.match(msg)
407         if m:
408           (_, port) = _VerifyListening(int(m.group("family")),
409                                        m.group("address"),
410                                        int(m.group("port")))
411
412           status_file.SetListenPort(port)
413           return True
414
415       if not status_file.GetConnected():
416         m = TRANSFER_LOOP_RE.match(msg)
417         if m:
418           logging.debug("Connection established")
419           status_file.SetConnected()
420           return True
421
422     return False
423
424   def _ProcessDdOutput(self, line):
425     """Interprets a line of dd(1)'s output.
426
427     """
428     m = DD_INFO_RE.match(line)
429     if m:
430       seconds = float(m.group("seconds"))
431       mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
432       self._UpdateDdProgress(seconds, mbytes)
433       return (False, True)
434
435     m = DD_STDERR_IGNORE.match(line)
436     if m:
437       # Ignore
438       return (False, False)
439
440     # Forward line
441     return (True, False)
442
443   def _UpdateDdProgress(self, seconds, mbytes):
444     """Updates the internal status variables for dd(1) progress.
445
446     @type seconds: float
447     @param seconds: Timestamp of this update
448     @type mbytes: float
449     @param mbytes: Total number of MiB transferred so far
450
451     """
452     # Add latest sample
453     self._dd_progress.append((seconds, mbytes))
454
455     # Remove old samples
456     del self._dd_progress[:-self._dd_tp_samples]
457
458     # Calculate throughput
459     throughput = _CalcThroughput(self._dd_progress)
460
461     # Calculate percent and ETA
462     percent = None
463     eta = None
464
465     if self._exp_size is not None:
466       if self._exp_size != 0:
467         percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
468
469       if throughput:
470         eta = max(0, float(self._exp_size - mbytes) / throughput)
471
472     self._status_file.SetProgress(mbytes, throughput, percent, eta)
473
474
475 def _CalcThroughput(samples):
476   """Calculates the throughput in MiB/second.
477
478   @type samples: sequence
479   @param samples: List of samples, each consisting of a (timestamp, mbytes)
480                   tuple
481   @rtype: float or None
482   @return: Throughput in MiB/second
483
484   """
485   if len(samples) < 2:
486     # Can't calculate throughput
487     return None
488
489   (start_time, start_mbytes) = samples[0]
490   (end_time, end_mbytes) = samples[-1]
491
492   return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)