Statistics
| Branch: | Tag: | Revision:

root / daemons / import-export @ bd275a93

History | View | Annotate | Download (19.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Import/export daemon.
23

    
24
"""
25

    
26
# pylint: disable-msg=C0103
27
# C0103: Invalid name import-export
28

    
29
import errno
30
import logging
31
import optparse
32
import os
33
import select
34
import signal
35
import subprocess
36
import sys
37
import time
38
import math
39

    
40
from ganeti import constants
41
from ganeti import cli
42
from ganeti import utils
43
from ganeti import errors
44
from ganeti import serializer
45
from ganeti import objects
46
from ganeti import locking
47
from ganeti import impexpd
48
from ganeti import netutils
49

    
50

    
51
#: How many lines to keep in the status file
52
MAX_RECENT_OUTPUT_LINES = 20
53

    
54
#: Don't update status file more than once every 5 seconds (unless forced)
55
MIN_UPDATE_INTERVAL = 5.0
56

    
57
#: Give child process up to 5 seconds to exit after sending a signal
58
CHILD_LINGER_TIMEOUT = 5.0
59

    
60
#: How long to wait for a connection to be established
61
DEFAULT_CONNECT_TIMEOUT = 60
62

    
63
#: Get dd(1) statistics every few seconds
64
DD_STATISTICS_INTERVAL = 5.0
65

    
66
#: Seconds for throughput calculation
67
DD_THROUGHPUT_INTERVAL = 60.0
68

    
69
#: Number of samples for throughput calculation
70
DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) /
71
                                      DD_STATISTICS_INTERVAL))
72

    
73

    
74
# Global variable for options
75
options = None
76

    
77

    
78
def SetupLogging():
79
  """Configures the logging module.
80

    
81
  """
82
  formatter = logging.Formatter("%(asctime)s: %(message)s")
83

    
84
  stderr_handler = logging.StreamHandler()
85
  stderr_handler.setFormatter(formatter)
86
  stderr_handler.setLevel(logging.NOTSET)
87

    
88
  root_logger = logging.getLogger("")
89
  root_logger.addHandler(stderr_handler)
90

    
91
  if options.debug:
92
    root_logger.setLevel(logging.NOTSET)
93
  elif options.verbose:
94
    root_logger.setLevel(logging.INFO)
95
  else:
96
    root_logger.setLevel(logging.ERROR)
97

    
98
  # Create special logger for child process output
99
  child_logger = logging.Logger("child output")
100
  child_logger.addHandler(stderr_handler)
101
  child_logger.setLevel(logging.NOTSET)
102

    
103
  return child_logger
104

    
105

    
106
class StatusFile:
107
  """Status file manager.
108

    
109
  """
110
  def __init__(self, path):
111
    """Initializes class.
112

    
113
    """
114
    self._path = path
115
    self._data = objects.ImportExportStatus(ctime=time.time(),
116
                                            mtime=None,
117
                                            recent_output=[])
118

    
119
  def AddRecentOutput(self, line):
120
    """Adds a new line of recent output.
121

    
122
    """
123
    self._data.recent_output.append(line)
124

    
125
    # Remove old lines
126
    del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
127

    
128
  def SetListenPort(self, port):
129
    """Sets the port the daemon is listening on.
130

    
131
    @type port: int
132
    @param port: TCP/UDP port
133

    
134
    """
135
    assert isinstance(port, (int, long)) and 0 < port < 2**16
136
    self._data.listen_port = port
137

    
138
  def GetListenPort(self):
139
    """Returns the port the daemon is listening on.
140

    
141
    """
142
    return self._data.listen_port
143

    
144
  def SetConnected(self):
145
    """Sets the connected flag.
146

    
147
    """
148
    self._data.connected = True
149

    
150
  def GetConnected(self):
151
    """Determines whether the daemon is connected.
152

    
153
    """
154
    return self._data.connected
155

    
156
  def SetProgress(self, mbytes, throughput, percent, eta):
157
    """Sets how much data has been transferred so far.
158

    
159
    @type mbytes: number
160
    @param mbytes: Transferred amount of data in MiB.
161
    @type throughput: float
162
    @param throughput: MiB/second
163
    @type percent: number
164
    @param percent: Percent processed
165
    @type eta: number
166
    @param eta: Expected number of seconds until done
167

    
168
    """
169
    self._data.progress_mbytes = mbytes
170
    self._data.progress_throughput = throughput
171
    self._data.progress_percent = percent
172
    self._data.progress_eta = eta
173

    
174
  def SetExitStatus(self, exit_status, error_message):
175
    """Sets the exit status and an error message.
176

    
177
    """
178
    # Require error message when status isn't 0
179
    assert exit_status == 0 or error_message
180

    
181
    self._data.exit_status = exit_status
182
    self._data.error_message = error_message
183

    
184
  def ExitStatusIsSuccess(self):
185
    """Returns whether the exit status means "success".
186

    
187
    """
188
    return not bool(self._data.error_message)
189

    
190
  def Update(self, force):
191
    """Updates the status file.
192

    
193
    @type force: bool
194
    @param force: Write status file in any case, not only when minimum interval
195
                  is expired
196

    
197
    """
198
    if not (force or
199
            self._data.mtime is None or
200
            time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
201
      return
202

    
203
    logging.debug("Updating status file %s", self._path)
204

    
205
    self._data.mtime = time.time()
206
    utils.WriteFile(self._path,
207
                    data=serializer.DumpJson(self._data.ToDict(), indent=True),
208
                    mode=0400)
209

    
210

    
211
def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
212
                   dd_pid_read_fd, exp_size_read_fd, status_file, child_logger,
213
                   signal_notify, signal_handler, mode):
214
  """Handles the child processes' output.
215

    
216
  """
217
  assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
218
         "Other signals are not handled in this function"
219

    
220
  # Buffer size 0 is important, otherwise .read() with a specified length
221
  # might buffer data while poll(2) won't mark its file descriptor as
222
  # readable again.
223
  socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
224
  dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
225
  dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
226
  exp_size_read = os.fdopen(exp_size_read_fd, "r", 0)
227

    
228
  tp_samples = DD_THROUGHPUT_SAMPLES
229

    
230
  if options.exp_size == constants.IE_CUSTOM_SIZE:
231
    exp_size = None
232
  else:
233
    exp_size = options.exp_size
234

    
235
  child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
236
                                           child_logger, tp_samples,
237
                                           exp_size)
238
  try:
239
    fdmap = {
240
      child.stderr.fileno():
241
        (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)),
242
      socat_stderr_read.fileno():
243
        (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)),
244
      dd_pid_read.fileno():
245
        (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
246
      dd_stderr_read.fileno():
247
        (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
248
      exp_size_read.fileno():
249
        (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)),
250
      signal_notify.fileno(): (signal_notify, None),
251
      }
252

    
253
    poller = select.poll()
254
    for fd in fdmap:
255
      utils.SetNonblockFlag(fd, True)
256
      poller.register(fd, select.POLLIN)
257

    
258
    if options.connect_timeout and mode == constants.IEM_IMPORT:
259
      listen_timeout = locking.RunningTimeout(options.connect_timeout, True)
260
    else:
261
      listen_timeout = None
262

    
263
    exit_timeout = None
264
    dd_stats_timeout = None
265

    
266
    while True:
267
      # Break out of loop if only signal notify FD is left
268
      if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
269
        break
270

    
271
      timeout = None
272

    
273
      if listen_timeout and not exit_timeout:
274
        assert mode == constants.IEM_IMPORT and options.connect_timeout
275
        if status_file.GetConnected():
276
          listen_timeout = None
277
        elif listen_timeout.Remaining() < 0:
278
          errmsg = ("Child process didn't establish connection in time"
279
                    " (%0.0fs), sending SIGTERM" % options.connect_timeout)
280
          logging.error(errmsg)
281
          status_file.AddRecentOutput(errmsg)
282
          status_file.Update(True)
283

    
284
          child.Kill(signal.SIGTERM)
285
          exit_timeout = \
286
            locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
287
          # Next block will calculate timeout
288
        else:
289
          # Not yet connected, check again in a second
290
          timeout = 1000
291

    
292
      if exit_timeout:
293
        timeout = exit_timeout.Remaining() * 1000
294
        if timeout < 0:
295
          logging.info("Child process didn't exit in time")
296
          break
297

    
298
      if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0:
299
        notify_status = child_io_proc.NotifyDd()
300
        if notify_status:
301
          # Schedule next notification
302
          dd_stats_timeout = locking.RunningTimeout(DD_STATISTICS_INTERVAL,
303
                                                    True)
304
        else:
305
          # Try again soon (dd isn't ready yet)
306
          dd_stats_timeout = locking.RunningTimeout(1.0, True)
307

    
308
      if dd_stats_timeout:
309
        dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000)
310

    
311
        if timeout is None:
312
          timeout = dd_timeout
313
        else:
314
          timeout = min(timeout, dd_timeout)
315

    
316
      for fd, event in utils.RetryOnSignal(poller.poll, timeout):
317
        if event & (select.POLLIN | event & select.POLLPRI):
318
          (from_, to) = fdmap[fd]
319

    
320
          # Read up to 1 KB of data
321
          data = from_.read(1024)
322
          if data:
323
            if to:
324
              to.write(data)
325
            elif fd == signal_notify.fileno():
326
              # Signal handling
327
              if signal_handler.called:
328
                signal_handler.Clear()
329
                if exit_timeout:
330
                  logging.info("Child process still has about %0.2f seconds"
331
                               " to exit", exit_timeout.Remaining())
332
                else:
333
                  logging.info("Giving child process %0.2f seconds to exit",
334
                               CHILD_LINGER_TIMEOUT)
335
                  exit_timeout = \
336
                    locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
337
          else:
338
            poller.unregister(fd)
339
            del fdmap[fd]
340

    
341
        elif event & (select.POLLNVAL | select.POLLHUP |
342
                      select.POLLERR):
343
          poller.unregister(fd)
344
          del fdmap[fd]
345

    
346
      child_io_proc.FlushAll()
347

    
348
    # If there was a timeout calculator, we were waiting for the child to
349
    # finish, e.g. due to a signal
350
    return not bool(exit_timeout)
351
  finally:
352
    child_io_proc.CloseAll()
353

    
354

    
355
def ParseOptions():
356
  """Parses the options passed to the program.
357

    
358
  @return: Arguments to program
359

    
360
  """
361
  global options # pylint: disable-msg=W0603
362

    
363
  parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
364
                                        (constants.IEM_IMPORT,
365
                                         constants.IEM_EXPORT)))
366
  parser.add_option(cli.DEBUG_OPT)
367
  parser.add_option(cli.VERBOSE_OPT)
368
  parser.add_option("--key", dest="key", action="store", type="string",
369
                    help="RSA key file")
370
  parser.add_option("--cert", dest="cert", action="store", type="string",
371
                    help="X509 certificate file")
372
  parser.add_option("--ca", dest="ca", action="store", type="string",
373
                    help="X509 CA file")
374
  parser.add_option("--bind", dest="bind", action="store", type="string",
375
                    help="Bind address")
376
  parser.add_option("--host", dest="host", action="store", type="string",
377
                    help="Remote hostname")
378
  parser.add_option("--port", dest="port", action="store", type="int",
379
                    help="Remote port")
380
  parser.add_option("--connect-retries", dest="connect_retries", action="store",
381
                    type="int", default=0,
382
                    help=("How many times the connection should be retried"
383
                          " (export only)"))
384
  parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
385
                    type="int", default=DEFAULT_CONNECT_TIMEOUT,
386
                    help="Timeout for connection to be established (seconds)")
387
  parser.add_option("--compress", dest="compress", action="store",
388
                    type="choice", help="Compression method",
389
                    metavar="[%s]" % "|".join(constants.IEC_ALL),
390
                    choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
391
  parser.add_option("--expected-size", dest="exp_size", action="store",
392
                    type="string", default=None,
393
                    help="Expected import/export size (MiB)")
394
  parser.add_option("--magic", dest="magic", action="store",
395
                    type="string", default=None, help="Magic string")
396
  parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
397
                    type="string", help="Command prefix")
398
  parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
399
                    type="string", help="Command suffix")
400

    
401
  (options, args) = parser.parse_args()
402

    
403
  if len(args) != 2:
404
    # Won't return
405
    parser.error("Expected exactly two arguments")
406

    
407
  (status_file_path, mode) = args
408

    
409
  if mode not in (constants.IEM_IMPORT,
410
                  constants.IEM_EXPORT):
411
    # Won't return
412
    parser.error("Invalid mode: %s" % mode)
413

    
414
  # Normalize and check parameters
415
  if options.host is not None:
416
    try:
417
      options.host = netutils.Hostname.GetNormalizedName(options.host)
418
    except errors.OpPrereqError, err:
419
      parser.error("Invalid hostname '%s': %s" % (options.host, err))
420

    
421
  if options.port is not None:
422
    options.port = utils.ValidateServiceName(options.port)
423

    
424
  if (options.exp_size is not None and
425
      options.exp_size != constants.IE_CUSTOM_SIZE):
426
    try:
427
      options.exp_size = int(options.exp_size)
428
    except (ValueError, TypeError), err:
429
      # Won't return
430
      parser.error("Invalid value for --expected-size: %s (%s)" %
431
                   (options.exp_size, err))
432

    
433
  if not (options.magic is None or constants.IE_MAGIC_RE.match(options.magic)):
434
    parser.error("Magic must match regular expression %s" %
435
                 constants.IE_MAGIC_RE.pattern)
436

    
437
  return (status_file_path, mode)
438

    
439

    
440
class ChildProcess(subprocess.Popen):
441
  def __init__(self, env, cmd, noclose_fds):
442
    """Initializes this class.
443

    
444
    """
445
    self._noclose_fds = noclose_fds
446

    
447
    # Not using close_fds because doing so would also close the socat stderr
448
    # pipe, which we still need.
449
    subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False,
450
                              stderr=subprocess.PIPE, stdout=None, stdin=None,
451
                              preexec_fn=self._ChildPreexec)
452
    self._SetProcessGroup()
453

    
454
  def _ChildPreexec(self):
455
    """Called before child executable is execve'd.
456

    
457
    """
458
    # Move to separate process group. By sending a signal to its process group
459
    # we can kill the child process and all grandchildren.
460
    os.setpgid(0, 0)
461

    
462
    # Close almost all file descriptors
463
    utils.CloseFDs(noclose_fds=self._noclose_fds)
464

    
465
  def _SetProcessGroup(self):
466
    """Sets the child's process group.
467

    
468
    """
469
    assert self.pid, "Can't be called in child process"
470

    
471
    # Avoid race condition by setting child's process group (as good as
472
    # possible in Python) before sending signals to child. For an
473
    # explanation, see preexec function for child.
474
    try:
475
      os.setpgid(self.pid, self.pid)
476
    except EnvironmentError, err:
477
      # If the child process was faster we receive EPERM or EACCES
478
      if err.errno not in (errno.EPERM, errno.EACCES):
479
        raise
480

    
481
  def Kill(self, signum):
482
    """Sends signal to child process.
483

    
484
    """
485
    logging.info("Sending signal %s to child process", signum)
486
    utils.IgnoreProcessNotFound(os.killpg, self.pid, signum)
487

    
488
  def ForceQuit(self):
489
    """Ensure child process is no longer running.
490

    
491
    """
492
    # Final check if child process is still alive
493
    if utils.RetryOnSignal(self.poll) is None:
494
      logging.error("Child process still alive, sending SIGKILL")
495
      self.Kill(signal.SIGKILL)
496
      utils.RetryOnSignal(self.wait)
497

    
498

    
499
def main():
500
  """Main function.
501

    
502
  """
503
  # Option parsing
504
  (status_file_path, mode) = ParseOptions()
505

    
506
  # Configure logging
507
  child_logger = SetupLogging()
508

    
509
  status_file = StatusFile(status_file_path)
510
  try:
511
    try:
512
      # Pipe to receive socat's stderr output
513
      (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
514

    
515
      # Pipe to receive dd's stderr output
516
      (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe()
517

    
518
      # Pipe to receive dd's PID
519
      (dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
520

    
521
      # Pipe to receive size predicted by export script
522
      (exp_size_read_fd, exp_size_write_fd) = os.pipe()
523

    
524
      # Get child process command
525
      cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
526
                                           dd_stderr_write_fd, dd_pid_write_fd)
527
      cmd = cmd_builder.GetCommand()
528

    
529
      # Prepare command environment
530
      cmd_env = os.environ.copy()
531

    
532
      if options.exp_size == constants.IE_CUSTOM_SIZE:
533
        cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd)
534

    
535
      logging.debug("Starting command %r", cmd)
536

    
537
      # Start child process
538
      child = ChildProcess(cmd_env, cmd,
539
                           [socat_stderr_write_fd, dd_stderr_write_fd,
540
                            dd_pid_write_fd, exp_size_write_fd])
541
      try:
542
        def _ForwardSignal(signum, _):
543
          """Forwards signals to child process.
544

    
545
          """
546
          child.Kill(signum)
547

    
548
        signal_wakeup = utils.SignalWakeupFd()
549
        try:
550
          # TODO: There is a race condition between starting the child and
551
          # handling the signals here. While there might be a way to work around
552
          # it by registering the handlers before starting the child and
553
          # deferring sent signals until the child is available, doing so can be
554
          # complicated.
555
          signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
556
                                               handler_fn=_ForwardSignal,
557
                                               wakeup=signal_wakeup)
558
          try:
559
            # Close child's side
560
            utils.RetryOnSignal(os.close, socat_stderr_write_fd)
561
            utils.RetryOnSignal(os.close, dd_stderr_write_fd)
562
            utils.RetryOnSignal(os.close, dd_pid_write_fd)
563
            utils.RetryOnSignal(os.close, exp_size_write_fd)
564

    
565
            if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
566
                              dd_pid_read_fd, exp_size_read_fd,
567
                              status_file, child_logger,
568
                              signal_wakeup, signal_handler, mode):
569
              # The child closed all its file descriptors and there was no
570
              # signal
571
              # TODO: Implement timeout instead of waiting indefinitely
572
              utils.RetryOnSignal(child.wait)
573
          finally:
574
            signal_handler.Reset()
575
        finally:
576
          signal_wakeup.Reset()
577
      finally:
578
        child.ForceQuit()
579

    
580
      if child.returncode == 0:
581
        errmsg = None
582
      elif child.returncode < 0:
583
        errmsg = "Exited due to signal %s" % (-child.returncode, )
584
      else:
585
        errmsg = "Exited with status %s" % (child.returncode, )
586

    
587
      status_file.SetExitStatus(child.returncode, errmsg)
588
    except Exception, err: # pylint: disable-msg=W0703
589
      logging.exception("Unhandled error occurred")
590
      status_file.SetExitStatus(constants.EXIT_FAILURE,
591
                                "Unhandled error occurred: %s" % (err, ))
592

    
593
    if status_file.ExitStatusIsSuccess():
594
      sys.exit(constants.EXIT_SUCCESS)
595

    
596
    sys.exit(constants.EXIT_FAILURE)
597
  finally:
598
    status_file.Update(True)
599

    
600

    
601
if __name__ == "__main__":
602
  main()