Statistics
| Branch: | Tag: | Revision:

root / daemons / import-export @ 043f2292

History | View | Annotate | Download (20.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Import/export daemon.
23

    
24
"""
25

    
26
# pylint: disable-msg=C0103
27
# C0103: Invalid name import-export
28

    
29
import errno
30
import logging
31
import optparse
32
import os
33
import re
34
import select
35
import signal
36
import socket
37
import subprocess
38
import sys
39
import time
40
from cStringIO import StringIO
41

    
42
from ganeti import constants
43
from ganeti import cli
44
from ganeti import utils
45
from ganeti import serializer
46
from ganeti import objects
47
from ganeti import locking
48

    
49

    
50
#: Used to recognize point at which socat(1) starts to listen on its socket.
51
#: The local address is required for the remote peer to connect (in particular
52
#: the port number).
53
LISTENING_RE = re.compile(r"^listening on\s+"
54
                          r"AF=(?P<family>\d+)\s+"
55
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
56

    
57
#: Used to recognize point at which socat(1) is sending data over the wire
58
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
59
                              re.I)
60

    
61
SOCAT_LOG_DEBUG = "D"
62
SOCAT_LOG_INFO = "I"
63
SOCAT_LOG_NOTICE = "N"
64
SOCAT_LOG_WARNING = "W"
65
SOCAT_LOG_ERROR = "E"
66
SOCAT_LOG_FATAL = "F"
67

    
68
SOCAT_LOG_IGNORE = frozenset([
69
  SOCAT_LOG_DEBUG,
70
  SOCAT_LOG_INFO,
71
  SOCAT_LOG_NOTICE,
72
  ])
73

    
74
#: Socat buffer size: at most this many bytes are transferred per step
75
SOCAT_BUFSIZE = 1024 * 1024
76

    
77
#: How many lines to keep in the status file
78
MAX_RECENT_OUTPUT_LINES = 20
79

    
80
#: Don't update status file more than once every 5 seconds (unless forced)
81
MIN_UPDATE_INTERVAL = 5.0
82

    
83
#: Give child process up to 5 seconds to exit after sending a signal
84
CHILD_LINGER_TIMEOUT = 5.0
85

    
86
#: How long to wait for a connection to be established
87
DEFAULT_CONNECT_TIMEOUT = 60
88

    
89
# Common options for socat
90
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
91
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
92

    
93

    
94
# Global variable for options
95
options = None
96

    
97

    
98
class Error(Exception):
99
  """Generic exception"""
100

    
101

    
102
def SetupLogging():
103
  """Configures the logging module.
104

    
105
  """
106
  formatter = logging.Formatter("%(asctime)s: %(message)s")
107

    
108
  stderr_handler = logging.StreamHandler()
109
  stderr_handler.setFormatter(formatter)
110
  stderr_handler.setLevel(logging.NOTSET)
111

    
112
  root_logger = logging.getLogger("")
113
  root_logger.addHandler(stderr_handler)
114

    
115
  if options.debug:
116
    root_logger.setLevel(logging.NOTSET)
117
  elif options.verbose:
118
    root_logger.setLevel(logging.INFO)
119
  else:
120
    root_logger.setLevel(logging.ERROR)
121

    
122
  # Create special logger for child process output
123
  child_logger = logging.Logger("child output")
124
  child_logger.addHandler(stderr_handler)
125
  child_logger.setLevel(logging.NOTSET)
126

    
127
  return child_logger
128

    
129

    
130
def _VerifyListening(family, address, port):
131
  """Verify address given as listening address by socat.
132

    
133
  """
134
  # TODO: Implement IPv6 support
135
  if family != socket.AF_INET:
136
    raise Error("Address family %r not supported" % family)
137

    
138
  try:
139
    packed_address = socket.inet_pton(family, address)
140
  except socket.error:
141
    raise Error("Invalid address %r for family %s" % (address, family))
142

    
143
  return (socket.inet_ntop(family, packed_address), port)
144

    
145

    
146
class StatusFile:
147
  """Status file manager.
148

    
149
  """
150
  def __init__(self, path):
151
    """Initializes class.
152

    
153
    """
154
    self._path = path
155
    self._data = objects.ImportExportStatus(ctime=time.time(),
156
                                            mtime=None,
157
                                            recent_output=[])
158

    
159
  def AddRecentOutput(self, line):
160
    """Adds a new line of recent output.
161

    
162
    """
163
    self._data.recent_output.append(line)
164

    
165
    # Remove old lines
166
    del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
167

    
168
  def SetListenPort(self, port):
169
    """Sets the port the daemon is listening on.
170

    
171
    @type port: int
172
    @param port: TCP/UDP port
173

    
174
    """
175
    assert isinstance(port, (int, long)) and 0 < port < 2**16
176
    self._data.listen_port = port
177

    
178
  def GetListenPort(self):
179
    """Returns the port the daemon is listening on.
180

    
181
    """
182
    return self._data.listen_port
183

    
184
  def SetConnected(self):
185
    """Sets the connected flag.
186

    
187
    """
188
    self._data.connected = True
189

    
190
  def GetConnected(self):
191
    """Determines whether the daemon is connected.
192

    
193
    """
194
    return self._data.connected
195

    
196
  def SetExitStatus(self, exit_status, error_message):
197
    """Sets the exit status and an error message.
198

    
199
    """
200
    # Require error message when status isn't 0
201
    assert exit_status == 0 or error_message
202

    
203
    self._data.exit_status = exit_status
204
    self._data.error_message = error_message
205

    
206
  def ExitStatusIsSuccess(self):
207
    """Returns whether the exit status means "success".
208

    
209
    """
210
    return not bool(self._data.error_message)
211

    
212
  def Update(self, force):
213
    """Updates the status file.
214

    
215
    @type force: bool
216
    @param force: Write status file in any case, not only when minimum interval
217
                  is expired
218

    
219
    """
220
    if not (force or
221
            self._data.mtime is None or
222
            time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
223
      return
224

    
225
    logging.debug("Updating status file %s", self._path)
226

    
227
    self._data.mtime = time.time()
228
    utils.WriteFile(self._path,
229
                    data=serializer.DumpJson(self._data.ToDict(), indent=True),
230
                    mode=0400)
231

    
232

    
233
def _ProcessSocatOutput(status_file, level, msg):
234
  """Interprets socat log output.
235

    
236
  """
237
  if level == SOCAT_LOG_NOTICE:
238
    if status_file.GetListenPort() is None:
239
      # TODO: Maybe implement timeout to not listen forever
240
      m = LISTENING_RE.match(msg)
241
      if m:
242
        (_, port) = _VerifyListening(int(m.group("family")), m.group("address"),
243
                                     int(m.group("port")))
244

    
245
        status_file.SetListenPort(port)
246
        return True
247

    
248
    if not status_file.GetConnected():
249
      m = TRANSFER_LOOP_RE.match(msg)
250
      if m:
251
        status_file.SetConnected()
252
        return True
253

    
254
  return False
255

    
256

    
257
def ProcessOutput(line, status_file, logger, socat):
258
  """Takes care of child process output.
259

    
260
  @param status_file: Status file manager
261
  @param logger: Child output logger
262
  @type socat: bool
263
  @param socat: Whether it's a socat output line
264
  @type line: string
265
  @param line: Child output line
266

    
267
  """
268
  force_update = False
269
  forward_line = line
270

    
271
  if socat:
272
    level = None
273
    parts = line.split(None, 4)
274

    
275
    if len(parts) == 5:
276
      (_, _, _, level, msg) = parts
277

    
278
      force_update = _ProcessSocatOutput(status_file, level, msg)
279

    
280
      if options.debug or (level and level not in SOCAT_LOG_IGNORE):
281
        forward_line = "socat: %s %s" % (level, msg)
282
      else:
283
        forward_line = None
284
    else:
285
      forward_line = "socat: %s" % line
286

    
287
  if forward_line:
288
    logger.info(forward_line)
289
    status_file.AddRecentOutput(forward_line)
290

    
291
  status_file.Update(force_update)
292

    
293

    
294
def GetBashCommand(cmd):
295
  """Prepares a command to be run in Bash.
296

    
297
  """
298
  return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
299

    
300

    
301
def GetSocatCommand(mode):
302
  """Returns the socat command.
303

    
304
  """
305
  common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
306
    "key=%s" % options.key,
307
    "cert=%s" % options.cert,
308
    "cafile=%s" % options.ca,
309
    ]
310

    
311
  if options.bind is not None:
312
    common_addr_opts.append("bind=%s" % options.bind)
313

    
314
  if mode == constants.IEM_IMPORT:
315
    if options.port is None:
316
      port = 0
317
    else:
318
      port = options.port
319

    
320
    addr1 = [
321
      "OPENSSL-LISTEN:%s" % port,
322
      "reuseaddr",
323

    
324
      # Retry to listen if connection wasn't established successfully, up to
325
      # 100 times a second. Note that this still leaves room for DoS attacks.
326
      "forever",
327
      "intervall=0.01",
328
      ] + common_addr_opts
329
    addr2 = ["stdout"]
330

    
331
  elif mode == constants.IEM_EXPORT:
332
    addr1 = ["stdin"]
333
    addr2 = [
334
      "OPENSSL:%s:%s" % (options.host, options.port),
335

    
336
      # How long to wait per connection attempt
337
      "connect-timeout=%s" % options.connect_timeout,
338

    
339
      # Retry a few times before giving up to connect (once per second)
340
      "retry=%s" % options.connect_retries,
341
      "intervall=1",
342
      ] + common_addr_opts
343

    
344
  else:
345
    raise Error("Invalid mode")
346

    
347
  for i in [addr1, addr2]:
348
    for value in i:
349
      if "," in value:
350
        raise Error("Comma not allowed in socat option value: %r" % value)
351

    
352
  return [
353
    constants.SOCAT_PATH,
354

    
355
    # Log to stderr
356
    "-ls",
357

    
358
    # Log level
359
    "-d", "-d",
360

    
361
    # Buffer size
362
    "-b%s" % SOCAT_BUFSIZE,
363

    
364
    # Unidirectional mode, the first address is only used for reading, and the
365
    # second address is only used for writing
366
    "-u",
367

    
368
    ",".join(addr1), ",".join(addr2)
369
    ]
370

    
371

    
372
def GetTransportCommand(mode, socat_stderr_fd):
373
  """Returns the command for the transport part of the daemon.
374

    
375
  @param mode: Daemon mode (import or export)
376
  @type socat_stderr_fd: int
377
  @param socat_stderr_fd: File descriptor socat should write its stderr to
378

    
379
  """
380
  socat_cmd = ("%s 2>&%d" %
381
               (utils.ShellQuoteArgs(GetSocatCommand(mode)),
382
                socat_stderr_fd))
383

    
384
  # TODO: Make compression configurable
385

    
386
  if mode == constants.IEM_IMPORT:
387
    transport_cmd = "%s | gunzip -c" % socat_cmd
388
  elif mode == constants.IEM_EXPORT:
389
    transport_cmd = "gzip -c | %s" % socat_cmd
390
  else:
391
    raise Error("Invalid mode")
392

    
393
  # TODO: Use "dd" to measure processed data (allows to give an ETA)
394

    
395
  # TODO: Run transport as separate user
396
  # The transport uses its own shell to simplify running it as a separate user
397
  # in the future.
398
  return GetBashCommand(transport_cmd)
399

    
400

    
401
def GetCommand(mode, socat_stderr_fd):
402
  """Returns the complete child process command.
403

    
404
  """
405
  buf = StringIO()
406

    
407
  if options.cmd_prefix:
408
    buf.write(options.cmd_prefix)
409
    buf.write(" ")
410

    
411
  buf.write(utils.ShellQuoteArgs(GetTransportCommand(mode, socat_stderr_fd)))
412

    
413
  if options.cmd_suffix:
414
    buf.write(" ")
415
    buf.write(options.cmd_suffix)
416

    
417
  return GetBashCommand(buf.getvalue())
418

    
419

    
420
def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
421
                   signal_notify, signal_handler, mode):
422
  """Handles the child processes' output.
423

    
424
  """
425
  assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
426
         "Other signals are not handled in this function"
427

    
428
  # Buffer size 0 is important, otherwise .read() with a specified length
429
  # might buffer data while poll(2) won't mark its file descriptor as
430
  # readable again.
431
  socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
432

    
433
  script_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
434
                                           child_logger, False)
435
  try:
436
    socat_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
437
                                            child_logger, True)
438
    try:
439
      fdmap = {
440
        child.stderr.fileno(): (child.stderr, script_stderr_lines),
441
        socat_stderr_read.fileno(): (socat_stderr_read, socat_stderr_lines),
442
        signal_notify.fileno(): (signal_notify, None),
443
        }
444

    
445
      poller = select.poll()
446
      for fd in fdmap:
447
        utils.SetNonblockFlag(fd, True)
448
        poller.register(fd, select.POLLIN)
449

    
450
      if options.connect_timeout and mode == constants.IEM_IMPORT:
451
        listen_timeout = locking.RunningTimeout(options.connect_timeout, True)
452
      else:
453
        listen_timeout = None
454

    
455
      exit_timeout = None
456

    
457
      while True:
458
        # Break out of loop if only signal notify FD is left
459
        if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
460
          break
461

    
462
        timeout = None
463

    
464
        if listen_timeout and not exit_timeout:
465
          if status_file.GetConnected():
466
            listen_timeout = None
467
          elif listen_timeout.Remaining() < 0:
468
            logging.info("Child process didn't establish connection in time")
469
            child.Kill(signal.SIGTERM)
470
            exit_timeout = \
471
              locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
472
            # Next block will calculate timeout
473
          else:
474
            # Not yet connected, check again in a second
475
            timeout = 1000
476

    
477
        if exit_timeout:
478
          timeout = exit_timeout.Remaining() * 1000
479
          if timeout < 0:
480
            logging.info("Child process didn't exit in time")
481
            break
482

    
483
        for fd, event in utils.RetryOnSignal(poller.poll, timeout):
484
          if event & (select.POLLIN | event & select.POLLPRI):
485
            (from_, to) = fdmap[fd]
486

    
487
            # Read up to 1 KB of data
488
            data = from_.read(1024)
489
            if data:
490
              if to:
491
                to.write(data)
492
              elif fd == signal_notify.fileno():
493
                # Signal handling
494
                if signal_handler.called:
495
                  signal_handler.Clear()
496
                  if exit_timeout:
497
                    logging.info("Child process still has about %0.2f seconds"
498
                                 " to exit", exit_timeout.Remaining())
499
                  else:
500
                    logging.info("Giving child process %0.2f seconds to exit",
501
                                 CHILD_LINGER_TIMEOUT)
502
                    exit_timeout = \
503
                      locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
504
            else:
505
              poller.unregister(fd)
506
              del fdmap[fd]
507

    
508
          elif event & (select.POLLNVAL | select.POLLHUP |
509
                        select.POLLERR):
510
            poller.unregister(fd)
511
            del fdmap[fd]
512

    
513
        script_stderr_lines.flush()
514
        socat_stderr_lines.flush()
515

    
516
      # If there was a timeout calculator, we were waiting for the child to
517
      # finish, e.g. due to a signal
518
      return not bool(exit_timeout)
519
    finally:
520
      socat_stderr_lines.close()
521
  finally:
522
    script_stderr_lines.close()
523

    
524

    
525
def ParseOptions():
526
  """Parses the options passed to the program.
527

    
528
  @return: Arguments to program
529

    
530
  """
531
  global options # pylint: disable-msg=W0603
532

    
533
  parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
534
                                        (constants.IEM_IMPORT,
535
                                         constants.IEM_EXPORT)))
536
  parser.add_option(cli.DEBUG_OPT)
537
  parser.add_option(cli.VERBOSE_OPT)
538
  parser.add_option("--key", dest="key", action="store", type="string",
539
                    help="RSA key file")
540
  parser.add_option("--cert", dest="cert", action="store", type="string",
541
                    help="X509 certificate file")
542
  parser.add_option("--ca", dest="ca", action="store", type="string",
543
                    help="X509 CA file")
544
  parser.add_option("--bind", dest="bind", action="store", type="string",
545
                    help="Bind address")
546
  parser.add_option("--host", dest="host", action="store", type="string",
547
                    help="Remote hostname")
548
  parser.add_option("--port", dest="port", action="store", type="int",
549
                    help="Remote port")
550
  parser.add_option("--connect-retries", dest="connect_retries", action="store",
551
                    type="int", default=0,
552
                    help=("How many times the connection should be retried"
553
                          " (export only)"))
554
  parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
555
                    type="int", default=DEFAULT_CONNECT_TIMEOUT,
556
                    help="Timeout for connection to be established (seconds)")
557
  parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
558
                    type="string", help="Command prefix")
559
  parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
560
                    type="string", help="Command suffix")
561

    
562
  (options, args) = parser.parse_args()
563

    
564
  if len(args) != 2:
565
    # Won't return
566
    parser.error("Expected exactly two arguments")
567

    
568
  (status_file_path, mode) = args
569

    
570
  if mode not in (constants.IEM_IMPORT,
571
                  constants.IEM_EXPORT):
572
    # Won't return
573
    parser.error("Invalid mode: %s" % mode)
574

    
575
  return (status_file_path, mode)
576

    
577

    
578
class ChildProcess(subprocess.Popen):
579
  def __init__(self, cmd, noclose_fds):
580
    """Initializes this class.
581

    
582
    """
583
    self._noclose_fds = noclose_fds
584

    
585
    # Not using close_fds because doing so would also close the socat stderr
586
    # pipe, which we still need.
587
    subprocess.Popen.__init__(self, cmd, shell=False, close_fds=False,
588
                              stderr=subprocess.PIPE, stdout=None, stdin=None,
589
                              preexec_fn=self._ChildPreexec)
590
    self._SetProcessGroup()
591

    
592
  def _ChildPreexec(self):
593
    """Called before child executable is execve'd.
594

    
595
    """
596
    # Move to separate process group. By sending a signal to its process group
597
    # we can kill the child process and all grandchildren.
598
    os.setpgid(0, 0)
599

    
600
    # Close almost all file descriptors
601
    utils.CloseFDs(noclose_fds=self._noclose_fds)
602

    
603
  def _SetProcessGroup(self):
604
    """Sets the child's process group.
605

    
606
    """
607
    assert self.pid, "Can't be called in child process"
608

    
609
    # Avoid race condition by setting child's process group (as good as
610
    # possible in Python) before sending signals to child. For an
611
    # explanation, see preexec function for child.
612
    try:
613
      os.setpgid(self.pid, self.pid)
614
    except EnvironmentError, err:
615
      # If the child process was faster we receive EPERM or EACCES
616
      if err.errno not in (errno.EPERM, errno.EACCES):
617
        raise
618

    
619
  def Kill(self, signum):
620
    """Sends signal to child process.
621

    
622
    """
623
    logging.info("Sending signal %s to child process", signum)
624
    os.killpg(self.pid, signum)
625

    
626
  def ForceQuit(self):
627
    """Ensure child process is no longer running.
628

    
629
    """
630
    # Final check if child process is still alive
631
    if utils.RetryOnSignal(self.poll) is None:
632
      logging.error("Child process still alive, sending SIGKILL")
633
      self.Kill(signal.SIGKILL)
634
      utils.RetryOnSignal(self.wait)
635

    
636

    
637
def main():
638
  """Main function.
639

    
640
  """
641
  # Option parsing
642
  (status_file_path, mode) = ParseOptions()
643

    
644
  # Configure logging
645
  child_logger = SetupLogging()
646

    
647
  status_file = StatusFile(status_file_path)
648
  try:
649
    try:
650
      # Pipe to receive socat's stderr output
651
      (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
652

    
653
      # Get child process command
654
      cmd = GetCommand(mode, socat_stderr_write_fd)
655

    
656
      logging.debug("Starting command %r", cmd)
657

    
658
      # Start child process
659
      child = ChildProcess(cmd, [socat_stderr_write_fd])
660
      try:
661
        def _ForwardSignal(signum, _):
662
          """Forwards signals to child process.
663

    
664
          """
665
          child.Kill(signum)
666

    
667
        signal_wakeup = utils.SignalWakeupFd()
668
        try:
669
          # TODO: There is a race condition between starting the child and
670
          # handling the signals here. While there might be a way to work around
671
          # it by registering the handlers before starting the child and
672
          # deferring sent signals until the child is available, doing so can be
673
          # complicated.
674
          signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
675
                                               handler_fn=_ForwardSignal,
676
                                               wakeup=signal_wakeup)
677
          try:
678
            # Close child's side
679
            utils.RetryOnSignal(os.close, socat_stderr_write_fd)
680

    
681
            if ProcessChildIO(child, socat_stderr_read_fd, status_file,
682
                              child_logger, signal_wakeup, signal_handler,
683
                              mode):
684
              # The child closed all its file descriptors and there was no
685
              # signal
686
              # TODO: Implement timeout instead of waiting indefinitely
687
              utils.RetryOnSignal(child.wait)
688
          finally:
689
            signal_handler.Reset()
690
        finally:
691
          signal_wakeup.Reset()
692
      finally:
693
        child.ForceQuit()
694

    
695
      if child.returncode == 0:
696
        errmsg = None
697
      elif child.returncode < 0:
698
        errmsg = "Exited due to signal %s" % (-child.returncode, )
699
      else:
700
        errmsg = "Exited with status %s" % (child.returncode, )
701

    
702
      status_file.SetExitStatus(child.returncode, errmsg)
703
    except Exception, err: # pylint: disable-msg=W0703
704
      logging.exception("Unhandled error occurred")
705
      status_file.SetExitStatus(constants.EXIT_FAILURE,
706
                                "Unhandled error occurred: %s" % (err, ))
707

    
708
    if status_file.ExitStatusIsSuccess():
709
      sys.exit(constants.EXIT_SUCCESS)
710

    
711
    sys.exit(constants.EXIT_FAILURE)
712
  finally:
713
    status_file.Update(True)
714

    
715

    
716
if __name__ == "__main__":
717
  main()