Statistics
| Branch: | Tag: | Revision:

root / daemons / import-export @ 557838c1

History | View | Annotate | Download (19.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Import/export daemon.
23

    
24
"""
25

    
26
# pylint: disable-msg=C0103
27
# C0103: Invalid name import-export
28

    
29
import errno
30
import logging
31
import optparse
32
import os
33
import select
34
import signal
35
import subprocess
36
import sys
37
import time
38
import math
39

    
40
from ganeti import constants
41
from ganeti import cli
42
from ganeti import utils
43
from ganeti import errors
44
from ganeti import serializer
45
from ganeti import objects
46
from ganeti import impexpd
47
from ganeti import netutils
48

    
49

    
50
#: How many lines to keep in the status file
51
MAX_RECENT_OUTPUT_LINES = 20
52

    
53
#: Don't update status file more than once every 5 seconds (unless forced)
54
MIN_UPDATE_INTERVAL = 5.0
55

    
56
#: Give child process up to 5 seconds to exit after sending a signal
57
CHILD_LINGER_TIMEOUT = 5.0
58

    
59
#: How long to wait for a connection to be established
60
DEFAULT_CONNECT_TIMEOUT = 60
61

    
62
#: Get dd(1) statistics every few seconds
63
DD_STATISTICS_INTERVAL = 5.0
64

    
65
#: Seconds for throughput calculation
66
DD_THROUGHPUT_INTERVAL = 60.0
67

    
68
#: Number of samples for throughput calculation
69
DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) /
70
                                      DD_STATISTICS_INTERVAL))
71

    
72

    
73
# Global variable for options
74
options = None
75

    
76

    
77
def SetupLogging():
78
  """Configures the logging module.
79

    
80
  """
81
  formatter = logging.Formatter("%(asctime)s: %(message)s")
82

    
83
  stderr_handler = logging.StreamHandler()
84
  stderr_handler.setFormatter(formatter)
85
  stderr_handler.setLevel(logging.NOTSET)
86

    
87
  root_logger = logging.getLogger("")
88
  root_logger.addHandler(stderr_handler)
89

    
90
  if options.debug:
91
    root_logger.setLevel(logging.NOTSET)
92
  elif options.verbose:
93
    root_logger.setLevel(logging.INFO)
94
  else:
95
    root_logger.setLevel(logging.ERROR)
96

    
97
  # Create special logger for child process output
98
  child_logger = logging.Logger("child output")
99
  child_logger.addHandler(stderr_handler)
100
  child_logger.setLevel(logging.NOTSET)
101

    
102
  return child_logger
103

    
104

    
105
class StatusFile:
106
  """Status file manager.
107

    
108
  """
109
  def __init__(self, path):
110
    """Initializes class.
111

    
112
    """
113
    self._path = path
114
    self._data = objects.ImportExportStatus(ctime=time.time(),
115
                                            mtime=None,
116
                                            recent_output=[])
117

    
118
  def AddRecentOutput(self, line):
119
    """Adds a new line of recent output.
120

    
121
    """
122
    self._data.recent_output.append(line)
123

    
124
    # Remove old lines
125
    del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
126

    
127
  def SetListenPort(self, port):
128
    """Sets the port the daemon is listening on.
129

    
130
    @type port: int
131
    @param port: TCP/UDP port
132

    
133
    """
134
    assert isinstance(port, (int, long)) and 0 < port < 2**16
135
    self._data.listen_port = port
136

    
137
  def GetListenPort(self):
138
    """Returns the port the daemon is listening on.
139

    
140
    """
141
    return self._data.listen_port
142

    
143
  def SetConnected(self):
144
    """Sets the connected flag.
145

    
146
    """
147
    self._data.connected = True
148

    
149
  def GetConnected(self):
150
    """Determines whether the daemon is connected.
151

    
152
    """
153
    return self._data.connected
154

    
155
  def SetProgress(self, mbytes, throughput, percent, eta):
156
    """Sets how much data has been transferred so far.
157

    
158
    @type mbytes: number
159
    @param mbytes: Transferred amount of data in MiB.
160
    @type throughput: float
161
    @param throughput: MiB/second
162
    @type percent: number
163
    @param percent: Percent processed
164
    @type eta: number
165
    @param eta: Expected number of seconds until done
166

    
167
    """
168
    self._data.progress_mbytes = mbytes
169
    self._data.progress_throughput = throughput
170
    self._data.progress_percent = percent
171
    self._data.progress_eta = eta
172

    
173
  def SetExitStatus(self, exit_status, error_message):
174
    """Sets the exit status and an error message.
175

    
176
    """
177
    # Require error message when status isn't 0
178
    assert exit_status == 0 or error_message
179

    
180
    self._data.exit_status = exit_status
181
    self._data.error_message = error_message
182

    
183
  def ExitStatusIsSuccess(self):
184
    """Returns whether the exit status means "success".
185

    
186
    """
187
    return not bool(self._data.error_message)
188

    
189
  def Update(self, force):
190
    """Updates the status file.
191

    
192
    @type force: bool
193
    @param force: Write status file in any case, not only when minimum interval
194
                  is expired
195

    
196
    """
197
    if not (force or
198
            self._data.mtime is None or
199
            time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
200
      return
201

    
202
    logging.debug("Updating status file %s", self._path)
203

    
204
    self._data.mtime = time.time()
205
    utils.WriteFile(self._path,
206
                    data=serializer.DumpJson(self._data.ToDict(), indent=True),
207
                    mode=0400)
208

    
209

    
210
def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
211
                   dd_pid_read_fd, exp_size_read_fd, status_file, child_logger,
212
                   signal_notify, signal_handler, mode):
213
  """Handles the child processes' output.
214

    
215
  """
216
  assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
217
         "Other signals are not handled in this function"
218

    
219
  # Buffer size 0 is important, otherwise .read() with a specified length
220
  # might buffer data while poll(2) won't mark its file descriptor as
221
  # readable again.
222
  socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
223
  dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
224
  dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
225
  exp_size_read = os.fdopen(exp_size_read_fd, "r", 0)
226

    
227
  tp_samples = DD_THROUGHPUT_SAMPLES
228

    
229
  if options.exp_size == constants.IE_CUSTOM_SIZE:
230
    exp_size = None
231
  else:
232
    exp_size = options.exp_size
233

    
234
  child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
235
                                           child_logger, tp_samples,
236
                                           exp_size)
237
  try:
238
    fdmap = {
239
      child.stderr.fileno():
240
        (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)),
241
      socat_stderr_read.fileno():
242
        (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)),
243
      dd_pid_read.fileno():
244
        (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
245
      dd_stderr_read.fileno():
246
        (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
247
      exp_size_read.fileno():
248
        (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)),
249
      signal_notify.fileno(): (signal_notify, None),
250
      }
251

    
252
    poller = select.poll()
253
    for fd in fdmap:
254
      utils.SetNonblockFlag(fd, True)
255
      poller.register(fd, select.POLLIN)
256

    
257
    if options.connect_timeout and mode == constants.IEM_IMPORT:
258
      listen_timeout = utils.RunningTimeout(options.connect_timeout, True)
259
    else:
260
      listen_timeout = None
261

    
262
    exit_timeout = None
263
    dd_stats_timeout = None
264

    
265
    while True:
266
      # Break out of loop if only signal notify FD is left
267
      if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
268
        break
269

    
270
      timeout = None
271

    
272
      if listen_timeout and not exit_timeout:
273
        if status_file.GetConnected():
274
          listen_timeout = None
275
        elif listen_timeout.Remaining() < 0:
276
          logging.info("Child process didn't establish connection in time")
277
          child.Kill(signal.SIGTERM)
278
          exit_timeout = \
279
            utils.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
280
          # Next block will calculate timeout
281
        else:
282
          # Not yet connected, check again in a second
283
          timeout = 1000
284

    
285
      if exit_timeout:
286
        timeout = exit_timeout.Remaining() * 1000
287
        if timeout < 0:
288
          logging.info("Child process didn't exit in time")
289
          break
290

    
291
      if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0:
292
        notify_status = child_io_proc.NotifyDd()
293
        if notify_status:
294
          # Schedule next notification
295
          dd_stats_timeout = utils.RunningTimeout(DD_STATISTICS_INTERVAL, True)
296
        else:
297
          # Try again soon (dd isn't ready yet)
298
          dd_stats_timeout = utils.RunningTimeout(1.0, True)
299

    
300
      if dd_stats_timeout:
301
        dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000)
302

    
303
        if timeout is None:
304
          timeout = dd_timeout
305
        else:
306
          timeout = min(timeout, dd_timeout)
307

    
308
      for fd, event in utils.RetryOnSignal(poller.poll, timeout):
309
        if event & (select.POLLIN | event & select.POLLPRI):
310
          (from_, to) = fdmap[fd]
311

    
312
          # Read up to 1 KB of data
313
          data = from_.read(1024)
314
          if data:
315
            if to:
316
              to.write(data)
317
            elif fd == signal_notify.fileno():
318
              # Signal handling
319
              if signal_handler.called:
320
                signal_handler.Clear()
321
                if exit_timeout:
322
                  logging.info("Child process still has about %0.2f seconds"
323
                               " to exit", exit_timeout.Remaining())
324
                else:
325
                  logging.info("Giving child process %0.2f seconds to exit",
326
                               CHILD_LINGER_TIMEOUT)
327
                  exit_timeout = \
328
                    utils.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
329
          else:
330
            poller.unregister(fd)
331
            del fdmap[fd]
332

    
333
        elif event & (select.POLLNVAL | select.POLLHUP |
334
                      select.POLLERR):
335
          poller.unregister(fd)
336
          del fdmap[fd]
337

    
338
      child_io_proc.FlushAll()
339

    
340
    # If there was a timeout calculator, we were waiting for the child to
341
    # finish, e.g. due to a signal
342
    return not bool(exit_timeout)
343
  finally:
344
    child_io_proc.CloseAll()
345

    
346

    
347
def ParseOptions():
348
  """Parses the options passed to the program.
349

    
350
  @return: Arguments to program
351

    
352
  """
353
  global options # pylint: disable-msg=W0603
354

    
355
  parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
356
                                        (constants.IEM_IMPORT,
357
                                         constants.IEM_EXPORT)))
358
  parser.add_option(cli.DEBUG_OPT)
359
  parser.add_option(cli.VERBOSE_OPT)
360
  parser.add_option("--key", dest="key", action="store", type="string",
361
                    help="RSA key file")
362
  parser.add_option("--cert", dest="cert", action="store", type="string",
363
                    help="X509 certificate file")
364
  parser.add_option("--ca", dest="ca", action="store", type="string",
365
                    help="X509 CA file")
366
  parser.add_option("--bind", dest="bind", action="store", type="string",
367
                    help="Bind address")
368
  parser.add_option("--host", dest="host", action="store", type="string",
369
                    help="Remote hostname")
370
  parser.add_option("--port", dest="port", action="store", type="int",
371
                    help="Remote port")
372
  parser.add_option("--connect-retries", dest="connect_retries", action="store",
373
                    type="int", default=0,
374
                    help=("How many times the connection should be retried"
375
                          " (export only)"))
376
  parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
377
                    type="int", default=DEFAULT_CONNECT_TIMEOUT,
378
                    help="Timeout for connection to be established (seconds)")
379
  parser.add_option("--compress", dest="compress", action="store",
380
                    type="choice", help="Compression method",
381
                    metavar="[%s]" % "|".join(constants.IEC_ALL),
382
                    choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
383
  parser.add_option("--expected-size", dest="exp_size", action="store",
384
                    type="string", default=None,
385
                    help="Expected import/export size (MiB)")
386
  parser.add_option("--magic", dest="magic", action="store",
387
                    type="string", default=None, help="Magic string")
388
  parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
389
                    type="string", help="Command prefix")
390
  parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
391
                    type="string", help="Command suffix")
392

    
393
  (options, args) = parser.parse_args()
394

    
395
  if len(args) != 2:
396
    # Won't return
397
    parser.error("Expected exactly two arguments")
398

    
399
  (status_file_path, mode) = args
400

    
401
  if mode not in (constants.IEM_IMPORT,
402
                  constants.IEM_EXPORT):
403
    # Won't return
404
    parser.error("Invalid mode: %s" % mode)
405

    
406
  # Normalize and check parameters
407
  if options.host is not None:
408
    try:
409
      options.host = netutils.Hostname.GetNormalizedName(options.host)
410
    except errors.OpPrereqError, err:
411
      parser.error("Invalid hostname '%s': %s" % (options.host, err))
412

    
413
  if options.port is not None:
414
    options.port = utils.ValidateServiceName(options.port)
415

    
416
  if (options.exp_size is not None and
417
      options.exp_size != constants.IE_CUSTOM_SIZE):
418
    try:
419
      options.exp_size = int(options.exp_size)
420
    except (ValueError, TypeError), err:
421
      # Won't return
422
      parser.error("Invalid value for --expected-size: %s (%s)" %
423
                   (options.exp_size, err))
424

    
425
  if not (options.magic is None or constants.IE_MAGIC_RE.match(options.magic)):
426
    parser.error("Magic must match regular expression %s" %
427
                 constants.IE_MAGIC_RE.pattern)
428

    
429
  return (status_file_path, mode)
430

    
431

    
432
class ChildProcess(subprocess.Popen):
433
  def __init__(self, env, cmd, noclose_fds):
434
    """Initializes this class.
435

    
436
    """
437
    self._noclose_fds = noclose_fds
438

    
439
    # Not using close_fds because doing so would also close the socat stderr
440
    # pipe, which we still need.
441
    subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False,
442
                              stderr=subprocess.PIPE, stdout=None, stdin=None,
443
                              preexec_fn=self._ChildPreexec)
444
    self._SetProcessGroup()
445

    
446
  def _ChildPreexec(self):
447
    """Called before child executable is execve'd.
448

    
449
    """
450
    # Move to separate process group. By sending a signal to its process group
451
    # we can kill the child process and all grandchildren.
452
    os.setpgid(0, 0)
453

    
454
    # Close almost all file descriptors
455
    utils.CloseFDs(noclose_fds=self._noclose_fds)
456

    
457
  def _SetProcessGroup(self):
458
    """Sets the child's process group.
459

    
460
    """
461
    assert self.pid, "Can't be called in child process"
462

    
463
    # Avoid race condition by setting child's process group (as good as
464
    # possible in Python) before sending signals to child. For an
465
    # explanation, see preexec function for child.
466
    try:
467
      os.setpgid(self.pid, self.pid)
468
    except EnvironmentError, err:
469
      # If the child process was faster we receive EPERM or EACCES
470
      if err.errno not in (errno.EPERM, errno.EACCES):
471
        raise
472

    
473
  def Kill(self, signum):
474
    """Sends signal to child process.
475

    
476
    """
477
    logging.info("Sending signal %s to child process", signum)
478
    utils.IgnoreProcessNotFound(os.killpg, self.pid, signum)
479

    
480
  def ForceQuit(self):
481
    """Ensure child process is no longer running.
482

    
483
    """
484
    # Final check if child process is still alive
485
    if utils.RetryOnSignal(self.poll) is None:
486
      logging.error("Child process still alive, sending SIGKILL")
487
      self.Kill(signal.SIGKILL)
488
      utils.RetryOnSignal(self.wait)
489

    
490

    
491
def main():
492
  """Main function.
493

    
494
  """
495
  # Option parsing
496
  (status_file_path, mode) = ParseOptions()
497

    
498
  # Configure logging
499
  child_logger = SetupLogging()
500

    
501
  status_file = StatusFile(status_file_path)
502
  try:
503
    try:
504
      # Pipe to receive socat's stderr output
505
      (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
506

    
507
      # Pipe to receive dd's stderr output
508
      (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe()
509

    
510
      # Pipe to receive dd's PID
511
      (dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
512

    
513
      # Pipe to receive size predicted by export script
514
      (exp_size_read_fd, exp_size_write_fd) = os.pipe()
515

    
516
      # Get child process command
517
      cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
518
                                           dd_stderr_write_fd, dd_pid_write_fd)
519
      cmd = cmd_builder.GetCommand()
520

    
521
      # Prepare command environment
522
      cmd_env = os.environ.copy()
523

    
524
      if options.exp_size == constants.IE_CUSTOM_SIZE:
525
        cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd)
526

    
527
      logging.debug("Starting command %r", cmd)
528

    
529
      # Start child process
530
      child = ChildProcess(cmd_env, cmd,
531
                           [socat_stderr_write_fd, dd_stderr_write_fd,
532
                            dd_pid_write_fd, exp_size_write_fd])
533
      try:
534
        def _ForwardSignal(signum, _):
535
          """Forwards signals to child process.
536

    
537
          """
538
          child.Kill(signum)
539

    
540
        signal_wakeup = utils.SignalWakeupFd()
541
        try:
542
          # TODO: There is a race condition between starting the child and
543
          # handling the signals here. While there might be a way to work around
544
          # it by registering the handlers before starting the child and
545
          # deferring sent signals until the child is available, doing so can be
546
          # complicated.
547
          signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
548
                                               handler_fn=_ForwardSignal,
549
                                               wakeup=signal_wakeup)
550
          try:
551
            # Close child's side
552
            utils.RetryOnSignal(os.close, socat_stderr_write_fd)
553
            utils.RetryOnSignal(os.close, dd_stderr_write_fd)
554
            utils.RetryOnSignal(os.close, dd_pid_write_fd)
555
            utils.RetryOnSignal(os.close, exp_size_write_fd)
556

    
557
            if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
558
                              dd_pid_read_fd, exp_size_read_fd,
559
                              status_file, child_logger,
560
                              signal_wakeup, signal_handler, mode):
561
              # The child closed all its file descriptors and there was no
562
              # signal
563
              # TODO: Implement timeout instead of waiting indefinitely
564
              utils.RetryOnSignal(child.wait)
565
          finally:
566
            signal_handler.Reset()
567
        finally:
568
          signal_wakeup.Reset()
569
      finally:
570
        child.ForceQuit()
571

    
572
      if child.returncode == 0:
573
        errmsg = None
574
      elif child.returncode < 0:
575
        errmsg = "Exited due to signal %s" % (-child.returncode, )
576
      else:
577
        errmsg = "Exited with status %s" % (child.returncode, )
578

    
579
      status_file.SetExitStatus(child.returncode, errmsg)
580
    except Exception, err: # pylint: disable-msg=W0703
581
      logging.exception("Unhandled error occurred")
582
      status_file.SetExitStatus(constants.EXIT_FAILURE,
583
                                "Unhandled error occurred: %s" % (err, ))
584

    
585
    if status_file.ExitStatusIsSuccess():
586
      sys.exit(constants.EXIT_SUCCESS)
587

    
588
    sys.exit(constants.EXIT_FAILURE)
589
  finally:
590
    status_file.Update(True)
591

    
592

    
593
if __name__ == "__main__":
594
  main()