import/export daemon: Move command building into separate module
[ganeti-local] / daemons / import-export
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Import/export daemon.
23
24 """
25
26 # pylint: disable-msg=C0103
27 # C0103: Invalid name import-export
28
29 import errno
30 import logging
31 import optparse
32 import os
33 import re
34 import select
35 import signal
36 import socket
37 import subprocess
38 import sys
39 import time
40
41 from ganeti import constants
42 from ganeti import cli
43 from ganeti import utils
44 from ganeti import serializer
45 from ganeti import objects
46 from ganeti import locking
47 from ganeti import impexpd
48
49
50 #: Used to recognize point at which socat(1) starts to listen on its socket.
51 #: The local address is required for the remote peer to connect (in particular
52 #: the port number).
53 LISTENING_RE = re.compile(r"^listening on\s+"
54                           r"AF=(?P<family>\d+)\s+"
55                           r"(?P<address>.+):(?P<port>\d+)$", re.I)
56
57 #: Used to recognize point at which socat(1) is sending data over the wire
58 TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
59                               re.I)
60
61 SOCAT_LOG_DEBUG = "D"
62 SOCAT_LOG_INFO = "I"
63 SOCAT_LOG_NOTICE = "N"
64 SOCAT_LOG_WARNING = "W"
65 SOCAT_LOG_ERROR = "E"
66 SOCAT_LOG_FATAL = "F"
67
68 SOCAT_LOG_IGNORE = frozenset([
69   SOCAT_LOG_DEBUG,
70   SOCAT_LOG_INFO,
71   SOCAT_LOG_NOTICE,
72   ])
73
74 #: How many lines to keep in the status file
75 MAX_RECENT_OUTPUT_LINES = 20
76
77 #: Don't update status file more than once every 5 seconds (unless forced)
78 MIN_UPDATE_INTERVAL = 5.0
79
80 #: Give child process up to 5 seconds to exit after sending a signal
81 CHILD_LINGER_TIMEOUT = 5.0
82
83 #: How long to wait for a connection to be established
84 DEFAULT_CONNECT_TIMEOUT = 60
85
86
87 # Global variable for options
88 options = None
89
90
91 class Error(Exception):
92   """Generic exception"""
93
94
95 def SetupLogging():
96   """Configures the logging module.
97
98   """
99   formatter = logging.Formatter("%(asctime)s: %(message)s")
100
101   stderr_handler = logging.StreamHandler()
102   stderr_handler.setFormatter(formatter)
103   stderr_handler.setLevel(logging.NOTSET)
104
105   root_logger = logging.getLogger("")
106   root_logger.addHandler(stderr_handler)
107
108   if options.debug:
109     root_logger.setLevel(logging.NOTSET)
110   elif options.verbose:
111     root_logger.setLevel(logging.INFO)
112   else:
113     root_logger.setLevel(logging.ERROR)
114
115   # Create special logger for child process output
116   child_logger = logging.Logger("child output")
117   child_logger.addHandler(stderr_handler)
118   child_logger.setLevel(logging.NOTSET)
119
120   return child_logger
121
122
123 def _VerifyListening(family, address, port):
124   """Verify address given as listening address by socat.
125
126   """
127   # TODO: Implement IPv6 support
128   if family != socket.AF_INET:
129     raise Error("Address family %r not supported" % family)
130
131   try:
132     packed_address = socket.inet_pton(family, address)
133   except socket.error:
134     raise Error("Invalid address %r for family %s" % (address, family))
135
136   return (socket.inet_ntop(family, packed_address), port)
137
138
139 class StatusFile:
140   """Status file manager.
141
142   """
143   def __init__(self, path):
144     """Initializes class.
145
146     """
147     self._path = path
148     self._data = objects.ImportExportStatus(ctime=time.time(),
149                                             mtime=None,
150                                             recent_output=[])
151
152   def AddRecentOutput(self, line):
153     """Adds a new line of recent output.
154
155     """
156     self._data.recent_output.append(line)
157
158     # Remove old lines
159     del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
160
161   def SetListenPort(self, port):
162     """Sets the port the daemon is listening on.
163
164     @type port: int
165     @param port: TCP/UDP port
166
167     """
168     assert isinstance(port, (int, long)) and 0 < port < 2**16
169     self._data.listen_port = port
170
171   def GetListenPort(self):
172     """Returns the port the daemon is listening on.
173
174     """
175     return self._data.listen_port
176
177   def SetConnected(self):
178     """Sets the connected flag.
179
180     """
181     self._data.connected = True
182
183   def GetConnected(self):
184     """Determines whether the daemon is connected.
185
186     """
187     return self._data.connected
188
189   def SetExitStatus(self, exit_status, error_message):
190     """Sets the exit status and an error message.
191
192     """
193     # Require error message when status isn't 0
194     assert exit_status == 0 or error_message
195
196     self._data.exit_status = exit_status
197     self._data.error_message = error_message
198
199   def ExitStatusIsSuccess(self):
200     """Returns whether the exit status means "success".
201
202     """
203     return not bool(self._data.error_message)
204
205   def Update(self, force):
206     """Updates the status file.
207
208     @type force: bool
209     @param force: Write status file in any case, not only when minimum interval
210                   is expired
211
212     """
213     if not (force or
214             self._data.mtime is None or
215             time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
216       return
217
218     logging.debug("Updating status file %s", self._path)
219
220     self._data.mtime = time.time()
221     utils.WriteFile(self._path,
222                     data=serializer.DumpJson(self._data.ToDict(), indent=True),
223                     mode=0400)
224
225
226 def _ProcessSocatOutput(status_file, level, msg):
227   """Interprets socat log output.
228
229   """
230   if level == SOCAT_LOG_NOTICE:
231     if status_file.GetListenPort() is None:
232       # TODO: Maybe implement timeout to not listen forever
233       m = LISTENING_RE.match(msg)
234       if m:
235         (_, port) = _VerifyListening(int(m.group("family")), m.group("address"),
236                                      int(m.group("port")))
237
238         status_file.SetListenPort(port)
239         return True
240
241     if not status_file.GetConnected():
242       m = TRANSFER_LOOP_RE.match(msg)
243       if m:
244         status_file.SetConnected()
245         return True
246
247   return False
248
249
250 def ProcessOutput(line, status_file, logger, socat):
251   """Takes care of child process output.
252
253   @param status_file: Status file manager
254   @param logger: Child output logger
255   @type socat: bool
256   @param socat: Whether it's a socat output line
257   @type line: string
258   @param line: Child output line
259
260   """
261   force_update = False
262   forward_line = line
263
264   if socat:
265     level = None
266     parts = line.split(None, 4)
267
268     if len(parts) == 5:
269       (_, _, _, level, msg) = parts
270
271       force_update = _ProcessSocatOutput(status_file, level, msg)
272
273       if options.debug or (level and level not in SOCAT_LOG_IGNORE):
274         forward_line = "socat: %s %s" % (level, msg)
275       else:
276         forward_line = None
277     else:
278       forward_line = "socat: %s" % line
279
280   if forward_line:
281     logger.info(forward_line)
282     status_file.AddRecentOutput(forward_line)
283
284   status_file.Update(force_update)
285
286
287 def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
288                    signal_notify, signal_handler, mode):
289   """Handles the child processes' output.
290
291   """
292   assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
293          "Other signals are not handled in this function"
294
295   # Buffer size 0 is important, otherwise .read() with a specified length
296   # might buffer data while poll(2) won't mark its file descriptor as
297   # readable again.
298   socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
299
300   script_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
301                                            child_logger, False)
302   try:
303     socat_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
304                                             child_logger, True)
305     try:
306       fdmap = {
307         child.stderr.fileno(): (child.stderr, script_stderr_lines),
308         socat_stderr_read.fileno(): (socat_stderr_read, socat_stderr_lines),
309         signal_notify.fileno(): (signal_notify, None),
310         }
311
312       poller = select.poll()
313       for fd in fdmap:
314         utils.SetNonblockFlag(fd, True)
315         poller.register(fd, select.POLLIN)
316
317       if options.connect_timeout and mode == constants.IEM_IMPORT:
318         listen_timeout = locking.RunningTimeout(options.connect_timeout, True)
319       else:
320         listen_timeout = None
321
322       exit_timeout = None
323
324       while True:
325         # Break out of loop if only signal notify FD is left
326         if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
327           break
328
329         timeout = None
330
331         if listen_timeout and not exit_timeout:
332           if status_file.GetConnected():
333             listen_timeout = None
334           elif listen_timeout.Remaining() < 0:
335             logging.info("Child process didn't establish connection in time")
336             child.Kill(signal.SIGTERM)
337             exit_timeout = \
338               locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
339             # Next block will calculate timeout
340           else:
341             # Not yet connected, check again in a second
342             timeout = 1000
343
344         if exit_timeout:
345           timeout = exit_timeout.Remaining() * 1000
346           if timeout < 0:
347             logging.info("Child process didn't exit in time")
348             break
349
350         for fd, event in utils.RetryOnSignal(poller.poll, timeout):
351           if event & (select.POLLIN | event & select.POLLPRI):
352             (from_, to) = fdmap[fd]
353
354             # Read up to 1 KB of data
355             data = from_.read(1024)
356             if data:
357               if to:
358                 to.write(data)
359               elif fd == signal_notify.fileno():
360                 # Signal handling
361                 if signal_handler.called:
362                   signal_handler.Clear()
363                   if exit_timeout:
364                     logging.info("Child process still has about %0.2f seconds"
365                                  " to exit", exit_timeout.Remaining())
366                   else:
367                     logging.info("Giving child process %0.2f seconds to exit",
368                                  CHILD_LINGER_TIMEOUT)
369                     exit_timeout = \
370                       locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
371             else:
372               poller.unregister(fd)
373               del fdmap[fd]
374
375           elif event & (select.POLLNVAL | select.POLLHUP |
376                         select.POLLERR):
377             poller.unregister(fd)
378             del fdmap[fd]
379
380         script_stderr_lines.flush()
381         socat_stderr_lines.flush()
382
383       # If there was a timeout calculator, we were waiting for the child to
384       # finish, e.g. due to a signal
385       return not bool(exit_timeout)
386     finally:
387       socat_stderr_lines.close()
388   finally:
389     script_stderr_lines.close()
390
391
392 def ParseOptions():
393   """Parses the options passed to the program.
394
395   @return: Arguments to program
396
397   """
398   global options # pylint: disable-msg=W0603
399
400   parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
401                                         (constants.IEM_IMPORT,
402                                          constants.IEM_EXPORT)))
403   parser.add_option(cli.DEBUG_OPT)
404   parser.add_option(cli.VERBOSE_OPT)
405   parser.add_option("--key", dest="key", action="store", type="string",
406                     help="RSA key file")
407   parser.add_option("--cert", dest="cert", action="store", type="string",
408                     help="X509 certificate file")
409   parser.add_option("--ca", dest="ca", action="store", type="string",
410                     help="X509 CA file")
411   parser.add_option("--bind", dest="bind", action="store", type="string",
412                     help="Bind address")
413   parser.add_option("--host", dest="host", action="store", type="string",
414                     help="Remote hostname")
415   parser.add_option("--port", dest="port", action="store", type="int",
416                     help="Remote port")
417   parser.add_option("--connect-retries", dest="connect_retries", action="store",
418                     type="int", default=0,
419                     help=("How many times the connection should be retried"
420                           " (export only)"))
421   parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
422                     type="int", default=DEFAULT_CONNECT_TIMEOUT,
423                     help="Timeout for connection to be established (seconds)")
424   parser.add_option("--compress", dest="compress", action="store",
425                     type="choice", help="Compression method",
426                     metavar="[%s]" % "|".join(constants.IEC_ALL),
427                     choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
428   parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
429                     type="string", help="Command prefix")
430   parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
431                     type="string", help="Command suffix")
432
433   (options, args) = parser.parse_args()
434
435   if len(args) != 2:
436     # Won't return
437     parser.error("Expected exactly two arguments")
438
439   (status_file_path, mode) = args
440
441   if mode not in (constants.IEM_IMPORT,
442                   constants.IEM_EXPORT):
443     # Won't return
444     parser.error("Invalid mode: %s" % mode)
445
446   return (status_file_path, mode)
447
448
449 class ChildProcess(subprocess.Popen):
450   def __init__(self, cmd, noclose_fds):
451     """Initializes this class.
452
453     """
454     self._noclose_fds = noclose_fds
455
456     # Not using close_fds because doing so would also close the socat stderr
457     # pipe, which we still need.
458     subprocess.Popen.__init__(self, cmd, shell=False, close_fds=False,
459                               stderr=subprocess.PIPE, stdout=None, stdin=None,
460                               preexec_fn=self._ChildPreexec)
461     self._SetProcessGroup()
462
463   def _ChildPreexec(self):
464     """Called before child executable is execve'd.
465
466     """
467     # Move to separate process group. By sending a signal to its process group
468     # we can kill the child process and all grandchildren.
469     os.setpgid(0, 0)
470
471     # Close almost all file descriptors
472     utils.CloseFDs(noclose_fds=self._noclose_fds)
473
474   def _SetProcessGroup(self):
475     """Sets the child's process group.
476
477     """
478     assert self.pid, "Can't be called in child process"
479
480     # Avoid race condition by setting child's process group (as good as
481     # possible in Python) before sending signals to child. For an
482     # explanation, see preexec function for child.
483     try:
484       os.setpgid(self.pid, self.pid)
485     except EnvironmentError, err:
486       # If the child process was faster we receive EPERM or EACCES
487       if err.errno not in (errno.EPERM, errno.EACCES):
488         raise
489
490   def Kill(self, signum):
491     """Sends signal to child process.
492
493     """
494     logging.info("Sending signal %s to child process", signum)
495     os.killpg(self.pid, signum)
496
497   def ForceQuit(self):
498     """Ensure child process is no longer running.
499
500     """
501     # Final check if child process is still alive
502     if utils.RetryOnSignal(self.poll) is None:
503       logging.error("Child process still alive, sending SIGKILL")
504       self.Kill(signal.SIGKILL)
505       utils.RetryOnSignal(self.wait)
506
507
508 def main():
509   """Main function.
510
511   """
512   # Option parsing
513   (status_file_path, mode) = ParseOptions()
514
515   # Configure logging
516   child_logger = SetupLogging()
517
518   status_file = StatusFile(status_file_path)
519   try:
520     try:
521       # Pipe to receive socat's stderr output
522       (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
523
524       # Get child process command
525       cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd)
526       cmd = cmd_builder.GetCommand()
527
528       logging.debug("Starting command %r", cmd)
529
530       # Start child process
531       child = ChildProcess(cmd, [socat_stderr_write_fd])
532       try:
533         def _ForwardSignal(signum, _):
534           """Forwards signals to child process.
535
536           """
537           child.Kill(signum)
538
539         signal_wakeup = utils.SignalWakeupFd()
540         try:
541           # TODO: There is a race condition between starting the child and
542           # handling the signals here. While there might be a way to work around
543           # it by registering the handlers before starting the child and
544           # deferring sent signals until the child is available, doing so can be
545           # complicated.
546           signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
547                                                handler_fn=_ForwardSignal,
548                                                wakeup=signal_wakeup)
549           try:
550             # Close child's side
551             utils.RetryOnSignal(os.close, socat_stderr_write_fd)
552
553             if ProcessChildIO(child, socat_stderr_read_fd, status_file,
554                               child_logger, signal_wakeup, signal_handler,
555                               mode):
556               # The child closed all its file descriptors and there was no
557               # signal
558               # TODO: Implement timeout instead of waiting indefinitely
559               utils.RetryOnSignal(child.wait)
560           finally:
561             signal_handler.Reset()
562         finally:
563           signal_wakeup.Reset()
564       finally:
565         child.ForceQuit()
566
567       if child.returncode == 0:
568         errmsg = None
569       elif child.returncode < 0:
570         errmsg = "Exited due to signal %s" % (-child.returncode, )
571       else:
572         errmsg = "Exited with status %s" % (child.returncode, )
573
574       status_file.SetExitStatus(child.returncode, errmsg)
575     except Exception, err: # pylint: disable-msg=W0703
576       logging.exception("Unhandled error occurred")
577       status_file.SetExitStatus(constants.EXIT_FAILURE,
578                                 "Unhandled error occurred: %s" % (err, ))
579
580     if status_file.ExitStatusIsSuccess():
581       sys.exit(constants.EXIT_SUCCESS)
582
583     sys.exit(constants.EXIT_FAILURE)
584   finally:
585     status_file.Update(True)
586
587
588 if __name__ == "__main__":
589   main()