d137b57e97836ae47d63c6a81437c008dbc2bc28
[ganeti-local] / daemons / import-export
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Import/export daemon.
23
24 """
25
26 # pylint: disable-msg=C0103
27 # C0103: Invalid name import-export
28
29 import errno
30 import logging
31 import optparse
32 import os
33 import select
34 import signal
35 import subprocess
36 import sys
37 import time
38 import math
39
40 from ganeti import constants
41 from ganeti import cli
42 from ganeti import utils
43 from ganeti import errors
44 from ganeti import serializer
45 from ganeti import objects
46 from ganeti import impexpd
47 from ganeti import netutils
48
49
50 #: How many lines to keep in the status file
51 MAX_RECENT_OUTPUT_LINES = 20
52
53 #: Don't update status file more than once every 5 seconds (unless forced)
54 MIN_UPDATE_INTERVAL = 5.0
55
56 #: Give child process up to 5 seconds to exit after sending a signal
57 CHILD_LINGER_TIMEOUT = 5.0
58
59 #: How long to wait for a connection to be established
60 DEFAULT_CONNECT_TIMEOUT = 60
61
62 #: Get dd(1) statistics every few seconds
63 DD_STATISTICS_INTERVAL = 5.0
64
65 #: Seconds for throughput calculation
66 DD_THROUGHPUT_INTERVAL = 60.0
67
68 #: Number of samples for throughput calculation
69 DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) /
70                                       DD_STATISTICS_INTERVAL))
71
72
73 # Global variable for options
74 options = None
75
76
77 def SetupLogging():
78   """Configures the logging module.
79
80   """
81   formatter = logging.Formatter("%(asctime)s: %(message)s")
82
83   stderr_handler = logging.StreamHandler()
84   stderr_handler.setFormatter(formatter)
85   stderr_handler.setLevel(logging.NOTSET)
86
87   root_logger = logging.getLogger("")
88   root_logger.addHandler(stderr_handler)
89
90   if options.debug:
91     root_logger.setLevel(logging.NOTSET)
92   elif options.verbose:
93     root_logger.setLevel(logging.INFO)
94   else:
95     root_logger.setLevel(logging.ERROR)
96
97   # Create special logger for child process output
98   child_logger = logging.Logger("child output")
99   child_logger.addHandler(stderr_handler)
100   child_logger.setLevel(logging.NOTSET)
101
102   return child_logger
103
104
105 class StatusFile:
106   """Status file manager.
107
108   """
109   def __init__(self, path):
110     """Initializes class.
111
112     """
113     self._path = path
114     self._data = objects.ImportExportStatus(ctime=time.time(),
115                                             mtime=None,
116                                             recent_output=[])
117
118   def AddRecentOutput(self, line):
119     """Adds a new line of recent output.
120
121     """
122     self._data.recent_output.append(line)
123
124     # Remove old lines
125     del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
126
127   def SetListenPort(self, port):
128     """Sets the port the daemon is listening on.
129
130     @type port: int
131     @param port: TCP/UDP port
132
133     """
134     assert isinstance(port, (int, long)) and 0 < port < 2**16
135     self._data.listen_port = port
136
137   def GetListenPort(self):
138     """Returns the port the daemon is listening on.
139
140     """
141     return self._data.listen_port
142
143   def SetConnected(self):
144     """Sets the connected flag.
145
146     """
147     self._data.connected = True
148
149   def GetConnected(self):
150     """Determines whether the daemon is connected.
151
152     """
153     return self._data.connected
154
155   def SetProgress(self, mbytes, throughput, percent, eta):
156     """Sets how much data has been transferred so far.
157
158     @type mbytes: number
159     @param mbytes: Transferred amount of data in MiB.
160     @type throughput: float
161     @param throughput: MiB/second
162     @type percent: number
163     @param percent: Percent processed
164     @type eta: number
165     @param eta: Expected number of seconds until done
166
167     """
168     self._data.progress_mbytes = mbytes
169     self._data.progress_throughput = throughput
170     self._data.progress_percent = percent
171     self._data.progress_eta = eta
172
173   def SetExitStatus(self, exit_status, error_message):
174     """Sets the exit status and an error message.
175
176     """
177     # Require error message when status isn't 0
178     assert exit_status == 0 or error_message
179
180     self._data.exit_status = exit_status
181     self._data.error_message = error_message
182
183   def ExitStatusIsSuccess(self):
184     """Returns whether the exit status means "success".
185
186     """
187     return not bool(self._data.error_message)
188
189   def Update(self, force):
190     """Updates the status file.
191
192     @type force: bool
193     @param force: Write status file in any case, not only when minimum interval
194                   is expired
195
196     """
197     if not (force or
198             self._data.mtime is None or
199             time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
200       return
201
202     logging.debug("Updating status file %s", self._path)
203
204     self._data.mtime = time.time()
205     utils.WriteFile(self._path,
206                     data=serializer.DumpJson(self._data.ToDict(), indent=True),
207                     mode=0400)
208
209
210 def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
211                    dd_pid_read_fd, exp_size_read_fd, status_file, child_logger,
212                    signal_notify, signal_handler, mode):
213   """Handles the child processes' output.
214
215   """
216   assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
217          "Other signals are not handled in this function"
218
219   # Buffer size 0 is important, otherwise .read() with a specified length
220   # might buffer data while poll(2) won't mark its file descriptor as
221   # readable again.
222   socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
223   dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
224   dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
225   exp_size_read = os.fdopen(exp_size_read_fd, "r", 0)
226
227   tp_samples = DD_THROUGHPUT_SAMPLES
228
229   if options.exp_size == constants.IE_CUSTOM_SIZE:
230     exp_size = None
231   else:
232     exp_size = options.exp_size
233
234   child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
235                                            child_logger, tp_samples,
236                                            exp_size)
237   try:
238     fdmap = {
239       child.stderr.fileno():
240         (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)),
241       socat_stderr_read.fileno():
242         (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)),
243       dd_pid_read.fileno():
244         (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
245       dd_stderr_read.fileno():
246         (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
247       exp_size_read.fileno():
248         (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)),
249       signal_notify.fileno(): (signal_notify, None),
250       }
251
252     poller = select.poll()
253     for fd in fdmap:
254       utils.SetNonblockFlag(fd, True)
255       poller.register(fd, select.POLLIN)
256
257     if options.connect_timeout and mode == constants.IEM_IMPORT:
258       listen_timeout = utils.RunningTimeout(options.connect_timeout, True)
259     else:
260       listen_timeout = None
261
262     exit_timeout = None
263     dd_stats_timeout = None
264
265     while True:
266       # Break out of loop if only signal notify FD is left
267       if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
268         break
269
270       timeout = None
271
272       if listen_timeout and not exit_timeout:
273         if status_file.GetConnected():
274           listen_timeout = None
275         elif listen_timeout.Remaining() < 0:
276           logging.info("Child process didn't establish connection in time")
277           child.Kill(signal.SIGTERM)
278           exit_timeout = \
279             utils.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
280           # Next block will calculate timeout
281         else:
282           # Not yet connected, check again in a second
283           timeout = 1000
284
285       if exit_timeout:
286         timeout = exit_timeout.Remaining() * 1000
287         if timeout < 0:
288           logging.info("Child process didn't exit in time")
289           break
290
291       if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0:
292         notify_status = child_io_proc.NotifyDd()
293         if notify_status:
294           # Schedule next notification
295           dd_stats_timeout = utils.RunningTimeout(DD_STATISTICS_INTERVAL, True)
296         else:
297           # Try again soon (dd isn't ready yet)
298           dd_stats_timeout = utils.RunningTimeout(1.0, True)
299
300       if dd_stats_timeout:
301         dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000)
302
303         if timeout is None:
304           timeout = dd_timeout
305         else:
306           timeout = min(timeout, dd_timeout)
307
308       for fd, event in utils.RetryOnSignal(poller.poll, timeout):
309         if event & (select.POLLIN | event & select.POLLPRI):
310           (from_, to) = fdmap[fd]
311
312           # Read up to 1 KB of data
313           data = from_.read(1024)
314           if data:
315             if to:
316               to.write(data)
317             elif fd == signal_notify.fileno():
318               # Signal handling
319               if signal_handler.called:
320                 signal_handler.Clear()
321                 if exit_timeout:
322                   logging.info("Child process still has about %0.2f seconds"
323                                " to exit", exit_timeout.Remaining())
324                 else:
325                   logging.info("Giving child process %0.2f seconds to exit",
326                                CHILD_LINGER_TIMEOUT)
327                   exit_timeout = \
328                     utils.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
329           else:
330             poller.unregister(fd)
331             del fdmap[fd]
332
333         elif event & (select.POLLNVAL | select.POLLHUP |
334                       select.POLLERR):
335           poller.unregister(fd)
336           del fdmap[fd]
337
338       child_io_proc.FlushAll()
339
340     # If there was a timeout calculator, we were waiting for the child to
341     # finish, e.g. due to a signal
342     return not bool(exit_timeout)
343   finally:
344     child_io_proc.CloseAll()
345
346
347 def ParseOptions():
348   """Parses the options passed to the program.
349
350   @return: Arguments to program
351
352   """
353   global options # pylint: disable-msg=W0603
354
355   parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
356                                         (constants.IEM_IMPORT,
357                                          constants.IEM_EXPORT)))
358   parser.add_option(cli.DEBUG_OPT)
359   parser.add_option(cli.VERBOSE_OPT)
360   parser.add_option("--key", dest="key", action="store", type="string",
361                     help="RSA key file")
362   parser.add_option("--cert", dest="cert", action="store", type="string",
363                     help="X509 certificate file")
364   parser.add_option("--ca", dest="ca", action="store", type="string",
365                     help="X509 CA file")
366   parser.add_option("--bind", dest="bind", action="store", type="string",
367                     help="Bind address")
368   parser.add_option("--host", dest="host", action="store", type="string",
369                     help="Remote hostname")
370   parser.add_option("--port", dest="port", action="store", type="int",
371                     help="Remote port")
372   parser.add_option("--connect-retries", dest="connect_retries", action="store",
373                     type="int", default=0,
374                     help=("How many times the connection should be retried"
375                           " (export only)"))
376   parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
377                     type="int", default=DEFAULT_CONNECT_TIMEOUT,
378                     help="Timeout for connection to be established (seconds)")
379   parser.add_option("--compress", dest="compress", action="store",
380                     type="choice", help="Compression method",
381                     metavar="[%s]" % "|".join(constants.IEC_ALL),
382                     choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
383   parser.add_option("--expected-size", dest="exp_size", action="store",
384                     type="string", default=None,
385                     help="Expected import/export size (MiB)")
386   parser.add_option("--magic", dest="magic", action="store",
387                     type="string", default=None, help="Magic string")
388   parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
389                     type="string", help="Command prefix")
390   parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
391                     type="string", help="Command suffix")
392
393   (options, args) = parser.parse_args()
394
395   if len(args) != 2:
396     # Won't return
397     parser.error("Expected exactly two arguments")
398
399   (status_file_path, mode) = args
400
401   if mode not in (constants.IEM_IMPORT,
402                   constants.IEM_EXPORT):
403     # Won't return
404     parser.error("Invalid mode: %s" % mode)
405
406   # Normalize and check parameters
407   if options.host is not None:
408     try:
409       options.host = netutils.Hostname.GetNormalizedName(options.host)
410     except errors.OpPrereqError, err:
411       parser.error("Invalid hostname '%s': %s" % (options.host, err))
412
413   if options.port is not None:
414     options.port = utils.ValidateServiceName(options.port)
415
416   if (options.exp_size is not None and
417       options.exp_size != constants.IE_CUSTOM_SIZE):
418     try:
419       options.exp_size = int(options.exp_size)
420     except (ValueError, TypeError), err:
421       # Won't return
422       parser.error("Invalid value for --expected-size: %s (%s)" %
423                    (options.exp_size, err))
424
425   if not (options.magic is None or constants.IE_MAGIC_RE.match(options.magic)):
426     parser.error("Magic must match regular expression %s" %
427                  constants.IE_MAGIC_RE.pattern)
428
429   return (status_file_path, mode)
430
431
432 class ChildProcess(subprocess.Popen):
433   def __init__(self, env, cmd, noclose_fds):
434     """Initializes this class.
435
436     """
437     self._noclose_fds = noclose_fds
438
439     # Not using close_fds because doing so would also close the socat stderr
440     # pipe, which we still need.
441     subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False,
442                               stderr=subprocess.PIPE, stdout=None, stdin=None,
443                               preexec_fn=self._ChildPreexec)
444     self._SetProcessGroup()
445
446   def _ChildPreexec(self):
447     """Called before child executable is execve'd.
448
449     """
450     # Move to separate process group. By sending a signal to its process group
451     # we can kill the child process and all grandchildren.
452     os.setpgid(0, 0)
453
454     # Close almost all file descriptors
455     utils.CloseFDs(noclose_fds=self._noclose_fds)
456
457   def _SetProcessGroup(self):
458     """Sets the child's process group.
459
460     """
461     assert self.pid, "Can't be called in child process"
462
463     # Avoid race condition by setting child's process group (as good as
464     # possible in Python) before sending signals to child. For an
465     # explanation, see preexec function for child.
466     try:
467       os.setpgid(self.pid, self.pid)
468     except EnvironmentError, err:
469       # If the child process was faster we receive EPERM or EACCES
470       if err.errno not in (errno.EPERM, errno.EACCES):
471         raise
472
473   def Kill(self, signum):
474     """Sends signal to child process.
475
476     """
477     logging.info("Sending signal %s to child process", signum)
478     utils.IgnoreProcessNotFound(os.killpg, self.pid, signum)
479
480   def ForceQuit(self):
481     """Ensure child process is no longer running.
482
483     """
484     # Final check if child process is still alive
485     if utils.RetryOnSignal(self.poll) is None:
486       logging.error("Child process still alive, sending SIGKILL")
487       self.Kill(signal.SIGKILL)
488       utils.RetryOnSignal(self.wait)
489
490
491 def main():
492   """Main function.
493
494   """
495   # Option parsing
496   (status_file_path, mode) = ParseOptions()
497
498   # Configure logging
499   child_logger = SetupLogging()
500
501   status_file = StatusFile(status_file_path)
502   try:
503     try:
504       # Pipe to receive socat's stderr output
505       (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
506
507       # Pipe to receive dd's stderr output
508       (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe()
509
510       # Pipe to receive dd's PID
511       (dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
512
513       # Pipe to receive size predicted by export script
514       (exp_size_read_fd, exp_size_write_fd) = os.pipe()
515
516       # Get child process command
517       cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
518                                            dd_stderr_write_fd, dd_pid_write_fd)
519       cmd = cmd_builder.GetCommand()
520
521       # Prepare command environment
522       cmd_env = os.environ.copy()
523
524       if options.exp_size == constants.IE_CUSTOM_SIZE:
525         cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd)
526
527       logging.debug("Starting command %r", cmd)
528
529       # Start child process
530       child = ChildProcess(cmd_env, cmd,
531                            [socat_stderr_write_fd, dd_stderr_write_fd,
532                             dd_pid_write_fd, exp_size_write_fd])
533       try:
534         def _ForwardSignal(signum, _):
535           """Forwards signals to child process.
536
537           """
538           child.Kill(signum)
539
540         signal_wakeup = utils.SignalWakeupFd()
541         try:
542           # TODO: There is a race condition between starting the child and
543           # handling the signals here. While there might be a way to work around
544           # it by registering the handlers before starting the child and
545           # deferring sent signals until the child is available, doing so can be
546           # complicated.
547           signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
548                                                handler_fn=_ForwardSignal,
549                                                wakeup=signal_wakeup)
550           try:
551             # Close child's side
552             utils.RetryOnSignal(os.close, socat_stderr_write_fd)
553             utils.RetryOnSignal(os.close, dd_stderr_write_fd)
554             utils.RetryOnSignal(os.close, dd_pid_write_fd)
555             utils.RetryOnSignal(os.close, exp_size_write_fd)
556
557             if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
558                               dd_pid_read_fd, exp_size_read_fd,
559                               status_file, child_logger,
560                               signal_wakeup, signal_handler, mode):
561               # The child closed all its file descriptors and there was no
562               # signal
563               # TODO: Implement timeout instead of waiting indefinitely
564               utils.RetryOnSignal(child.wait)
565           finally:
566             signal_handler.Reset()
567         finally:
568           signal_wakeup.Reset()
569       finally:
570         child.ForceQuit()
571
572       if child.returncode == 0:
573         errmsg = None
574       elif child.returncode < 0:
575         errmsg = "Exited due to signal %s" % (-child.returncode, )
576       else:
577         errmsg = "Exited with status %s" % (child.returncode, )
578
579       status_file.SetExitStatus(child.returncode, errmsg)
580     except Exception, err: # pylint: disable-msg=W0703
581       logging.exception("Unhandled error occurred")
582       status_file.SetExitStatus(constants.EXIT_FAILURE,
583                                 "Unhandled error occurred: %s" % (err, ))
584
585     if status_file.ExitStatusIsSuccess():
586       sys.exit(constants.EXIT_SUCCESS)
587
588     sys.exit(constants.EXIT_FAILURE)
589   finally:
590     status_file.Update(True)
591
592
593 if __name__ == "__main__":
594   main()