Statistics
| Branch: | Tag: | Revision:

root / daemons / import-export @ c74cda62

History | View | Annotate | Download (19 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Import/export daemon.
23

    
24
"""
25

    
26
# pylint: disable-msg=C0103
27
# C0103: Invalid name import-export
28

    
29
import errno
30
import logging
31
import optparse
32
import os
33
import select
34
import signal
35
import subprocess
36
import sys
37
import time
38
import math
39

    
40
from ganeti import constants
41
from ganeti import cli
42
from ganeti import utils
43
from ganeti import errors
44
from ganeti import serializer
45
from ganeti import objects
46
from ganeti import impexpd
47
from ganeti import netutils
48

    
49

    
50
#: How many lines to keep in the status file
51
MAX_RECENT_OUTPUT_LINES = 20
52

    
53
#: Don't update status file more than once every 5 seconds (unless forced)
54
MIN_UPDATE_INTERVAL = 5.0
55

    
56
#: How long to wait for a connection to be established
57
DEFAULT_CONNECT_TIMEOUT = 60
58

    
59
#: Get dd(1) statistics every few seconds
60
DD_STATISTICS_INTERVAL = 5.0
61

    
62
#: Seconds for throughput calculation
63
DD_THROUGHPUT_INTERVAL = 60.0
64

    
65
#: Number of samples for throughput calculation
66
DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) /
67
                                      DD_STATISTICS_INTERVAL))
68

    
69

    
70
# Global variable for options
71
options = None
72

    
73

    
74
def SetupLogging():
75
  """Configures the logging module.
76

    
77
  """
78
  formatter = logging.Formatter("%(asctime)s: %(message)s")
79

    
80
  stderr_handler = logging.StreamHandler()
81
  stderr_handler.setFormatter(formatter)
82
  stderr_handler.setLevel(logging.NOTSET)
83

    
84
  root_logger = logging.getLogger("")
85
  root_logger.addHandler(stderr_handler)
86

    
87
  if options.debug:
88
    root_logger.setLevel(logging.NOTSET)
89
  elif options.verbose:
90
    root_logger.setLevel(logging.INFO)
91
  else:
92
    root_logger.setLevel(logging.ERROR)
93

    
94
  # Create special logger for child process output
95
  child_logger = logging.Logger("child output")
96
  child_logger.addHandler(stderr_handler)
97
  child_logger.setLevel(logging.NOTSET)
98

    
99
  return child_logger
100

    
101

    
102
class StatusFile:
103
  """Status file manager.
104

    
105
  """
106
  def __init__(self, path):
107
    """Initializes class.
108

    
109
    """
110
    self._path = path
111
    self._data = objects.ImportExportStatus(ctime=time.time(),
112
                                            mtime=None,
113
                                            recent_output=[])
114

    
115
  def AddRecentOutput(self, line):
116
    """Adds a new line of recent output.
117

    
118
    """
119
    self._data.recent_output.append(line)
120

    
121
    # Remove old lines
122
    del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
123

    
124
  def SetListenPort(self, port):
125
    """Sets the port the daemon is listening on.
126

    
127
    @type port: int
128
    @param port: TCP/UDP port
129

    
130
    """
131
    assert isinstance(port, (int, long)) and 0 < port < 2**16
132
    self._data.listen_port = port
133

    
134
  def GetListenPort(self):
135
    """Returns the port the daemon is listening on.
136

    
137
    """
138
    return self._data.listen_port
139

    
140
  def SetConnected(self):
141
    """Sets the connected flag.
142

    
143
    """
144
    self._data.connected = True
145

    
146
  def GetConnected(self):
147
    """Determines whether the daemon is connected.
148

    
149
    """
150
    return self._data.connected
151

    
152
  def SetProgress(self, mbytes, throughput, percent, eta):
153
    """Sets how much data has been transferred so far.
154

    
155
    @type mbytes: number
156
    @param mbytes: Transferred amount of data in MiB.
157
    @type throughput: float
158
    @param throughput: MiB/second
159
    @type percent: number
160
    @param percent: Percent processed
161
    @type eta: number
162
    @param eta: Expected number of seconds until done
163

    
164
    """
165
    self._data.progress_mbytes = mbytes
166
    self._data.progress_throughput = throughput
167
    self._data.progress_percent = percent
168
    self._data.progress_eta = eta
169

    
170
  def SetExitStatus(self, exit_status, error_message):
171
    """Sets the exit status and an error message.
172

    
173
    """
174
    # Require error message when status isn't 0
175
    assert exit_status == 0 or error_message
176

    
177
    self._data.exit_status = exit_status
178
    self._data.error_message = error_message
179

    
180
  def ExitStatusIsSuccess(self):
181
    """Returns whether the exit status means "success".
182

    
183
    """
184
    return not bool(self._data.error_message)
185

    
186
  def Update(self, force):
187
    """Updates the status file.
188

    
189
    @type force: bool
190
    @param force: Write status file in any case, not only when minimum interval
191
                  is expired
192

    
193
    """
194
    if not (force or
195
            self._data.mtime is None or
196
            time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
197
      return
198

    
199
    logging.debug("Updating status file %s", self._path)
200

    
201
    self._data.mtime = time.time()
202
    utils.WriteFile(self._path,
203
                    data=serializer.DumpJson(self._data.ToDict(), indent=True),
204
                    mode=0400)
205

    
206

    
207
def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
208
                   dd_pid_read_fd, exp_size_read_fd, status_file, child_logger,
209
                   signal_notify, signal_handler, mode):
210
  """Handles the child processes' output.
211

    
212
  """
213
  assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
214
         "Other signals are not handled in this function"
215

    
216
  # Buffer size 0 is important, otherwise .read() with a specified length
217
  # might buffer data while poll(2) won't mark its file descriptor as
218
  # readable again.
219
  socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
220
  dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
221
  dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
222
  exp_size_read = os.fdopen(exp_size_read_fd, "r", 0)
223

    
224
  tp_samples = DD_THROUGHPUT_SAMPLES
225

    
226
  if options.exp_size == constants.IE_CUSTOM_SIZE:
227
    exp_size = None
228
  else:
229
    exp_size = options.exp_size
230

    
231
  child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
232
                                           child_logger, tp_samples,
233
                                           exp_size)
234
  try:
235
    fdmap = {
236
      child.stderr.fileno():
237
        (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)),
238
      socat_stderr_read.fileno():
239
        (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)),
240
      dd_pid_read.fileno():
241
        (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
242
      dd_stderr_read.fileno():
243
        (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
244
      exp_size_read.fileno():
245
        (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)),
246
      signal_notify.fileno(): (signal_notify, None),
247
      }
248

    
249
    poller = select.poll()
250
    for fd in fdmap:
251
      utils.SetNonblockFlag(fd, True)
252
      poller.register(fd, select.POLLIN)
253

    
254
    if options.connect_timeout and mode == constants.IEM_IMPORT:
255
      listen_timeout = utils.RunningTimeout(options.connect_timeout, True)
256
    else:
257
      listen_timeout = None
258

    
259
    exit_timeout = None
260
    dd_stats_timeout = None
261

    
262
    while True:
263
      # Break out of loop if only signal notify FD is left
264
      if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
265
        break
266

    
267
      timeout = None
268

    
269
      if listen_timeout and not exit_timeout:
270
        if status_file.GetConnected():
271
          listen_timeout = None
272
        elif listen_timeout.Remaining() < 0:
273
          logging.info("Child process didn't establish connection in time")
274
          child.Kill(signal.SIGTERM)
275
          exit_timeout = \
276
            utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True)
277
          # Next block will calculate timeout
278
        else:
279
          # Not yet connected, check again in a second
280
          timeout = 1000
281

    
282
      if exit_timeout:
283
        timeout = exit_timeout.Remaining() * 1000
284
        if timeout < 0:
285
          logging.info("Child process didn't exit in time")
286
          break
287

    
288
      if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0:
289
        notify_status = child_io_proc.NotifyDd()
290
        if notify_status:
291
          # Schedule next notification
292
          dd_stats_timeout = utils.RunningTimeout(DD_STATISTICS_INTERVAL, True)
293
        else:
294
          # Try again soon (dd isn't ready yet)
295
          dd_stats_timeout = utils.RunningTimeout(1.0, True)
296

    
297
      if dd_stats_timeout:
298
        dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000)
299

    
300
        if timeout is None:
301
          timeout = dd_timeout
302
        else:
303
          timeout = min(timeout, dd_timeout)
304

    
305
      for fd, event in utils.RetryOnSignal(poller.poll, timeout):
306
        if event & (select.POLLIN | event & select.POLLPRI):
307
          (from_, to) = fdmap[fd]
308

    
309
          # Read up to 1 KB of data
310
          data = from_.read(1024)
311
          if data:
312
            if to:
313
              to.write(data)
314
            elif fd == signal_notify.fileno():
315
              # Signal handling
316
              if signal_handler.called:
317
                signal_handler.Clear()
318
                if exit_timeout:
319
                  logging.info("Child process still has about %0.2f seconds"
320
                               " to exit", exit_timeout.Remaining())
321
                else:
322
                  logging.info("Giving child process %0.2f seconds to exit",
323
                               constants.CHILD_LINGER_TIMEOUT)
324
                  exit_timeout = \
325
                    utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True)
326
          else:
327
            poller.unregister(fd)
328
            del fdmap[fd]
329

    
330
        elif event & (select.POLLNVAL | select.POLLHUP |
331
                      select.POLLERR):
332
          poller.unregister(fd)
333
          del fdmap[fd]
334

    
335
      child_io_proc.FlushAll()
336

    
337
    # If there was a timeout calculator, we were waiting for the child to
338
    # finish, e.g. due to a signal
339
    return not bool(exit_timeout)
340
  finally:
341
    child_io_proc.CloseAll()
342

    
343

    
344
def ParseOptions():
345
  """Parses the options passed to the program.
346

    
347
  @return: Arguments to program
348

    
349
  """
350
  global options # pylint: disable-msg=W0603
351

    
352
  parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
353
                                        (constants.IEM_IMPORT,
354
                                         constants.IEM_EXPORT)))
355
  parser.add_option(cli.DEBUG_OPT)
356
  parser.add_option(cli.VERBOSE_OPT)
357
  parser.add_option("--key", dest="key", action="store", type="string",
358
                    help="RSA key file")
359
  parser.add_option("--cert", dest="cert", action="store", type="string",
360
                    help="X509 certificate file")
361
  parser.add_option("--ca", dest="ca", action="store", type="string",
362
                    help="X509 CA file")
363
  parser.add_option("--bind", dest="bind", action="store", type="string",
364
                    help="Bind address")
365
  parser.add_option("--host", dest="host", action="store", type="string",
366
                    help="Remote hostname")
367
  parser.add_option("--port", dest="port", action="store", type="int",
368
                    help="Remote port")
369
  parser.add_option("--connect-retries", dest="connect_retries", action="store",
370
                    type="int", default=0,
371
                    help=("How many times the connection should be retried"
372
                          " (export only)"))
373
  parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
374
                    type="int", default=DEFAULT_CONNECT_TIMEOUT,
375
                    help="Timeout for connection to be established (seconds)")
376
  parser.add_option("--compress", dest="compress", action="store",
377
                    type="choice", help="Compression method",
378
                    metavar="[%s]" % "|".join(constants.IEC_ALL),
379
                    choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
380
  parser.add_option("--expected-size", dest="exp_size", action="store",
381
                    type="string", default=None,
382
                    help="Expected import/export size (MiB)")
383
  parser.add_option("--magic", dest="magic", action="store",
384
                    type="string", default=None, help="Magic string")
385
  parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
386
                    type="string", help="Command prefix")
387
  parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
388
                    type="string", help="Command suffix")
389

    
390
  (options, args) = parser.parse_args()
391

    
392
  if len(args) != 2:
393
    # Won't return
394
    parser.error("Expected exactly two arguments")
395

    
396
  (status_file_path, mode) = args
397

    
398
  if mode not in (constants.IEM_IMPORT,
399
                  constants.IEM_EXPORT):
400
    # Won't return
401
    parser.error("Invalid mode: %s" % mode)
402

    
403
  # Normalize and check parameters
404
  if options.host is not None:
405
    try:
406
      options.host = netutils.Hostname.GetNormalizedName(options.host)
407
    except errors.OpPrereqError, err:
408
      parser.error("Invalid hostname '%s': %s" % (options.host, err))
409

    
410
  if options.port is not None:
411
    options.port = utils.ValidateServiceName(options.port)
412

    
413
  if (options.exp_size is not None and
414
      options.exp_size != constants.IE_CUSTOM_SIZE):
415
    try:
416
      options.exp_size = int(options.exp_size)
417
    except (ValueError, TypeError), err:
418
      # Won't return
419
      parser.error("Invalid value for --expected-size: %s (%s)" %
420
                   (options.exp_size, err))
421

    
422
  if not (options.magic is None or constants.IE_MAGIC_RE.match(options.magic)):
423
    parser.error("Magic must match regular expression %s" %
424
                 constants.IE_MAGIC_RE.pattern)
425

    
426
  return (status_file_path, mode)
427

    
428

    
429
class ChildProcess(subprocess.Popen):
430
  def __init__(self, env, cmd, noclose_fds):
431
    """Initializes this class.
432

    
433
    """
434
    self._noclose_fds = noclose_fds
435

    
436
    # Not using close_fds because doing so would also close the socat stderr
437
    # pipe, which we still need.
438
    subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False,
439
                              stderr=subprocess.PIPE, stdout=None, stdin=None,
440
                              preexec_fn=self._ChildPreexec)
441
    self._SetProcessGroup()
442

    
443
  def _ChildPreexec(self):
444
    """Called before child executable is execve'd.
445

    
446
    """
447
    # Move to separate process group. By sending a signal to its process group
448
    # we can kill the child process and all grandchildren.
449
    os.setpgid(0, 0)
450

    
451
    # Close almost all file descriptors
452
    utils.CloseFDs(noclose_fds=self._noclose_fds)
453

    
454
  def _SetProcessGroup(self):
455
    """Sets the child's process group.
456

    
457
    """
458
    assert self.pid, "Can't be called in child process"
459

    
460
    # Avoid race condition by setting child's process group (as good as
461
    # possible in Python) before sending signals to child. For an
462
    # explanation, see preexec function for child.
463
    try:
464
      os.setpgid(self.pid, self.pid)
465
    except EnvironmentError, err:
466
      # If the child process was faster we receive EPERM or EACCES
467
      if err.errno not in (errno.EPERM, errno.EACCES):
468
        raise
469

    
470
  def Kill(self, signum):
471
    """Sends signal to child process.
472

    
473
    """
474
    logging.info("Sending signal %s to child process", signum)
475
    utils.IgnoreProcessNotFound(os.killpg, self.pid, signum)
476

    
477
  def ForceQuit(self):
478
    """Ensure child process is no longer running.
479

    
480
    """
481
    # Final check if child process is still alive
482
    if utils.RetryOnSignal(self.poll) is None:
483
      logging.error("Child process still alive, sending SIGKILL")
484
      self.Kill(signal.SIGKILL)
485
      utils.RetryOnSignal(self.wait)
486

    
487

    
488
def main():
489
  """Main function.
490

    
491
  """
492
  # Option parsing
493
  (status_file_path, mode) = ParseOptions()
494

    
495
  # Configure logging
496
  child_logger = SetupLogging()
497

    
498
  status_file = StatusFile(status_file_path)
499
  try:
500
    try:
501
      # Pipe to receive socat's stderr output
502
      (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
503

    
504
      # Pipe to receive dd's stderr output
505
      (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe()
506

    
507
      # Pipe to receive dd's PID
508
      (dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
509

    
510
      # Pipe to receive size predicted by export script
511
      (exp_size_read_fd, exp_size_write_fd) = os.pipe()
512

    
513
      # Get child process command
514
      cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
515
                                           dd_stderr_write_fd, dd_pid_write_fd)
516
      cmd = cmd_builder.GetCommand()
517

    
518
      # Prepare command environment
519
      cmd_env = os.environ.copy()
520

    
521
      if options.exp_size == constants.IE_CUSTOM_SIZE:
522
        cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd)
523

    
524
      logging.debug("Starting command %r", cmd)
525

    
526
      # Start child process
527
      child = ChildProcess(cmd_env, cmd,
528
                           [socat_stderr_write_fd, dd_stderr_write_fd,
529
                            dd_pid_write_fd, exp_size_write_fd])
530
      try:
531
        def _ForwardSignal(signum, _):
532
          """Forwards signals to child process.
533

    
534
          """
535
          child.Kill(signum)
536

    
537
        signal_wakeup = utils.SignalWakeupFd()
538
        try:
539
          # TODO: There is a race condition between starting the child and
540
          # handling the signals here. While there might be a way to work around
541
          # it by registering the handlers before starting the child and
542
          # deferring sent signals until the child is available, doing so can be
543
          # complicated.
544
          signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
545
                                               handler_fn=_ForwardSignal,
546
                                               wakeup=signal_wakeup)
547
          try:
548
            # Close child's side
549
            utils.RetryOnSignal(os.close, socat_stderr_write_fd)
550
            utils.RetryOnSignal(os.close, dd_stderr_write_fd)
551
            utils.RetryOnSignal(os.close, dd_pid_write_fd)
552
            utils.RetryOnSignal(os.close, exp_size_write_fd)
553

    
554
            if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
555
                              dd_pid_read_fd, exp_size_read_fd,
556
                              status_file, child_logger,
557
                              signal_wakeup, signal_handler, mode):
558
              # The child closed all its file descriptors and there was no
559
              # signal
560
              # TODO: Implement timeout instead of waiting indefinitely
561
              utils.RetryOnSignal(child.wait)
562
          finally:
563
            signal_handler.Reset()
564
        finally:
565
          signal_wakeup.Reset()
566
      finally:
567
        child.ForceQuit()
568

    
569
      if child.returncode == 0:
570
        errmsg = None
571
      elif child.returncode < 0:
572
        errmsg = "Exited due to signal %s" % (-child.returncode, )
573
      else:
574
        errmsg = "Exited with status %s" % (child.returncode, )
575

    
576
      status_file.SetExitStatus(child.returncode, errmsg)
577
    except Exception, err: # pylint: disable-msg=W0703
578
      logging.exception("Unhandled error occurred")
579
      status_file.SetExitStatus(constants.EXIT_FAILURE,
580
                                "Unhandled error occurred: %s" % (err, ))
581

    
582
    if status_file.ExitStatusIsSuccess():
583
      sys.exit(constants.EXIT_SUCCESS)
584

    
585
    sys.exit(constants.EXIT_FAILURE)
586
  finally:
587
    status_file.Update(True)
588

    
589

    
590
if __name__ == "__main__":
591
  main()