Statistics
| Branch: | Tag: | Revision:

root / daemons / import-export @ 2d76b580

History | View | Annotate | Download (18.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Import/export daemon.
23

    
24
"""
25

    
26
# pylint: disable-msg=C0103
27
# C0103: Invalid name import-export
28

    
29
import errno
30
import logging
31
import optparse
32
import os
33
import re
34
import select
35
import signal
36
import socket
37
import subprocess
38
import sys
39
import time
40
from cStringIO import StringIO
41

    
42
from ganeti import constants
43
from ganeti import cli
44
from ganeti import utils
45
from ganeti import serializer
46
from ganeti import objects
47
from ganeti import locking
48

    
49

    
50
#: Used to recognize point at which socat(1) starts to listen on its socket.
51
#: The local address is required for the remote peer to connect (in particular
52
#: the port number).
53
LISTENING_RE = re.compile(r"^listening on\s+"
54
                          r"AF=(?P<family>\d+)\s+"
55
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
56

    
57
#: Used to recognize point at which socat(1) is sending data over the wire
58
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
59
                              re.I)
60

    
61
SOCAT_LOG_DEBUG = "D"
62
SOCAT_LOG_INFO = "I"
63
SOCAT_LOG_NOTICE = "N"
64
SOCAT_LOG_WARNING = "W"
65
SOCAT_LOG_ERROR = "E"
66
SOCAT_LOG_FATAL = "F"
67

    
68
SOCAT_LOG_IGNORE = frozenset([
69
  SOCAT_LOG_DEBUG,
70
  SOCAT_LOG_INFO,
71
  SOCAT_LOG_NOTICE,
72
  ])
73

    
74
#: Socat buffer size: at most this many bytes are transferred per step
75
SOCAT_BUFSIZE = 1024 * 1024
76

    
77
#: How many lines to keep in the status file
78
MAX_RECENT_OUTPUT_LINES = 20
79

    
80
#: Don't update status file more than once every 5 seconds (unless forced)
81
MIN_UPDATE_INTERVAL = 5.0
82

    
83
#: Give child process up to 5 seconds to exit after sending a signal
84
CHILD_LINGER_TIMEOUT = 5.0
85

    
86
# Common options for socat
87
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
88
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
89
SOCAT_CONNECT_TIMEOUT = 60
90

    
91

    
92
# Global variable for options
93
options = None
94

    
95

    
96
class Error(Exception):
97
  """Generic exception"""
98

    
99

    
100
def SetupLogging():
101
  """Configures the logging module.
102

    
103
  """
104
  formatter = logging.Formatter("%(asctime)s: %(message)s")
105

    
106
  stderr_handler = logging.StreamHandler()
107
  stderr_handler.setFormatter(formatter)
108
  stderr_handler.setLevel(logging.NOTSET)
109

    
110
  root_logger = logging.getLogger("")
111
  root_logger.addHandler(stderr_handler)
112

    
113
  if options.debug:
114
    root_logger.setLevel(logging.NOTSET)
115
  elif options.verbose:
116
    root_logger.setLevel(logging.INFO)
117
  else:
118
    root_logger.setLevel(logging.ERROR)
119

    
120
  # Create special logger for child process output
121
  child_logger = logging.Logger("child output")
122
  child_logger.addHandler(stderr_handler)
123
  child_logger.setLevel(logging.NOTSET)
124

    
125
  return child_logger
126

    
127

    
128
def _VerifyListening(family, address, port):
129
  """Verify address given as listening address by socat.
130

    
131
  """
132
  # TODO: Implement IPv6 support
133
  if family != socket.AF_INET:
134
    raise Error("Address family %r not supported" % family)
135

    
136
  try:
137
    packed_address = socket.inet_pton(family, address)
138
  except socket.error:
139
    raise Error("Invalid address %r for family %s" % (address, family))
140

    
141
  return (socket.inet_ntop(family, packed_address), port)
142

    
143

    
144
class StatusFile:
145
  """Status file manager.
146

    
147
  """
148
  def __init__(self, path):
149
    """Initializes class.
150

    
151
    """
152
    self._path = path
153
    self._data = objects.ImportExportStatus(ctime=time.time(),
154
                                            mtime=None,
155
                                            recent_output=[])
156

    
157
  def AddRecentOutput(self, line):
158
    """Adds a new line of recent output.
159

    
160
    """
161
    self._data.recent_output.append(line)
162

    
163
    # Remove old lines
164
    del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
165

    
166
  def SetListenPort(self, port):
167
    """Sets the port the daemon is listening on.
168

    
169
    @type port: int
170
    @param port: TCP/UDP port
171

    
172
    """
173
    assert isinstance(port, (int, long)) and 0 < port < 2**16
174
    self._data.listen_port = port
175

    
176
  def GetListenPort(self):
177
    """Returns the port the daemon is listening on.
178

    
179
    """
180
    return self._data.listen_port
181

    
182
  def SetConnected(self):
183
    """Sets the connected flag.
184

    
185
    """
186
    self._data.connected = True
187

    
188
  def SetExitStatus(self, exit_status, error_message):
189
    """Sets the exit status and an error message.
190

    
191
    """
192
    # Require error message when status isn't 0
193
    assert exit_status == 0 or error_message
194

    
195
    self._data.exit_status = exit_status
196
    self._data.error_message = error_message
197

    
198
  def ExitStatusIsSuccess(self):
199
    """Returns whether the exit status means "success".
200

    
201
    """
202
    return not bool(self._data.error_message)
203

    
204
  def Update(self, force):
205
    """Updates the status file.
206

    
207
    @type force: bool
208
    @param force: Write status file in any case, not only when minimum interval
209
                  is expired
210

    
211
    """
212
    if not (force or
213
            self._data.mtime is None or
214
            time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
215
      return
216

    
217
    logging.debug("Updating status file %s", self._path)
218

    
219
    self._data.mtime = time.time()
220
    utils.WriteFile(self._path,
221
                    data=serializer.DumpJson(self._data.ToDict(), indent=True),
222
                    mode=0400)
223

    
224

    
225
def _ProcessSocatOutput(status_file, level, msg):
226
  """Interprets socat log output.
227

    
228
  """
229
  if level == SOCAT_LOG_NOTICE:
230
    if status_file.GetListenPort() is None:
231
      # TODO: Maybe implement timeout to not listen forever
232
      m = LISTENING_RE.match(msg)
233
      if m:
234
        (_, port) = _VerifyListening(int(m.group("family")), m.group("address"),
235
                                     int(m.group("port")))
236

    
237
        status_file.SetListenPort(port)
238
        return True
239

    
240
    m = TRANSFER_LOOP_RE.match(msg)
241
    if m:
242
      status_file.SetConnected()
243
      return True
244

    
245
  return False
246

    
247

    
248
def ProcessOutput(line, status_file, logger, socat):
249
  """Takes care of child process output.
250

    
251
  @param status_file: Status file manager
252
  @param logger: Child output logger
253
  @type socat: bool
254
  @param socat: Whether it's a socat output line
255
  @type line: string
256
  @param line: Child output line
257

    
258
  """
259
  force_update = False
260
  forward_line = line
261

    
262
  if socat:
263
    level = None
264
    parts = line.split(None, 4)
265

    
266
    if len(parts) == 5:
267
      (_, _, _, level, msg) = parts
268

    
269
      force_update = _ProcessSocatOutput(status_file, level, msg)
270

    
271
      if options.debug or (level and level not in SOCAT_LOG_IGNORE):
272
        forward_line = "socat: %s %s" % (level, msg)
273
      else:
274
        forward_line = None
275
    else:
276
      forward_line = "socat: %s" % line
277

    
278
  if forward_line:
279
    logger.info(forward_line)
280
    status_file.AddRecentOutput(forward_line)
281

    
282
  status_file.Update(force_update)
283

    
284

    
285
def GetBashCommand(cmd):
286
  """Prepares a command to be run in Bash.
287

    
288
  """
289
  return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
290

    
291

    
292
def GetSocatCommand(mode):
293
  """Returns the socat command.
294

    
295
  """
296
  common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
297
    "key=%s" % options.key,
298
    "cert=%s" % options.cert,
299
    "cafile=%s" % options.ca,
300
    ]
301

    
302
  if options.bind is not None:
303
    common_addr_opts.append("bind=%s" % options.bind)
304

    
305
  if mode == constants.IEM_IMPORT:
306
    if options.port is None:
307
      port = 0
308
    else:
309
      port = options.port
310

    
311
    addr1 = [
312
      "OPENSSL-LISTEN:%s" % port,
313
      "reuseaddr",
314
      ] + common_addr_opts
315
    addr2 = ["stdout"]
316

    
317
  elif mode == constants.IEM_EXPORT:
318
    addr1 = ["stdin"]
319
    addr2 = [
320
      "OPENSSL:%s:%s" % (options.host, options.port),
321
      "connect-timeout=%s" % SOCAT_CONNECT_TIMEOUT,
322
      ] + common_addr_opts
323

    
324
  else:
325
    raise Error("Invalid mode")
326

    
327
  for i in [addr1, addr2]:
328
    for value in i:
329
      if "," in value:
330
        raise Error("Comma not allowed in socat option value: %r" % value)
331

    
332
  return [
333
    constants.SOCAT_PATH,
334

    
335
    # Log to stderr
336
    "-ls",
337

    
338
    # Log level
339
    "-d", "-d",
340

    
341
    # Buffer size
342
    "-b%s" % SOCAT_BUFSIZE,
343

    
344
    # Unidirectional mode, the first address is only used for reading, and the
345
    # second address is only used for writing
346
    "-u",
347

    
348
    ",".join(addr1), ",".join(addr2)
349
    ]
350

    
351

    
352
def GetTransportCommand(mode, socat_stderr_fd):
353
  """Returns the command for the transport part of the daemon.
354

    
355
  @param mode: Daemon mode (import or export)
356
  @type socat_stderr_fd: int
357
  @param socat_stderr_fd: File descriptor socat should write its stderr to
358

    
359
  """
360
  socat_cmd = ("%s 2>&%d" %
361
               (utils.ShellQuoteArgs(GetSocatCommand(mode)),
362
                socat_stderr_fd))
363

    
364
  # TODO: Make compression configurable
365

    
366
  if mode == constants.IEM_IMPORT:
367
    transport_cmd = "%s | gunzip -c" % socat_cmd
368
  elif mode == constants.IEM_EXPORT:
369
    transport_cmd = "gzip -c | %s" % socat_cmd
370
  else:
371
    raise Error("Invalid mode")
372

    
373
  # TODO: Use "dd" to measure processed data (allows to give an ETA)
374
  # TODO: If a connection to socat is dropped (e.g. due to a wrong
375
  # certificate), socat should be restarted
376

    
377
  # TODO: Run transport as separate user
378
  # The transport uses its own shell to simplify running it as a separate user
379
  # in the future.
380
  return GetBashCommand(transport_cmd)
381

    
382

    
383
def GetCommand(mode, socat_stderr_fd):
384
  """Returns the complete child process command.
385

    
386
  """
387
  buf = StringIO()
388

    
389
  if options.cmd_prefix:
390
    buf.write(options.cmd_prefix)
391
    buf.write(" ")
392

    
393
  buf.write(utils.ShellQuoteArgs(GetTransportCommand(mode, socat_stderr_fd)))
394

    
395
  if options.cmd_suffix:
396
    buf.write(" ")
397
    buf.write(options.cmd_suffix)
398

    
399
  return GetBashCommand(buf.getvalue())
400

    
401

    
402
def ProcessChildIO(child, socat_stderr_read, status_file, child_logger,
403
                   signal_notify, signal_handler):
404
  """Handles the child processes' output.
405

    
406
  """
407
  poller = select.poll()
408

    
409
  script_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
410
                                           child_logger, False)
411
  try:
412
    socat_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
413
                                            child_logger, True)
414
    try:
415
      fdmap = {
416
        child.stderr.fileno(): (child.stderr, script_stderr_lines),
417
        socat_stderr_read.fileno(): (socat_stderr_read, socat_stderr_lines),
418
        signal_notify.fileno(): (signal_notify, None),
419
        }
420

    
421
      for fd in fdmap:
422
        utils.SetNonblockFlag(fd, True)
423
        poller.register(fd, select.POLLIN)
424

    
425
      timeout_calculator = None
426
      while True:
427
        # Break out of loop if only signal notify FD is left
428
        if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
429
          break
430

    
431
        if timeout_calculator:
432
          timeout = timeout_calculator.Remaining() * 1000
433
          if timeout < 0:
434
            logging.info("Child process didn't exit in time")
435
            break
436
        else:
437
          timeout = None
438

    
439
        for fd, event in utils.RetryOnSignal(poller.poll, timeout):
440
          if event & (select.POLLIN | event & select.POLLPRI):
441
            (from_, to) = fdmap[fd]
442

    
443
            # Read up to 1 KB of data
444
            data = from_.read(1024)
445
            if data:
446
              if to:
447
                to.write(data)
448
              elif fd == signal_notify.fileno():
449
                # Signal handling
450
                if signal_handler.called:
451
                  signal_handler.Clear()
452
                  if timeout_calculator:
453
                    logging.info("Child process still has about %0.2f seconds"
454
                                 " to exit", timeout_calculator.Remaining())
455
                  else:
456
                    logging.info("Giving child process %0.2f seconds to exit",
457
                                 CHILD_LINGER_TIMEOUT)
458
                    timeout_calculator = \
459
                      locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
460
            else:
461
              poller.unregister(fd)
462
              del fdmap[fd]
463

    
464
          elif event & (select.POLLNVAL | select.POLLHUP |
465
                        select.POLLERR):
466
            poller.unregister(fd)
467
            del fdmap[fd]
468

    
469
        script_stderr_lines.flush()
470
        socat_stderr_lines.flush()
471

    
472
      # If there was a timeout calculator, we were waiting for the child to
473
      # finish, e.g. due to a signal
474
      return not bool(timeout_calculator)
475
    finally:
476
      socat_stderr_lines.close()
477
  finally:
478
    script_stderr_lines.close()
479

    
480

    
481
def ParseOptions():
482
  """Parses the options passed to the program.
483

    
484
  @return: Arguments to program
485

    
486
  """
487
  global options # pylint: disable-msg=W0603
488

    
489
  parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
490
                                        (constants.IEM_IMPORT,
491
                                         constants.IEM_EXPORT)))
492
  parser.add_option(cli.DEBUG_OPT)
493
  parser.add_option(cli.VERBOSE_OPT)
494
  parser.add_option("--key", dest="key", action="store", type="string",
495
                    help="RSA key file")
496
  parser.add_option("--cert", dest="cert", action="store", type="string",
497
                    help="X509 certificate file")
498
  parser.add_option("--ca", dest="ca", action="store", type="string",
499
                    help="X509 CA file")
500
  parser.add_option("--bind", dest="bind", action="store", type="string",
501
                    help="Bind address")
502
  parser.add_option("--host", dest="host", action="store", type="string",
503
                    help="Remote hostname")
504
  parser.add_option("--port", dest="port", action="store", type="int",
505
                    help="Remote port")
506
  parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
507
                    type="string", help="Command prefix")
508
  parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
509
                    type="string", help="Command suffix")
510

    
511
  (options, args) = parser.parse_args()
512

    
513
  if len(args) != 2:
514
    # Won't return
515
    parser.error("Expected exactly two arguments")
516

    
517
  (status_file_path, mode) = args
518

    
519
  if mode not in (constants.IEM_IMPORT,
520
                  constants.IEM_EXPORT):
521
    # Won't return
522
    parser.error("Invalid mode: %s" % mode)
523

    
524
  return (status_file_path, mode)
525

    
526

    
527
def main():
528
  """Main function.
529

    
530
  """
531
  # Option parsing
532
  (status_file_path, mode) = ParseOptions()
533

    
534
  # Configure logging
535
  child_logger = SetupLogging()
536

    
537
  status_file = StatusFile(status_file_path)
538
  try:
539
    try:
540
      # Pipe to receive socat's stderr output
541
      (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
542

    
543
      # Pipe to notify on signals
544
      (signal_notify_read_fd, signal_notify_write_fd) = os.pipe()
545

    
546
      # Configure signal module's notifier
547
      try:
548
        # This is only supported in Python 2.5 and above (some distributions
549
        # backported it to Python 2.4)
550
        set_wakeup_fd_fn = signal.set_wakeup_fd
551
      except AttributeError:
552
        pass
553
      else:
554
        set_wakeup_fd_fn(signal_notify_write_fd)
555

    
556
      # Buffer size 0 is important, otherwise .read() with a specified length
557
      # might buffer data while poll(2) won't mark its file descriptor as
558
      # readable again.
559
      socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
560
      signal_notify_read = os.fdopen(signal_notify_read_fd, "r", 0)
561

    
562
      # Get child process command
563
      cmd = GetCommand(mode, socat_stderr_write_fd)
564

    
565
      logging.debug("Starting command %r", cmd)
566

    
567
      def _ChildPreexec():
568
        # Move child to separate process group. By sending a signal to its
569
        # process group we can kill the child process and all its own
570
        # child-processes.
571
        os.setpgid(0, 0)
572

    
573
        # Close almost all file descriptors
574
        utils.CloseFDs(noclose_fds=[socat_stderr_write_fd])
575

    
576
      # Not using close_fds because doing so would also close the socat stderr
577
      # pipe, which we still need.
578
      child = subprocess.Popen(cmd, shell=False, close_fds=False,
579
                               stderr=subprocess.PIPE, stdout=None, stdin=None,
580
                               preexec_fn=_ChildPreexec)
581
      try:
582
        # Avoid race condition by setting child's process group (as good as
583
        # possible in Python) before sending signals to child. For an
584
        # explanation, see preexec function for child.
585
        try:
586
          os.setpgid(child.pid, child.pid)
587
        except EnvironmentError, err:
588
          # If the child process was faster we receive EPERM or EACCES
589
          if err.errno not in (errno.EPERM, errno.EACCES):
590
            raise
591

    
592
        # Forward signals to child process
593
        def _ForwardSignal(signum, _):
594
          # Wake up poll(2)
595
          os.write(signal_notify_write_fd, "\0")
596

    
597
          # Send signal to child
598
          os.killpg(child.pid, signum)
599

    
600
        # TODO: There is a race condition between starting the child and
601
        # handling the signals here. While there might be a way to work around
602
        # it by registering the handlers before starting the child and
603
        # deferring sent signals until the child is available, doing so can be
604
        # complicated.
605
        signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
606
                                             handler_fn=_ForwardSignal)
607
        try:
608
          # Close child's side
609
          utils.RetryOnSignal(os.close, socat_stderr_write_fd)
610

    
611
          if ProcessChildIO(child, socat_stderr_read, status_file, child_logger,
612
                            signal_notify_read, signal_handler):
613
            # The child closed all its file descriptors and there was no signal
614
            # TODO: Implement timeout instead of waiting indefinitely
615
            utils.RetryOnSignal(child.wait)
616
        finally:
617
          signal_handler.Reset()
618
      finally:
619
        # Final check if child process is still alive
620
        if utils.RetryOnSignal(child.poll) is None:
621
          logging.error("Child process still alive, sending SIGKILL")
622
          os.killpg(child.pid, signal.SIGKILL)
623
          utils.RetryOnSignal(child.wait)
624

    
625
      if child.returncode == 0:
626
        errmsg = None
627
      elif child.returncode < 0:
628
        errmsg = "Exited due to signal %s" % (-child.returncode, )
629
      else:
630
        errmsg = "Exited with status %s" % (child.returncode, )
631

    
632
      status_file.SetExitStatus(child.returncode, errmsg)
633
    except Exception, err: # pylint: disable-msg=W0703
634
      logging.exception("Unhandled error occurred")
635
      status_file.SetExitStatus(constants.EXIT_FAILURE,
636
                                "Unhandled error occurred: %s" % (err, ))
637

    
638
    if status_file.ExitStatusIsSuccess():
639
      sys.exit(constants.EXIT_SUCCESS)
640

    
641
    sys.exit(constants.EXIT_FAILURE)
642
  finally:
643
    status_file.Update(True)
644

    
645

    
646
if __name__ == "__main__":
647
  main()