Expose bulk parameter for GetJobs in RAPI client
[ganeti-local] / daemons / import-export
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Import/export daemon.
23
24 """
25
26 # pylint: disable=C0103
27 # C0103: Invalid name import-export
28
29 import errno
30 import logging
31 import optparse
32 import os
33 import select
34 import signal
35 import subprocess
36 import sys
37 import time
38 import math
39
40 from ganeti import constants
41 from ganeti import cli
42 from ganeti import utils
43 from ganeti import errors
44 from ganeti import serializer
45 from ganeti import objects
46 from ganeti import impexpd
47 from ganeti import netutils
48
49
50 #: How many lines to keep in the status file
51 MAX_RECENT_OUTPUT_LINES = 20
52
53 #: Don't update status file more than once every 5 seconds (unless forced)
54 MIN_UPDATE_INTERVAL = 5.0
55
56 #: How long to wait for a connection to be established
57 DEFAULT_CONNECT_TIMEOUT = 60
58
59 #: Get dd(1) statistics every few seconds
60 DD_STATISTICS_INTERVAL = 5.0
61
62 #: Seconds for throughput calculation
63 DD_THROUGHPUT_INTERVAL = 60.0
64
65 #: Number of samples for throughput calculation
66 DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) /
67                                       DD_STATISTICS_INTERVAL))
68
69
70 # Global variable for options
71 options = None
72
73
74 def SetupLogging():
75   """Configures the logging module.
76
77   """
78   formatter = logging.Formatter("%(asctime)s: %(message)s")
79
80   stderr_handler = logging.StreamHandler()
81   stderr_handler.setFormatter(formatter)
82   stderr_handler.setLevel(logging.NOTSET)
83
84   root_logger = logging.getLogger("")
85   root_logger.addHandler(stderr_handler)
86
87   if options.debug:
88     root_logger.setLevel(logging.NOTSET)
89   elif options.verbose:
90     root_logger.setLevel(logging.INFO)
91   else:
92     root_logger.setLevel(logging.ERROR)
93
94   # Create special logger for child process output
95   child_logger = logging.Logger("child output")
96   child_logger.addHandler(stderr_handler)
97   child_logger.setLevel(logging.NOTSET)
98
99   return child_logger
100
101
102 class StatusFile:
103   """Status file manager.
104
105   """
106   def __init__(self, path):
107     """Initializes class.
108
109     """
110     self._path = path
111     self._data = objects.ImportExportStatus(ctime=time.time(),
112                                             mtime=None,
113                                             recent_output=[])
114
115   def AddRecentOutput(self, line):
116     """Adds a new line of recent output.
117
118     """
119     self._data.recent_output.append(line)
120
121     # Remove old lines
122     del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
123
124   def SetListenPort(self, port):
125     """Sets the port the daemon is listening on.
126
127     @type port: int
128     @param port: TCP/UDP port
129
130     """
131     assert isinstance(port, (int, long)) and 0 < port < (2 ** 16)
132     self._data.listen_port = port
133
134   def GetListenPort(self):
135     """Returns the port the daemon is listening on.
136
137     """
138     return self._data.listen_port
139
140   def SetConnected(self):
141     """Sets the connected flag.
142
143     """
144     self._data.connected = True
145
146   def GetConnected(self):
147     """Determines whether the daemon is connected.
148
149     """
150     return self._data.connected
151
152   def SetProgress(self, mbytes, throughput, percent, eta):
153     """Sets how much data has been transferred so far.
154
155     @type mbytes: number
156     @param mbytes: Transferred amount of data in MiB.
157     @type throughput: float
158     @param throughput: MiB/second
159     @type percent: number
160     @param percent: Percent processed
161     @type eta: number
162     @param eta: Expected number of seconds until done
163
164     """
165     self._data.progress_mbytes = mbytes
166     self._data.progress_throughput = throughput
167     self._data.progress_percent = percent
168     self._data.progress_eta = eta
169
170   def SetExitStatus(self, exit_status, error_message):
171     """Sets the exit status and an error message.
172
173     """
174     # Require error message when status isn't 0
175     assert exit_status == 0 or error_message
176
177     self._data.exit_status = exit_status
178     self._data.error_message = error_message
179
180   def ExitStatusIsSuccess(self):
181     """Returns whether the exit status means "success".
182
183     """
184     return not bool(self._data.error_message)
185
186   def Update(self, force):
187     """Updates the status file.
188
189     @type force: bool
190     @param force: Write status file in any case, not only when minimum interval
191                   is expired
192
193     """
194     if not (force or
195             self._data.mtime is None or
196             time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
197       return
198
199     logging.debug("Updating status file %s", self._path)
200
201     self._data.mtime = time.time()
202     utils.WriteFile(self._path,
203                     data=serializer.DumpJson(self._data.ToDict()),
204                     mode=0400)
205
206
207 def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
208                    dd_pid_read_fd, exp_size_read_fd, status_file, child_logger,
209                    signal_notify, signal_handler, mode):
210   """Handles the child processes' output.
211
212   """
213   assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
214          "Other signals are not handled in this function"
215
216   # Buffer size 0 is important, otherwise .read() with a specified length
217   # might buffer data while poll(2) won't mark its file descriptor as
218   # readable again.
219   socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
220   dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
221   dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
222   exp_size_read = os.fdopen(exp_size_read_fd, "r", 0)
223
224   tp_samples = DD_THROUGHPUT_SAMPLES
225
226   if options.exp_size == constants.IE_CUSTOM_SIZE:
227     exp_size = None
228   else:
229     exp_size = options.exp_size
230
231   child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
232                                            child_logger, tp_samples,
233                                            exp_size)
234   try:
235     fdmap = {
236       child.stderr.fileno():
237         (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)),
238       socat_stderr_read.fileno():
239         (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)),
240       dd_pid_read.fileno():
241         (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
242       dd_stderr_read.fileno():
243         (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
244       exp_size_read.fileno():
245         (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)),
246       signal_notify.fileno(): (signal_notify, None),
247       }
248
249     poller = select.poll()
250     for fd in fdmap:
251       utils.SetNonblockFlag(fd, True)
252       poller.register(fd, select.POLLIN)
253
254     if options.connect_timeout and mode == constants.IEM_IMPORT:
255       listen_timeout = utils.RunningTimeout(options.connect_timeout, True)
256     else:
257       listen_timeout = None
258
259     exit_timeout = None
260     dd_stats_timeout = None
261
262     while True:
263       # Break out of loop if only signal notify FD is left
264       if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
265         break
266
267       timeout = None
268
269       if listen_timeout and not exit_timeout:
270         assert mode == constants.IEM_IMPORT and options.connect_timeout
271         if status_file.GetConnected():
272           listen_timeout = None
273         elif listen_timeout.Remaining() < 0:
274           errmsg = ("Child process didn't establish connection in time"
275                     " (%0.0fs), sending SIGTERM" % options.connect_timeout)
276           logging.error(errmsg)
277           status_file.AddRecentOutput(errmsg)
278           status_file.Update(True)
279
280           child.Kill(signal.SIGTERM)
281           exit_timeout = \
282             utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True)
283           # Next block will calculate timeout
284         else:
285           # Not yet connected, check again in a second
286           timeout = 1000
287
288       if exit_timeout:
289         timeout = exit_timeout.Remaining() * 1000
290         if timeout < 0:
291           logging.info("Child process didn't exit in time")
292           break
293
294       if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0:
295         notify_status = child_io_proc.NotifyDd()
296         if notify_status:
297           # Schedule next notification
298           dd_stats_timeout = utils.RunningTimeout(DD_STATISTICS_INTERVAL, True)
299         else:
300           # Try again soon (dd isn't ready yet)
301           dd_stats_timeout = utils.RunningTimeout(1.0, True)
302
303       if dd_stats_timeout:
304         dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000)
305
306         if timeout is None:
307           timeout = dd_timeout
308         else:
309           timeout = min(timeout, dd_timeout)
310
311       for fd, event in utils.RetryOnSignal(poller.poll, timeout):
312         if event & (select.POLLIN | event & select.POLLPRI):
313           (from_, to) = fdmap[fd]
314
315           # Read up to 1 KB of data
316           data = from_.read(1024)
317           if data:
318             if to:
319               to.write(data)
320             elif fd == signal_notify.fileno():
321               # Signal handling
322               if signal_handler.called:
323                 signal_handler.Clear()
324                 if exit_timeout:
325                   logging.info("Child process still has about %0.2f seconds"
326                                " to exit", exit_timeout.Remaining())
327                 else:
328                   logging.info("Giving child process %0.2f seconds to exit",
329                                constants.CHILD_LINGER_TIMEOUT)
330                   exit_timeout = \
331                     utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True)
332           else:
333             poller.unregister(fd)
334             del fdmap[fd]
335
336         elif event & (select.POLLNVAL | select.POLLHUP |
337                       select.POLLERR):
338           poller.unregister(fd)
339           del fdmap[fd]
340
341       child_io_proc.FlushAll()
342
343     # If there was a timeout calculator, we were waiting for the child to
344     # finish, e.g. due to a signal
345     return not bool(exit_timeout)
346   finally:
347     child_io_proc.CloseAll()
348
349
350 def ParseOptions():
351   """Parses the options passed to the program.
352
353   @return: Arguments to program
354
355   """
356   global options # pylint: disable=W0603
357
358   parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
359                                         (constants.IEM_IMPORT,
360                                          constants.IEM_EXPORT)))
361   parser.add_option(cli.DEBUG_OPT)
362   parser.add_option(cli.VERBOSE_OPT)
363   parser.add_option("--key", dest="key", action="store", type="string",
364                     help="RSA key file")
365   parser.add_option("--cert", dest="cert", action="store", type="string",
366                     help="X509 certificate file")
367   parser.add_option("--ca", dest="ca", action="store", type="string",
368                     help="X509 CA file")
369   parser.add_option("--bind", dest="bind", action="store", type="string",
370                     help="Bind address")
371   parser.add_option("--ipv4", dest="ipv4", action="store_true",
372                     help="Use IPv4 only")
373   parser.add_option("--ipv6", dest="ipv6", action="store_true",
374                     help="Use IPv6 only")
375   parser.add_option("--host", dest="host", action="store", type="string",
376                     help="Remote hostname")
377   parser.add_option("--port", dest="port", action="store", type="int",
378                     help="Remote port")
379   parser.add_option("--connect-retries", dest="connect_retries", action="store",
380                     type="int", default=0,
381                     help=("How many times the connection should be retried"
382                           " (export only)"))
383   parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
384                     type="int", default=DEFAULT_CONNECT_TIMEOUT,
385                     help="Timeout for connection to be established (seconds)")
386   parser.add_option("--compress", dest="compress", action="store",
387                     type="choice", help="Compression method",
388                     metavar="[%s]" % "|".join(constants.IEC_ALL),
389                     choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
390   parser.add_option("--expected-size", dest="exp_size", action="store",
391                     type="string", default=None,
392                     help="Expected import/export size (MiB)")
393   parser.add_option("--magic", dest="magic", action="store",
394                     type="string", default=None, help="Magic string")
395   parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
396                     type="string", help="Command prefix")
397   parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
398                     type="string", help="Command suffix")
399
400   (options, args) = parser.parse_args()
401
402   if len(args) != 2:
403     # Won't return
404     parser.error("Expected exactly two arguments")
405
406   (status_file_path, mode) = args
407
408   if mode not in (constants.IEM_IMPORT,
409                   constants.IEM_EXPORT):
410     # Won't return
411     parser.error("Invalid mode: %s" % mode)
412
413   # Normalize and check parameters
414   if options.host is not None and not netutils.IPAddress.IsValid(options.host):
415     try:
416       options.host = netutils.Hostname.GetNormalizedName(options.host)
417     except errors.OpPrereqError, err:
418       parser.error("Invalid hostname '%s': %s" % (options.host, err))
419
420   if options.port is not None:
421     options.port = utils.ValidateServiceName(options.port)
422
423   if (options.exp_size is not None and
424       options.exp_size != constants.IE_CUSTOM_SIZE):
425     try:
426       options.exp_size = int(options.exp_size)
427     except (ValueError, TypeError), err:
428       # Won't return
429       parser.error("Invalid value for --expected-size: %s (%s)" %
430                    (options.exp_size, err))
431
432   if not (options.magic is None or constants.IE_MAGIC_RE.match(options.magic)):
433     parser.error("Magic must match regular expression %s" %
434                  constants.IE_MAGIC_RE.pattern)
435
436   if options.ipv4 and options.ipv6:
437     parser.error("Can only use one of --ipv4 and --ipv6")
438
439   return (status_file_path, mode)
440
441
442 class ChildProcess(subprocess.Popen):
443   def __init__(self, env, cmd, noclose_fds):
444     """Initializes this class.
445
446     """
447     self._noclose_fds = noclose_fds
448
449     # Not using close_fds because doing so would also close the socat stderr
450     # pipe, which we still need.
451     subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False,
452                               stderr=subprocess.PIPE, stdout=None, stdin=None,
453                               preexec_fn=self._ChildPreexec)
454     self._SetProcessGroup()
455
456   def _ChildPreexec(self):
457     """Called before child executable is execve'd.
458
459     """
460     # Move to separate process group. By sending a signal to its process group
461     # we can kill the child process and all grandchildren.
462     os.setpgid(0, 0)
463
464     # Close almost all file descriptors
465     utils.CloseFDs(noclose_fds=self._noclose_fds)
466
467   def _SetProcessGroup(self):
468     """Sets the child's process group.
469
470     """
471     assert self.pid, "Can't be called in child process"
472
473     # Avoid race condition by setting child's process group (as good as
474     # possible in Python) before sending signals to child. For an
475     # explanation, see preexec function for child.
476     try:
477       os.setpgid(self.pid, self.pid)
478     except EnvironmentError, err:
479       # If the child process was faster we receive EPERM or EACCES
480       if err.errno not in (errno.EPERM, errno.EACCES):
481         raise
482
483   def Kill(self, signum):
484     """Sends signal to child process.
485
486     """
487     logging.info("Sending signal %s to child process", signum)
488     utils.IgnoreProcessNotFound(os.killpg, self.pid, signum)
489
490   def ForceQuit(self):
491     """Ensure child process is no longer running.
492
493     """
494     # Final check if child process is still alive
495     if utils.RetryOnSignal(self.poll) is None:
496       logging.error("Child process still alive, sending SIGKILL")
497       self.Kill(signal.SIGKILL)
498       utils.RetryOnSignal(self.wait)
499
500
501 def main():
502   """Main function.
503
504   """
505   # Option parsing
506   (status_file_path, mode) = ParseOptions()
507
508   # Configure logging
509   child_logger = SetupLogging()
510
511   status_file = StatusFile(status_file_path)
512   try:
513     try:
514       # Pipe to receive socat's stderr output
515       (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
516
517       # Pipe to receive dd's stderr output
518       (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe()
519
520       # Pipe to receive dd's PID
521       (dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
522
523       # Pipe to receive size predicted by export script
524       (exp_size_read_fd, exp_size_write_fd) = os.pipe()
525
526       # Get child process command
527       cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
528                                            dd_stderr_write_fd, dd_pid_write_fd)
529       cmd = cmd_builder.GetCommand()
530
531       # Prepare command environment
532       cmd_env = os.environ.copy()
533
534       if options.exp_size == constants.IE_CUSTOM_SIZE:
535         cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd)
536
537       logging.debug("Starting command %r", cmd)
538
539       # Start child process
540       child = ChildProcess(cmd_env, cmd,
541                            [socat_stderr_write_fd, dd_stderr_write_fd,
542                             dd_pid_write_fd, exp_size_write_fd])
543       try:
544
545         def _ForwardSignal(signum, _):
546           """Forwards signals to child process.
547
548           """
549           child.Kill(signum)
550
551         signal_wakeup = utils.SignalWakeupFd()
552         try:
553           # TODO: There is a race condition between starting the child and
554           # handling the signals here. While there might be a way to work around
555           # it by registering the handlers before starting the child and
556           # deferring sent signals until the child is available, doing so can be
557           # complicated.
558           signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
559                                                handler_fn=_ForwardSignal,
560                                                wakeup=signal_wakeup)
561           try:
562             # Close child's side
563             utils.RetryOnSignal(os.close, socat_stderr_write_fd)
564             utils.RetryOnSignal(os.close, dd_stderr_write_fd)
565             utils.RetryOnSignal(os.close, dd_pid_write_fd)
566             utils.RetryOnSignal(os.close, exp_size_write_fd)
567
568             if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
569                               dd_pid_read_fd, exp_size_read_fd,
570                               status_file, child_logger,
571                               signal_wakeup, signal_handler, mode):
572               # The child closed all its file descriptors and there was no
573               # signal
574               # TODO: Implement timeout instead of waiting indefinitely
575               utils.RetryOnSignal(child.wait)
576           finally:
577             signal_handler.Reset()
578         finally:
579           signal_wakeup.Reset()
580       finally:
581         child.ForceQuit()
582
583       if child.returncode == 0:
584         errmsg = None
585       elif child.returncode < 0:
586         errmsg = "Exited due to signal %s" % (-child.returncode, )
587       else:
588         errmsg = "Exited with status %s" % (child.returncode, )
589
590       status_file.SetExitStatus(child.returncode, errmsg)
591     except Exception, err: # pylint: disable=W0703
592       logging.exception("Unhandled error occurred")
593       status_file.SetExitStatus(constants.EXIT_FAILURE,
594                                 "Unhandled error occurred: %s" % (err, ))
595
596     if status_file.ExitStatusIsSuccess():
597       sys.exit(constants.EXIT_SUCCESS)
598
599     sys.exit(constants.EXIT_FAILURE)
600   finally:
601     status_file.Update(True)
602
603
604 if __name__ == "__main__":
605   main()