Statistics
| Branch: | Tag: | Revision:

root / daemons / import-export @ 4b63dc7a

History | View | Annotate | Download (19.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Import/export daemon.
23

    
24
"""
25

    
26
# pylint: disable-msg=C0103
27
# C0103: Invalid name import-export
28

    
29
import errno
30
import logging
31
import optparse
32
import os
33
import select
34
import signal
35
import subprocess
36
import sys
37
import time
38
import math
39

    
40
from ganeti import constants
41
from ganeti import cli
42
from ganeti import utils
43
from ganeti import errors
44
from ganeti import serializer
45
from ganeti import objects
46
from ganeti import locking
47
from ganeti import impexpd
48
from ganeti import netutils
49

    
50

    
51
#: How many lines to keep in the status file
52
MAX_RECENT_OUTPUT_LINES = 20
53

    
54
#: Don't update status file more than once every 5 seconds (unless forced)
55
MIN_UPDATE_INTERVAL = 5.0
56

    
57
#: Give child process up to 5 seconds to exit after sending a signal
58
CHILD_LINGER_TIMEOUT = 5.0
59

    
60
#: How long to wait for a connection to be established
61
DEFAULT_CONNECT_TIMEOUT = 60
62

    
63
#: Get dd(1) statistics every few seconds
64
DD_STATISTICS_INTERVAL = 5.0
65

    
66
#: Seconds for throughput calculation
67
DD_THROUGHPUT_INTERVAL = 60.0
68

    
69
#: Number of samples for throughput calculation
70
DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) /
71
                                      DD_STATISTICS_INTERVAL))
72

    
73

    
74
# Global variable for options
75
options = None
76

    
77

    
78
def SetupLogging():
79
  """Configures the logging module.
80

    
81
  """
82
  formatter = logging.Formatter("%(asctime)s: %(message)s")
83

    
84
  stderr_handler = logging.StreamHandler()
85
  stderr_handler.setFormatter(formatter)
86
  stderr_handler.setLevel(logging.NOTSET)
87

    
88
  root_logger = logging.getLogger("")
89
  root_logger.addHandler(stderr_handler)
90

    
91
  if options.debug:
92
    root_logger.setLevel(logging.NOTSET)
93
  elif options.verbose:
94
    root_logger.setLevel(logging.INFO)
95
  else:
96
    root_logger.setLevel(logging.ERROR)
97

    
98
  # Create special logger for child process output
99
  child_logger = logging.Logger("child output")
100
  child_logger.addHandler(stderr_handler)
101
  child_logger.setLevel(logging.NOTSET)
102

    
103
  return child_logger
104

    
105

    
106
class StatusFile:
107
  """Status file manager.
108

    
109
  """
110
  def __init__(self, path):
111
    """Initializes class.
112

    
113
    """
114
    self._path = path
115
    self._data = objects.ImportExportStatus(ctime=time.time(),
116
                                            mtime=None,
117
                                            recent_output=[])
118

    
119
  def AddRecentOutput(self, line):
120
    """Adds a new line of recent output.
121

    
122
    """
123
    self._data.recent_output.append(line)
124

    
125
    # Remove old lines
126
    del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
127

    
128
  def SetListenPort(self, port):
129
    """Sets the port the daemon is listening on.
130

    
131
    @type port: int
132
    @param port: TCP/UDP port
133

    
134
    """
135
    assert isinstance(port, (int, long)) and 0 < port < 2**16
136
    self._data.listen_port = port
137

    
138
  def GetListenPort(self):
139
    """Returns the port the daemon is listening on.
140

    
141
    """
142
    return self._data.listen_port
143

    
144
  def SetConnected(self):
145
    """Sets the connected flag.
146

    
147
    """
148
    self._data.connected = True
149

    
150
  def GetConnected(self):
151
    """Determines whether the daemon is connected.
152

    
153
    """
154
    return self._data.connected
155

    
156
  def SetProgress(self, mbytes, throughput, percent, eta):
157
    """Sets how much data has been transferred so far.
158

    
159
    @type mbytes: number
160
    @param mbytes: Transferred amount of data in MiB.
161
    @type throughput: float
162
    @param throughput: MiB/second
163
    @type percent: number
164
    @param percent: Percent processed
165
    @type eta: number
166
    @param eta: Expected number of seconds until done
167

    
168
    """
169
    self._data.progress_mbytes = mbytes
170
    self._data.progress_throughput = throughput
171
    self._data.progress_percent = percent
172
    self._data.progress_eta = eta
173

    
174
  def SetExitStatus(self, exit_status, error_message):
175
    """Sets the exit status and an error message.
176

    
177
    """
178
    # Require error message when status isn't 0
179
    assert exit_status == 0 or error_message
180

    
181
    self._data.exit_status = exit_status
182
    self._data.error_message = error_message
183

    
184
  def ExitStatusIsSuccess(self):
185
    """Returns whether the exit status means "success".
186

    
187
    """
188
    return not bool(self._data.error_message)
189

    
190
  def Update(self, force):
191
    """Updates the status file.
192

    
193
    @type force: bool
194
    @param force: Write status file in any case, not only when minimum interval
195
                  is expired
196

    
197
    """
198
    if not (force or
199
            self._data.mtime is None or
200
            time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
201
      return
202

    
203
    logging.debug("Updating status file %s", self._path)
204

    
205
    self._data.mtime = time.time()
206
    utils.WriteFile(self._path,
207
                    data=serializer.DumpJson(self._data.ToDict(), indent=True),
208
                    mode=0400)
209

    
210

    
211
def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
212
                   dd_pid_read_fd, exp_size_read_fd, status_file, child_logger,
213
                   signal_notify, signal_handler, mode):
214
  """Handles the child processes' output.
215

    
216
  """
217
  assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
218
         "Other signals are not handled in this function"
219

    
220
  # Buffer size 0 is important, otherwise .read() with a specified length
221
  # might buffer data while poll(2) won't mark its file descriptor as
222
  # readable again.
223
  socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
224
  dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
225
  dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
226
  exp_size_read = os.fdopen(exp_size_read_fd, "r", 0)
227

    
228
  tp_samples = DD_THROUGHPUT_SAMPLES
229

    
230
  if options.exp_size == constants.IE_CUSTOM_SIZE:
231
    exp_size = None
232
  else:
233
    exp_size = options.exp_size
234

    
235
  child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
236
                                           child_logger, tp_samples,
237
                                           exp_size)
238
  try:
239
    fdmap = {
240
      child.stderr.fileno():
241
        (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)),
242
      socat_stderr_read.fileno():
243
        (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)),
244
      dd_pid_read.fileno():
245
        (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
246
      dd_stderr_read.fileno():
247
        (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
248
      exp_size_read.fileno():
249
        (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)),
250
      signal_notify.fileno(): (signal_notify, None),
251
      }
252

    
253
    poller = select.poll()
254
    for fd in fdmap:
255
      utils.SetNonblockFlag(fd, True)
256
      poller.register(fd, select.POLLIN)
257

    
258
    if options.connect_timeout and mode == constants.IEM_IMPORT:
259
      listen_timeout = locking.RunningTimeout(options.connect_timeout, True)
260
    else:
261
      listen_timeout = None
262

    
263
    exit_timeout = None
264
    dd_stats_timeout = None
265

    
266
    while True:
267
      # Break out of loop if only signal notify FD is left
268
      if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
269
        break
270

    
271
      timeout = None
272

    
273
      if listen_timeout and not exit_timeout:
274
        if status_file.GetConnected():
275
          listen_timeout = None
276
        elif listen_timeout.Remaining() < 0:
277
          logging.info("Child process didn't establish connection in time")
278
          child.Kill(signal.SIGTERM)
279
          exit_timeout = \
280
            locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
281
          # Next block will calculate timeout
282
        else:
283
          # Not yet connected, check again in a second
284
          timeout = 1000
285

    
286
      if exit_timeout:
287
        timeout = exit_timeout.Remaining() * 1000
288
        if timeout < 0:
289
          logging.info("Child process didn't exit in time")
290
          break
291

    
292
      if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0:
293
        notify_status = child_io_proc.NotifyDd()
294
        if notify_status:
295
          # Schedule next notification
296
          dd_stats_timeout = locking.RunningTimeout(DD_STATISTICS_INTERVAL,
297
                                                    True)
298
        else:
299
          # Try again soon (dd isn't ready yet)
300
          dd_stats_timeout = locking.RunningTimeout(1.0, True)
301

    
302
      if dd_stats_timeout:
303
        dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000)
304

    
305
        if timeout is None:
306
          timeout = dd_timeout
307
        else:
308
          timeout = min(timeout, dd_timeout)
309

    
310
      for fd, event in utils.RetryOnSignal(poller.poll, timeout):
311
        if event & (select.POLLIN | event & select.POLLPRI):
312
          (from_, to) = fdmap[fd]
313

    
314
          # Read up to 1 KB of data
315
          data = from_.read(1024)
316
          if data:
317
            if to:
318
              to.write(data)
319
            elif fd == signal_notify.fileno():
320
              # Signal handling
321
              if signal_handler.called:
322
                signal_handler.Clear()
323
                if exit_timeout:
324
                  logging.info("Child process still has about %0.2f seconds"
325
                               " to exit", exit_timeout.Remaining())
326
                else:
327
                  logging.info("Giving child process %0.2f seconds to exit",
328
                               CHILD_LINGER_TIMEOUT)
329
                  exit_timeout = \
330
                    locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
331
          else:
332
            poller.unregister(fd)
333
            del fdmap[fd]
334

    
335
        elif event & (select.POLLNVAL | select.POLLHUP |
336
                      select.POLLERR):
337
          poller.unregister(fd)
338
          del fdmap[fd]
339

    
340
      child_io_proc.FlushAll()
341

    
342
    # If there was a timeout calculator, we were waiting for the child to
343
    # finish, e.g. due to a signal
344
    return not bool(exit_timeout)
345
  finally:
346
    child_io_proc.CloseAll()
347

    
348

    
349
def ParseOptions():
350
  """Parses the options passed to the program.
351

    
352
  @return: Arguments to program
353

    
354
  """
355
  global options # pylint: disable-msg=W0603
356

    
357
  parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
358
                                        (constants.IEM_IMPORT,
359
                                         constants.IEM_EXPORT)))
360
  parser.add_option(cli.DEBUG_OPT)
361
  parser.add_option(cli.VERBOSE_OPT)
362
  parser.add_option("--key", dest="key", action="store", type="string",
363
                    help="RSA key file")
364
  parser.add_option("--cert", dest="cert", action="store", type="string",
365
                    help="X509 certificate file")
366
  parser.add_option("--ca", dest="ca", action="store", type="string",
367
                    help="X509 CA file")
368
  parser.add_option("--bind", dest="bind", action="store", type="string",
369
                    help="Bind address")
370
  parser.add_option("--host", dest="host", action="store", type="string",
371
                    help="Remote hostname")
372
  parser.add_option("--port", dest="port", action="store", type="int",
373
                    help="Remote port")
374
  parser.add_option("--connect-retries", dest="connect_retries", action="store",
375
                    type="int", default=0,
376
                    help=("How many times the connection should be retried"
377
                          " (export only)"))
378
  parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
379
                    type="int", default=DEFAULT_CONNECT_TIMEOUT,
380
                    help="Timeout for connection to be established (seconds)")
381
  parser.add_option("--compress", dest="compress", action="store",
382
                    type="choice", help="Compression method",
383
                    metavar="[%s]" % "|".join(constants.IEC_ALL),
384
                    choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
385
  parser.add_option("--expected-size", dest="exp_size", action="store",
386
                    type="string", default=None,
387
                    help="Expected import/export size (MiB)")
388
  parser.add_option("--magic", dest="magic", action="store",
389
                    type="string", default=None, help="Magic string")
390
  parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
391
                    type="string", help="Command prefix")
392
  parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
393
                    type="string", help="Command suffix")
394

    
395
  (options, args) = parser.parse_args()
396

    
397
  if len(args) != 2:
398
    # Won't return
399
    parser.error("Expected exactly two arguments")
400

    
401
  (status_file_path, mode) = args
402

    
403
  if mode not in (constants.IEM_IMPORT,
404
                  constants.IEM_EXPORT):
405
    # Won't return
406
    parser.error("Invalid mode: %s" % mode)
407

    
408
  # Normalize and check parameters
409
  if options.host is not None:
410
    try:
411
      options.host = netutils.Hostname.GetNormalizedName(options.host)
412
    except errors.OpPrereqError, err:
413
      parser.error("Invalid hostname '%s': %s" % (options.host, err))
414

    
415
  if options.port is not None:
416
    options.port = utils.ValidateServiceName(options.port)
417

    
418
  if (options.exp_size is not None and
419
      options.exp_size != constants.IE_CUSTOM_SIZE):
420
    try:
421
      options.exp_size = int(options.exp_size)
422
    except (ValueError, TypeError), err:
423
      # Won't return
424
      parser.error("Invalid value for --expected-size: %s (%s)" %
425
                   (options.exp_size, err))
426

    
427
  if not (options.magic is None or constants.IE_MAGIC_RE.match(options.magic)):
428
    parser.error("Magic must match regular expression %s" %
429
                 constants.IE_MAGIC_RE.pattern)
430

    
431
  return (status_file_path, mode)
432

    
433

    
434
class ChildProcess(subprocess.Popen):
435
  def __init__(self, env, cmd, noclose_fds):
436
    """Initializes this class.
437

    
438
    """
439
    self._noclose_fds = noclose_fds
440

    
441
    # Not using close_fds because doing so would also close the socat stderr
442
    # pipe, which we still need.
443
    subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False,
444
                              stderr=subprocess.PIPE, stdout=None, stdin=None,
445
                              preexec_fn=self._ChildPreexec)
446
    self._SetProcessGroup()
447

    
448
  def _ChildPreexec(self):
449
    """Called before child executable is execve'd.
450

    
451
    """
452
    # Move to separate process group. By sending a signal to its process group
453
    # we can kill the child process and all grandchildren.
454
    os.setpgid(0, 0)
455

    
456
    # Close almost all file descriptors
457
    utils.CloseFDs(noclose_fds=self._noclose_fds)
458

    
459
  def _SetProcessGroup(self):
460
    """Sets the child's process group.
461

    
462
    """
463
    assert self.pid, "Can't be called in child process"
464

    
465
    # Avoid race condition by setting child's process group (as good as
466
    # possible in Python) before sending signals to child. For an
467
    # explanation, see preexec function for child.
468
    try:
469
      os.setpgid(self.pid, self.pid)
470
    except EnvironmentError, err:
471
      # If the child process was faster we receive EPERM or EACCES
472
      if err.errno not in (errno.EPERM, errno.EACCES):
473
        raise
474

    
475
  def Kill(self, signum):
476
    """Sends signal to child process.
477

    
478
    """
479
    logging.info("Sending signal %s to child process", signum)
480
    utils.IgnoreProcessNotFound(os.killpg, self.pid, signum)
481

    
482
  def ForceQuit(self):
483
    """Ensure child process is no longer running.
484

    
485
    """
486
    # Final check if child process is still alive
487
    if utils.RetryOnSignal(self.poll) is None:
488
      logging.error("Child process still alive, sending SIGKILL")
489
      self.Kill(signal.SIGKILL)
490
      utils.RetryOnSignal(self.wait)
491

    
492

    
493
def main():
494
  """Main function.
495

    
496
  """
497
  # Option parsing
498
  (status_file_path, mode) = ParseOptions()
499

    
500
  # Configure logging
501
  child_logger = SetupLogging()
502

    
503
  status_file = StatusFile(status_file_path)
504
  try:
505
    try:
506
      # Pipe to receive socat's stderr output
507
      (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
508

    
509
      # Pipe to receive dd's stderr output
510
      (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe()
511

    
512
      # Pipe to receive dd's PID
513
      (dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
514

    
515
      # Pipe to receive size predicted by export script
516
      (exp_size_read_fd, exp_size_write_fd) = os.pipe()
517

    
518
      # Get child process command
519
      cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
520
                                           dd_stderr_write_fd, dd_pid_write_fd)
521
      cmd = cmd_builder.GetCommand()
522

    
523
      # Prepare command environment
524
      cmd_env = os.environ.copy()
525

    
526
      if options.exp_size == constants.IE_CUSTOM_SIZE:
527
        cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd)
528

    
529
      logging.debug("Starting command %r", cmd)
530

    
531
      # Start child process
532
      child = ChildProcess(cmd_env, cmd,
533
                           [socat_stderr_write_fd, dd_stderr_write_fd,
534
                            dd_pid_write_fd, exp_size_write_fd])
535
      try:
536
        def _ForwardSignal(signum, _):
537
          """Forwards signals to child process.
538

    
539
          """
540
          child.Kill(signum)
541

    
542
        signal_wakeup = utils.SignalWakeupFd()
543
        try:
544
          # TODO: There is a race condition between starting the child and
545
          # handling the signals here. While there might be a way to work around
546
          # it by registering the handlers before starting the child and
547
          # deferring sent signals until the child is available, doing so can be
548
          # complicated.
549
          signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
550
                                               handler_fn=_ForwardSignal,
551
                                               wakeup=signal_wakeup)
552
          try:
553
            # Close child's side
554
            utils.RetryOnSignal(os.close, socat_stderr_write_fd)
555
            utils.RetryOnSignal(os.close, dd_stderr_write_fd)
556
            utils.RetryOnSignal(os.close, dd_pid_write_fd)
557
            utils.RetryOnSignal(os.close, exp_size_write_fd)
558

    
559
            if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
560
                              dd_pid_read_fd, exp_size_read_fd,
561
                              status_file, child_logger,
562
                              signal_wakeup, signal_handler, mode):
563
              # The child closed all its file descriptors and there was no
564
              # signal
565
              # TODO: Implement timeout instead of waiting indefinitely
566
              utils.RetryOnSignal(child.wait)
567
          finally:
568
            signal_handler.Reset()
569
        finally:
570
          signal_wakeup.Reset()
571
      finally:
572
        child.ForceQuit()
573

    
574
      if child.returncode == 0:
575
        errmsg = None
576
      elif child.returncode < 0:
577
        errmsg = "Exited due to signal %s" % (-child.returncode, )
578
      else:
579
        errmsg = "Exited with status %s" % (child.returncode, )
580

    
581
      status_file.SetExitStatus(child.returncode, errmsg)
582
    except Exception, err: # pylint: disable-msg=W0703
583
      logging.exception("Unhandled error occurred")
584
      status_file.SetExitStatus(constants.EXIT_FAILURE,
585
                                "Unhandled error occurred: %s" % (err, ))
586

    
587
    if status_file.ExitStatusIsSuccess():
588
      sys.exit(constants.EXIT_SUCCESS)
589

    
590
    sys.exit(constants.EXIT_FAILURE)
591
  finally:
592
    status_file.Update(True)
593

    
594

    
595
if __name__ == "__main__":
596
  main()