Statistics
| Branch: | Tag: | Revision:

root / daemons / import-export @ 29da446a

History | View | Annotate | Download (19 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Import/export daemon.
23

    
24
"""
25

    
26
# pylint: disable-msg=C0103
27
# C0103: Invalid name import-export
28

    
29
import errno
30
import logging
31
import optparse
32
import os
33
import re
34
import select
35
import signal
36
import socket
37
import subprocess
38
import sys
39
import time
40
from cStringIO import StringIO
41

    
42
from ganeti import constants
43
from ganeti import cli
44
from ganeti import utils
45
from ganeti import serializer
46
from ganeti import objects
47
from ganeti import locking
48

    
49

    
50
#: Used to recognize point at which socat(1) starts to listen on its socket.
51
#: The local address is required for the remote peer to connect (in particular
52
#: the port number).
53
LISTENING_RE = re.compile(r"^listening on\s+"
54
                          r"AF=(?P<family>\d+)\s+"
55
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
56

    
57
#: Used to recognize point at which socat(1) is sending data over the wire
58
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
59
                              re.I)
60

    
61
SOCAT_LOG_DEBUG = "D"
62
SOCAT_LOG_INFO = "I"
63
SOCAT_LOG_NOTICE = "N"
64
SOCAT_LOG_WARNING = "W"
65
SOCAT_LOG_ERROR = "E"
66
SOCAT_LOG_FATAL = "F"
67

    
68
SOCAT_LOG_IGNORE = frozenset([
69
  SOCAT_LOG_DEBUG,
70
  SOCAT_LOG_INFO,
71
  SOCAT_LOG_NOTICE,
72
  ])
73

    
74
#: Socat buffer size: at most this many bytes are transferred per step
75
SOCAT_BUFSIZE = 1024 * 1024
76

    
77
#: How many lines to keep in the status file
78
MAX_RECENT_OUTPUT_LINES = 20
79

    
80
#: Don't update status file more than once every 5 seconds (unless forced)
81
MIN_UPDATE_INTERVAL = 5.0
82

    
83
#: Give child process up to 5 seconds to exit after sending a signal
84
CHILD_LINGER_TIMEOUT = 5.0
85

    
86
# Common options for socat
87
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
88
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
89
SOCAT_CONNECT_TIMEOUT = 60
90

    
91

    
92
# Global variable for options
93
options = None
94

    
95

    
96
class Error(Exception):
97
  """Generic exception"""
98

    
99

    
100
def SetupLogging():
101
  """Configures the logging module.
102

    
103
  """
104
  formatter = logging.Formatter("%(asctime)s: %(message)s")
105

    
106
  stderr_handler = logging.StreamHandler()
107
  stderr_handler.setFormatter(formatter)
108
  stderr_handler.setLevel(logging.NOTSET)
109

    
110
  root_logger = logging.getLogger("")
111
  root_logger.addHandler(stderr_handler)
112

    
113
  if options.debug:
114
    root_logger.setLevel(logging.NOTSET)
115
  elif options.verbose:
116
    root_logger.setLevel(logging.INFO)
117
  else:
118
    root_logger.setLevel(logging.ERROR)
119

    
120
  # Create special logger for child process output
121
  child_logger = logging.Logger("child output")
122
  child_logger.addHandler(stderr_handler)
123
  child_logger.setLevel(logging.NOTSET)
124

    
125
  return child_logger
126

    
127

    
128
def _VerifyListening(family, address, port):
129
  """Verify address given as listening address by socat.
130

    
131
  """
132
  # TODO: Implement IPv6 support
133
  if family != socket.AF_INET:
134
    raise Error("Address family %r not supported" % family)
135

    
136
  try:
137
    packed_address = socket.inet_pton(family, address)
138
  except socket.error:
139
    raise Error("Invalid address %r for family %s" % (address, family))
140

    
141
  return (socket.inet_ntop(family, packed_address), port)
142

    
143

    
144
class StatusFile:
145
  """Status file manager.
146

    
147
  """
148
  def __init__(self, path):
149
    """Initializes class.
150

    
151
    """
152
    self._path = path
153
    self._data = objects.ImportExportStatus(ctime=time.time(),
154
                                            mtime=None,
155
                                            recent_output=[])
156

    
157
  def AddRecentOutput(self, line):
158
    """Adds a new line of recent output.
159

    
160
    """
161
    self._data.recent_output.append(line)
162

    
163
    # Remove old lines
164
    del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
165

    
166
  def SetListenPort(self, port):
167
    """Sets the port the daemon is listening on.
168

    
169
    @type port: int
170
    @param port: TCP/UDP port
171

    
172
    """
173
    assert isinstance(port, (int, long)) and 0 < port < 2**16
174
    self._data.listen_port = port
175

    
176
  def GetListenPort(self):
177
    """Returns the port the daemon is listening on.
178

    
179
    """
180
    return self._data.listen_port
181

    
182
  def SetConnected(self):
183
    """Sets the connected flag.
184

    
185
    """
186
    self._data.connected = True
187

    
188
  def SetExitStatus(self, exit_status, error_message):
189
    """Sets the exit status and an error message.
190

    
191
    """
192
    # Require error message when status isn't 0
193
    assert exit_status == 0 or error_message
194

    
195
    self._data.exit_status = exit_status
196
    self._data.error_message = error_message
197

    
198
  def ExitStatusIsSuccess(self):
199
    """Returns whether the exit status means "success".
200

    
201
    """
202
    return not bool(self._data.error_message)
203

    
204
  def Update(self, force):
205
    """Updates the status file.
206

    
207
    @type force: bool
208
    @param force: Write status file in any case, not only when minimum interval
209
                  is expired
210

    
211
    """
212
    if not (force or
213
            self._data.mtime is None or
214
            time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
215
      return
216

    
217
    logging.debug("Updating status file %s", self._path)
218

    
219
    self._data.mtime = time.time()
220
    utils.WriteFile(self._path,
221
                    data=serializer.DumpJson(self._data.ToDict(), indent=True),
222
                    mode=0400)
223

    
224

    
225
def _ProcessSocatOutput(status_file, level, msg):
226
  """Interprets socat log output.
227

    
228
  """
229
  if level == SOCAT_LOG_NOTICE:
230
    if status_file.GetListenPort() is None:
231
      # TODO: Maybe implement timeout to not listen forever
232
      m = LISTENING_RE.match(msg)
233
      if m:
234
        (_, port) = _VerifyListening(int(m.group("family")), m.group("address"),
235
                                     int(m.group("port")))
236

    
237
        status_file.SetListenPort(port)
238
        return True
239

    
240
    m = TRANSFER_LOOP_RE.match(msg)
241
    if m:
242
      status_file.SetConnected()
243
      return True
244

    
245
  return False
246

    
247

    
248
def ProcessOutput(line, status_file, logger, socat):
249
  """Takes care of child process output.
250

    
251
  @param status_file: Status file manager
252
  @param logger: Child output logger
253
  @type socat: bool
254
  @param socat: Whether it's a socat output line
255
  @type line: string
256
  @param line: Child output line
257

    
258
  """
259
  force_update = False
260
  forward_line = line
261

    
262
  if socat:
263
    level = None
264
    parts = line.split(None, 4)
265

    
266
    if len(parts) == 5:
267
      (_, _, _, level, msg) = parts
268

    
269
      force_update = _ProcessSocatOutput(status_file, level, msg)
270

    
271
      if options.debug or (level and level not in SOCAT_LOG_IGNORE):
272
        forward_line = "socat: %s %s" % (level, msg)
273
      else:
274
        forward_line = None
275
    else:
276
      forward_line = "socat: %s" % line
277

    
278
  if forward_line:
279
    logger.info(forward_line)
280
    status_file.AddRecentOutput(forward_line)
281

    
282
  status_file.Update(force_update)
283

    
284

    
285
def GetBashCommand(cmd):
286
  """Prepares a command to be run in Bash.
287

    
288
  """
289
  return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
290

    
291

    
292
def GetSocatCommand(mode):
293
  """Returns the socat command.
294

    
295
  """
296
  common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
297
    "key=%s" % options.key,
298
    "cert=%s" % options.cert,
299
    "cafile=%s" % options.ca,
300
    ]
301

    
302
  if options.bind is not None:
303
    common_addr_opts.append("bind=%s" % options.bind)
304

    
305
  if mode == constants.IEM_IMPORT:
306
    if options.port is None:
307
      port = 0
308
    else:
309
      port = options.port
310

    
311
    addr1 = [
312
      "OPENSSL-LISTEN:%s" % port,
313
      "reuseaddr",
314
      ] + common_addr_opts
315
    addr2 = ["stdout"]
316

    
317
  elif mode == constants.IEM_EXPORT:
318
    addr1 = ["stdin"]
319
    addr2 = [
320
      "OPENSSL:%s:%s" % (options.host, options.port),
321
      "connect-timeout=%s" % SOCAT_CONNECT_TIMEOUT,
322
      ] + common_addr_opts
323

    
324
  else:
325
    raise Error("Invalid mode")
326

    
327
  for i in [addr1, addr2]:
328
    for value in i:
329
      if "," in value:
330
        raise Error("Comma not allowed in socat option value: %r" % value)
331

    
332
  return [
333
    constants.SOCAT_PATH,
334

    
335
    # Log to stderr
336
    "-ls",
337

    
338
    # Log level
339
    "-d", "-d",
340

    
341
    # Buffer size
342
    "-b%s" % SOCAT_BUFSIZE,
343

    
344
    # Unidirectional mode, the first address is only used for reading, and the
345
    # second address is only used for writing
346
    "-u",
347

    
348
    ",".join(addr1), ",".join(addr2)
349
    ]
350

    
351

    
352
def GetTransportCommand(mode, socat_stderr_fd):
353
  """Returns the command for the transport part of the daemon.
354

    
355
  @param mode: Daemon mode (import or export)
356
  @type socat_stderr_fd: int
357
  @param socat_stderr_fd: File descriptor socat should write its stderr to
358

    
359
  """
360
  socat_cmd = ("%s 2>&%d" %
361
               (utils.ShellQuoteArgs(GetSocatCommand(mode)),
362
                socat_stderr_fd))
363

    
364
  # TODO: Make compression configurable
365

    
366
  if mode == constants.IEM_IMPORT:
367
    transport_cmd = "%s | gunzip -c" % socat_cmd
368
  elif mode == constants.IEM_EXPORT:
369
    transport_cmd = "gzip -c | %s" % socat_cmd
370
  else:
371
    raise Error("Invalid mode")
372

    
373
  # TODO: Use "dd" to measure processed data (allows to give an ETA)
374
  # TODO: If a connection to socat is dropped (e.g. due to a wrong
375
  # certificate), socat should be restarted
376

    
377
  # TODO: Run transport as separate user
378
  # The transport uses its own shell to simplify running it as a separate user
379
  # in the future.
380
  return GetBashCommand(transport_cmd)
381

    
382

    
383
def GetCommand(mode, socat_stderr_fd):
384
  """Returns the complete child process command.
385

    
386
  """
387
  buf = StringIO()
388

    
389
  if options.cmd_prefix:
390
    buf.write(options.cmd_prefix)
391
    buf.write(" ")
392

    
393
  buf.write(utils.ShellQuoteArgs(GetTransportCommand(mode, socat_stderr_fd)))
394

    
395
  if options.cmd_suffix:
396
    buf.write(" ")
397
    buf.write(options.cmd_suffix)
398

    
399
  return GetBashCommand(buf.getvalue())
400

    
401

    
402
def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
403
                   signal_notify, signal_handler):
404
  """Handles the child processes' output.
405

    
406
  """
407
  assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
408
         "Other signals are not handled in this function"
409

    
410
  # Buffer size 0 is important, otherwise .read() with a specified length
411
  # might buffer data while poll(2) won't mark its file descriptor as
412
  # readable again.
413
  socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
414

    
415
  script_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
416
                                           child_logger, False)
417
  try:
418
    socat_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
419
                                            child_logger, True)
420
    try:
421
      fdmap = {
422
        child.stderr.fileno(): (child.stderr, script_stderr_lines),
423
        socat_stderr_read.fileno(): (socat_stderr_read, socat_stderr_lines),
424
        signal_notify.fileno(): (signal_notify, None),
425
        }
426

    
427
      poller = select.poll()
428
      for fd in fdmap:
429
        utils.SetNonblockFlag(fd, True)
430
        poller.register(fd, select.POLLIN)
431

    
432
      timeout_calculator = None
433
      while True:
434
        # Break out of loop if only signal notify FD is left
435
        if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
436
          break
437

    
438
        if timeout_calculator:
439
          timeout = timeout_calculator.Remaining() * 1000
440
          if timeout < 0:
441
            logging.info("Child process didn't exit in time")
442
            break
443
        else:
444
          timeout = None
445

    
446
        for fd, event in utils.RetryOnSignal(poller.poll, timeout):
447
          if event & (select.POLLIN | event & select.POLLPRI):
448
            (from_, to) = fdmap[fd]
449

    
450
            # Read up to 1 KB of data
451
            data = from_.read(1024)
452
            if data:
453
              if to:
454
                to.write(data)
455
              elif fd == signal_notify.fileno():
456
                # Signal handling
457
                if signal_handler.called:
458
                  signal_handler.Clear()
459
                  if timeout_calculator:
460
                    logging.info("Child process still has about %0.2f seconds"
461
                                 " to exit", timeout_calculator.Remaining())
462
                  else:
463
                    logging.info("Giving child process %0.2f seconds to exit",
464
                                 CHILD_LINGER_TIMEOUT)
465
                    timeout_calculator = \
466
                      locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
467
            else:
468
              poller.unregister(fd)
469
              del fdmap[fd]
470

    
471
          elif event & (select.POLLNVAL | select.POLLHUP |
472
                        select.POLLERR):
473
            poller.unregister(fd)
474
            del fdmap[fd]
475

    
476
        script_stderr_lines.flush()
477
        socat_stderr_lines.flush()
478

    
479
      # If there was a timeout calculator, we were waiting for the child to
480
      # finish, e.g. due to a signal
481
      return not bool(timeout_calculator)
482
    finally:
483
      socat_stderr_lines.close()
484
  finally:
485
    script_stderr_lines.close()
486

    
487

    
488
def ParseOptions():
489
  """Parses the options passed to the program.
490

    
491
  @return: Arguments to program
492

    
493
  """
494
  global options # pylint: disable-msg=W0603
495

    
496
  parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
497
                                        (constants.IEM_IMPORT,
498
                                         constants.IEM_EXPORT)))
499
  parser.add_option(cli.DEBUG_OPT)
500
  parser.add_option(cli.VERBOSE_OPT)
501
  parser.add_option("--key", dest="key", action="store", type="string",
502
                    help="RSA key file")
503
  parser.add_option("--cert", dest="cert", action="store", type="string",
504
                    help="X509 certificate file")
505
  parser.add_option("--ca", dest="ca", action="store", type="string",
506
                    help="X509 CA file")
507
  parser.add_option("--bind", dest="bind", action="store", type="string",
508
                    help="Bind address")
509
  parser.add_option("--host", dest="host", action="store", type="string",
510
                    help="Remote hostname")
511
  parser.add_option("--port", dest="port", action="store", type="int",
512
                    help="Remote port")
513
  parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
514
                    type="string", help="Command prefix")
515
  parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
516
                    type="string", help="Command suffix")
517

    
518
  (options, args) = parser.parse_args()
519

    
520
  if len(args) != 2:
521
    # Won't return
522
    parser.error("Expected exactly two arguments")
523

    
524
  (status_file_path, mode) = args
525

    
526
  if mode not in (constants.IEM_IMPORT,
527
                  constants.IEM_EXPORT):
528
    # Won't return
529
    parser.error("Invalid mode: %s" % mode)
530

    
531
  return (status_file_path, mode)
532

    
533

    
534
class ChildProcess(subprocess.Popen):
535
  def __init__(self, cmd, noclose_fds):
536
    """Initializes this class.
537

    
538
    """
539
    self._noclose_fds = noclose_fds
540

    
541
    # Not using close_fds because doing so would also close the socat stderr
542
    # pipe, which we still need.
543
    subprocess.Popen.__init__(self, cmd, shell=False, close_fds=False,
544
                              stderr=subprocess.PIPE, stdout=None, stdin=None,
545
                              preexec_fn=self._ChildPreexec)
546
    self._SetProcessGroup()
547

    
548
  def _ChildPreexec(self):
549
    """Called before child executable is execve'd.
550

    
551
    """
552
    # Move to separate process group. By sending a signal to its process group
553
    # we can kill the child process and all grandchildren.
554
    os.setpgid(0, 0)
555

    
556
    # Close almost all file descriptors
557
    utils.CloseFDs(noclose_fds=self._noclose_fds)
558

    
559
  def _SetProcessGroup(self):
560
    """Sets the child's process group.
561

    
562
    """
563
    assert self.pid, "Can't be called in child process"
564

    
565
    # Avoid race condition by setting child's process group (as good as
566
    # possible in Python) before sending signals to child. For an
567
    # explanation, see preexec function for child.
568
    try:
569
      os.setpgid(self.pid, self.pid)
570
    except EnvironmentError, err:
571
      # If the child process was faster we receive EPERM or EACCES
572
      if err.errno not in (errno.EPERM, errno.EACCES):
573
        raise
574

    
575
  def Kill(self, signum):
576
    """Sends signal to child process.
577

    
578
    """
579
    logging.info("Sending signal %s to child process", signum)
580
    os.killpg(self.pid, signum)
581

    
582
  def ForceQuit(self):
583
    """Ensure child process is no longer running.
584

    
585
    """
586
    # Final check if child process is still alive
587
    if utils.RetryOnSignal(self.poll) is None:
588
      logging.error("Child process still alive, sending SIGKILL")
589
      self.Kill(signal.SIGKILL)
590
      utils.RetryOnSignal(self.wait)
591

    
592

    
593
def main():
594
  """Main function.
595

    
596
  """
597
  # Option parsing
598
  (status_file_path, mode) = ParseOptions()
599

    
600
  # Configure logging
601
  child_logger = SetupLogging()
602

    
603
  status_file = StatusFile(status_file_path)
604
  try:
605
    try:
606
      # Pipe to receive socat's stderr output
607
      (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
608

    
609
      # Get child process command
610
      cmd = GetCommand(mode, socat_stderr_write_fd)
611

    
612
      logging.debug("Starting command %r", cmd)
613

    
614
      # Start child process
615
      child = ChildProcess(cmd, [socat_stderr_write_fd])
616
      try:
617
        def _ForwardSignal(signum, _):
618
          """Forwards signals to child process.
619

    
620
          """
621
          child.Kill(signum)
622

    
623
        signal_wakeup = utils.SignalWakeupFd()
624
        try:
625
          # TODO: There is a race condition between starting the child and
626
          # handling the signals here. While there might be a way to work around
627
          # it by registering the handlers before starting the child and
628
          # deferring sent signals until the child is available, doing so can be
629
          # complicated.
630
          signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
631
                                               handler_fn=_ForwardSignal,
632
                                               wakeup=signal_wakeup)
633
          try:
634
            # Close child's side
635
            utils.RetryOnSignal(os.close, socat_stderr_write_fd)
636

    
637
            if ProcessChildIO(child, socat_stderr_read_fd, status_file,
638
                              child_logger, signal_wakeup,
639
                              signal_handler):
640
              # The child closed all its file descriptors and there was no
641
              # signal
642
              # TODO: Implement timeout instead of waiting indefinitely
643
              utils.RetryOnSignal(child.wait)
644
          finally:
645
            signal_handler.Reset()
646
        finally:
647
          signal_wakeup.Reset()
648
      finally:
649
        child.ForceQuit()
650

    
651
      if child.returncode == 0:
652
        errmsg = None
653
      elif child.returncode < 0:
654
        errmsg = "Exited due to signal %s" % (-child.returncode, )
655
      else:
656
        errmsg = "Exited with status %s" % (child.returncode, )
657

    
658
      status_file.SetExitStatus(child.returncode, errmsg)
659
    except Exception, err: # pylint: disable-msg=W0703
660
      logging.exception("Unhandled error occurred")
661
      status_file.SetExitStatus(constants.EXIT_FAILURE,
662
                                "Unhandled error occurred: %s" % (err, ))
663

    
664
    if status_file.ExitStatusIsSuccess():
665
      sys.exit(constants.EXIT_SUCCESS)
666

    
667
    sys.exit(constants.EXIT_FAILURE)
668
  finally:
669
    status_file.Update(True)
670

    
671

    
672
if __name__ == "__main__":
673
  main()