Statistics
| Branch: | Tag: | Revision:

root / daemons / import-export @ 58bb385c

History | View | Annotate | Download (19.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Import/export daemon.
23

    
24
"""
25

    
26
# pylint: disable-msg=C0103
27
# C0103: Invalid name import-export
28

    
29
import errno
30
import logging
31
import optparse
32
import os
33
import select
34
import signal
35
import subprocess
36
import sys
37
import time
38
import math
39

    
40
from ganeti import constants
41
from ganeti import cli
42
from ganeti import utils
43
from ganeti import errors
44
from ganeti import serializer
45
from ganeti import objects
46
from ganeti import impexpd
47
from ganeti import netutils
48

    
49

    
50
#: How many lines to keep in the status file
51
MAX_RECENT_OUTPUT_LINES = 20
52

    
53
#: Don't update status file more than once every 5 seconds (unless forced)
54
MIN_UPDATE_INTERVAL = 5.0
55

    
56
#: How long to wait for a connection to be established
57
DEFAULT_CONNECT_TIMEOUT = 60
58

    
59
#: Get dd(1) statistics every few seconds
60
DD_STATISTICS_INTERVAL = 5.0
61

    
62
#: Seconds for throughput calculation
63
DD_THROUGHPUT_INTERVAL = 60.0
64

    
65
#: Number of samples for throughput calculation
66
DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) /
67
                                      DD_STATISTICS_INTERVAL))
68

    
69

    
70
# Global variable for options
71
options = None
72

    
73

    
74
def SetupLogging():
75
  """Configures the logging module.
76

    
77
  """
78
  formatter = logging.Formatter("%(asctime)s: %(message)s")
79

    
80
  stderr_handler = logging.StreamHandler()
81
  stderr_handler.setFormatter(formatter)
82
  stderr_handler.setLevel(logging.NOTSET)
83

    
84
  root_logger = logging.getLogger("")
85
  root_logger.addHandler(stderr_handler)
86

    
87
  if options.debug:
88
    root_logger.setLevel(logging.NOTSET)
89
  elif options.verbose:
90
    root_logger.setLevel(logging.INFO)
91
  else:
92
    root_logger.setLevel(logging.ERROR)
93

    
94
  # Create special logger for child process output
95
  child_logger = logging.Logger("child output")
96
  child_logger.addHandler(stderr_handler)
97
  child_logger.setLevel(logging.NOTSET)
98

    
99
  return child_logger
100

    
101

    
102
class StatusFile:
103
  """Status file manager.
104

    
105
  """
106
  def __init__(self, path):
107
    """Initializes class.
108

    
109
    """
110
    self._path = path
111
    self._data = objects.ImportExportStatus(ctime=time.time(),
112
                                            mtime=None,
113
                                            recent_output=[])
114

    
115
  def AddRecentOutput(self, line):
116
    """Adds a new line of recent output.
117

    
118
    """
119
    self._data.recent_output.append(line)
120

    
121
    # Remove old lines
122
    del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
123

    
124
  def SetListenPort(self, port):
125
    """Sets the port the daemon is listening on.
126

    
127
    @type port: int
128
    @param port: TCP/UDP port
129

    
130
    """
131
    assert isinstance(port, (int, long)) and 0 < port < 2**16
132
    self._data.listen_port = port
133

    
134
  def GetListenPort(self):
135
    """Returns the port the daemon is listening on.
136

    
137
    """
138
    return self._data.listen_port
139

    
140
  def SetConnected(self):
141
    """Sets the connected flag.
142

    
143
    """
144
    self._data.connected = True
145

    
146
  def GetConnected(self):
147
    """Determines whether the daemon is connected.
148

    
149
    """
150
    return self._data.connected
151

    
152
  def SetProgress(self, mbytes, throughput, percent, eta):
153
    """Sets how much data has been transferred so far.
154

    
155
    @type mbytes: number
156
    @param mbytes: Transferred amount of data in MiB.
157
    @type throughput: float
158
    @param throughput: MiB/second
159
    @type percent: number
160
    @param percent: Percent processed
161
    @type eta: number
162
    @param eta: Expected number of seconds until done
163

    
164
    """
165
    self._data.progress_mbytes = mbytes
166
    self._data.progress_throughput = throughput
167
    self._data.progress_percent = percent
168
    self._data.progress_eta = eta
169

    
170
  def SetExitStatus(self, exit_status, error_message):
171
    """Sets the exit status and an error message.
172

    
173
    """
174
    # Require error message when status isn't 0
175
    assert exit_status == 0 or error_message
176

    
177
    self._data.exit_status = exit_status
178
    self._data.error_message = error_message
179

    
180
  def ExitStatusIsSuccess(self):
181
    """Returns whether the exit status means "success".
182

    
183
    """
184
    return not bool(self._data.error_message)
185

    
186
  def Update(self, force):
187
    """Updates the status file.
188

    
189
    @type force: bool
190
    @param force: Write status file in any case, not only when minimum interval
191
                  is expired
192

    
193
    """
194
    if not (force or
195
            self._data.mtime is None or
196
            time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
197
      return
198

    
199
    logging.debug("Updating status file %s", self._path)
200

    
201
    self._data.mtime = time.time()
202
    utils.WriteFile(self._path,
203
                    data=serializer.DumpJson(self._data.ToDict(), indent=True),
204
                    mode=0400)
205

    
206

    
207
def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
208
                   dd_pid_read_fd, exp_size_read_fd, status_file, child_logger,
209
                   signal_notify, signal_handler, mode):
210
  """Handles the child processes' output.
211

    
212
  """
213
  assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
214
         "Other signals are not handled in this function"
215

    
216
  # Buffer size 0 is important, otherwise .read() with a specified length
217
  # might buffer data while poll(2) won't mark its file descriptor as
218
  # readable again.
219
  socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
220
  dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
221
  dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
222
  exp_size_read = os.fdopen(exp_size_read_fd, "r", 0)
223

    
224
  tp_samples = DD_THROUGHPUT_SAMPLES
225

    
226
  if options.exp_size == constants.IE_CUSTOM_SIZE:
227
    exp_size = None
228
  else:
229
    exp_size = options.exp_size
230

    
231
  child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
232
                                           child_logger, tp_samples,
233
                                           exp_size)
234
  try:
235
    fdmap = {
236
      child.stderr.fileno():
237
        (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)),
238
      socat_stderr_read.fileno():
239
        (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)),
240
      dd_pid_read.fileno():
241
        (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
242
      dd_stderr_read.fileno():
243
        (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
244
      exp_size_read.fileno():
245
        (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)),
246
      signal_notify.fileno(): (signal_notify, None),
247
      }
248

    
249
    poller = select.poll()
250
    for fd in fdmap:
251
      utils.SetNonblockFlag(fd, True)
252
      poller.register(fd, select.POLLIN)
253

    
254
    if options.connect_timeout and mode == constants.IEM_IMPORT:
255
      listen_timeout = utils.RunningTimeout(options.connect_timeout, True)
256
    else:
257
      listen_timeout = None
258

    
259
    exit_timeout = None
260
    dd_stats_timeout = None
261

    
262
    while True:
263
      # Break out of loop if only signal notify FD is left
264
      if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
265
        break
266

    
267
      timeout = None
268

    
269
      if listen_timeout and not exit_timeout:
270
        if status_file.GetConnected():
271
          listen_timeout = None
272
        elif listen_timeout.Remaining() < 0:
273
          logging.info("Child process didn't establish connection in time")
274
          child.Kill(signal.SIGTERM)
275
          exit_timeout = \
276
            utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True)
277
          # Next block will calculate timeout
278
        else:
279
          # Not yet connected, check again in a second
280
          timeout = 1000
281

    
282
      if exit_timeout:
283
        timeout = exit_timeout.Remaining() * 1000
284
        if timeout < 0:
285
          logging.info("Child process didn't exit in time")
286
          break
287

    
288
      if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0:
289
        notify_status = child_io_proc.NotifyDd()
290
        if notify_status:
291
          # Schedule next notification
292
          dd_stats_timeout = utils.RunningTimeout(DD_STATISTICS_INTERVAL, True)
293
        else:
294
          # Try again soon (dd isn't ready yet)
295
          dd_stats_timeout = utils.RunningTimeout(1.0, True)
296

    
297
      if dd_stats_timeout:
298
        dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000)
299

    
300
        if timeout is None:
301
          timeout = dd_timeout
302
        else:
303
          timeout = min(timeout, dd_timeout)
304

    
305
      for fd, event in utils.RetryOnSignal(poller.poll, timeout):
306
        if event & (select.POLLIN | event & select.POLLPRI):
307
          (from_, to) = fdmap[fd]
308

    
309
          # Read up to 1 KB of data
310
          data = from_.read(1024)
311
          if data:
312
            if to:
313
              to.write(data)
314
            elif fd == signal_notify.fileno():
315
              # Signal handling
316
              if signal_handler.called:
317
                signal_handler.Clear()
318
                if exit_timeout:
319
                  logging.info("Child process still has about %0.2f seconds"
320
                               " to exit", exit_timeout.Remaining())
321
                else:
322
                  logging.info("Giving child process %0.2f seconds to exit",
323
                               constants.CHILD_LINGER_TIMEOUT)
324
                  exit_timeout = \
325
                    utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True)
326
          else:
327
            poller.unregister(fd)
328
            del fdmap[fd]
329

    
330
        elif event & (select.POLLNVAL | select.POLLHUP |
331
                      select.POLLERR):
332
          poller.unregister(fd)
333
          del fdmap[fd]
334

    
335
      child_io_proc.FlushAll()
336

    
337
    # If there was a timeout calculator, we were waiting for the child to
338
    # finish, e.g. due to a signal
339
    return not bool(exit_timeout)
340
  finally:
341
    child_io_proc.CloseAll()
342

    
343

    
344
def ParseOptions():
345
  """Parses the options passed to the program.
346

    
347
  @return: Arguments to program
348

    
349
  """
350
  global options # pylint: disable-msg=W0603
351

    
352
  parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
353
                                        (constants.IEM_IMPORT,
354
                                         constants.IEM_EXPORT)))
355
  parser.add_option(cli.DEBUG_OPT)
356
  parser.add_option(cli.VERBOSE_OPT)
357
  parser.add_option("--key", dest="key", action="store", type="string",
358
                    help="RSA key file")
359
  parser.add_option("--cert", dest="cert", action="store", type="string",
360
                    help="X509 certificate file")
361
  parser.add_option("--ca", dest="ca", action="store", type="string",
362
                    help="X509 CA file")
363
  parser.add_option("--bind", dest="bind", action="store", type="string",
364
                    help="Bind address")
365
  parser.add_option("--ipv4", dest="ipv4", action="store_true",
366
                    help="Use IPv4 only")
367
  parser.add_option("--ipv6", dest="ipv6", action="store_true",
368
                    help="Use IPv6 only")
369
  parser.add_option("--host", dest="host", action="store", type="string",
370
                    help="Remote hostname")
371
  parser.add_option("--port", dest="port", action="store", type="int",
372
                    help="Remote port")
373
  parser.add_option("--connect-retries", dest="connect_retries", action="store",
374
                    type="int", default=0,
375
                    help=("How many times the connection should be retried"
376
                          " (export only)"))
377
  parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
378
                    type="int", default=DEFAULT_CONNECT_TIMEOUT,
379
                    help="Timeout for connection to be established (seconds)")
380
  parser.add_option("--compress", dest="compress", action="store",
381
                    type="choice", help="Compression method",
382
                    metavar="[%s]" % "|".join(constants.IEC_ALL),
383
                    choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
384
  parser.add_option("--expected-size", dest="exp_size", action="store",
385
                    type="string", default=None,
386
                    help="Expected import/export size (MiB)")
387
  parser.add_option("--magic", dest="magic", action="store",
388
                    type="string", default=None, help="Magic string")
389
  parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
390
                    type="string", help="Command prefix")
391
  parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
392
                    type="string", help="Command suffix")
393

    
394
  (options, args) = parser.parse_args()
395

    
396
  if len(args) != 2:
397
    # Won't return
398
    parser.error("Expected exactly two arguments")
399

    
400
  (status_file_path, mode) = args
401

    
402
  if mode not in (constants.IEM_IMPORT,
403
                  constants.IEM_EXPORT):
404
    # Won't return
405
    parser.error("Invalid mode: %s" % mode)
406

    
407
  # Normalize and check parameters
408
  if options.host is not None and not netutils.IPAddress.IsValid(options.host):
409
    try:
410
      options.host = netutils.Hostname.GetNormalizedName(options.host)
411
    except errors.OpPrereqError, err:
412
      parser.error("Invalid hostname '%s': %s" % (options.host, err))
413

    
414
  if options.port is not None:
415
    options.port = utils.ValidateServiceName(options.port)
416

    
417
  if (options.exp_size is not None and
418
      options.exp_size != constants.IE_CUSTOM_SIZE):
419
    try:
420
      options.exp_size = int(options.exp_size)
421
    except (ValueError, TypeError), err:
422
      # Won't return
423
      parser.error("Invalid value for --expected-size: %s (%s)" %
424
                   (options.exp_size, err))
425

    
426
  if not (options.magic is None or constants.IE_MAGIC_RE.match(options.magic)):
427
    parser.error("Magic must match regular expression %s" %
428
                 constants.IE_MAGIC_RE.pattern)
429

    
430
  if options.ipv4 and options.ipv6:
431
    parser.error("Can only use one of --ipv4 and --ipv6")
432

    
433
  return (status_file_path, mode)
434

    
435

    
436
class ChildProcess(subprocess.Popen):
437
  def __init__(self, env, cmd, noclose_fds):
438
    """Initializes this class.
439

    
440
    """
441
    self._noclose_fds = noclose_fds
442

    
443
    # Not using close_fds because doing so would also close the socat stderr
444
    # pipe, which we still need.
445
    subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False,
446
                              stderr=subprocess.PIPE, stdout=None, stdin=None,
447
                              preexec_fn=self._ChildPreexec)
448
    self._SetProcessGroup()
449

    
450
  def _ChildPreexec(self):
451
    """Called before child executable is execve'd.
452

    
453
    """
454
    # Move to separate process group. By sending a signal to its process group
455
    # we can kill the child process and all grandchildren.
456
    os.setpgid(0, 0)
457

    
458
    # Close almost all file descriptors
459
    utils.CloseFDs(noclose_fds=self._noclose_fds)
460

    
461
  def _SetProcessGroup(self):
462
    """Sets the child's process group.
463

    
464
    """
465
    assert self.pid, "Can't be called in child process"
466

    
467
    # Avoid race condition by setting child's process group (as good as
468
    # possible in Python) before sending signals to child. For an
469
    # explanation, see preexec function for child.
470
    try:
471
      os.setpgid(self.pid, self.pid)
472
    except EnvironmentError, err:
473
      # If the child process was faster we receive EPERM or EACCES
474
      if err.errno not in (errno.EPERM, errno.EACCES):
475
        raise
476

    
477
  def Kill(self, signum):
478
    """Sends signal to child process.
479

    
480
    """
481
    logging.info("Sending signal %s to child process", signum)
482
    utils.IgnoreProcessNotFound(os.killpg, self.pid, signum)
483

    
484
  def ForceQuit(self):
485
    """Ensure child process is no longer running.
486

    
487
    """
488
    # Final check if child process is still alive
489
    if utils.RetryOnSignal(self.poll) is None:
490
      logging.error("Child process still alive, sending SIGKILL")
491
      self.Kill(signal.SIGKILL)
492
      utils.RetryOnSignal(self.wait)
493

    
494

    
495
def main():
496
  """Main function.
497

    
498
  """
499
  # Option parsing
500
  (status_file_path, mode) = ParseOptions()
501

    
502
  # Configure logging
503
  child_logger = SetupLogging()
504

    
505
  status_file = StatusFile(status_file_path)
506
  try:
507
    try:
508
      # Pipe to receive socat's stderr output
509
      (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
510

    
511
      # Pipe to receive dd's stderr output
512
      (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe()
513

    
514
      # Pipe to receive dd's PID
515
      (dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
516

    
517
      # Pipe to receive size predicted by export script
518
      (exp_size_read_fd, exp_size_write_fd) = os.pipe()
519

    
520
      # Get child process command
521
      cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
522
                                           dd_stderr_write_fd, dd_pid_write_fd)
523
      cmd = cmd_builder.GetCommand()
524

    
525
      # Prepare command environment
526
      cmd_env = os.environ.copy()
527

    
528
      if options.exp_size == constants.IE_CUSTOM_SIZE:
529
        cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd)
530

    
531
      logging.debug("Starting command %r", cmd)
532

    
533
      # Start child process
534
      child = ChildProcess(cmd_env, cmd,
535
                           [socat_stderr_write_fd, dd_stderr_write_fd,
536
                            dd_pid_write_fd, exp_size_write_fd])
537
      try:
538
        def _ForwardSignal(signum, _):
539
          """Forwards signals to child process.
540

    
541
          """
542
          child.Kill(signum)
543

    
544
        signal_wakeup = utils.SignalWakeupFd()
545
        try:
546
          # TODO: There is a race condition between starting the child and
547
          # handling the signals here. While there might be a way to work around
548
          # it by registering the handlers before starting the child and
549
          # deferring sent signals until the child is available, doing so can be
550
          # complicated.
551
          signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
552
                                               handler_fn=_ForwardSignal,
553
                                               wakeup=signal_wakeup)
554
          try:
555
            # Close child's side
556
            utils.RetryOnSignal(os.close, socat_stderr_write_fd)
557
            utils.RetryOnSignal(os.close, dd_stderr_write_fd)
558
            utils.RetryOnSignal(os.close, dd_pid_write_fd)
559
            utils.RetryOnSignal(os.close, exp_size_write_fd)
560

    
561
            if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
562
                              dd_pid_read_fd, exp_size_read_fd,
563
                              status_file, child_logger,
564
                              signal_wakeup, signal_handler, mode):
565
              # The child closed all its file descriptors and there was no
566
              # signal
567
              # TODO: Implement timeout instead of waiting indefinitely
568
              utils.RetryOnSignal(child.wait)
569
          finally:
570
            signal_handler.Reset()
571
        finally:
572
          signal_wakeup.Reset()
573
      finally:
574
        child.ForceQuit()
575

    
576
      if child.returncode == 0:
577
        errmsg = None
578
      elif child.returncode < 0:
579
        errmsg = "Exited due to signal %s" % (-child.returncode, )
580
      else:
581
        errmsg = "Exited with status %s" % (child.returncode, )
582

    
583
      status_file.SetExitStatus(child.returncode, errmsg)
584
    except Exception, err: # pylint: disable-msg=W0703
585
      logging.exception("Unhandled error occurred")
586
      status_file.SetExitStatus(constants.EXIT_FAILURE,
587
                                "Unhandled error occurred: %s" % (err, ))
588

    
589
    if status_file.ExitStatusIsSuccess():
590
      sys.exit(constants.EXIT_SUCCESS)
591

    
592
    sys.exit(constants.EXIT_FAILURE)
593
  finally:
594
    status_file.Update(True)
595

    
596

    
597
if __name__ == "__main__":
598
  main()