Statistics
| Branch: | Tag: | Revision:

root / daemons / import-export @ 34c9ee7b

History | View | Annotate | Download (14.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Import/export daemon.
23

    
24
"""
25

    
26
# pylint: disable-msg=C0103
27
# C0103: Invalid name import-export
28

    
29
import errno
30
import logging
31
import optparse
32
import os
33
import select
34
import signal
35
import subprocess
36
import sys
37
import time
38

    
39
from ganeti import constants
40
from ganeti import cli
41
from ganeti import utils
42
from ganeti import serializer
43
from ganeti import objects
44
from ganeti import locking
45
from ganeti import impexpd
46

    
47

    
48
#: How many lines to keep in the status file
49
MAX_RECENT_OUTPUT_LINES = 20
50

    
51
#: Don't update status file more than once every 5 seconds (unless forced)
52
MIN_UPDATE_INTERVAL = 5.0
53

    
54
#: Give child process up to 5 seconds to exit after sending a signal
55
CHILD_LINGER_TIMEOUT = 5.0
56

    
57
#: How long to wait for a connection to be established
58
DEFAULT_CONNECT_TIMEOUT = 60
59

    
60

    
61
# Global variable for options
62
options = None
63

    
64

    
65
def SetupLogging():
66
  """Configures the logging module.
67

    
68
  """
69
  formatter = logging.Formatter("%(asctime)s: %(message)s")
70

    
71
  stderr_handler = logging.StreamHandler()
72
  stderr_handler.setFormatter(formatter)
73
  stderr_handler.setLevel(logging.NOTSET)
74

    
75
  root_logger = logging.getLogger("")
76
  root_logger.addHandler(stderr_handler)
77

    
78
  if options.debug:
79
    root_logger.setLevel(logging.NOTSET)
80
  elif options.verbose:
81
    root_logger.setLevel(logging.INFO)
82
  else:
83
    root_logger.setLevel(logging.ERROR)
84

    
85
  # Create special logger for child process output
86
  child_logger = logging.Logger("child output")
87
  child_logger.addHandler(stderr_handler)
88
  child_logger.setLevel(logging.NOTSET)
89

    
90
  return child_logger
91

    
92

    
93
class StatusFile:
94
  """Status file manager.
95

    
96
  """
97
  def __init__(self, path):
98
    """Initializes class.
99

    
100
    """
101
    self._path = path
102
    self._data = objects.ImportExportStatus(ctime=time.time(),
103
                                            mtime=None,
104
                                            recent_output=[])
105

    
106
  def AddRecentOutput(self, line):
107
    """Adds a new line of recent output.
108

    
109
    """
110
    self._data.recent_output.append(line)
111

    
112
    # Remove old lines
113
    del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
114

    
115
  def SetListenPort(self, port):
116
    """Sets the port the daemon is listening on.
117

    
118
    @type port: int
119
    @param port: TCP/UDP port
120

    
121
    """
122
    assert isinstance(port, (int, long)) and 0 < port < 2**16
123
    self._data.listen_port = port
124

    
125
  def GetListenPort(self):
126
    """Returns the port the daemon is listening on.
127

    
128
    """
129
    return self._data.listen_port
130

    
131
  def SetConnected(self):
132
    """Sets the connected flag.
133

    
134
    """
135
    self._data.connected = True
136

    
137
  def GetConnected(self):
138
    """Determines whether the daemon is connected.
139

    
140
    """
141
    return self._data.connected
142

    
143
  def SetExitStatus(self, exit_status, error_message):
144
    """Sets the exit status and an error message.
145

    
146
    """
147
    # Require error message when status isn't 0
148
    assert exit_status == 0 or error_message
149

    
150
    self._data.exit_status = exit_status
151
    self._data.error_message = error_message
152

    
153
  def ExitStatusIsSuccess(self):
154
    """Returns whether the exit status means "success".
155

    
156
    """
157
    return not bool(self._data.error_message)
158

    
159
  def Update(self, force):
160
    """Updates the status file.
161

    
162
    @type force: bool
163
    @param force: Write status file in any case, not only when minimum interval
164
                  is expired
165

    
166
    """
167
    if not (force or
168
            self._data.mtime is None or
169
            time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
170
      return
171

    
172
    logging.debug("Updating status file %s", self._path)
173

    
174
    self._data.mtime = time.time()
175
    utils.WriteFile(self._path,
176
                    data=serializer.DumpJson(self._data.ToDict(), indent=True),
177
                    mode=0400)
178

    
179

    
180
def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
181
                   signal_notify, signal_handler, mode):
182
  """Handles the child processes' output.
183

    
184
  """
185
  assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
186
         "Other signals are not handled in this function"
187

    
188
  # Buffer size 0 is important, otherwise .read() with a specified length
189
  # might buffer data while poll(2) won't mark its file descriptor as
190
  # readable again.
191
  socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
192

    
193
  child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
194
                                           child_logger)
195
  try:
196
    fdmap = {
197
      child.stderr.fileno():
198
        (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)),
199
      socat_stderr_read.fileno():
200
        (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)),
201
      signal_notify.fileno(): (signal_notify, None),
202
      }
203

    
204
    poller = select.poll()
205
    for fd in fdmap:
206
      utils.SetNonblockFlag(fd, True)
207
      poller.register(fd, select.POLLIN)
208

    
209
    if options.connect_timeout and mode == constants.IEM_IMPORT:
210
      listen_timeout = locking.RunningTimeout(options.connect_timeout, True)
211
    else:
212
      listen_timeout = None
213

    
214
    exit_timeout = None
215

    
216
    while True:
217
      # Break out of loop if only signal notify FD is left
218
      if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
219
        break
220

    
221
      timeout = None
222

    
223
      if listen_timeout and not exit_timeout:
224
        if status_file.GetConnected():
225
          listen_timeout = None
226
        elif listen_timeout.Remaining() < 0:
227
          logging.info("Child process didn't establish connection in time")
228
          child.Kill(signal.SIGTERM)
229
          exit_timeout = \
230
            locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
231
          # Next block will calculate timeout
232
        else:
233
          # Not yet connected, check again in a second
234
          timeout = 1000
235

    
236
      if exit_timeout:
237
        timeout = exit_timeout.Remaining() * 1000
238
        if timeout < 0:
239
          logging.info("Child process didn't exit in time")
240
          break
241

    
242
      for fd, event in utils.RetryOnSignal(poller.poll, timeout):
243
        if event & (select.POLLIN | event & select.POLLPRI):
244
          (from_, to) = fdmap[fd]
245

    
246
          # Read up to 1 KB of data
247
          data = from_.read(1024)
248
          if data:
249
            if to:
250
              to.write(data)
251
            elif fd == signal_notify.fileno():
252
              # Signal handling
253
              if signal_handler.called:
254
                signal_handler.Clear()
255
                if exit_timeout:
256
                  logging.info("Child process still has about %0.2f seconds"
257
                               " to exit", exit_timeout.Remaining())
258
                else:
259
                  logging.info("Giving child process %0.2f seconds to exit",
260
                               CHILD_LINGER_TIMEOUT)
261
                  exit_timeout = \
262
                    locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
263
          else:
264
            poller.unregister(fd)
265
            del fdmap[fd]
266

    
267
        elif event & (select.POLLNVAL | select.POLLHUP |
268
                      select.POLLERR):
269
          poller.unregister(fd)
270
          del fdmap[fd]
271

    
272
      child_io_proc.FlushAll()
273

    
274
    # If there was a timeout calculator, we were waiting for the child to
275
    # finish, e.g. due to a signal
276
    return not bool(exit_timeout)
277
  finally:
278
    child_io_proc.CloseAll()
279

    
280

    
281
def ParseOptions():
282
  """Parses the options passed to the program.
283

    
284
  @return: Arguments to program
285

    
286
  """
287
  global options # pylint: disable-msg=W0603
288

    
289
  parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
290
                                        (constants.IEM_IMPORT,
291
                                         constants.IEM_EXPORT)))
292
  parser.add_option(cli.DEBUG_OPT)
293
  parser.add_option(cli.VERBOSE_OPT)
294
  parser.add_option("--key", dest="key", action="store", type="string",
295
                    help="RSA key file")
296
  parser.add_option("--cert", dest="cert", action="store", type="string",
297
                    help="X509 certificate file")
298
  parser.add_option("--ca", dest="ca", action="store", type="string",
299
                    help="X509 CA file")
300
  parser.add_option("--bind", dest="bind", action="store", type="string",
301
                    help="Bind address")
302
  parser.add_option("--host", dest="host", action="store", type="string",
303
                    help="Remote hostname")
304
  parser.add_option("--port", dest="port", action="store", type="int",
305
                    help="Remote port")
306
  parser.add_option("--connect-retries", dest="connect_retries", action="store",
307
                    type="int", default=0,
308
                    help=("How many times the connection should be retried"
309
                          " (export only)"))
310
  parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
311
                    type="int", default=DEFAULT_CONNECT_TIMEOUT,
312
                    help="Timeout for connection to be established (seconds)")
313
  parser.add_option("--compress", dest="compress", action="store",
314
                    type="choice", help="Compression method",
315
                    metavar="[%s]" % "|".join(constants.IEC_ALL),
316
                    choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
317
  parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
318
                    type="string", help="Command prefix")
319
  parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
320
                    type="string", help="Command suffix")
321

    
322
  (options, args) = parser.parse_args()
323

    
324
  if len(args) != 2:
325
    # Won't return
326
    parser.error("Expected exactly two arguments")
327

    
328
  (status_file_path, mode) = args
329

    
330
  if mode not in (constants.IEM_IMPORT,
331
                  constants.IEM_EXPORT):
332
    # Won't return
333
    parser.error("Invalid mode: %s" % mode)
334

    
335
  return (status_file_path, mode)
336

    
337

    
338
class ChildProcess(subprocess.Popen):
339
  def __init__(self, cmd, noclose_fds):
340
    """Initializes this class.
341

    
342
    """
343
    self._noclose_fds = noclose_fds
344

    
345
    # Not using close_fds because doing so would also close the socat stderr
346
    # pipe, which we still need.
347
    subprocess.Popen.__init__(self, cmd, shell=False, close_fds=False,
348
                              stderr=subprocess.PIPE, stdout=None, stdin=None,
349
                              preexec_fn=self._ChildPreexec)
350
    self._SetProcessGroup()
351

    
352
  def _ChildPreexec(self):
353
    """Called before child executable is execve'd.
354

    
355
    """
356
    # Move to separate process group. By sending a signal to its process group
357
    # we can kill the child process and all grandchildren.
358
    os.setpgid(0, 0)
359

    
360
    # Close almost all file descriptors
361
    utils.CloseFDs(noclose_fds=self._noclose_fds)
362

    
363
  def _SetProcessGroup(self):
364
    """Sets the child's process group.
365

    
366
    """
367
    assert self.pid, "Can't be called in child process"
368

    
369
    # Avoid race condition by setting child's process group (as good as
370
    # possible in Python) before sending signals to child. For an
371
    # explanation, see preexec function for child.
372
    try:
373
      os.setpgid(self.pid, self.pid)
374
    except EnvironmentError, err:
375
      # If the child process was faster we receive EPERM or EACCES
376
      if err.errno not in (errno.EPERM, errno.EACCES):
377
        raise
378

    
379
  def Kill(self, signum):
380
    """Sends signal to child process.
381

    
382
    """
383
    logging.info("Sending signal %s to child process", signum)
384
    os.killpg(self.pid, signum)
385

    
386
  def ForceQuit(self):
387
    """Ensure child process is no longer running.
388

    
389
    """
390
    # Final check if child process is still alive
391
    if utils.RetryOnSignal(self.poll) is None:
392
      logging.error("Child process still alive, sending SIGKILL")
393
      self.Kill(signal.SIGKILL)
394
      utils.RetryOnSignal(self.wait)
395

    
396

    
397
def main():
398
  """Main function.
399

    
400
  """
401
  # Option parsing
402
  (status_file_path, mode) = ParseOptions()
403

    
404
  # Configure logging
405
  child_logger = SetupLogging()
406

    
407
  status_file = StatusFile(status_file_path)
408
  try:
409
    try:
410
      # Pipe to receive socat's stderr output
411
      (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
412

    
413
      # Get child process command
414
      cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd)
415
      cmd = cmd_builder.GetCommand()
416

    
417
      logging.debug("Starting command %r", cmd)
418

    
419
      # Start child process
420
      child = ChildProcess(cmd, [socat_stderr_write_fd])
421
      try:
422
        def _ForwardSignal(signum, _):
423
          """Forwards signals to child process.
424

    
425
          """
426
          child.Kill(signum)
427

    
428
        signal_wakeup = utils.SignalWakeupFd()
429
        try:
430
          # TODO: There is a race condition between starting the child and
431
          # handling the signals here. While there might be a way to work around
432
          # it by registering the handlers before starting the child and
433
          # deferring sent signals until the child is available, doing so can be
434
          # complicated.
435
          signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
436
                                               handler_fn=_ForwardSignal,
437
                                               wakeup=signal_wakeup)
438
          try:
439
            # Close child's side
440
            utils.RetryOnSignal(os.close, socat_stderr_write_fd)
441

    
442
            if ProcessChildIO(child, socat_stderr_read_fd, status_file,
443
                              child_logger, signal_wakeup, signal_handler,
444
                              mode):
445
              # The child closed all its file descriptors and there was no
446
              # signal
447
              # TODO: Implement timeout instead of waiting indefinitely
448
              utils.RetryOnSignal(child.wait)
449
          finally:
450
            signal_handler.Reset()
451
        finally:
452
          signal_wakeup.Reset()
453
      finally:
454
        child.ForceQuit()
455

    
456
      if child.returncode == 0:
457
        errmsg = None
458
      elif child.returncode < 0:
459
        errmsg = "Exited due to signal %s" % (-child.returncode, )
460
      else:
461
        errmsg = "Exited with status %s" % (child.returncode, )
462

    
463
      status_file.SetExitStatus(child.returncode, errmsg)
464
    except Exception, err: # pylint: disable-msg=W0703
465
      logging.exception("Unhandled error occurred")
466
      status_file.SetExitStatus(constants.EXIT_FAILURE,
467
                                "Unhandled error occurred: %s" % (err, ))
468

    
469
    if status_file.ExitStatusIsSuccess():
470
      sys.exit(constants.EXIT_SUCCESS)
471

    
472
    sys.exit(constants.EXIT_FAILURE)
473
  finally:
474
    status_file.Update(True)
475

    
476

    
477
if __name__ == "__main__":
478
  main()