Statistics
| Branch: | Tag: | Revision:

root / daemons / import-export @ f9323011

History | View | Annotate | Download (18.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Import/export daemon.
23

    
24
"""
25

    
26
# pylint: disable-msg=C0103
27
# C0103: Invalid name import-export
28

    
29
import errno
30
import logging
31
import optparse
32
import os
33
import select
34
import signal
35
import subprocess
36
import sys
37
import time
38
import math
39

    
40
from ganeti import constants
41
from ganeti import cli
42
from ganeti import utils
43
from ganeti import serializer
44
from ganeti import objects
45
from ganeti import locking
46
from ganeti import impexpd
47

    
48

    
49
#: How many lines to keep in the status file
50
MAX_RECENT_OUTPUT_LINES = 20
51

    
52
#: Don't update status file more than once every 5 seconds (unless forced)
53
MIN_UPDATE_INTERVAL = 5.0
54

    
55
#: Give child process up to 5 seconds to exit after sending a signal
56
CHILD_LINGER_TIMEOUT = 5.0
57

    
58
#: How long to wait for a connection to be established
59
DEFAULT_CONNECT_TIMEOUT = 60
60

    
61
#: Get dd(1) statistics every few seconds
62
DD_STATISTICS_INTERVAL = 5.0
63

    
64
#: Seconds for throughput calculation
65
DD_THROUGHPUT_INTERVAL = 60.0
66

    
67
#: Number of samples for throughput calculation
68
DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) /
69
                                      DD_STATISTICS_INTERVAL))
70

    
71

    
72
# Global variable for options
73
options = None
74

    
75

    
76
def SetupLogging():
77
  """Configures the logging module.
78

    
79
  """
80
  formatter = logging.Formatter("%(asctime)s: %(message)s")
81

    
82
  stderr_handler = logging.StreamHandler()
83
  stderr_handler.setFormatter(formatter)
84
  stderr_handler.setLevel(logging.NOTSET)
85

    
86
  root_logger = logging.getLogger("")
87
  root_logger.addHandler(stderr_handler)
88

    
89
  if options.debug:
90
    root_logger.setLevel(logging.NOTSET)
91
  elif options.verbose:
92
    root_logger.setLevel(logging.INFO)
93
  else:
94
    root_logger.setLevel(logging.ERROR)
95

    
96
  # Create special logger for child process output
97
  child_logger = logging.Logger("child output")
98
  child_logger.addHandler(stderr_handler)
99
  child_logger.setLevel(logging.NOTSET)
100

    
101
  return child_logger
102

    
103

    
104
class StatusFile:
105
  """Status file manager.
106

    
107
  """
108
  def __init__(self, path):
109
    """Initializes class.
110

    
111
    """
112
    self._path = path
113
    self._data = objects.ImportExportStatus(ctime=time.time(),
114
                                            mtime=None,
115
                                            recent_output=[])
116

    
117
  def AddRecentOutput(self, line):
118
    """Adds a new line of recent output.
119

    
120
    """
121
    self._data.recent_output.append(line)
122

    
123
    # Remove old lines
124
    del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
125

    
126
  def SetListenPort(self, port):
127
    """Sets the port the daemon is listening on.
128

    
129
    @type port: int
130
    @param port: TCP/UDP port
131

    
132
    """
133
    assert isinstance(port, (int, long)) and 0 < port < 2**16
134
    self._data.listen_port = port
135

    
136
  def GetListenPort(self):
137
    """Returns the port the daemon is listening on.
138

    
139
    """
140
    return self._data.listen_port
141

    
142
  def SetConnected(self):
143
    """Sets the connected flag.
144

    
145
    """
146
    self._data.connected = True
147

    
148
  def GetConnected(self):
149
    """Determines whether the daemon is connected.
150

    
151
    """
152
    return self._data.connected
153

    
154
  def SetProgress(self, mbytes, throughput, percent, eta):
155
    """Sets how much data has been transferred so far.
156

    
157
    @type mbytes: number
158
    @param mbytes: Transferred amount of data in MiB.
159
    @type throughput: float
160
    @param throughput: MiB/second
161
    @type percent: number
162
    @param percent: Percent processed
163
    @type eta: number
164
    @param eta: Expected number of seconds until done
165

    
166
    """
167
    self._data.progress_mbytes = mbytes
168
    self._data.progress_throughput = throughput
169
    self._data.progress_percent = percent
170
    self._data.progress_eta = eta
171

    
172
  def SetExitStatus(self, exit_status, error_message):
173
    """Sets the exit status and an error message.
174

    
175
    """
176
    # Require error message when status isn't 0
177
    assert exit_status == 0 or error_message
178

    
179
    self._data.exit_status = exit_status
180
    self._data.error_message = error_message
181

    
182
  def ExitStatusIsSuccess(self):
183
    """Returns whether the exit status means "success".
184

    
185
    """
186
    return not bool(self._data.error_message)
187

    
188
  def Update(self, force):
189
    """Updates the status file.
190

    
191
    @type force: bool
192
    @param force: Write status file in any case, not only when minimum interval
193
                  is expired
194

    
195
    """
196
    if not (force or
197
            self._data.mtime is None or
198
            time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
199
      return
200

    
201
    logging.debug("Updating status file %s", self._path)
202

    
203
    self._data.mtime = time.time()
204
    utils.WriteFile(self._path,
205
                    data=serializer.DumpJson(self._data.ToDict(), indent=True),
206
                    mode=0400)
207

    
208

    
209
def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
210
                   dd_pid_read_fd, exp_size_read_fd, status_file, child_logger,
211
                   signal_notify, signal_handler, mode):
212
  """Handles the child processes' output.
213

    
214
  """
215
  assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
216
         "Other signals are not handled in this function"
217

    
218
  # Buffer size 0 is important, otherwise .read() with a specified length
219
  # might buffer data while poll(2) won't mark its file descriptor as
220
  # readable again.
221
  socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
222
  dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
223
  dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
224
  exp_size_read = os.fdopen(exp_size_read_fd, "r", 0)
225

    
226
  tp_samples = DD_THROUGHPUT_SAMPLES
227

    
228
  if options.exp_size == constants.IE_CUSTOM_SIZE:
229
    exp_size = None
230
  else:
231
    exp_size = options.exp_size
232

    
233
  child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
234
                                           child_logger, tp_samples,
235
                                           exp_size)
236
  try:
237
    fdmap = {
238
      child.stderr.fileno():
239
        (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)),
240
      socat_stderr_read.fileno():
241
        (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)),
242
      dd_pid_read.fileno():
243
        (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
244
      dd_stderr_read.fileno():
245
        (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
246
      exp_size_read.fileno():
247
        (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)),
248
      signal_notify.fileno(): (signal_notify, None),
249
      }
250

    
251
    poller = select.poll()
252
    for fd in fdmap:
253
      utils.SetNonblockFlag(fd, True)
254
      poller.register(fd, select.POLLIN)
255

    
256
    if options.connect_timeout and mode == constants.IEM_IMPORT:
257
      listen_timeout = locking.RunningTimeout(options.connect_timeout, True)
258
    else:
259
      listen_timeout = None
260

    
261
    exit_timeout = None
262
    dd_stats_timeout = None
263

    
264
    while True:
265
      # Break out of loop if only signal notify FD is left
266
      if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
267
        break
268

    
269
      timeout = None
270

    
271
      if listen_timeout and not exit_timeout:
272
        if status_file.GetConnected():
273
          listen_timeout = None
274
        elif listen_timeout.Remaining() < 0:
275
          logging.info("Child process didn't establish connection in time")
276
          child.Kill(signal.SIGTERM)
277
          exit_timeout = \
278
            locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
279
          # Next block will calculate timeout
280
        else:
281
          # Not yet connected, check again in a second
282
          timeout = 1000
283

    
284
      if exit_timeout:
285
        timeout = exit_timeout.Remaining() * 1000
286
        if timeout < 0:
287
          logging.info("Child process didn't exit in time")
288
          break
289

    
290
      if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0:
291
        notify_status = child_io_proc.NotifyDd()
292
        if notify_status:
293
          # Schedule next notification
294
          dd_stats_timeout = locking.RunningTimeout(DD_STATISTICS_INTERVAL,
295
                                                    True)
296
        else:
297
          # Try again soon (dd isn't ready yet)
298
          dd_stats_timeout = locking.RunningTimeout(1.0, True)
299

    
300
      if dd_stats_timeout:
301
        dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000)
302

    
303
        if timeout is None:
304
          timeout = dd_timeout
305
        else:
306
          timeout = min(timeout, dd_timeout)
307

    
308
      for fd, event in utils.RetryOnSignal(poller.poll, timeout):
309
        if event & (select.POLLIN | event & select.POLLPRI):
310
          (from_, to) = fdmap[fd]
311

    
312
          # Read up to 1 KB of data
313
          data = from_.read(1024)
314
          if data:
315
            if to:
316
              to.write(data)
317
            elif fd == signal_notify.fileno():
318
              # Signal handling
319
              if signal_handler.called:
320
                signal_handler.Clear()
321
                if exit_timeout:
322
                  logging.info("Child process still has about %0.2f seconds"
323
                               " to exit", exit_timeout.Remaining())
324
                else:
325
                  logging.info("Giving child process %0.2f seconds to exit",
326
                               CHILD_LINGER_TIMEOUT)
327
                  exit_timeout = \
328
                    locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
329
          else:
330
            poller.unregister(fd)
331
            del fdmap[fd]
332

    
333
        elif event & (select.POLLNVAL | select.POLLHUP |
334
                      select.POLLERR):
335
          poller.unregister(fd)
336
          del fdmap[fd]
337

    
338
      child_io_proc.FlushAll()
339

    
340
    # If there was a timeout calculator, we were waiting for the child to
341
    # finish, e.g. due to a signal
342
    return not bool(exit_timeout)
343
  finally:
344
    child_io_proc.CloseAll()
345

    
346

    
347
def ParseOptions():
348
  """Parses the options passed to the program.
349

    
350
  @return: Arguments to program
351

    
352
  """
353
  global options # pylint: disable-msg=W0603
354

    
355
  parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
356
                                        (constants.IEM_IMPORT,
357
                                         constants.IEM_EXPORT)))
358
  parser.add_option(cli.DEBUG_OPT)
359
  parser.add_option(cli.VERBOSE_OPT)
360
  parser.add_option("--key", dest="key", action="store", type="string",
361
                    help="RSA key file")
362
  parser.add_option("--cert", dest="cert", action="store", type="string",
363
                    help="X509 certificate file")
364
  parser.add_option("--ca", dest="ca", action="store", type="string",
365
                    help="X509 CA file")
366
  parser.add_option("--bind", dest="bind", action="store", type="string",
367
                    help="Bind address")
368
  parser.add_option("--host", dest="host", action="store", type="string",
369
                    help="Remote hostname")
370
  parser.add_option("--port", dest="port", action="store", type="int",
371
                    help="Remote port")
372
  parser.add_option("--connect-retries", dest="connect_retries", action="store",
373
                    type="int", default=0,
374
                    help=("How many times the connection should be retried"
375
                          " (export only)"))
376
  parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
377
                    type="int", default=DEFAULT_CONNECT_TIMEOUT,
378
                    help="Timeout for connection to be established (seconds)")
379
  parser.add_option("--compress", dest="compress", action="store",
380
                    type="choice", help="Compression method",
381
                    metavar="[%s]" % "|".join(constants.IEC_ALL),
382
                    choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
383
  parser.add_option("--expected-size", dest="exp_size", action="store",
384
                    type="string", default=None,
385
                    help="Expected import/export size (MiB)")
386
  parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
387
                    type="string", help="Command prefix")
388
  parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
389
                    type="string", help="Command suffix")
390

    
391
  (options, args) = parser.parse_args()
392

    
393
  if len(args) != 2:
394
    # Won't return
395
    parser.error("Expected exactly two arguments")
396

    
397
  (status_file_path, mode) = args
398

    
399
  if mode not in (constants.IEM_IMPORT,
400
                  constants.IEM_EXPORT):
401
    # Won't return
402
    parser.error("Invalid mode: %s" % mode)
403

    
404
  if (options.exp_size is not None and
405
      options.exp_size != constants.IE_CUSTOM_SIZE):
406
    try:
407
      options.exp_size = int(options.exp_size)
408
    except (ValueError, TypeError), err:
409
      # Won't return
410
      parser.error("Invalid value for --expected-size: %s (%s)" %
411
                   (options.exp_size, err))
412

    
413
  return (status_file_path, mode)
414

    
415

    
416
class ChildProcess(subprocess.Popen):
417
  def __init__(self, env, cmd, noclose_fds):
418
    """Initializes this class.
419

    
420
    """
421
    self._noclose_fds = noclose_fds
422

    
423
    # Not using close_fds because doing so would also close the socat stderr
424
    # pipe, which we still need.
425
    subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False,
426
                              stderr=subprocess.PIPE, stdout=None, stdin=None,
427
                              preexec_fn=self._ChildPreexec)
428
    self._SetProcessGroup()
429

    
430
  def _ChildPreexec(self):
431
    """Called before child executable is execve'd.
432

    
433
    """
434
    # Move to separate process group. By sending a signal to its process group
435
    # we can kill the child process and all grandchildren.
436
    os.setpgid(0, 0)
437

    
438
    # Close almost all file descriptors
439
    utils.CloseFDs(noclose_fds=self._noclose_fds)
440

    
441
  def _SetProcessGroup(self):
442
    """Sets the child's process group.
443

    
444
    """
445
    assert self.pid, "Can't be called in child process"
446

    
447
    # Avoid race condition by setting child's process group (as good as
448
    # possible in Python) before sending signals to child. For an
449
    # explanation, see preexec function for child.
450
    try:
451
      os.setpgid(self.pid, self.pid)
452
    except EnvironmentError, err:
453
      # If the child process was faster we receive EPERM or EACCES
454
      if err.errno not in (errno.EPERM, errno.EACCES):
455
        raise
456

    
457
  def Kill(self, signum):
458
    """Sends signal to child process.
459

    
460
    """
461
    logging.info("Sending signal %s to child process", signum)
462
    os.killpg(self.pid, signum)
463

    
464
  def ForceQuit(self):
465
    """Ensure child process is no longer running.
466

    
467
    """
468
    # Final check if child process is still alive
469
    if utils.RetryOnSignal(self.poll) is None:
470
      logging.error("Child process still alive, sending SIGKILL")
471
      self.Kill(signal.SIGKILL)
472
      utils.RetryOnSignal(self.wait)
473

    
474

    
475
def main():
476
  """Main function.
477

    
478
  """
479
  # Option parsing
480
  (status_file_path, mode) = ParseOptions()
481

    
482
  # Configure logging
483
  child_logger = SetupLogging()
484

    
485
  status_file = StatusFile(status_file_path)
486
  try:
487
    try:
488
      # Pipe to receive socat's stderr output
489
      (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
490

    
491
      # Pipe to receive dd's stderr output
492
      (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe()
493

    
494
      # Pipe to receive dd's PID
495
      (dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
496

    
497
      # Pipe to receive size predicted by export script
498
      (exp_size_read_fd, exp_size_write_fd) = os.pipe()
499

    
500
      # Get child process command
501
      cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
502
                                           dd_stderr_write_fd, dd_pid_write_fd)
503
      cmd = cmd_builder.GetCommand()
504

    
505
      # Prepare command environment
506
      cmd_env = os.environ.copy()
507

    
508
      if options.exp_size == constants.IE_CUSTOM_SIZE:
509
        cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd)
510

    
511
      logging.debug("Starting command %r", cmd)
512

    
513
      # Start child process
514
      child = ChildProcess(cmd_env, cmd,
515
                           [socat_stderr_write_fd, dd_stderr_write_fd,
516
                            dd_pid_write_fd, exp_size_write_fd])
517
      try:
518
        def _ForwardSignal(signum, _):
519
          """Forwards signals to child process.
520

    
521
          """
522
          child.Kill(signum)
523

    
524
        signal_wakeup = utils.SignalWakeupFd()
525
        try:
526
          # TODO: There is a race condition between starting the child and
527
          # handling the signals here. While there might be a way to work around
528
          # it by registering the handlers before starting the child and
529
          # deferring sent signals until the child is available, doing so can be
530
          # complicated.
531
          signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
532
                                               handler_fn=_ForwardSignal,
533
                                               wakeup=signal_wakeup)
534
          try:
535
            # Close child's side
536
            utils.RetryOnSignal(os.close, socat_stderr_write_fd)
537
            utils.RetryOnSignal(os.close, dd_stderr_write_fd)
538
            utils.RetryOnSignal(os.close, dd_pid_write_fd)
539
            utils.RetryOnSignal(os.close, exp_size_write_fd)
540

    
541
            if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
542
                              dd_pid_read_fd, exp_size_read_fd,
543
                              status_file, child_logger,
544
                              signal_wakeup, signal_handler, mode):
545
              # The child closed all its file descriptors and there was no
546
              # signal
547
              # TODO: Implement timeout instead of waiting indefinitely
548
              utils.RetryOnSignal(child.wait)
549
          finally:
550
            signal_handler.Reset()
551
        finally:
552
          signal_wakeup.Reset()
553
      finally:
554
        child.ForceQuit()
555

    
556
      if child.returncode == 0:
557
        errmsg = None
558
      elif child.returncode < 0:
559
        errmsg = "Exited due to signal %s" % (-child.returncode, )
560
      else:
561
        errmsg = "Exited with status %s" % (child.returncode, )
562

    
563
      status_file.SetExitStatus(child.returncode, errmsg)
564
    except Exception, err: # pylint: disable-msg=W0703
565
      logging.exception("Unhandled error occurred")
566
      status_file.SetExitStatus(constants.EXIT_FAILURE,
567
                                "Unhandled error occurred: %s" % (err, ))
568

    
569
    if status_file.ExitStatusIsSuccess():
570
      sys.exit(constants.EXIT_SUCCESS)
571

    
572
    sys.exit(constants.EXIT_FAILURE)
573
  finally:
574
    status_file.Update(True)
575

    
576

    
577
if __name__ == "__main__":
578
  main()