Move ganeti-rapi to ganeti.server.rapi
[ganeti-local] / daemons / import-export
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Import/export daemon.
23
24 """
25
26 # pylint: disable-msg=C0103
27 # C0103: Invalid name import-export
28
29 import errno
30 import logging
31 import optparse
32 import os
33 import select
34 import signal
35 import subprocess
36 import sys
37 import time
38 import math
39
40 from ganeti import constants
41 from ganeti import cli
42 from ganeti import utils
43 from ganeti import errors
44 from ganeti import serializer
45 from ganeti import objects
46 from ganeti import locking
47 from ganeti import impexpd
48 from ganeti import netutils
49
50
51 #: How many lines to keep in the status file
52 MAX_RECENT_OUTPUT_LINES = 20
53
54 #: Don't update status file more than once every 5 seconds (unless forced)
55 MIN_UPDATE_INTERVAL = 5.0
56
57 #: Give child process up to 5 seconds to exit after sending a signal
58 CHILD_LINGER_TIMEOUT = 5.0
59
60 #: How long to wait for a connection to be established
61 DEFAULT_CONNECT_TIMEOUT = 60
62
63 #: Get dd(1) statistics every few seconds
64 DD_STATISTICS_INTERVAL = 5.0
65
66 #: Seconds for throughput calculation
67 DD_THROUGHPUT_INTERVAL = 60.0
68
69 #: Number of samples for throughput calculation
70 DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) /
71                                       DD_STATISTICS_INTERVAL))
72
73
74 # Global variable for options
75 options = None
76
77
78 def SetupLogging():
79   """Configures the logging module.
80
81   """
82   formatter = logging.Formatter("%(asctime)s: %(message)s")
83
84   stderr_handler = logging.StreamHandler()
85   stderr_handler.setFormatter(formatter)
86   stderr_handler.setLevel(logging.NOTSET)
87
88   root_logger = logging.getLogger("")
89   root_logger.addHandler(stderr_handler)
90
91   if options.debug:
92     root_logger.setLevel(logging.NOTSET)
93   elif options.verbose:
94     root_logger.setLevel(logging.INFO)
95   else:
96     root_logger.setLevel(logging.ERROR)
97
98   # Create special logger for child process output
99   child_logger = logging.Logger("child output")
100   child_logger.addHandler(stderr_handler)
101   child_logger.setLevel(logging.NOTSET)
102
103   return child_logger
104
105
106 class StatusFile:
107   """Status file manager.
108
109   """
110   def __init__(self, path):
111     """Initializes class.
112
113     """
114     self._path = path
115     self._data = objects.ImportExportStatus(ctime=time.time(),
116                                             mtime=None,
117                                             recent_output=[])
118
119   def AddRecentOutput(self, line):
120     """Adds a new line of recent output.
121
122     """
123     self._data.recent_output.append(line)
124
125     # Remove old lines
126     del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
127
128   def SetListenPort(self, port):
129     """Sets the port the daemon is listening on.
130
131     @type port: int
132     @param port: TCP/UDP port
133
134     """
135     assert isinstance(port, (int, long)) and 0 < port < 2**16
136     self._data.listen_port = port
137
138   def GetListenPort(self):
139     """Returns the port the daemon is listening on.
140
141     """
142     return self._data.listen_port
143
144   def SetConnected(self):
145     """Sets the connected flag.
146
147     """
148     self._data.connected = True
149
150   def GetConnected(self):
151     """Determines whether the daemon is connected.
152
153     """
154     return self._data.connected
155
156   def SetProgress(self, mbytes, throughput, percent, eta):
157     """Sets how much data has been transferred so far.
158
159     @type mbytes: number
160     @param mbytes: Transferred amount of data in MiB.
161     @type throughput: float
162     @param throughput: MiB/second
163     @type percent: number
164     @param percent: Percent processed
165     @type eta: number
166     @param eta: Expected number of seconds until done
167
168     """
169     self._data.progress_mbytes = mbytes
170     self._data.progress_throughput = throughput
171     self._data.progress_percent = percent
172     self._data.progress_eta = eta
173
174   def SetExitStatus(self, exit_status, error_message):
175     """Sets the exit status and an error message.
176
177     """
178     # Require error message when status isn't 0
179     assert exit_status == 0 or error_message
180
181     self._data.exit_status = exit_status
182     self._data.error_message = error_message
183
184   def ExitStatusIsSuccess(self):
185     """Returns whether the exit status means "success".
186
187     """
188     return not bool(self._data.error_message)
189
190   def Update(self, force):
191     """Updates the status file.
192
193     @type force: bool
194     @param force: Write status file in any case, not only when minimum interval
195                   is expired
196
197     """
198     if not (force or
199             self._data.mtime is None or
200             time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
201       return
202
203     logging.debug("Updating status file %s", self._path)
204
205     self._data.mtime = time.time()
206     utils.WriteFile(self._path,
207                     data=serializer.DumpJson(self._data.ToDict(), indent=True),
208                     mode=0400)
209
210
211 def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
212                    dd_pid_read_fd, exp_size_read_fd, status_file, child_logger,
213                    signal_notify, signal_handler, mode):
214   """Handles the child processes' output.
215
216   """
217   assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
218          "Other signals are not handled in this function"
219
220   # Buffer size 0 is important, otherwise .read() with a specified length
221   # might buffer data while poll(2) won't mark its file descriptor as
222   # readable again.
223   socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
224   dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
225   dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
226   exp_size_read = os.fdopen(exp_size_read_fd, "r", 0)
227
228   tp_samples = DD_THROUGHPUT_SAMPLES
229
230   if options.exp_size == constants.IE_CUSTOM_SIZE:
231     exp_size = None
232   else:
233     exp_size = options.exp_size
234
235   child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
236                                            child_logger, tp_samples,
237                                            exp_size)
238   try:
239     fdmap = {
240       child.stderr.fileno():
241         (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)),
242       socat_stderr_read.fileno():
243         (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)),
244       dd_pid_read.fileno():
245         (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
246       dd_stderr_read.fileno():
247         (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
248       exp_size_read.fileno():
249         (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)),
250       signal_notify.fileno(): (signal_notify, None),
251       }
252
253     poller = select.poll()
254     for fd in fdmap:
255       utils.SetNonblockFlag(fd, True)
256       poller.register(fd, select.POLLIN)
257
258     if options.connect_timeout and mode == constants.IEM_IMPORT:
259       listen_timeout = locking.RunningTimeout(options.connect_timeout, True)
260     else:
261       listen_timeout = None
262
263     exit_timeout = None
264     dd_stats_timeout = None
265
266     while True:
267       # Break out of loop if only signal notify FD is left
268       if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
269         break
270
271       timeout = None
272
273       if listen_timeout and not exit_timeout:
274         if status_file.GetConnected():
275           listen_timeout = None
276         elif listen_timeout.Remaining() < 0:
277           logging.info("Child process didn't establish connection in time")
278           child.Kill(signal.SIGTERM)
279           exit_timeout = \
280             locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
281           # Next block will calculate timeout
282         else:
283           # Not yet connected, check again in a second
284           timeout = 1000
285
286       if exit_timeout:
287         timeout = exit_timeout.Remaining() * 1000
288         if timeout < 0:
289           logging.info("Child process didn't exit in time")
290           break
291
292       if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0:
293         notify_status = child_io_proc.NotifyDd()
294         if notify_status:
295           # Schedule next notification
296           dd_stats_timeout = locking.RunningTimeout(DD_STATISTICS_INTERVAL,
297                                                     True)
298         else:
299           # Try again soon (dd isn't ready yet)
300           dd_stats_timeout = locking.RunningTimeout(1.0, True)
301
302       if dd_stats_timeout:
303         dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000)
304
305         if timeout is None:
306           timeout = dd_timeout
307         else:
308           timeout = min(timeout, dd_timeout)
309
310       for fd, event in utils.RetryOnSignal(poller.poll, timeout):
311         if event & (select.POLLIN | event & select.POLLPRI):
312           (from_, to) = fdmap[fd]
313
314           # Read up to 1 KB of data
315           data = from_.read(1024)
316           if data:
317             if to:
318               to.write(data)
319             elif fd == signal_notify.fileno():
320               # Signal handling
321               if signal_handler.called:
322                 signal_handler.Clear()
323                 if exit_timeout:
324                   logging.info("Child process still has about %0.2f seconds"
325                                " to exit", exit_timeout.Remaining())
326                 else:
327                   logging.info("Giving child process %0.2f seconds to exit",
328                                CHILD_LINGER_TIMEOUT)
329                   exit_timeout = \
330                     locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
331           else:
332             poller.unregister(fd)
333             del fdmap[fd]
334
335         elif event & (select.POLLNVAL | select.POLLHUP |
336                       select.POLLERR):
337           poller.unregister(fd)
338           del fdmap[fd]
339
340       child_io_proc.FlushAll()
341
342     # If there was a timeout calculator, we were waiting for the child to
343     # finish, e.g. due to a signal
344     return not bool(exit_timeout)
345   finally:
346     child_io_proc.CloseAll()
347
348
349 def ParseOptions():
350   """Parses the options passed to the program.
351
352   @return: Arguments to program
353
354   """
355   global options # pylint: disable-msg=W0603
356
357   parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
358                                         (constants.IEM_IMPORT,
359                                          constants.IEM_EXPORT)))
360   parser.add_option(cli.DEBUG_OPT)
361   parser.add_option(cli.VERBOSE_OPT)
362   parser.add_option("--key", dest="key", action="store", type="string",
363                     help="RSA key file")
364   parser.add_option("--cert", dest="cert", action="store", type="string",
365                     help="X509 certificate file")
366   parser.add_option("--ca", dest="ca", action="store", type="string",
367                     help="X509 CA file")
368   parser.add_option("--bind", dest="bind", action="store", type="string",
369                     help="Bind address")
370   parser.add_option("--host", dest="host", action="store", type="string",
371                     help="Remote hostname")
372   parser.add_option("--port", dest="port", action="store", type="int",
373                     help="Remote port")
374   parser.add_option("--connect-retries", dest="connect_retries", action="store",
375                     type="int", default=0,
376                     help=("How many times the connection should be retried"
377                           " (export only)"))
378   parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
379                     type="int", default=DEFAULT_CONNECT_TIMEOUT,
380                     help="Timeout for connection to be established (seconds)")
381   parser.add_option("--compress", dest="compress", action="store",
382                     type="choice", help="Compression method",
383                     metavar="[%s]" % "|".join(constants.IEC_ALL),
384                     choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
385   parser.add_option("--expected-size", dest="exp_size", action="store",
386                     type="string", default=None,
387                     help="Expected import/export size (MiB)")
388   parser.add_option("--magic", dest="magic", action="store",
389                     type="string", default=None, help="Magic string")
390   parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
391                     type="string", help="Command prefix")
392   parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
393                     type="string", help="Command suffix")
394
395   (options, args) = parser.parse_args()
396
397   if len(args) != 2:
398     # Won't return
399     parser.error("Expected exactly two arguments")
400
401   (status_file_path, mode) = args
402
403   if mode not in (constants.IEM_IMPORT,
404                   constants.IEM_EXPORT):
405     # Won't return
406     parser.error("Invalid mode: %s" % mode)
407
408   # Normalize and check parameters
409   if options.host is not None:
410     try:
411       options.host = netutils.Hostname.GetNormalizedName(options.host)
412     except errors.OpPrereqError, err:
413       parser.error("Invalid hostname '%s': %s" % (options.host, err))
414
415   if options.port is not None:
416     options.port = utils.ValidateServiceName(options.port)
417
418   if (options.exp_size is not None and
419       options.exp_size != constants.IE_CUSTOM_SIZE):
420     try:
421       options.exp_size = int(options.exp_size)
422     except (ValueError, TypeError), err:
423       # Won't return
424       parser.error("Invalid value for --expected-size: %s (%s)" %
425                    (options.exp_size, err))
426
427   if not (options.magic is None or constants.IE_MAGIC_RE.match(options.magic)):
428     parser.error("Magic must match regular expression %s" %
429                  constants.IE_MAGIC_RE.pattern)
430
431   return (status_file_path, mode)
432
433
434 class ChildProcess(subprocess.Popen):
435   def __init__(self, env, cmd, noclose_fds):
436     """Initializes this class.
437
438     """
439     self._noclose_fds = noclose_fds
440
441     # Not using close_fds because doing so would also close the socat stderr
442     # pipe, which we still need.
443     subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False,
444                               stderr=subprocess.PIPE, stdout=None, stdin=None,
445                               preexec_fn=self._ChildPreexec)
446     self._SetProcessGroup()
447
448   def _ChildPreexec(self):
449     """Called before child executable is execve'd.
450
451     """
452     # Move to separate process group. By sending a signal to its process group
453     # we can kill the child process and all grandchildren.
454     os.setpgid(0, 0)
455
456     # Close almost all file descriptors
457     utils.CloseFDs(noclose_fds=self._noclose_fds)
458
459   def _SetProcessGroup(self):
460     """Sets the child's process group.
461
462     """
463     assert self.pid, "Can't be called in child process"
464
465     # Avoid race condition by setting child's process group (as good as
466     # possible in Python) before sending signals to child. For an
467     # explanation, see preexec function for child.
468     try:
469       os.setpgid(self.pid, self.pid)
470     except EnvironmentError, err:
471       # If the child process was faster we receive EPERM or EACCES
472       if err.errno not in (errno.EPERM, errno.EACCES):
473         raise
474
475   def Kill(self, signum):
476     """Sends signal to child process.
477
478     """
479     logging.info("Sending signal %s to child process", signum)
480     utils.IgnoreProcessNotFound(os.killpg, self.pid, signum)
481
482   def ForceQuit(self):
483     """Ensure child process is no longer running.
484
485     """
486     # Final check if child process is still alive
487     if utils.RetryOnSignal(self.poll) is None:
488       logging.error("Child process still alive, sending SIGKILL")
489       self.Kill(signal.SIGKILL)
490       utils.RetryOnSignal(self.wait)
491
492
493 def main():
494   """Main function.
495
496   """
497   # Option parsing
498   (status_file_path, mode) = ParseOptions()
499
500   # Configure logging
501   child_logger = SetupLogging()
502
503   status_file = StatusFile(status_file_path)
504   try:
505     try:
506       # Pipe to receive socat's stderr output
507       (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
508
509       # Pipe to receive dd's stderr output
510       (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe()
511
512       # Pipe to receive dd's PID
513       (dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
514
515       # Pipe to receive size predicted by export script
516       (exp_size_read_fd, exp_size_write_fd) = os.pipe()
517
518       # Get child process command
519       cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
520                                            dd_stderr_write_fd, dd_pid_write_fd)
521       cmd = cmd_builder.GetCommand()
522
523       # Prepare command environment
524       cmd_env = os.environ.copy()
525
526       if options.exp_size == constants.IE_CUSTOM_SIZE:
527         cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd)
528
529       logging.debug("Starting command %r", cmd)
530
531       # Start child process
532       child = ChildProcess(cmd_env, cmd,
533                            [socat_stderr_write_fd, dd_stderr_write_fd,
534                             dd_pid_write_fd, exp_size_write_fd])
535       try:
536         def _ForwardSignal(signum, _):
537           """Forwards signals to child process.
538
539           """
540           child.Kill(signum)
541
542         signal_wakeup = utils.SignalWakeupFd()
543         try:
544           # TODO: There is a race condition between starting the child and
545           # handling the signals here. While there might be a way to work around
546           # it by registering the handlers before starting the child and
547           # deferring sent signals until the child is available, doing so can be
548           # complicated.
549           signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
550                                                handler_fn=_ForwardSignal,
551                                                wakeup=signal_wakeup)
552           try:
553             # Close child's side
554             utils.RetryOnSignal(os.close, socat_stderr_write_fd)
555             utils.RetryOnSignal(os.close, dd_stderr_write_fd)
556             utils.RetryOnSignal(os.close, dd_pid_write_fd)
557             utils.RetryOnSignal(os.close, exp_size_write_fd)
558
559             if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
560                               dd_pid_read_fd, exp_size_read_fd,
561                               status_file, child_logger,
562                               signal_wakeup, signal_handler, mode):
563               # The child closed all its file descriptors and there was no
564               # signal
565               # TODO: Implement timeout instead of waiting indefinitely
566               utils.RetryOnSignal(child.wait)
567           finally:
568             signal_handler.Reset()
569         finally:
570           signal_wakeup.Reset()
571       finally:
572         child.ForceQuit()
573
574       if child.returncode == 0:
575         errmsg = None
576       elif child.returncode < 0:
577         errmsg = "Exited due to signal %s" % (-child.returncode, )
578       else:
579         errmsg = "Exited with status %s" % (child.returncode, )
580
581       status_file.SetExitStatus(child.returncode, errmsg)
582     except Exception, err: # pylint: disable-msg=W0703
583       logging.exception("Unhandled error occurred")
584       status_file.SetExitStatus(constants.EXIT_FAILURE,
585                                 "Unhandled error occurred: %s" % (err, ))
586
587     if status_file.ExitStatusIsSuccess():
588       sys.exit(constants.EXIT_SUCCESS)
589
590     sys.exit(constants.EXIT_FAILURE)
591   finally:
592     status_file.Update(True)
593
594
595 if __name__ == "__main__":
596   main()