Statistics
| Branch: | Tag: | Revision:

root / daemons / import-export @ 4fe04580

History | View | Annotate | Download (19.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Import/export daemon.
23

    
24
"""
25

    
26
# pylint: disable=C0103
27
# C0103: Invalid name import-export
28

    
29
import errno
30
import logging
31
import optparse
32
import os
33
import select
34
import signal
35
import subprocess
36
import sys
37
import time
38
import math
39

    
40
from ganeti import constants
41
from ganeti import cli
42
from ganeti import utils
43
from ganeti import errors
44
from ganeti import serializer
45
from ganeti import objects
46
from ganeti import impexpd
47
from ganeti import netutils
48

    
49

    
50
#: How many lines to keep in the status file
51
MAX_RECENT_OUTPUT_LINES = 20
52

    
53
#: Don't update status file more than once every 5 seconds (unless forced)
54
MIN_UPDATE_INTERVAL = 5.0
55

    
56
#: How long to wait for a connection to be established
57
DEFAULT_CONNECT_TIMEOUT = 60
58

    
59
#: Get dd(1) statistics every few seconds
60
DD_STATISTICS_INTERVAL = 5.0
61

    
62
#: Seconds for throughput calculation
63
DD_THROUGHPUT_INTERVAL = 60.0
64

    
65
#: Number of samples for throughput calculation
66
DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) /
67
                                      DD_STATISTICS_INTERVAL))
68

    
69

    
70
# Global variable for options
71
options = None
72

    
73

    
74
def SetupLogging():
75
  """Configures the logging module.
76

    
77
  """
78
  formatter = logging.Formatter("%(asctime)s: %(message)s")
79

    
80
  stderr_handler = logging.StreamHandler()
81
  stderr_handler.setFormatter(formatter)
82
  stderr_handler.setLevel(logging.NOTSET)
83

    
84
  root_logger = logging.getLogger("")
85
  root_logger.addHandler(stderr_handler)
86

    
87
  if options.debug:
88
    root_logger.setLevel(logging.NOTSET)
89
  elif options.verbose:
90
    root_logger.setLevel(logging.INFO)
91
  else:
92
    root_logger.setLevel(logging.ERROR)
93

    
94
  # Create special logger for child process output
95
  child_logger = logging.Logger("child output")
96
  child_logger.addHandler(stderr_handler)
97
  child_logger.setLevel(logging.NOTSET)
98

    
99
  return child_logger
100

    
101

    
102
class StatusFile:
103
  """Status file manager.
104

    
105
  """
106
  def __init__(self, path):
107
    """Initializes class.
108

    
109
    """
110
    self._path = path
111
    self._data = objects.ImportExportStatus(ctime=time.time(),
112
                                            mtime=None,
113
                                            recent_output=[])
114

    
115
  def AddRecentOutput(self, line):
116
    """Adds a new line of recent output.
117

    
118
    """
119
    self._data.recent_output.append(line)
120

    
121
    # Remove old lines
122
    del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
123

    
124
  def SetListenPort(self, port):
125
    """Sets the port the daemon is listening on.
126

    
127
    @type port: int
128
    @param port: TCP/UDP port
129

    
130
    """
131
    assert isinstance(port, (int, long)) and 0 < port < (2 ** 16)
132
    self._data.listen_port = port
133

    
134
  def GetListenPort(self):
135
    """Returns the port the daemon is listening on.
136

    
137
    """
138
    return self._data.listen_port
139

    
140
  def SetConnected(self):
141
    """Sets the connected flag.
142

    
143
    """
144
    self._data.connected = True
145

    
146
  def GetConnected(self):
147
    """Determines whether the daemon is connected.
148

    
149
    """
150
    return self._data.connected
151

    
152
  def SetProgress(self, mbytes, throughput, percent, eta):
153
    """Sets how much data has been transferred so far.
154

    
155
    @type mbytes: number
156
    @param mbytes: Transferred amount of data in MiB.
157
    @type throughput: float
158
    @param throughput: MiB/second
159
    @type percent: number
160
    @param percent: Percent processed
161
    @type eta: number
162
    @param eta: Expected number of seconds until done
163

    
164
    """
165
    self._data.progress_mbytes = mbytes
166
    self._data.progress_throughput = throughput
167
    self._data.progress_percent = percent
168
    self._data.progress_eta = eta
169

    
170
  def SetExitStatus(self, exit_status, error_message):
171
    """Sets the exit status and an error message.
172

    
173
    """
174
    # Require error message when status isn't 0
175
    assert exit_status == 0 or error_message
176

    
177
    self._data.exit_status = exit_status
178
    self._data.error_message = error_message
179

    
180
  def ExitStatusIsSuccess(self):
181
    """Returns whether the exit status means "success".
182

    
183
    """
184
    return not bool(self._data.error_message)
185

    
186
  def Update(self, force):
187
    """Updates the status file.
188

    
189
    @type force: bool
190
    @param force: Write status file in any case, not only when minimum interval
191
                  is expired
192

    
193
    """
194
    if not (force or
195
            self._data.mtime is None or
196
            time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
197
      return
198

    
199
    logging.debug("Updating status file %s", self._path)
200

    
201
    self._data.mtime = time.time()
202
    utils.WriteFile(self._path,
203
                    data=serializer.DumpJson(self._data.ToDict()),
204
                    mode=0400)
205

    
206

    
207
def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
208
                   dd_pid_read_fd, exp_size_read_fd, status_file, child_logger,
209
                   signal_notify, signal_handler, mode):
210
  """Handles the child processes' output.
211

    
212
  """
213
  assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
214
         "Other signals are not handled in this function"
215

    
216
  # Buffer size 0 is important, otherwise .read() with a specified length
217
  # might buffer data while poll(2) won't mark its file descriptor as
218
  # readable again.
219
  socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
220
  dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
221
  dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
222
  exp_size_read = os.fdopen(exp_size_read_fd, "r", 0)
223

    
224
  tp_samples = DD_THROUGHPUT_SAMPLES
225

    
226
  if options.exp_size == constants.IE_CUSTOM_SIZE:
227
    exp_size = None
228
  else:
229
    exp_size = options.exp_size
230

    
231
  child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
232
                                           child_logger, tp_samples,
233
                                           exp_size)
234
  try:
235
    fdmap = {
236
      child.stderr.fileno():
237
        (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)),
238
      socat_stderr_read.fileno():
239
        (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)),
240
      dd_pid_read.fileno():
241
        (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
242
      dd_stderr_read.fileno():
243
        (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
244
      exp_size_read.fileno():
245
        (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)),
246
      signal_notify.fileno(): (signal_notify, None),
247
      }
248

    
249
    poller = select.poll()
250
    for fd in fdmap:
251
      utils.SetNonblockFlag(fd, True)
252
      poller.register(fd, select.POLLIN)
253

    
254
    if options.connect_timeout and mode == constants.IEM_IMPORT:
255
      listen_timeout = utils.RunningTimeout(options.connect_timeout, True)
256
    else:
257
      listen_timeout = None
258

    
259
    exit_timeout = None
260
    dd_stats_timeout = None
261

    
262
    while True:
263
      # Break out of loop if only signal notify FD is left
264
      if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
265
        break
266

    
267
      timeout = None
268

    
269
      if listen_timeout and not exit_timeout:
270
        assert mode == constants.IEM_IMPORT and options.connect_timeout
271
        if status_file.GetConnected():
272
          listen_timeout = None
273
        elif listen_timeout.Remaining() < 0:
274
          errmsg = ("Child process didn't establish connection in time"
275
                    " (%0.0fs), sending SIGTERM" % options.connect_timeout)
276
          logging.error(errmsg)
277
          status_file.AddRecentOutput(errmsg)
278
          status_file.Update(True)
279

    
280
          child.Kill(signal.SIGTERM)
281
          exit_timeout = \
282
            utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True)
283
          # Next block will calculate timeout
284
        else:
285
          # Not yet connected, check again in a second
286
          timeout = 1000
287

    
288
      if exit_timeout:
289
        timeout = exit_timeout.Remaining() * 1000
290
        if timeout < 0:
291
          logging.info("Child process didn't exit in time")
292
          break
293

    
294
      if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0:
295
        notify_status = child_io_proc.NotifyDd()
296
        if notify_status:
297
          # Schedule next notification
298
          dd_stats_timeout = utils.RunningTimeout(DD_STATISTICS_INTERVAL, True)
299
        else:
300
          # Try again soon (dd isn't ready yet)
301
          dd_stats_timeout = utils.RunningTimeout(1.0, True)
302

    
303
      if dd_stats_timeout:
304
        dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000)
305

    
306
        if timeout is None:
307
          timeout = dd_timeout
308
        else:
309
          timeout = min(timeout, dd_timeout)
310

    
311
      for fd, event in utils.RetryOnSignal(poller.poll, timeout):
312
        if event & (select.POLLIN | event & select.POLLPRI):
313
          (from_, to) = fdmap[fd]
314

    
315
          # Read up to 1 KB of data
316
          data = from_.read(1024)
317
          if data:
318
            if to:
319
              to.write(data)
320
            elif fd == signal_notify.fileno():
321
              # Signal handling
322
              if signal_handler.called:
323
                signal_handler.Clear()
324
                if exit_timeout:
325
                  logging.info("Child process still has about %0.2f seconds"
326
                               " to exit", exit_timeout.Remaining())
327
                else:
328
                  logging.info("Giving child process %0.2f seconds to exit",
329
                               constants.CHILD_LINGER_TIMEOUT)
330
                  exit_timeout = \
331
                    utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True)
332
          else:
333
            poller.unregister(fd)
334
            del fdmap[fd]
335

    
336
        elif event & (select.POLLNVAL | select.POLLHUP |
337
                      select.POLLERR):
338
          poller.unregister(fd)
339
          del fdmap[fd]
340

    
341
      child_io_proc.FlushAll()
342

    
343
    # If there was a timeout calculator, we were waiting for the child to
344
    # finish, e.g. due to a signal
345
    return not bool(exit_timeout)
346
  finally:
347
    child_io_proc.CloseAll()
348

    
349

    
350
def ParseOptions():
351
  """Parses the options passed to the program.
352

    
353
  @return: Arguments to program
354

    
355
  """
356
  global options # pylint: disable=W0603
357

    
358
  parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
359
                                        (constants.IEM_IMPORT,
360
                                         constants.IEM_EXPORT)))
361
  parser.add_option(cli.DEBUG_OPT)
362
  parser.add_option(cli.VERBOSE_OPT)
363
  parser.add_option("--key", dest="key", action="store", type="string",
364
                    help="RSA key file")
365
  parser.add_option("--cert", dest="cert", action="store", type="string",
366
                    help="X509 certificate file")
367
  parser.add_option("--ca", dest="ca", action="store", type="string",
368
                    help="X509 CA file")
369
  parser.add_option("--bind", dest="bind", action="store", type="string",
370
                    help="Bind address")
371
  parser.add_option("--ipv4", dest="ipv4", action="store_true",
372
                    help="Use IPv4 only")
373
  parser.add_option("--ipv6", dest="ipv6", action="store_true",
374
                    help="Use IPv6 only")
375
  parser.add_option("--host", dest="host", action="store", type="string",
376
                    help="Remote hostname")
377
  parser.add_option("--port", dest="port", action="store", type="int",
378
                    help="Remote port")
379
  parser.add_option("--connect-retries", dest="connect_retries", action="store",
380
                    type="int", default=0,
381
                    help=("How many times the connection should be retried"
382
                          " (export only)"))
383
  parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
384
                    type="int", default=DEFAULT_CONNECT_TIMEOUT,
385
                    help="Timeout for connection to be established (seconds)")
386
  parser.add_option("--compress", dest="compress", action="store",
387
                    type="choice", help="Compression method",
388
                    metavar="[%s]" % "|".join(constants.IEC_ALL),
389
                    choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
390
  parser.add_option("--expected-size", dest="exp_size", action="store",
391
                    type="string", default=None,
392
                    help="Expected import/export size (MiB)")
393
  parser.add_option("--magic", dest="magic", action="store",
394
                    type="string", default=None, help="Magic string")
395
  parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
396
                    type="string", help="Command prefix")
397
  parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
398
                    type="string", help="Command suffix")
399

    
400
  (options, args) = parser.parse_args()
401

    
402
  if len(args) != 2:
403
    # Won't return
404
    parser.error("Expected exactly two arguments")
405

    
406
  (status_file_path, mode) = args
407

    
408
  if mode not in (constants.IEM_IMPORT,
409
                  constants.IEM_EXPORT):
410
    # Won't return
411
    parser.error("Invalid mode: %s" % mode)
412

    
413
  # Normalize and check parameters
414
  if options.host is not None and not netutils.IPAddress.IsValid(options.host):
415
    try:
416
      options.host = netutils.Hostname.GetNormalizedName(options.host)
417
    except errors.OpPrereqError, err:
418
      parser.error("Invalid hostname '%s': %s" % (options.host, err))
419

    
420
  if options.port is not None:
421
    options.port = utils.ValidateServiceName(options.port)
422

    
423
  if (options.exp_size is not None and
424
      options.exp_size != constants.IE_CUSTOM_SIZE):
425
    try:
426
      options.exp_size = int(options.exp_size)
427
    except (ValueError, TypeError), err:
428
      # Won't return
429
      parser.error("Invalid value for --expected-size: %s (%s)" %
430
                   (options.exp_size, err))
431

    
432
  if not (options.magic is None or constants.IE_MAGIC_RE.match(options.magic)):
433
    parser.error("Magic must match regular expression %s" %
434
                 constants.IE_MAGIC_RE.pattern)
435

    
436
  if options.ipv4 and options.ipv6:
437
    parser.error("Can only use one of --ipv4 and --ipv6")
438

    
439
  return (status_file_path, mode)
440

    
441

    
442
class ChildProcess(subprocess.Popen):
443
  def __init__(self, env, cmd, noclose_fds):
444
    """Initializes this class.
445

    
446
    """
447
    self._noclose_fds = noclose_fds
448

    
449
    # Not using close_fds because doing so would also close the socat stderr
450
    # pipe, which we still need.
451
    subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False,
452
                              stderr=subprocess.PIPE, stdout=None, stdin=None,
453
                              preexec_fn=self._ChildPreexec)
454
    self._SetProcessGroup()
455

    
456
  def _ChildPreexec(self):
457
    """Called before child executable is execve'd.
458

    
459
    """
460
    # Move to separate process group. By sending a signal to its process group
461
    # we can kill the child process and all grandchildren.
462
    os.setpgid(0, 0)
463

    
464
    # Close almost all file descriptors
465
    utils.CloseFDs(noclose_fds=self._noclose_fds)
466

    
467
  def _SetProcessGroup(self):
468
    """Sets the child's process group.
469

    
470
    """
471
    assert self.pid, "Can't be called in child process"
472

    
473
    # Avoid race condition by setting child's process group (as good as
474
    # possible in Python) before sending signals to child. For an
475
    # explanation, see preexec function for child.
476
    try:
477
      os.setpgid(self.pid, self.pid)
478
    except EnvironmentError, err:
479
      # If the child process was faster we receive EPERM or EACCES
480
      if err.errno not in (errno.EPERM, errno.EACCES):
481
        raise
482

    
483
  def Kill(self, signum):
484
    """Sends signal to child process.
485

    
486
    """
487
    logging.info("Sending signal %s to child process", signum)
488
    utils.IgnoreProcessNotFound(os.killpg, self.pid, signum)
489

    
490
  def ForceQuit(self):
491
    """Ensure child process is no longer running.
492

    
493
    """
494
    # Final check if child process is still alive
495
    if utils.RetryOnSignal(self.poll) is None:
496
      logging.error("Child process still alive, sending SIGKILL")
497
      self.Kill(signal.SIGKILL)
498
      utils.RetryOnSignal(self.wait)
499

    
500

    
501
def main():
502
  """Main function.
503

    
504
  """
505
  # Option parsing
506
  (status_file_path, mode) = ParseOptions()
507

    
508
  # Configure logging
509
  child_logger = SetupLogging()
510

    
511
  status_file = StatusFile(status_file_path)
512
  try:
513
    try:
514
      # Pipe to receive socat's stderr output
515
      (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
516

    
517
      # Pipe to receive dd's stderr output
518
      (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe()
519

    
520
      # Pipe to receive dd's PID
521
      (dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
522

    
523
      # Pipe to receive size predicted by export script
524
      (exp_size_read_fd, exp_size_write_fd) = os.pipe()
525

    
526
      # Get child process command
527
      cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
528
                                           dd_stderr_write_fd, dd_pid_write_fd)
529
      cmd = cmd_builder.GetCommand()
530

    
531
      # Prepare command environment
532
      cmd_env = os.environ.copy()
533

    
534
      if options.exp_size == constants.IE_CUSTOM_SIZE:
535
        cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd)
536

    
537
      logging.debug("Starting command %r", cmd)
538

    
539
      # Start child process
540
      child = ChildProcess(cmd_env, cmd,
541
                           [socat_stderr_write_fd, dd_stderr_write_fd,
542
                            dd_pid_write_fd, exp_size_write_fd])
543
      try:
544

    
545
        def _ForwardSignal(signum, _):
546
          """Forwards signals to child process.
547

    
548
          """
549
          child.Kill(signum)
550

    
551
        signal_wakeup = utils.SignalWakeupFd()
552
        try:
553
          # TODO: There is a race condition between starting the child and
554
          # handling the signals here. While there might be a way to work around
555
          # it by registering the handlers before starting the child and
556
          # deferring sent signals until the child is available, doing so can be
557
          # complicated.
558
          signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
559
                                               handler_fn=_ForwardSignal,
560
                                               wakeup=signal_wakeup)
561
          try:
562
            # Close child's side
563
            utils.RetryOnSignal(os.close, socat_stderr_write_fd)
564
            utils.RetryOnSignal(os.close, dd_stderr_write_fd)
565
            utils.RetryOnSignal(os.close, dd_pid_write_fd)
566
            utils.RetryOnSignal(os.close, exp_size_write_fd)
567

    
568
            if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
569
                              dd_pid_read_fd, exp_size_read_fd,
570
                              status_file, child_logger,
571
                              signal_wakeup, signal_handler, mode):
572
              # The child closed all its file descriptors and there was no
573
              # signal
574
              # TODO: Implement timeout instead of waiting indefinitely
575
              utils.RetryOnSignal(child.wait)
576
          finally:
577
            signal_handler.Reset()
578
        finally:
579
          signal_wakeup.Reset()
580
      finally:
581
        child.ForceQuit()
582

    
583
      if child.returncode == 0:
584
        errmsg = None
585
      elif child.returncode < 0:
586
        errmsg = "Exited due to signal %s" % (-child.returncode, )
587
      else:
588
        errmsg = "Exited with status %s" % (child.returncode, )
589

    
590
      status_file.SetExitStatus(child.returncode, errmsg)
591
    except Exception, err: # pylint: disable=W0703
592
      logging.exception("Unhandled error occurred")
593
      status_file.SetExitStatus(constants.EXIT_FAILURE,
594
                                "Unhandled error occurred: %s" % (err, ))
595

    
596
    if status_file.ExitStatusIsSuccess():
597
      sys.exit(constants.EXIT_SUCCESS)
598

    
599
    sys.exit(constants.EXIT_FAILURE)
600
  finally:
601
    status_file.Update(True)
602

    
603

    
604
if __name__ == "__main__":
605
  main()