Retry connection in import-export daemon
[ganeti-local] / daemons / import-export
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Import/export daemon.
23
24 """
25
26 # pylint: disable-msg=C0103
27 # C0103: Invalid name import-export
28
29 import errno
30 import logging
31 import optparse
32 import os
33 import re
34 import select
35 import signal
36 import socket
37 import subprocess
38 import sys
39 import time
40 from cStringIO import StringIO
41
42 from ganeti import constants
43 from ganeti import cli
44 from ganeti import utils
45 from ganeti import serializer
46 from ganeti import objects
47 from ganeti import locking
48
49
50 #: Used to recognize point at which socat(1) starts to listen on its socket.
51 #: The local address is required for the remote peer to connect (in particular
52 #: the port number).
53 LISTENING_RE = re.compile(r"^listening on\s+"
54                           r"AF=(?P<family>\d+)\s+"
55                           r"(?P<address>.+):(?P<port>\d+)$", re.I)
56
57 #: Used to recognize point at which socat(1) is sending data over the wire
58 TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
59                               re.I)
60
61 SOCAT_LOG_DEBUG = "D"
62 SOCAT_LOG_INFO = "I"
63 SOCAT_LOG_NOTICE = "N"
64 SOCAT_LOG_WARNING = "W"
65 SOCAT_LOG_ERROR = "E"
66 SOCAT_LOG_FATAL = "F"
67
68 SOCAT_LOG_IGNORE = frozenset([
69   SOCAT_LOG_DEBUG,
70   SOCAT_LOG_INFO,
71   SOCAT_LOG_NOTICE,
72   ])
73
74 #: Socat buffer size: at most this many bytes are transferred per step
75 SOCAT_BUFSIZE = 1024 * 1024
76
77 #: How many lines to keep in the status file
78 MAX_RECENT_OUTPUT_LINES = 20
79
80 #: Don't update status file more than once every 5 seconds (unless forced)
81 MIN_UPDATE_INTERVAL = 5.0
82
83 #: Give child process up to 5 seconds to exit after sending a signal
84 CHILD_LINGER_TIMEOUT = 5.0
85
86 #: How long to wait for a connection to be established
87 DEFAULT_CONNECT_TIMEOUT = 60
88
89 # Common options for socat
90 SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
91 SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
92
93
94 # Global variable for options
95 options = None
96
97
98 class Error(Exception):
99   """Generic exception"""
100
101
102 def SetupLogging():
103   """Configures the logging module.
104
105   """
106   formatter = logging.Formatter("%(asctime)s: %(message)s")
107
108   stderr_handler = logging.StreamHandler()
109   stderr_handler.setFormatter(formatter)
110   stderr_handler.setLevel(logging.NOTSET)
111
112   root_logger = logging.getLogger("")
113   root_logger.addHandler(stderr_handler)
114
115   if options.debug:
116     root_logger.setLevel(logging.NOTSET)
117   elif options.verbose:
118     root_logger.setLevel(logging.INFO)
119   else:
120     root_logger.setLevel(logging.ERROR)
121
122   # Create special logger for child process output
123   child_logger = logging.Logger("child output")
124   child_logger.addHandler(stderr_handler)
125   child_logger.setLevel(logging.NOTSET)
126
127   return child_logger
128
129
130 def _VerifyListening(family, address, port):
131   """Verify address given as listening address by socat.
132
133   """
134   # TODO: Implement IPv6 support
135   if family != socket.AF_INET:
136     raise Error("Address family %r not supported" % family)
137
138   try:
139     packed_address = socket.inet_pton(family, address)
140   except socket.error:
141     raise Error("Invalid address %r for family %s" % (address, family))
142
143   return (socket.inet_ntop(family, packed_address), port)
144
145
146 class StatusFile:
147   """Status file manager.
148
149   """
150   def __init__(self, path):
151     """Initializes class.
152
153     """
154     self._path = path
155     self._data = objects.ImportExportStatus(ctime=time.time(),
156                                             mtime=None,
157                                             recent_output=[])
158
159   def AddRecentOutput(self, line):
160     """Adds a new line of recent output.
161
162     """
163     self._data.recent_output.append(line)
164
165     # Remove old lines
166     del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
167
168   def SetListenPort(self, port):
169     """Sets the port the daemon is listening on.
170
171     @type port: int
172     @param port: TCP/UDP port
173
174     """
175     assert isinstance(port, (int, long)) and 0 < port < 2**16
176     self._data.listen_port = port
177
178   def GetListenPort(self):
179     """Returns the port the daemon is listening on.
180
181     """
182     return self._data.listen_port
183
184   def SetConnected(self):
185     """Sets the connected flag.
186
187     """
188     self._data.connected = True
189
190   def GetConnected(self):
191     """Determines whether the daemon is connected.
192
193     """
194     return self._data.connected
195
196   def SetExitStatus(self, exit_status, error_message):
197     """Sets the exit status and an error message.
198
199     """
200     # Require error message when status isn't 0
201     assert exit_status == 0 or error_message
202
203     self._data.exit_status = exit_status
204     self._data.error_message = error_message
205
206   def ExitStatusIsSuccess(self):
207     """Returns whether the exit status means "success".
208
209     """
210     return not bool(self._data.error_message)
211
212   def Update(self, force):
213     """Updates the status file.
214
215     @type force: bool
216     @param force: Write status file in any case, not only when minimum interval
217                   is expired
218
219     """
220     if not (force or
221             self._data.mtime is None or
222             time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
223       return
224
225     logging.debug("Updating status file %s", self._path)
226
227     self._data.mtime = time.time()
228     utils.WriteFile(self._path,
229                     data=serializer.DumpJson(self._data.ToDict(), indent=True),
230                     mode=0400)
231
232
233 def _ProcessSocatOutput(status_file, level, msg):
234   """Interprets socat log output.
235
236   """
237   if level == SOCAT_LOG_NOTICE:
238     if status_file.GetListenPort() is None:
239       # TODO: Maybe implement timeout to not listen forever
240       m = LISTENING_RE.match(msg)
241       if m:
242         (_, port) = _VerifyListening(int(m.group("family")), m.group("address"),
243                                      int(m.group("port")))
244
245         status_file.SetListenPort(port)
246         return True
247
248     if not status_file.GetConnected():
249       m = TRANSFER_LOOP_RE.match(msg)
250       if m:
251         status_file.SetConnected()
252         return True
253
254   return False
255
256
257 def ProcessOutput(line, status_file, logger, socat):
258   """Takes care of child process output.
259
260   @param status_file: Status file manager
261   @param logger: Child output logger
262   @type socat: bool
263   @param socat: Whether it's a socat output line
264   @type line: string
265   @param line: Child output line
266
267   """
268   force_update = False
269   forward_line = line
270
271   if socat:
272     level = None
273     parts = line.split(None, 4)
274
275     if len(parts) == 5:
276       (_, _, _, level, msg) = parts
277
278       force_update = _ProcessSocatOutput(status_file, level, msg)
279
280       if options.debug or (level and level not in SOCAT_LOG_IGNORE):
281         forward_line = "socat: %s %s" % (level, msg)
282       else:
283         forward_line = None
284     else:
285       forward_line = "socat: %s" % line
286
287   if forward_line:
288     logger.info(forward_line)
289     status_file.AddRecentOutput(forward_line)
290
291   status_file.Update(force_update)
292
293
294 def GetBashCommand(cmd):
295   """Prepares a command to be run in Bash.
296
297   """
298   return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
299
300
301 def GetSocatCommand(mode):
302   """Returns the socat command.
303
304   """
305   common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
306     "key=%s" % options.key,
307     "cert=%s" % options.cert,
308     "cafile=%s" % options.ca,
309     ]
310
311   if options.bind is not None:
312     common_addr_opts.append("bind=%s" % options.bind)
313
314   if mode == constants.IEM_IMPORT:
315     if options.port is None:
316       port = 0
317     else:
318       port = options.port
319
320     addr1 = [
321       "OPENSSL-LISTEN:%s" % port,
322       "reuseaddr",
323
324       # Retry to listen if connection wasn't established successfully, up to
325       # 100 times a second. Note that this still leaves room for DoS attacks.
326       "forever",
327       "intervall=0.01",
328       ] + common_addr_opts
329     addr2 = ["stdout"]
330
331   elif mode == constants.IEM_EXPORT:
332     addr1 = ["stdin"]
333     addr2 = [
334       "OPENSSL:%s:%s" % (options.host, options.port),
335
336       # How long to wait per connection attempt
337       "connect-timeout=%s" % options.connect_timeout,
338
339       # Retry a few times before giving up to connect (once per second)
340       "retry=%s" % options.connect_retries,
341       "intervall=1",
342       ] + common_addr_opts
343
344   else:
345     raise Error("Invalid mode")
346
347   for i in [addr1, addr2]:
348     for value in i:
349       if "," in value:
350         raise Error("Comma not allowed in socat option value: %r" % value)
351
352   return [
353     constants.SOCAT_PATH,
354
355     # Log to stderr
356     "-ls",
357
358     # Log level
359     "-d", "-d",
360
361     # Buffer size
362     "-b%s" % SOCAT_BUFSIZE,
363
364     # Unidirectional mode, the first address is only used for reading, and the
365     # second address is only used for writing
366     "-u",
367
368     ",".join(addr1), ",".join(addr2)
369     ]
370
371
372 def GetTransportCommand(mode, socat_stderr_fd):
373   """Returns the command for the transport part of the daemon.
374
375   @param mode: Daemon mode (import or export)
376   @type socat_stderr_fd: int
377   @param socat_stderr_fd: File descriptor socat should write its stderr to
378
379   """
380   socat_cmd = ("%s 2>&%d" %
381                (utils.ShellQuoteArgs(GetSocatCommand(mode)),
382                 socat_stderr_fd))
383
384   # TODO: Make compression configurable
385
386   if mode == constants.IEM_IMPORT:
387     transport_cmd = "%s | gunzip -c" % socat_cmd
388   elif mode == constants.IEM_EXPORT:
389     transport_cmd = "gzip -c | %s" % socat_cmd
390   else:
391     raise Error("Invalid mode")
392
393   # TODO: Use "dd" to measure processed data (allows to give an ETA)
394
395   # TODO: Run transport as separate user
396   # The transport uses its own shell to simplify running it as a separate user
397   # in the future.
398   return GetBashCommand(transport_cmd)
399
400
401 def GetCommand(mode, socat_stderr_fd):
402   """Returns the complete child process command.
403
404   """
405   buf = StringIO()
406
407   if options.cmd_prefix:
408     buf.write(options.cmd_prefix)
409     buf.write(" ")
410
411   buf.write(utils.ShellQuoteArgs(GetTransportCommand(mode, socat_stderr_fd)))
412
413   if options.cmd_suffix:
414     buf.write(" ")
415     buf.write(options.cmd_suffix)
416
417   return GetBashCommand(buf.getvalue())
418
419
420 def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
421                    signal_notify, signal_handler, mode):
422   """Handles the child processes' output.
423
424   """
425   assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
426          "Other signals are not handled in this function"
427
428   # Buffer size 0 is important, otherwise .read() with a specified length
429   # might buffer data while poll(2) won't mark its file descriptor as
430   # readable again.
431   socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
432
433   script_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
434                                            child_logger, False)
435   try:
436     socat_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
437                                             child_logger, True)
438     try:
439       fdmap = {
440         child.stderr.fileno(): (child.stderr, script_stderr_lines),
441         socat_stderr_read.fileno(): (socat_stderr_read, socat_stderr_lines),
442         signal_notify.fileno(): (signal_notify, None),
443         }
444
445       poller = select.poll()
446       for fd in fdmap:
447         utils.SetNonblockFlag(fd, True)
448         poller.register(fd, select.POLLIN)
449
450       if options.connect_timeout and mode == constants.IEM_IMPORT:
451         listen_timeout = locking.RunningTimeout(options.connect_timeout, True)
452       else:
453         listen_timeout = None
454
455       exit_timeout = None
456
457       while True:
458         # Break out of loop if only signal notify FD is left
459         if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
460           break
461
462         timeout = None
463
464         if listen_timeout and not exit_timeout:
465           if status_file.GetConnected():
466             listen_timeout = None
467           elif listen_timeout.Remaining() < 0:
468             logging.info("Child process didn't establish connection in time")
469             child.Kill(signal.SIGTERM)
470             exit_timeout = \
471               locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
472             # Next block will calculate timeout
473           else:
474             # Not yet connected, check again in a second
475             timeout = 1000
476
477         if exit_timeout:
478           timeout = exit_timeout.Remaining() * 1000
479           if timeout < 0:
480             logging.info("Child process didn't exit in time")
481             break
482
483         for fd, event in utils.RetryOnSignal(poller.poll, timeout):
484           if event & (select.POLLIN | event & select.POLLPRI):
485             (from_, to) = fdmap[fd]
486
487             # Read up to 1 KB of data
488             data = from_.read(1024)
489             if data:
490               if to:
491                 to.write(data)
492               elif fd == signal_notify.fileno():
493                 # Signal handling
494                 if signal_handler.called:
495                   signal_handler.Clear()
496                   if exit_timeout:
497                     logging.info("Child process still has about %0.2f seconds"
498                                  " to exit", exit_timeout.Remaining())
499                   else:
500                     logging.info("Giving child process %0.2f seconds to exit",
501                                  CHILD_LINGER_TIMEOUT)
502                     exit_timeout = \
503                       locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
504             else:
505               poller.unregister(fd)
506               del fdmap[fd]
507
508           elif event & (select.POLLNVAL | select.POLLHUP |
509                         select.POLLERR):
510             poller.unregister(fd)
511             del fdmap[fd]
512
513         script_stderr_lines.flush()
514         socat_stderr_lines.flush()
515
516       # If there was a timeout calculator, we were waiting for the child to
517       # finish, e.g. due to a signal
518       return not bool(exit_timeout)
519     finally:
520       socat_stderr_lines.close()
521   finally:
522     script_stderr_lines.close()
523
524
525 def ParseOptions():
526   """Parses the options passed to the program.
527
528   @return: Arguments to program
529
530   """
531   global options # pylint: disable-msg=W0603
532
533   parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
534                                         (constants.IEM_IMPORT,
535                                          constants.IEM_EXPORT)))
536   parser.add_option(cli.DEBUG_OPT)
537   parser.add_option(cli.VERBOSE_OPT)
538   parser.add_option("--key", dest="key", action="store", type="string",
539                     help="RSA key file")
540   parser.add_option("--cert", dest="cert", action="store", type="string",
541                     help="X509 certificate file")
542   parser.add_option("--ca", dest="ca", action="store", type="string",
543                     help="X509 CA file")
544   parser.add_option("--bind", dest="bind", action="store", type="string",
545                     help="Bind address")
546   parser.add_option("--host", dest="host", action="store", type="string",
547                     help="Remote hostname")
548   parser.add_option("--port", dest="port", action="store", type="int",
549                     help="Remote port")
550   parser.add_option("--connect-retries", dest="connect_retries", action="store",
551                     type="int", default=0,
552                     help=("How many times the connection should be retried"
553                           " (export only)"))
554   parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
555                     type="int", default=DEFAULT_CONNECT_TIMEOUT,
556                     help="Timeout for connection to be established (seconds)")
557   parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
558                     type="string", help="Command prefix")
559   parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
560                     type="string", help="Command suffix")
561
562   (options, args) = parser.parse_args()
563
564   if len(args) != 2:
565     # Won't return
566     parser.error("Expected exactly two arguments")
567
568   (status_file_path, mode) = args
569
570   if mode not in (constants.IEM_IMPORT,
571                   constants.IEM_EXPORT):
572     # Won't return
573     parser.error("Invalid mode: %s" % mode)
574
575   return (status_file_path, mode)
576
577
578 class ChildProcess(subprocess.Popen):
579   def __init__(self, cmd, noclose_fds):
580     """Initializes this class.
581
582     """
583     self._noclose_fds = noclose_fds
584
585     # Not using close_fds because doing so would also close the socat stderr
586     # pipe, which we still need.
587     subprocess.Popen.__init__(self, cmd, shell=False, close_fds=False,
588                               stderr=subprocess.PIPE, stdout=None, stdin=None,
589                               preexec_fn=self._ChildPreexec)
590     self._SetProcessGroup()
591
592   def _ChildPreexec(self):
593     """Called before child executable is execve'd.
594
595     """
596     # Move to separate process group. By sending a signal to its process group
597     # we can kill the child process and all grandchildren.
598     os.setpgid(0, 0)
599
600     # Close almost all file descriptors
601     utils.CloseFDs(noclose_fds=self._noclose_fds)
602
603   def _SetProcessGroup(self):
604     """Sets the child's process group.
605
606     """
607     assert self.pid, "Can't be called in child process"
608
609     # Avoid race condition by setting child's process group (as good as
610     # possible in Python) before sending signals to child. For an
611     # explanation, see preexec function for child.
612     try:
613       os.setpgid(self.pid, self.pid)
614     except EnvironmentError, err:
615       # If the child process was faster we receive EPERM or EACCES
616       if err.errno not in (errno.EPERM, errno.EACCES):
617         raise
618
619   def Kill(self, signum):
620     """Sends signal to child process.
621
622     """
623     logging.info("Sending signal %s to child process", signum)
624     os.killpg(self.pid, signum)
625
626   def ForceQuit(self):
627     """Ensure child process is no longer running.
628
629     """
630     # Final check if child process is still alive
631     if utils.RetryOnSignal(self.poll) is None:
632       logging.error("Child process still alive, sending SIGKILL")
633       self.Kill(signal.SIGKILL)
634       utils.RetryOnSignal(self.wait)
635
636
637 def main():
638   """Main function.
639
640   """
641   # Option parsing
642   (status_file_path, mode) = ParseOptions()
643
644   # Configure logging
645   child_logger = SetupLogging()
646
647   status_file = StatusFile(status_file_path)
648   try:
649     try:
650       # Pipe to receive socat's stderr output
651       (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
652
653       # Get child process command
654       cmd = GetCommand(mode, socat_stderr_write_fd)
655
656       logging.debug("Starting command %r", cmd)
657
658       # Start child process
659       child = ChildProcess(cmd, [socat_stderr_write_fd])
660       try:
661         def _ForwardSignal(signum, _):
662           """Forwards signals to child process.
663
664           """
665           child.Kill(signum)
666
667         signal_wakeup = utils.SignalWakeupFd()
668         try:
669           # TODO: There is a race condition between starting the child and
670           # handling the signals here. While there might be a way to work around
671           # it by registering the handlers before starting the child and
672           # deferring sent signals until the child is available, doing so can be
673           # complicated.
674           signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
675                                                handler_fn=_ForwardSignal,
676                                                wakeup=signal_wakeup)
677           try:
678             # Close child's side
679             utils.RetryOnSignal(os.close, socat_stderr_write_fd)
680
681             if ProcessChildIO(child, socat_stderr_read_fd, status_file,
682                               child_logger, signal_wakeup, signal_handler,
683                               mode):
684               # The child closed all its file descriptors and there was no
685               # signal
686               # TODO: Implement timeout instead of waiting indefinitely
687               utils.RetryOnSignal(child.wait)
688           finally:
689             signal_handler.Reset()
690         finally:
691           signal_wakeup.Reset()
692       finally:
693         child.ForceQuit()
694
695       if child.returncode == 0:
696         errmsg = None
697       elif child.returncode < 0:
698         errmsg = "Exited due to signal %s" % (-child.returncode, )
699       else:
700         errmsg = "Exited with status %s" % (child.returncode, )
701
702       status_file.SetExitStatus(child.returncode, errmsg)
703     except Exception, err: # pylint: disable-msg=W0703
704       logging.exception("Unhandled error occurred")
705       status_file.SetExitStatus(constants.EXIT_FAILURE,
706                                 "Unhandled error occurred: %s" % (err, ))
707
708     if status_file.ExitStatusIsSuccess():
709       sys.exit(constants.EXIT_SUCCESS)
710
711     sys.exit(constants.EXIT_FAILURE)
712   finally:
713     status_file.Update(True)
714
715
716 if __name__ == "__main__":
717   main()