Statistics
| Branch: | Tag: | Revision:

root / daemons / import-export @ c08d76f5

History | View | Annotate | Download (17.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Import/export daemon.
23

    
24
"""
25

    
26
# pylint: disable-msg=C0103
27
# C0103: Invalid name import-export
28

    
29
import errno
30
import logging
31
import optparse
32
import os
33
import select
34
import signal
35
import subprocess
36
import sys
37
import time
38
import math
39

    
40
from ganeti import constants
41
from ganeti import cli
42
from ganeti import utils
43
from ganeti import serializer
44
from ganeti import objects
45
from ganeti import locking
46
from ganeti import impexpd
47

    
48

    
49
#: How many lines to keep in the status file
50
MAX_RECENT_OUTPUT_LINES = 20
51

    
52
#: Don't update status file more than once every 5 seconds (unless forced)
53
MIN_UPDATE_INTERVAL = 5.0
54

    
55
#: Give child process up to 5 seconds to exit after sending a signal
56
CHILD_LINGER_TIMEOUT = 5.0
57

    
58
#: How long to wait for a connection to be established
59
DEFAULT_CONNECT_TIMEOUT = 60
60

    
61
#: Get dd(1) statistics every few seconds
62
DD_STATISTICS_INTERVAL = 5.0
63

    
64
#: Seconds for throughput calculation
65
DD_THROUGHPUT_INTERVAL = 60.0
66

    
67
#: Number of samples for throughput calculation
68
DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) /
69
                                      DD_STATISTICS_INTERVAL))
70

    
71

    
72
# Global variable for options
73
options = None
74

    
75

    
76
def SetupLogging():
77
  """Configures the logging module.
78

    
79
  """
80
  formatter = logging.Formatter("%(asctime)s: %(message)s")
81

    
82
  stderr_handler = logging.StreamHandler()
83
  stderr_handler.setFormatter(formatter)
84
  stderr_handler.setLevel(logging.NOTSET)
85

    
86
  root_logger = logging.getLogger("")
87
  root_logger.addHandler(stderr_handler)
88

    
89
  if options.debug:
90
    root_logger.setLevel(logging.NOTSET)
91
  elif options.verbose:
92
    root_logger.setLevel(logging.INFO)
93
  else:
94
    root_logger.setLevel(logging.ERROR)
95

    
96
  # Create special logger for child process output
97
  child_logger = logging.Logger("child output")
98
  child_logger.addHandler(stderr_handler)
99
  child_logger.setLevel(logging.NOTSET)
100

    
101
  return child_logger
102

    
103

    
104
class StatusFile:
105
  """Status file manager.
106

    
107
  """
108
  def __init__(self, path):
109
    """Initializes class.
110

    
111
    """
112
    self._path = path
113
    self._data = objects.ImportExportStatus(ctime=time.time(),
114
                                            mtime=None,
115
                                            recent_output=[])
116

    
117
  def AddRecentOutput(self, line):
118
    """Adds a new line of recent output.
119

    
120
    """
121
    self._data.recent_output.append(line)
122

    
123
    # Remove old lines
124
    del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
125

    
126
  def SetListenPort(self, port):
127
    """Sets the port the daemon is listening on.
128

    
129
    @type port: int
130
    @param port: TCP/UDP port
131

    
132
    """
133
    assert isinstance(port, (int, long)) and 0 < port < 2**16
134
    self._data.listen_port = port
135

    
136
  def GetListenPort(self):
137
    """Returns the port the daemon is listening on.
138

    
139
    """
140
    return self._data.listen_port
141

    
142
  def SetConnected(self):
143
    """Sets the connected flag.
144

    
145
    """
146
    self._data.connected = True
147

    
148
  def GetConnected(self):
149
    """Determines whether the daemon is connected.
150

    
151
    """
152
    return self._data.connected
153

    
154
  def SetProgress(self, mbytes, throughput, percent, eta):
155
    """Sets how much data has been transferred so far.
156

    
157
    @type mbytes: number
158
    @param mbytes: Transferred amount of data in MiB.
159
    @type throughput: float
160
    @param throughput: MiB/second
161
    @type percent: number
162
    @param percent: Percent processed
163
    @type eta: number
164
    @param eta: Expected number of seconds until done
165

    
166
    """
167
    self._data.progress_mbytes = mbytes
168
    self._data.progress_throughput = throughput
169
    self._data.progress_percent = percent
170
    self._data.progress_eta = eta
171

    
172
  def SetExitStatus(self, exit_status, error_message):
173
    """Sets the exit status and an error message.
174

    
175
    """
176
    # Require error message when status isn't 0
177
    assert exit_status == 0 or error_message
178

    
179
    self._data.exit_status = exit_status
180
    self._data.error_message = error_message
181

    
182
  def ExitStatusIsSuccess(self):
183
    """Returns whether the exit status means "success".
184

    
185
    """
186
    return not bool(self._data.error_message)
187

    
188
  def Update(self, force):
189
    """Updates the status file.
190

    
191
    @type force: bool
192
    @param force: Write status file in any case, not only when minimum interval
193
                  is expired
194

    
195
    """
196
    if not (force or
197
            self._data.mtime is None or
198
            time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
199
      return
200

    
201
    logging.debug("Updating status file %s", self._path)
202

    
203
    self._data.mtime = time.time()
204
    utils.WriteFile(self._path,
205
                    data=serializer.DumpJson(self._data.ToDict(), indent=True),
206
                    mode=0400)
207

    
208

    
209
def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
210
                   dd_pid_read_fd, status_file, child_logger,
211
                   signal_notify, signal_handler, mode):
212
  """Handles the child processes' output.
213

    
214
  """
215
  assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
216
         "Other signals are not handled in this function"
217

    
218
  # Buffer size 0 is important, otherwise .read() with a specified length
219
  # might buffer data while poll(2) won't mark its file descriptor as
220
  # readable again.
221
  socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
222
  dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
223
  dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
224

    
225
  tp_samples = DD_THROUGHPUT_SAMPLES
226

    
227
  child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
228
                                           child_logger,
229
                                           throughput_samples=tp_samples)
230
  try:
231
    fdmap = {
232
      child.stderr.fileno():
233
        (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)),
234
      socat_stderr_read.fileno():
235
        (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)),
236
      dd_pid_read.fileno():
237
        (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
238
      dd_stderr_read.fileno():
239
        (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
240
      signal_notify.fileno(): (signal_notify, None),
241
      }
242

    
243
    poller = select.poll()
244
    for fd in fdmap:
245
      utils.SetNonblockFlag(fd, True)
246
      poller.register(fd, select.POLLIN)
247

    
248
    if options.connect_timeout and mode == constants.IEM_IMPORT:
249
      listen_timeout = locking.RunningTimeout(options.connect_timeout, True)
250
    else:
251
      listen_timeout = None
252

    
253
    exit_timeout = None
254
    dd_stats_timeout = None
255

    
256
    while True:
257
      # Break out of loop if only signal notify FD is left
258
      if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
259
        break
260

    
261
      timeout = None
262

    
263
      if listen_timeout and not exit_timeout:
264
        if status_file.GetConnected():
265
          listen_timeout = None
266
        elif listen_timeout.Remaining() < 0:
267
          logging.info("Child process didn't establish connection in time")
268
          child.Kill(signal.SIGTERM)
269
          exit_timeout = \
270
            locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
271
          # Next block will calculate timeout
272
        else:
273
          # Not yet connected, check again in a second
274
          timeout = 1000
275

    
276
      if exit_timeout:
277
        timeout = exit_timeout.Remaining() * 1000
278
        if timeout < 0:
279
          logging.info("Child process didn't exit in time")
280
          break
281

    
282
      if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0:
283
        notify_status = child_io_proc.NotifyDd()
284
        if notify_status:
285
          # Schedule next notification
286
          dd_stats_timeout = locking.RunningTimeout(DD_STATISTICS_INTERVAL,
287
                                                    True)
288
        else:
289
          # Try again soon (dd isn't ready yet)
290
          dd_stats_timeout = locking.RunningTimeout(1.0, True)
291

    
292
      if dd_stats_timeout:
293
        dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000)
294

    
295
        if timeout is None:
296
          timeout = dd_timeout
297
        else:
298
          timeout = min(timeout, dd_timeout)
299

    
300
      for fd, event in utils.RetryOnSignal(poller.poll, timeout):
301
        if event & (select.POLLIN | event & select.POLLPRI):
302
          (from_, to) = fdmap[fd]
303

    
304
          # Read up to 1 KB of data
305
          data = from_.read(1024)
306
          if data:
307
            if to:
308
              to.write(data)
309
            elif fd == signal_notify.fileno():
310
              # Signal handling
311
              if signal_handler.called:
312
                signal_handler.Clear()
313
                if exit_timeout:
314
                  logging.info("Child process still has about %0.2f seconds"
315
                               " to exit", exit_timeout.Remaining())
316
                else:
317
                  logging.info("Giving child process %0.2f seconds to exit",
318
                               CHILD_LINGER_TIMEOUT)
319
                  exit_timeout = \
320
                    locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
321
          else:
322
            poller.unregister(fd)
323
            del fdmap[fd]
324

    
325
        elif event & (select.POLLNVAL | select.POLLHUP |
326
                      select.POLLERR):
327
          poller.unregister(fd)
328
          del fdmap[fd]
329

    
330
      child_io_proc.FlushAll()
331

    
332
    # If there was a timeout calculator, we were waiting for the child to
333
    # finish, e.g. due to a signal
334
    return not bool(exit_timeout)
335
  finally:
336
    child_io_proc.CloseAll()
337

    
338

    
339
def ParseOptions():
340
  """Parses the options passed to the program.
341

    
342
  @return: Arguments to program
343

    
344
  """
345
  global options # pylint: disable-msg=W0603
346

    
347
  parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
348
                                        (constants.IEM_IMPORT,
349
                                         constants.IEM_EXPORT)))
350
  parser.add_option(cli.DEBUG_OPT)
351
  parser.add_option(cli.VERBOSE_OPT)
352
  parser.add_option("--key", dest="key", action="store", type="string",
353
                    help="RSA key file")
354
  parser.add_option("--cert", dest="cert", action="store", type="string",
355
                    help="X509 certificate file")
356
  parser.add_option("--ca", dest="ca", action="store", type="string",
357
                    help="X509 CA file")
358
  parser.add_option("--bind", dest="bind", action="store", type="string",
359
                    help="Bind address")
360
  parser.add_option("--host", dest="host", action="store", type="string",
361
                    help="Remote hostname")
362
  parser.add_option("--port", dest="port", action="store", type="int",
363
                    help="Remote port")
364
  parser.add_option("--connect-retries", dest="connect_retries", action="store",
365
                    type="int", default=0,
366
                    help=("How many times the connection should be retried"
367
                          " (export only)"))
368
  parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
369
                    type="int", default=DEFAULT_CONNECT_TIMEOUT,
370
                    help="Timeout for connection to be established (seconds)")
371
  parser.add_option("--compress", dest="compress", action="store",
372
                    type="choice", help="Compression method",
373
                    metavar="[%s]" % "|".join(constants.IEC_ALL),
374
                    choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
375
  parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
376
                    type="string", help="Command prefix")
377
  parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
378
                    type="string", help="Command suffix")
379

    
380
  (options, args) = parser.parse_args()
381

    
382
  if len(args) != 2:
383
    # Won't return
384
    parser.error("Expected exactly two arguments")
385

    
386
  (status_file_path, mode) = args
387

    
388
  if mode not in (constants.IEM_IMPORT,
389
                  constants.IEM_EXPORT):
390
    # Won't return
391
    parser.error("Invalid mode: %s" % mode)
392

    
393
  return (status_file_path, mode)
394

    
395

    
396
class ChildProcess(subprocess.Popen):
397
  def __init__(self, cmd, noclose_fds):
398
    """Initializes this class.
399

    
400
    """
401
    self._noclose_fds = noclose_fds
402

    
403
    # Not using close_fds because doing so would also close the socat stderr
404
    # pipe, which we still need.
405
    subprocess.Popen.__init__(self, cmd, shell=False, close_fds=False,
406
                              stderr=subprocess.PIPE, stdout=None, stdin=None,
407
                              preexec_fn=self._ChildPreexec)
408
    self._SetProcessGroup()
409

    
410
  def _ChildPreexec(self):
411
    """Called before child executable is execve'd.
412

    
413
    """
414
    # Move to separate process group. By sending a signal to its process group
415
    # we can kill the child process and all grandchildren.
416
    os.setpgid(0, 0)
417

    
418
    # Close almost all file descriptors
419
    utils.CloseFDs(noclose_fds=self._noclose_fds)
420

    
421
  def _SetProcessGroup(self):
422
    """Sets the child's process group.
423

    
424
    """
425
    assert self.pid, "Can't be called in child process"
426

    
427
    # Avoid race condition by setting child's process group (as good as
428
    # possible in Python) before sending signals to child. For an
429
    # explanation, see preexec function for child.
430
    try:
431
      os.setpgid(self.pid, self.pid)
432
    except EnvironmentError, err:
433
      # If the child process was faster we receive EPERM or EACCES
434
      if err.errno not in (errno.EPERM, errno.EACCES):
435
        raise
436

    
437
  def Kill(self, signum):
438
    """Sends signal to child process.
439

    
440
    """
441
    logging.info("Sending signal %s to child process", signum)
442
    os.killpg(self.pid, signum)
443

    
444
  def ForceQuit(self):
445
    """Ensure child process is no longer running.
446

    
447
    """
448
    # Final check if child process is still alive
449
    if utils.RetryOnSignal(self.poll) is None:
450
      logging.error("Child process still alive, sending SIGKILL")
451
      self.Kill(signal.SIGKILL)
452
      utils.RetryOnSignal(self.wait)
453

    
454

    
455
def main():
456
  """Main function.
457

    
458
  """
459
  # Option parsing
460
  (status_file_path, mode) = ParseOptions()
461

    
462
  # Configure logging
463
  child_logger = SetupLogging()
464

    
465
  status_file = StatusFile(status_file_path)
466
  try:
467
    try:
468
      # Pipe to receive socat's stderr output
469
      (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
470

    
471
      # Pipe to receive dd's stderr output
472
      (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe()
473

    
474
      # Pipe to receive dd's PID
475
      (dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
476

    
477
      # Get child process command
478
      cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
479
                                           dd_stderr_write_fd, dd_pid_write_fd)
480
      cmd = cmd_builder.GetCommand()
481

    
482
      logging.debug("Starting command %r", cmd)
483

    
484
      # Start child process
485
      child = ChildProcess(cmd, [socat_stderr_write_fd, dd_stderr_write_fd,
486
                                 dd_pid_write_fd])
487
      try:
488
        def _ForwardSignal(signum, _):
489
          """Forwards signals to child process.
490

    
491
          """
492
          child.Kill(signum)
493

    
494
        signal_wakeup = utils.SignalWakeupFd()
495
        try:
496
          # TODO: There is a race condition between starting the child and
497
          # handling the signals here. While there might be a way to work around
498
          # it by registering the handlers before starting the child and
499
          # deferring sent signals until the child is available, doing so can be
500
          # complicated.
501
          signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
502
                                               handler_fn=_ForwardSignal,
503
                                               wakeup=signal_wakeup)
504
          try:
505
            # Close child's side
506
            utils.RetryOnSignal(os.close, socat_stderr_write_fd)
507
            utils.RetryOnSignal(os.close, dd_stderr_write_fd)
508
            utils.RetryOnSignal(os.close, dd_pid_write_fd)
509

    
510
            if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
511
                              dd_pid_read_fd, status_file, child_logger,
512
                              signal_wakeup, signal_handler, mode):
513
              # The child closed all its file descriptors and there was no
514
              # signal
515
              # TODO: Implement timeout instead of waiting indefinitely
516
              utils.RetryOnSignal(child.wait)
517
          finally:
518
            signal_handler.Reset()
519
        finally:
520
          signal_wakeup.Reset()
521
      finally:
522
        child.ForceQuit()
523

    
524
      if child.returncode == 0:
525
        errmsg = None
526
      elif child.returncode < 0:
527
        errmsg = "Exited due to signal %s" % (-child.returncode, )
528
      else:
529
        errmsg = "Exited with status %s" % (child.returncode, )
530

    
531
      status_file.SetExitStatus(child.returncode, errmsg)
532
    except Exception, err: # pylint: disable-msg=W0703
533
      logging.exception("Unhandled error occurred")
534
      status_file.SetExitStatus(constants.EXIT_FAILURE,
535
                                "Unhandled error occurred: %s" % (err, ))
536

    
537
    if status_file.ExitStatusIsSuccess():
538
      sys.exit(constants.EXIT_SUCCESS)
539

    
540
    sys.exit(constants.EXIT_FAILURE)
541
  finally:
542
    status_file.Update(True)
543

    
544

    
545
if __name__ == "__main__":
546
  main()