Statistics
| Branch: | Tag: | Revision:

root / daemons / import-export @ acd65a16

History | View | Annotate | Download (18.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Import/export daemon.
23

    
24
"""
25

    
26
# pylint: disable-msg=C0103
27
# C0103: Invalid name import-export
28

    
29
import errno
30
import logging
31
import optparse
32
import os
33
import select
34
import signal
35
import subprocess
36
import sys
37
import time
38
import math
39

    
40
from ganeti import constants
41
from ganeti import cli
42
from ganeti import utils
43
from ganeti import errors
44
from ganeti import serializer
45
from ganeti import objects
46
from ganeti import locking
47
from ganeti import impexpd
48

    
49

    
50
#: How many lines to keep in the status file
51
MAX_RECENT_OUTPUT_LINES = 20
52

    
53
#: Don't update status file more than once every 5 seconds (unless forced)
54
MIN_UPDATE_INTERVAL = 5.0
55

    
56
#: Give child process up to 5 seconds to exit after sending a signal
57
CHILD_LINGER_TIMEOUT = 5.0
58

    
59
#: How long to wait for a connection to be established
60
DEFAULT_CONNECT_TIMEOUT = 60
61

    
62
#: Get dd(1) statistics every few seconds
63
DD_STATISTICS_INTERVAL = 5.0
64

    
65
#: Seconds for throughput calculation
66
DD_THROUGHPUT_INTERVAL = 60.0
67

    
68
#: Number of samples for throughput calculation
69
DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) /
70
                                      DD_STATISTICS_INTERVAL))
71

    
72

    
73
# Global variable for options
74
options = None
75

    
76

    
77
def SetupLogging():
78
  """Configures the logging module.
79

    
80
  """
81
  formatter = logging.Formatter("%(asctime)s: %(message)s")
82

    
83
  stderr_handler = logging.StreamHandler()
84
  stderr_handler.setFormatter(formatter)
85
  stderr_handler.setLevel(logging.NOTSET)
86

    
87
  root_logger = logging.getLogger("")
88
  root_logger.addHandler(stderr_handler)
89

    
90
  if options.debug:
91
    root_logger.setLevel(logging.NOTSET)
92
  elif options.verbose:
93
    root_logger.setLevel(logging.INFO)
94
  else:
95
    root_logger.setLevel(logging.ERROR)
96

    
97
  # Create special logger for child process output
98
  child_logger = logging.Logger("child output")
99
  child_logger.addHandler(stderr_handler)
100
  child_logger.setLevel(logging.NOTSET)
101

    
102
  return child_logger
103

    
104

    
105
class StatusFile:
106
  """Status file manager.
107

    
108
  """
109
  def __init__(self, path):
110
    """Initializes class.
111

    
112
    """
113
    self._path = path
114
    self._data = objects.ImportExportStatus(ctime=time.time(),
115
                                            mtime=None,
116
                                            recent_output=[])
117

    
118
  def AddRecentOutput(self, line):
119
    """Adds a new line of recent output.
120

    
121
    """
122
    self._data.recent_output.append(line)
123

    
124
    # Remove old lines
125
    del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
126

    
127
  def SetListenPort(self, port):
128
    """Sets the port the daemon is listening on.
129

    
130
    @type port: int
131
    @param port: TCP/UDP port
132

    
133
    """
134
    assert isinstance(port, (int, long)) and 0 < port < 2**16
135
    self._data.listen_port = port
136

    
137
  def GetListenPort(self):
138
    """Returns the port the daemon is listening on.
139

    
140
    """
141
    return self._data.listen_port
142

    
143
  def SetConnected(self):
144
    """Sets the connected flag.
145

    
146
    """
147
    self._data.connected = True
148

    
149
  def GetConnected(self):
150
    """Determines whether the daemon is connected.
151

    
152
    """
153
    return self._data.connected
154

    
155
  def SetProgress(self, mbytes, throughput, percent, eta):
156
    """Sets how much data has been transferred so far.
157

    
158
    @type mbytes: number
159
    @param mbytes: Transferred amount of data in MiB.
160
    @type throughput: float
161
    @param throughput: MiB/second
162
    @type percent: number
163
    @param percent: Percent processed
164
    @type eta: number
165
    @param eta: Expected number of seconds until done
166

    
167
    """
168
    self._data.progress_mbytes = mbytes
169
    self._data.progress_throughput = throughput
170
    self._data.progress_percent = percent
171
    self._data.progress_eta = eta
172

    
173
  def SetExitStatus(self, exit_status, error_message):
174
    """Sets the exit status and an error message.
175

    
176
    """
177
    # Require error message when status isn't 0
178
    assert exit_status == 0 or error_message
179

    
180
    self._data.exit_status = exit_status
181
    self._data.error_message = error_message
182

    
183
  def ExitStatusIsSuccess(self):
184
    """Returns whether the exit status means "success".
185

    
186
    """
187
    return not bool(self._data.error_message)
188

    
189
  def Update(self, force):
190
    """Updates the status file.
191

    
192
    @type force: bool
193
    @param force: Write status file in any case, not only when minimum interval
194
                  is expired
195

    
196
    """
197
    if not (force or
198
            self._data.mtime is None or
199
            time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
200
      return
201

    
202
    logging.debug("Updating status file %s", self._path)
203

    
204
    self._data.mtime = time.time()
205
    utils.WriteFile(self._path,
206
                    data=serializer.DumpJson(self._data.ToDict(), indent=True),
207
                    mode=0400)
208

    
209

    
210
def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
211
                   dd_pid_read_fd, exp_size_read_fd, status_file, child_logger,
212
                   signal_notify, signal_handler, mode):
213
  """Handles the child processes' output.
214

    
215
  """
216
  assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
217
         "Other signals are not handled in this function"
218

    
219
  # Buffer size 0 is important, otherwise .read() with a specified length
220
  # might buffer data while poll(2) won't mark its file descriptor as
221
  # readable again.
222
  socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
223
  dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
224
  dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
225
  exp_size_read = os.fdopen(exp_size_read_fd, "r", 0)
226

    
227
  tp_samples = DD_THROUGHPUT_SAMPLES
228

    
229
  if options.exp_size == constants.IE_CUSTOM_SIZE:
230
    exp_size = None
231
  else:
232
    exp_size = options.exp_size
233

    
234
  child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
235
                                           child_logger, tp_samples,
236
                                           exp_size)
237
  try:
238
    fdmap = {
239
      child.stderr.fileno():
240
        (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)),
241
      socat_stderr_read.fileno():
242
        (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)),
243
      dd_pid_read.fileno():
244
        (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
245
      dd_stderr_read.fileno():
246
        (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
247
      exp_size_read.fileno():
248
        (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)),
249
      signal_notify.fileno(): (signal_notify, None),
250
      }
251

    
252
    poller = select.poll()
253
    for fd in fdmap:
254
      utils.SetNonblockFlag(fd, True)
255
      poller.register(fd, select.POLLIN)
256

    
257
    if options.connect_timeout and mode == constants.IEM_IMPORT:
258
      listen_timeout = locking.RunningTimeout(options.connect_timeout, True)
259
    else:
260
      listen_timeout = None
261

    
262
    exit_timeout = None
263
    dd_stats_timeout = None
264

    
265
    while True:
266
      # Break out of loop if only signal notify FD is left
267
      if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
268
        break
269

    
270
      timeout = None
271

    
272
      if listen_timeout and not exit_timeout:
273
        if status_file.GetConnected():
274
          listen_timeout = None
275
        elif listen_timeout.Remaining() < 0:
276
          logging.info("Child process didn't establish connection in time")
277
          child.Kill(signal.SIGTERM)
278
          exit_timeout = \
279
            locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
280
          # Next block will calculate timeout
281
        else:
282
          # Not yet connected, check again in a second
283
          timeout = 1000
284

    
285
      if exit_timeout:
286
        timeout = exit_timeout.Remaining() * 1000
287
        if timeout < 0:
288
          logging.info("Child process didn't exit in time")
289
          break
290

    
291
      if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0:
292
        notify_status = child_io_proc.NotifyDd()
293
        if notify_status:
294
          # Schedule next notification
295
          dd_stats_timeout = locking.RunningTimeout(DD_STATISTICS_INTERVAL,
296
                                                    True)
297
        else:
298
          # Try again soon (dd isn't ready yet)
299
          dd_stats_timeout = locking.RunningTimeout(1.0, True)
300

    
301
      if dd_stats_timeout:
302
        dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000)
303

    
304
        if timeout is None:
305
          timeout = dd_timeout
306
        else:
307
          timeout = min(timeout, dd_timeout)
308

    
309
      for fd, event in utils.RetryOnSignal(poller.poll, timeout):
310
        if event & (select.POLLIN | event & select.POLLPRI):
311
          (from_, to) = fdmap[fd]
312

    
313
          # Read up to 1 KB of data
314
          data = from_.read(1024)
315
          if data:
316
            if to:
317
              to.write(data)
318
            elif fd == signal_notify.fileno():
319
              # Signal handling
320
              if signal_handler.called:
321
                signal_handler.Clear()
322
                if exit_timeout:
323
                  logging.info("Child process still has about %0.2f seconds"
324
                               " to exit", exit_timeout.Remaining())
325
                else:
326
                  logging.info("Giving child process %0.2f seconds to exit",
327
                               CHILD_LINGER_TIMEOUT)
328
                  exit_timeout = \
329
                    locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
330
          else:
331
            poller.unregister(fd)
332
            del fdmap[fd]
333

    
334
        elif event & (select.POLLNVAL | select.POLLHUP |
335
                      select.POLLERR):
336
          poller.unregister(fd)
337
          del fdmap[fd]
338

    
339
      child_io_proc.FlushAll()
340

    
341
    # If there was a timeout calculator, we were waiting for the child to
342
    # finish, e.g. due to a signal
343
    return not bool(exit_timeout)
344
  finally:
345
    child_io_proc.CloseAll()
346

    
347

    
348
def ParseOptions():
349
  """Parses the options passed to the program.
350

    
351
  @return: Arguments to program
352

    
353
  """
354
  global options # pylint: disable-msg=W0603
355

    
356
  parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
357
                                        (constants.IEM_IMPORT,
358
                                         constants.IEM_EXPORT)))
359
  parser.add_option(cli.DEBUG_OPT)
360
  parser.add_option(cli.VERBOSE_OPT)
361
  parser.add_option("--key", dest="key", action="store", type="string",
362
                    help="RSA key file")
363
  parser.add_option("--cert", dest="cert", action="store", type="string",
364
                    help="X509 certificate file")
365
  parser.add_option("--ca", dest="ca", action="store", type="string",
366
                    help="X509 CA file")
367
  parser.add_option("--bind", dest="bind", action="store", type="string",
368
                    help="Bind address")
369
  parser.add_option("--host", dest="host", action="store", type="string",
370
                    help="Remote hostname")
371
  parser.add_option("--port", dest="port", action="store", type="int",
372
                    help="Remote port")
373
  parser.add_option("--connect-retries", dest="connect_retries", action="store",
374
                    type="int", default=0,
375
                    help=("How many times the connection should be retried"
376
                          " (export only)"))
377
  parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
378
                    type="int", default=DEFAULT_CONNECT_TIMEOUT,
379
                    help="Timeout for connection to be established (seconds)")
380
  parser.add_option("--compress", dest="compress", action="store",
381
                    type="choice", help="Compression method",
382
                    metavar="[%s]" % "|".join(constants.IEC_ALL),
383
                    choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
384
  parser.add_option("--expected-size", dest="exp_size", action="store",
385
                    type="string", default=None,
386
                    help="Expected import/export size (MiB)")
387
  parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
388
                    type="string", help="Command prefix")
389
  parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
390
                    type="string", help="Command suffix")
391

    
392
  (options, args) = parser.parse_args()
393

    
394
  if len(args) != 2:
395
    # Won't return
396
    parser.error("Expected exactly two arguments")
397

    
398
  (status_file_path, mode) = args
399

    
400
  if mode not in (constants.IEM_IMPORT,
401
                  constants.IEM_EXPORT):
402
    # Won't return
403
    parser.error("Invalid mode: %s" % mode)
404

    
405
  # Normalize and check parameters
406
  if options.host is not None:
407
    try:
408
      options.host = utils.HostInfo.NormalizeName(options.host)
409
    except errors.OpPrereqError, err:
410
      parser.error("Invalid hostname '%s': %s" % (options.host, err))
411

    
412
  if options.port is not None:
413
    options.port = utils.ValidateServiceName(options.port)
414

    
415
  if (options.exp_size is not None and
416
      options.exp_size != constants.IE_CUSTOM_SIZE):
417
    try:
418
      options.exp_size = int(options.exp_size)
419
    except (ValueError, TypeError), err:
420
      # Won't return
421
      parser.error("Invalid value for --expected-size: %s (%s)" %
422
                   (options.exp_size, err))
423

    
424
  return (status_file_path, mode)
425

    
426

    
427
class ChildProcess(subprocess.Popen):
428
  def __init__(self, env, cmd, noclose_fds):
429
    """Initializes this class.
430

    
431
    """
432
    self._noclose_fds = noclose_fds
433

    
434
    # Not using close_fds because doing so would also close the socat stderr
435
    # pipe, which we still need.
436
    subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False,
437
                              stderr=subprocess.PIPE, stdout=None, stdin=None,
438
                              preexec_fn=self._ChildPreexec)
439
    self._SetProcessGroup()
440

    
441
  def _ChildPreexec(self):
442
    """Called before child executable is execve'd.
443

    
444
    """
445
    # Move to separate process group. By sending a signal to its process group
446
    # we can kill the child process and all grandchildren.
447
    os.setpgid(0, 0)
448

    
449
    # Close almost all file descriptors
450
    utils.CloseFDs(noclose_fds=self._noclose_fds)
451

    
452
  def _SetProcessGroup(self):
453
    """Sets the child's process group.
454

    
455
    """
456
    assert self.pid, "Can't be called in child process"
457

    
458
    # Avoid race condition by setting child's process group (as good as
459
    # possible in Python) before sending signals to child. For an
460
    # explanation, see preexec function for child.
461
    try:
462
      os.setpgid(self.pid, self.pid)
463
    except EnvironmentError, err:
464
      # If the child process was faster we receive EPERM or EACCES
465
      if err.errno not in (errno.EPERM, errno.EACCES):
466
        raise
467

    
468
  def Kill(self, signum):
469
    """Sends signal to child process.
470

    
471
    """
472
    logging.info("Sending signal %s to child process", signum)
473
    utils.IgnoreProcessNotFound(os.killpg, self.pid, signum)
474

    
475
  def ForceQuit(self):
476
    """Ensure child process is no longer running.
477

    
478
    """
479
    # Final check if child process is still alive
480
    if utils.RetryOnSignal(self.poll) is None:
481
      logging.error("Child process still alive, sending SIGKILL")
482
      self.Kill(signal.SIGKILL)
483
      utils.RetryOnSignal(self.wait)
484

    
485

    
486
def main():
487
  """Main function.
488

    
489
  """
490
  # Option parsing
491
  (status_file_path, mode) = ParseOptions()
492

    
493
  # Configure logging
494
  child_logger = SetupLogging()
495

    
496
  status_file = StatusFile(status_file_path)
497
  try:
498
    try:
499
      # Pipe to receive socat's stderr output
500
      (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
501

    
502
      # Pipe to receive dd's stderr output
503
      (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe()
504

    
505
      # Pipe to receive dd's PID
506
      (dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
507

    
508
      # Pipe to receive size predicted by export script
509
      (exp_size_read_fd, exp_size_write_fd) = os.pipe()
510

    
511
      # Get child process command
512
      cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
513
                                           dd_stderr_write_fd, dd_pid_write_fd)
514
      cmd = cmd_builder.GetCommand()
515

    
516
      # Prepare command environment
517
      cmd_env = os.environ.copy()
518

    
519
      if options.exp_size == constants.IE_CUSTOM_SIZE:
520
        cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd)
521

    
522
      logging.debug("Starting command %r", cmd)
523

    
524
      # Start child process
525
      child = ChildProcess(cmd_env, cmd,
526
                           [socat_stderr_write_fd, dd_stderr_write_fd,
527
                            dd_pid_write_fd, exp_size_write_fd])
528
      try:
529
        def _ForwardSignal(signum, _):
530
          """Forwards signals to child process.
531

    
532
          """
533
          child.Kill(signum)
534

    
535
        signal_wakeup = utils.SignalWakeupFd()
536
        try:
537
          # TODO: There is a race condition between starting the child and
538
          # handling the signals here. While there might be a way to work around
539
          # it by registering the handlers before starting the child and
540
          # deferring sent signals until the child is available, doing so can be
541
          # complicated.
542
          signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
543
                                               handler_fn=_ForwardSignal,
544
                                               wakeup=signal_wakeup)
545
          try:
546
            # Close child's side
547
            utils.RetryOnSignal(os.close, socat_stderr_write_fd)
548
            utils.RetryOnSignal(os.close, dd_stderr_write_fd)
549
            utils.RetryOnSignal(os.close, dd_pid_write_fd)
550
            utils.RetryOnSignal(os.close, exp_size_write_fd)
551

    
552
            if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
553
                              dd_pid_read_fd, exp_size_read_fd,
554
                              status_file, child_logger,
555
                              signal_wakeup, signal_handler, mode):
556
              # The child closed all its file descriptors and there was no
557
              # signal
558
              # TODO: Implement timeout instead of waiting indefinitely
559
              utils.RetryOnSignal(child.wait)
560
          finally:
561
            signal_handler.Reset()
562
        finally:
563
          signal_wakeup.Reset()
564
      finally:
565
        child.ForceQuit()
566

    
567
      if child.returncode == 0:
568
        errmsg = None
569
      elif child.returncode < 0:
570
        errmsg = "Exited due to signal %s" % (-child.returncode, )
571
      else:
572
        errmsg = "Exited with status %s" % (child.returncode, )
573

    
574
      status_file.SetExitStatus(child.returncode, errmsg)
575
    except Exception, err: # pylint: disable-msg=W0703
576
      logging.exception("Unhandled error occurred")
577
      status_file.SetExitStatus(constants.EXIT_FAILURE,
578
                                "Unhandled error occurred: %s" % (err, ))
579

    
580
    if status_file.ExitStatusIsSuccess():
581
      sys.exit(constants.EXIT_SUCCESS)
582

    
583
    sys.exit(constants.EXIT_FAILURE)
584
  finally:
585
    status_file.Update(True)
586

    
587

    
588
if __name__ == "__main__":
589
  main()