Revision 2d76b580

b/Makefile.am
241 241
	tools/lvmstrap \
242 242
	tools/sanitize-config
243 243

  
244
pkglib_python_scripts = \
245
	daemons/import-export
246

  
244 247
pkglib_SCRIPTS = \
245
	daemons/daemon-util
248
	daemons/daemon-util \
249
	$(pkglib_python_scripts)
246 250

  
247 251
EXTRA_DIST = \
248 252
	NEWS \
......
256 260
	$(RUN_IN_TEMPDIR) \
257 261
	daemons/daemon-util.in \
258 262
	daemons/ganeti-cleaner.in \
263
	$(pkglib_python_scripts) \
259 264
	devel/upload.in \
260 265
	$(docdot) \
261 266
	$(docpng) \
......
322 327
	test/data/cert1.pem \
323 328
	test/data/proc_drbd8.txt \
324 329
	test/data/proc_drbd80-emptyline.txt \
325
	test/data/proc_drbd83.txt
330
	test/data/proc_drbd83.txt \
331
	test/import-export_unittest-helper
326 332

  
327 333
python_tests = \
328 334
	test/ganeti.backend_unittest.py \
......
351 357

  
352 358
dist_TESTS = \
353 359
	test/daemon-util_unittest.bash \
360
	test/import-export_unittest.bash \
354 361
	$(python_tests)
355 362

  
356 363
nodist_TESTS =
......
368 375
all_python_code = \
369 376
	$(dist_sbin_SCRIPTS) \
370 377
	$(dist_tools_SCRIPTS) \
378
	$(pkglib_python_scripts) \
371 379
	$(python_tests) \
372 380
	$(pkgpython_PYTHON) \
373 381
	$(hypervisor_PYTHON) \
......
379 387
srclink_files = \
380 388
	man/footer.sgml \
381 389
	test/daemon-util_unittest.bash \
390
	test/import-export_unittest.bash \
382 391
	$(all_python_code)
383 392

  
384 393
check_python_code = \
......
389 398
	ganeti \
390 399
	$(dist_sbin_SCRIPTS) \
391 400
	$(dist_tools_SCRIPTS) \
401
	$(pkglib_python_scripts) \
392 402
	$(BUILD_BASH_COMPLETION)
393 403

  
394 404
test/daemon-util_unittest.bash: daemons/daemon-util
b/daemons/import-export
1
#!/usr/bin/python
2
#
3

  
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21

  
22
"""Import/export daemon.
23

  
24
"""
25

  
26
# pylint: disable-msg=C0103
27
# C0103: Invalid name import-export
28

  
29
import errno
30
import logging
31
import optparse
32
import os
33
import re
34
import select
35
import signal
36
import socket
37
import subprocess
38
import sys
39
import time
40
from cStringIO import StringIO
41

  
42
from ganeti import constants
43
from ganeti import cli
44
from ganeti import utils
45
from ganeti import serializer
46
from ganeti import objects
47
from ganeti import locking
48

  
49

  
50
#: Used to recognize point at which socat(1) starts to listen on its socket.
51
#: The local address is required for the remote peer to connect (in particular
52
#: the port number).
53
LISTENING_RE = re.compile(r"^listening on\s+"
54
                          r"AF=(?P<family>\d+)\s+"
55
                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
56

  
57
#: Used to recognize point at which socat(1) is sending data over the wire
58
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
59
                              re.I)
60

  
61
SOCAT_LOG_DEBUG = "D"
62
SOCAT_LOG_INFO = "I"
63
SOCAT_LOG_NOTICE = "N"
64
SOCAT_LOG_WARNING = "W"
65
SOCAT_LOG_ERROR = "E"
66
SOCAT_LOG_FATAL = "F"
67

  
68
SOCAT_LOG_IGNORE = frozenset([
69
  SOCAT_LOG_DEBUG,
70
  SOCAT_LOG_INFO,
71
  SOCAT_LOG_NOTICE,
72
  ])
73

  
74
#: Socat buffer size: at most this many bytes are transferred per step
75
SOCAT_BUFSIZE = 1024 * 1024
76

  
77
#: How many lines to keep in the status file
78
MAX_RECENT_OUTPUT_LINES = 20
79

  
80
#: Don't update status file more than once every 5 seconds (unless forced)
81
MIN_UPDATE_INTERVAL = 5.0
82

  
83
#: Give child process up to 5 seconds to exit after sending a signal
84
CHILD_LINGER_TIMEOUT = 5.0
85

  
86
# Common options for socat
87
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
88
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
89
SOCAT_CONNECT_TIMEOUT = 60
90

  
91

  
92
# Global variable for options
93
options = None
94

  
95

  
96
class Error(Exception):
97
  """Generic exception"""
98

  
99

  
100
def SetupLogging():
101
  """Configures the logging module.
102

  
103
  """
104
  formatter = logging.Formatter("%(asctime)s: %(message)s")
105

  
106
  stderr_handler = logging.StreamHandler()
107
  stderr_handler.setFormatter(formatter)
108
  stderr_handler.setLevel(logging.NOTSET)
109

  
110
  root_logger = logging.getLogger("")
111
  root_logger.addHandler(stderr_handler)
112

  
113
  if options.debug:
114
    root_logger.setLevel(logging.NOTSET)
115
  elif options.verbose:
116
    root_logger.setLevel(logging.INFO)
117
  else:
118
    root_logger.setLevel(logging.ERROR)
119

  
120
  # Create special logger for child process output
121
  child_logger = logging.Logger("child output")
122
  child_logger.addHandler(stderr_handler)
123
  child_logger.setLevel(logging.NOTSET)
124

  
125
  return child_logger
126

  
127

  
128
def _VerifyListening(family, address, port):
129
  """Verify address given as listening address by socat.
130

  
131
  """
132
  # TODO: Implement IPv6 support
133
  if family != socket.AF_INET:
134
    raise Error("Address family %r not supported" % family)
135

  
136
  try:
137
    packed_address = socket.inet_pton(family, address)
138
  except socket.error:
139
    raise Error("Invalid address %r for family %s" % (address, family))
140

  
141
  return (socket.inet_ntop(family, packed_address), port)
142

  
143

  
144
class StatusFile:
145
  """Status file manager.
146

  
147
  """
148
  def __init__(self, path):
149
    """Initializes class.
150

  
151
    """
152
    self._path = path
153
    self._data = objects.ImportExportStatus(ctime=time.time(),
154
                                            mtime=None,
155
                                            recent_output=[])
156

  
157
  def AddRecentOutput(self, line):
158
    """Adds a new line of recent output.
159

  
160
    """
161
    self._data.recent_output.append(line)
162

  
163
    # Remove old lines
164
    del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
165

  
166
  def SetListenPort(self, port):
167
    """Sets the port the daemon is listening on.
168

  
169
    @type port: int
170
    @param port: TCP/UDP port
171

  
172
    """
173
    assert isinstance(port, (int, long)) and 0 < port < 2**16
174
    self._data.listen_port = port
175

  
176
  def GetListenPort(self):
177
    """Returns the port the daemon is listening on.
178

  
179
    """
180
    return self._data.listen_port
181

  
182
  def SetConnected(self):
183
    """Sets the connected flag.
184

  
185
    """
186
    self._data.connected = True
187

  
188
  def SetExitStatus(self, exit_status, error_message):
189
    """Sets the exit status and an error message.
190

  
191
    """
192
    # Require error message when status isn't 0
193
    assert exit_status == 0 or error_message
194

  
195
    self._data.exit_status = exit_status
196
    self._data.error_message = error_message
197

  
198
  def ExitStatusIsSuccess(self):
199
    """Returns whether the exit status means "success".
200

  
201
    """
202
    return not bool(self._data.error_message)
203

  
204
  def Update(self, force):
205
    """Updates the status file.
206

  
207
    @type force: bool
208
    @param force: Write status file in any case, not only when minimum interval
209
                  is expired
210

  
211
    """
212
    if not (force or
213
            self._data.mtime is None or
214
            time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
215
      return
216

  
217
    logging.debug("Updating status file %s", self._path)
218

  
219
    self._data.mtime = time.time()
220
    utils.WriteFile(self._path,
221
                    data=serializer.DumpJson(self._data.ToDict(), indent=True),
222
                    mode=0400)
223

  
224

  
225
def _ProcessSocatOutput(status_file, level, msg):
226
  """Interprets socat log output.
227

  
228
  """
229
  if level == SOCAT_LOG_NOTICE:
230
    if status_file.GetListenPort() is None:
231
      # TODO: Maybe implement timeout to not listen forever
232
      m = LISTENING_RE.match(msg)
233
      if m:
234
        (_, port) = _VerifyListening(int(m.group("family")), m.group("address"),
235
                                     int(m.group("port")))
236

  
237
        status_file.SetListenPort(port)
238
        return True
239

  
240
    m = TRANSFER_LOOP_RE.match(msg)
241
    if m:
242
      status_file.SetConnected()
243
      return True
244

  
245
  return False
246

  
247

  
248
def ProcessOutput(line, status_file, logger, socat):
249
  """Takes care of child process output.
250

  
251
  @param status_file: Status file manager
252
  @param logger: Child output logger
253
  @type socat: bool
254
  @param socat: Whether it's a socat output line
255
  @type line: string
256
  @param line: Child output line
257

  
258
  """
259
  force_update = False
260
  forward_line = line
261

  
262
  if socat:
263
    level = None
264
    parts = line.split(None, 4)
265

  
266
    if len(parts) == 5:
267
      (_, _, _, level, msg) = parts
268

  
269
      force_update = _ProcessSocatOutput(status_file, level, msg)
270

  
271
      if options.debug or (level and level not in SOCAT_LOG_IGNORE):
272
        forward_line = "socat: %s %s" % (level, msg)
273
      else:
274
        forward_line = None
275
    else:
276
      forward_line = "socat: %s" % line
277

  
278
  if forward_line:
279
    logger.info(forward_line)
280
    status_file.AddRecentOutput(forward_line)
281

  
282
  status_file.Update(force_update)
283

  
284

  
285
def GetBashCommand(cmd):
286
  """Prepares a command to be run in Bash.
287

  
288
  """
289
  return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
290

  
291

  
292
def GetSocatCommand(mode):
293
  """Returns the socat command.
294

  
295
  """
296
  common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
297
    "key=%s" % options.key,
298
    "cert=%s" % options.cert,
299
    "cafile=%s" % options.ca,
300
    ]
301

  
302
  if options.bind is not None:
303
    common_addr_opts.append("bind=%s" % options.bind)
304

  
305
  if mode == constants.IEM_IMPORT:
306
    if options.port is None:
307
      port = 0
308
    else:
309
      port = options.port
310

  
311
    addr1 = [
312
      "OPENSSL-LISTEN:%s" % port,
313
      "reuseaddr",
314
      ] + common_addr_opts
315
    addr2 = ["stdout"]
316

  
317
  elif mode == constants.IEM_EXPORT:
318
    addr1 = ["stdin"]
319
    addr2 = [
320
      "OPENSSL:%s:%s" % (options.host, options.port),
321
      "connect-timeout=%s" % SOCAT_CONNECT_TIMEOUT,
322
      ] + common_addr_opts
323

  
324
  else:
325
    raise Error("Invalid mode")
326

  
327
  for i in [addr1, addr2]:
328
    for value in i:
329
      if "," in value:
330
        raise Error("Comma not allowed in socat option value: %r" % value)
331

  
332
  return [
333
    constants.SOCAT_PATH,
334

  
335
    # Log to stderr
336
    "-ls",
337

  
338
    # Log level
339
    "-d", "-d",
340

  
341
    # Buffer size
342
    "-b%s" % SOCAT_BUFSIZE,
343

  
344
    # Unidirectional mode, the first address is only used for reading, and the
345
    # second address is only used for writing
346
    "-u",
347

  
348
    ",".join(addr1), ",".join(addr2)
349
    ]
350

  
351

  
352
def GetTransportCommand(mode, socat_stderr_fd):
353
  """Returns the command for the transport part of the daemon.
354

  
355
  @param mode: Daemon mode (import or export)
356
  @type socat_stderr_fd: int
357
  @param socat_stderr_fd: File descriptor socat should write its stderr to
358

  
359
  """
360
  socat_cmd = ("%s 2>&%d" %
361
               (utils.ShellQuoteArgs(GetSocatCommand(mode)),
362
                socat_stderr_fd))
363

  
364
  # TODO: Make compression configurable
365

  
366
  if mode == constants.IEM_IMPORT:
367
    transport_cmd = "%s | gunzip -c" % socat_cmd
368
  elif mode == constants.IEM_EXPORT:
369
    transport_cmd = "gzip -c | %s" % socat_cmd
370
  else:
371
    raise Error("Invalid mode")
372

  
373
  # TODO: Use "dd" to measure processed data (allows to give an ETA)
374
  # TODO: If a connection to socat is dropped (e.g. due to a wrong
375
  # certificate), socat should be restarted
376

  
377
  # TODO: Run transport as separate user
378
  # The transport uses its own shell to simplify running it as a separate user
379
  # in the future.
380
  return GetBashCommand(transport_cmd)
381

  
382

  
383
def GetCommand(mode, socat_stderr_fd):
384
  """Returns the complete child process command.
385

  
386
  """
387
  buf = StringIO()
388

  
389
  if options.cmd_prefix:
390
    buf.write(options.cmd_prefix)
391
    buf.write(" ")
392

  
393
  buf.write(utils.ShellQuoteArgs(GetTransportCommand(mode, socat_stderr_fd)))
394

  
395
  if options.cmd_suffix:
396
    buf.write(" ")
397
    buf.write(options.cmd_suffix)
398

  
399
  return GetBashCommand(buf.getvalue())
400

  
401

  
402
def ProcessChildIO(child, socat_stderr_read, status_file, child_logger,
403
                   signal_notify, signal_handler):
404
  """Handles the child processes' output.
405

  
406
  """
407
  poller = select.poll()
408

  
409
  script_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
410
                                           child_logger, False)
411
  try:
412
    socat_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
413
                                            child_logger, True)
414
    try:
415
      fdmap = {
416
        child.stderr.fileno(): (child.stderr, script_stderr_lines),
417
        socat_stderr_read.fileno(): (socat_stderr_read, socat_stderr_lines),
418
        signal_notify.fileno(): (signal_notify, None),
419
        }
420

  
421
      for fd in fdmap:
422
        utils.SetNonblockFlag(fd, True)
423
        poller.register(fd, select.POLLIN)
424

  
425
      timeout_calculator = None
426
      while True:
427
        # Break out of loop if only signal notify FD is left
428
        if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
429
          break
430

  
431
        if timeout_calculator:
432
          timeout = timeout_calculator.Remaining() * 1000
433
          if timeout < 0:
434
            logging.info("Child process didn't exit in time")
435
            break
436
        else:
437
          timeout = None
438

  
439
        for fd, event in utils.RetryOnSignal(poller.poll, timeout):
440
          if event & (select.POLLIN | event & select.POLLPRI):
441
            (from_, to) = fdmap[fd]
442

  
443
            # Read up to 1 KB of data
444
            data = from_.read(1024)
445
            if data:
446
              if to:
447
                to.write(data)
448
              elif fd == signal_notify.fileno():
449
                # Signal handling
450
                if signal_handler.called:
451
                  signal_handler.Clear()
452
                  if timeout_calculator:
453
                    logging.info("Child process still has about %0.2f seconds"
454
                                 " to exit", timeout_calculator.Remaining())
455
                  else:
456
                    logging.info("Giving child process %0.2f seconds to exit",
457
                                 CHILD_LINGER_TIMEOUT)
458
                    timeout_calculator = \
459
                      locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
460
            else:
461
              poller.unregister(fd)
462
              del fdmap[fd]
463

  
464
          elif event & (select.POLLNVAL | select.POLLHUP |
465
                        select.POLLERR):
466
            poller.unregister(fd)
467
            del fdmap[fd]
468

  
469
        script_stderr_lines.flush()
470
        socat_stderr_lines.flush()
471

  
472
      # If there was a timeout calculator, we were waiting for the child to
473
      # finish, e.g. due to a signal
474
      return not bool(timeout_calculator)
475
    finally:
476
      socat_stderr_lines.close()
477
  finally:
478
    script_stderr_lines.close()
479

  
480

  
481
def ParseOptions():
482
  """Parses the options passed to the program.
483

  
484
  @return: Arguments to program
485

  
486
  """
487
  global options # pylint: disable-msg=W0603
488

  
489
  parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
490
                                        (constants.IEM_IMPORT,
491
                                         constants.IEM_EXPORT)))
492
  parser.add_option(cli.DEBUG_OPT)
493
  parser.add_option(cli.VERBOSE_OPT)
494
  parser.add_option("--key", dest="key", action="store", type="string",
495
                    help="RSA key file")
496
  parser.add_option("--cert", dest="cert", action="store", type="string",
497
                    help="X509 certificate file")
498
  parser.add_option("--ca", dest="ca", action="store", type="string",
499
                    help="X509 CA file")
500
  parser.add_option("--bind", dest="bind", action="store", type="string",
501
                    help="Bind address")
502
  parser.add_option("--host", dest="host", action="store", type="string",
503
                    help="Remote hostname")
504
  parser.add_option("--port", dest="port", action="store", type="int",
505
                    help="Remote port")
506
  parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
507
                    type="string", help="Command prefix")
508
  parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
509
                    type="string", help="Command suffix")
510

  
511
  (options, args) = parser.parse_args()
512

  
513
  if len(args) != 2:
514
    # Won't return
515
    parser.error("Expected exactly two arguments")
516

  
517
  (status_file_path, mode) = args
518

  
519
  if mode not in (constants.IEM_IMPORT,
520
                  constants.IEM_EXPORT):
521
    # Won't return
522
    parser.error("Invalid mode: %s" % mode)
523

  
524
  return (status_file_path, mode)
525

  
526

  
527
def main():
528
  """Main function.
529

  
530
  """
531
  # Option parsing
532
  (status_file_path, mode) = ParseOptions()
533

  
534
  # Configure logging
535
  child_logger = SetupLogging()
536

  
537
  status_file = StatusFile(status_file_path)
538
  try:
539
    try:
540
      # Pipe to receive socat's stderr output
541
      (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
542

  
543
      # Pipe to notify on signals
544
      (signal_notify_read_fd, signal_notify_write_fd) = os.pipe()
545

  
546
      # Configure signal module's notifier
547
      try:
548
        # This is only supported in Python 2.5 and above (some distributions
549
        # backported it to Python 2.4)
550
        set_wakeup_fd_fn = signal.set_wakeup_fd
551
      except AttributeError:
552
        pass
553
      else:
554
        set_wakeup_fd_fn(signal_notify_write_fd)
555

  
556
      # Buffer size 0 is important, otherwise .read() with a specified length
557
      # might buffer data while poll(2) won't mark its file descriptor as
558
      # readable again.
559
      socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
560
      signal_notify_read = os.fdopen(signal_notify_read_fd, "r", 0)
561

  
562
      # Get child process command
563
      cmd = GetCommand(mode, socat_stderr_write_fd)
564

  
565
      logging.debug("Starting command %r", cmd)
566

  
567
      def _ChildPreexec():
568
        # Move child to separate process group. By sending a signal to its
569
        # process group we can kill the child process and all its own
570
        # child-processes.
571
        os.setpgid(0, 0)
572

  
573
        # Close almost all file descriptors
574
        utils.CloseFDs(noclose_fds=[socat_stderr_write_fd])
575

  
576
      # Not using close_fds because doing so would also close the socat stderr
577
      # pipe, which we still need.
578
      child = subprocess.Popen(cmd, shell=False, close_fds=False,
579
                               stderr=subprocess.PIPE, stdout=None, stdin=None,
580
                               preexec_fn=_ChildPreexec)
581
      try:
582
        # Avoid race condition by setting child's process group (as good as
583
        # possible in Python) before sending signals to child. For an
584
        # explanation, see preexec function for child.
585
        try:
586
          os.setpgid(child.pid, child.pid)
587
        except EnvironmentError, err:
588
          # If the child process was faster we receive EPERM or EACCES
589
          if err.errno not in (errno.EPERM, errno.EACCES):
590
            raise
591

  
592
        # Forward signals to child process
593
        def _ForwardSignal(signum, _):
594
          # Wake up poll(2)
595
          os.write(signal_notify_write_fd, "\0")
596

  
597
          # Send signal to child
598
          os.killpg(child.pid, signum)
599

  
600
        # TODO: There is a race condition between starting the child and
601
        # handling the signals here. While there might be a way to work around
602
        # it by registering the handlers before starting the child and
603
        # deferring sent signals until the child is available, doing so can be
604
        # complicated.
605
        signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
606
                                             handler_fn=_ForwardSignal)
607
        try:
608
          # Close child's side
609
          utils.RetryOnSignal(os.close, socat_stderr_write_fd)
610

  
611
          if ProcessChildIO(child, socat_stderr_read, status_file, child_logger,
612
                            signal_notify_read, signal_handler):
613
            # The child closed all its file descriptors and there was no signal
614
            # TODO: Implement timeout instead of waiting indefinitely
615
            utils.RetryOnSignal(child.wait)
616
        finally:
617
          signal_handler.Reset()
618
      finally:
619
        # Final check if child process is still alive
620
        if utils.RetryOnSignal(child.poll) is None:
621
          logging.error("Child process still alive, sending SIGKILL")
622
          os.killpg(child.pid, signal.SIGKILL)
623
          utils.RetryOnSignal(child.wait)
624

  
625
      if child.returncode == 0:
626
        errmsg = None
627
      elif child.returncode < 0:
628
        errmsg = "Exited due to signal %s" % (-child.returncode, )
629
      else:
630
        errmsg = "Exited with status %s" % (child.returncode, )
631

  
632
      status_file.SetExitStatus(child.returncode, errmsg)
633
    except Exception, err: # pylint: disable-msg=W0703
634
      logging.exception("Unhandled error occurred")
635
      status_file.SetExitStatus(constants.EXIT_FAILURE,
636
                                "Unhandled error occurred: %s" % (err, ))
637

  
638
    if status_file.ExitStatusIsSuccess():
639
      sys.exit(constants.EXIT_SUCCESS)
640

  
641
    sys.exit(constants.EXIT_FAILURE)
642
  finally:
643
    status_file.Update(True)
644

  
645

  
646
if __name__ == "__main__":
647
  main()
b/epydoc.conf
8 8
# note: the wildcards means the directories should be cleaned up after each
9 9
# run, otherwise there will be stale '*c' (compiled) files that will not be
10 10
# parsable and will break the epydoc run
11
modules: ganeti, scripts/gnt-*, daemons/ganeti-confd, daemons/ganeti-masterd, daemons/ganeti-noded, daemons/ganeti-rapi, daemons/ganeti-watcher, tools/*
11
modules: ganeti, scripts/gnt-*, daemons/ganeti-confd, daemons/ganeti-masterd, daemons/ganeti-noded, daemons/ganeti-rapi, daemons/ganeti-watcher, daemons/import-export, tools/*
12 12

  
13 13
graph: all
14 14

  
b/lib/constants.py
193 193

  
194 194
X509_CERT_SIGNATURE_HEADER = "X-Ganeti-Signature"
195 195

  
196
IMPORT_EXPORT_DAEMON = _autoconf.PKGLIBDIR + "/import-export"
197

  
198
# Import/export daemon mode
199
IEM_IMPORT = "import"
200
IEM_EXPORT = "export"
201

  
196 202
VALUE_DEFAULT = "default"
197 203
VALUE_AUTO = "auto"
198 204
VALUE_GENERATE = "generate"
b/lib/objects.py
1007 1007
    ]
1008 1008

  
1009 1009

  
1010
class ImportExportStatus(ConfigObject):
1011
  """Config object representing the status of an import or export."""
1012
  __slots__ = [
1013
    "recent_output",
1014
    "listen_port",
1015
    "connected",
1016
    "exit_status",
1017
    "error_message",
1018
    ] + _TIMESTAMPS
1019

  
1020

  
1010 1021
class ConfdRequest(ConfigObject):
1011 1022
  """Object holding a confd request.
1012 1023

  
b/test/import-export_unittest-helper
1
#!/usr/bin/python
2
#
3

  
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21

  
22
"""Helpers for testing import-export daemon"""
23

  
24
import os
25
import sys
26
import errno
27

  
28
from ganeti import constants
29
from ganeti import utils
30
from ganeti import objects
31
from ganeti import serializer
32

  
33

  
34
RETRY_INTERVAL = 0.1
35
TIMEOUT = int(os.getenv("TIMEOUT", 10))
36

  
37

  
38
def _GetImportExportData(filename):
39
  try:
40
    data = utils.ReadFile(filename)
41
  except EnvironmentError, err:
42
    if err.errno != errno.ENOENT:
43
      raise
44
    raise utils.RetryAgain()
45

  
46
  return objects.ImportExportStatus.FromDict(serializer.LoadJson(data))
47

  
48

  
49
def _CheckConnected(filename):
50
  if not _GetImportExportData(filename).connected:
51
    raise utils.RetryAgain()
52

  
53

  
54
def WaitForListenPort(filename):
55
  return utils.Retry(lambda: _GetImportExportData(filename).listen_port,
56
                     RETRY_INTERVAL, TIMEOUT)
57

  
58

  
59
def WaitForConnected(filename):
60
  utils.Retry(_CheckConnected, RETRY_INTERVAL, TIMEOUT, args=(filename, ))
61

  
62

  
63
def main():
64
  (filename, what) = sys.argv[1:]
65

  
66
  if what == "listen-port":
67
    print WaitForListenPort(filename)
68
  elif what == "connected":
69
    WaitForConnected(filename)
70
  elif what == "gencert":
71
    utils.GenerateSelfSignedSslCert(filename, validity=1)
72
  else:
73
    raise Exception("Unknown command '%s'" % what)
74

  
75

  
76
if __name__ == "__main__":
77
  main()
b/test/import-export_unittest.bash
1
#!/bin/bash
2
#
3

  
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21
set -e
22
set -o pipefail
23

  
24
export PYTHON=${PYTHON:=python}
25

  
26
impexpd="$PYTHON daemons/import-export"
27

  
28
# Add "-d" for debugging
29
#impexpd+=' -d'
30

  
31
err() {
32
  echo "$@"
33
  echo 'Aborting'
34
  exit 1
35
}
36

  
37
checkpids() {
38
  local result=0
39

  
40
  # Unlike combining the "wait" commands using || or &&, this ensures we
41
  # actually wait for all PIDs.
42
  for pid in "$@"; do
43
    if ! wait $pid; then
44
      result=1
45
    fi
46
  done
47

  
48
  return $result
49
}
50

  
51
get_testpath() {
52
  echo "${TOP_SRCDIR:-.}/test"
53
}
54

  
55
get_testfile() {
56
  echo "$(get_testpath)/data/$1"
57
}
58

  
59
statusdir=$(mktemp -d)
60
trap "rm -rf $statusdir" EXIT
61

  
62
src_statusfile=$statusdir/src.status
63
src_x509=$statusdir/src.pem
64

  
65
dst_statusfile=$statusdir/dst.status
66
dst_x509=$statusdir/dst.pem
67
dst_portfile=$statusdir/dst.port
68

  
69
other_x509=$statusdir/other.pem
70

  
71
testdata=$statusdir/data1
72

  
73
cmd_prefix=
74
cmd_suffix=
75

  
76
$impexpd >/dev/null 2>&1 &&
77
  err "daemon-util succeeded without parameters"
78

  
79
$impexpd foo bar baz moo boo >/dev/null 2>&1 &&
80
  err "daemon-util succeeded with wrong parameters"
81

  
82
$impexpd $src_statusfile >/dev/null 2>&1 &&
83
  err "daemon-util succeeded with insufficient parameters"
84

  
85
$impexpd $src_statusfile invalidmode >/dev/null 2>&1 &&
86
  err "daemon-util succeeded with invalid mode"
87

  
88
cat $(get_testfile proc_drbd8.txt) $(get_testfile cert1.pem) > $testdata
89

  
90
impexpd_helper() {
91
  $PYTHON $(get_testpath)/import-export_unittest-helper "$@"
92
}
93

  
94
reset_status() {
95
  rm -f $src_statusfile $dst_statusfile $dst_portfile
96
}
97

  
98
write_data() {
99
  # Wait for connection to be established
100
  impexpd_helper $dst_statusfile connected
101

  
102
  cat $testdata
103
}
104

  
105
do_export() {
106
  # Wait for listening port
107
  impexpd_helper $dst_statusfile listen-port > $dst_portfile
108

  
109
  local port=$(< $dst_portfile)
110

  
111
  test -n "$port" || err 'Empty port file'
112

  
113
  do_export_to_port $port
114
}
115

  
116
do_export_to_port() {
117
  local port=$1
118

  
119
  $impexpd $src_statusfile export --bind=127.0.0.1 \
120
    --host=127.0.0.1 --port=$port \
121
    --key=$src_x509 --cert=$src_x509 --ca=$dst_x509 \
122
    --cmd-prefix="$cmd_prefix" --cmd-suffix="$cmd_suffix"
123
}
124

  
125
do_import() {
126
  $impexpd $dst_statusfile import --bind=127.0.0.1 \
127
    --host=127.0.0.1 \
128
    --key=$dst_x509 --cert=$dst_x509 --ca=$src_x509 \
129
    --cmd-prefix="$cmd_prefix" --cmd-suffix="$cmd_suffix"
130
}
131

  
132
# Generate X509 certificates and keys
133
impexpd_helper $src_x509 gencert
134
impexpd_helper $dst_x509 gencert
135
impexpd_helper $other_x509 gencert
136

  
137
# Normal case
138
reset_status
139
do_import > $statusdir/recv1 & imppid=$!
140
write_data | do_export & exppid=$!
141
checkpids $exppid $imppid || err 'An error occurred'
142
cmp $testdata $statusdir/recv1 || err 'Received data does not match input'
143

  
144
# Export using wrong CA
145
reset_status
146
do_import > /dev/null 2>&1 & imppid=$!
147
: | dst_x509=$other_x509 do_export 2>/dev/null & exppid=$!
148
checkpids $exppid $imppid && err 'Export did not fail when using wrong CA'
149

  
150
# Import using wrong CA
151
reset_status
152
src_x509=$other_x509 do_import > /dev/null 2>&1 & imppid=$!
153
: | do_export 2> /dev/null & exppid=$!
154
checkpids $exppid $imppid && err 'Import did not fail when using wrong CA'
155

  
156
# Suffix command on import
157
reset_status
158
cmd_suffix="| cksum > $statusdir/recv2" do_import & imppid=$!
159
write_data | do_export & exppid=$!
160
checkpids $exppid $imppid || err 'Testing additional commands failed'
161
cmp $statusdir/recv2 <(cksum < $testdata) || \
162
  err 'Checksum of received data does not match'
163

  
164
# Prefix command on export
165
reset_status
166
do_import > $statusdir/recv3 & imppid=$!
167
write_data | cmd_prefix="cksum |" do_export & exppid=$!
168
checkpids $exppid $imppid || err 'Testing additional commands failed'
169
cmp $statusdir/recv3 <(cksum < $testdata) || \
170
  err 'Received checksum does not match'
171

  
172
# Failing prefix command on export
173
reset_status
174
: | cmd_prefix='exit 1;' do_export_to_port 0 & exppid=$!
175
checkpids $exppid && err 'Prefix command on export did not fail when it should'
176

  
177
# Failing suffix command on export
178
reset_status
179
do_import > /dev/null & imppid=$!
180
: | cmd_suffix='| exit 1' do_export & exppid=$!
181
checkpids $imppid $exppid && \
182
  err 'Suffix command on export did not fail when it should'
183

  
184
# Failing prefix command on import
185
reset_status
186
cmd_prefix='exit 1;' do_import > /dev/null & imppid=$!
187
checkpids $imppid && err 'Prefix command on import did not fail when it should'
188

  
189
# Failing suffix command on import
190
reset_status
191
cmd_suffix='| exit 1' do_import > /dev/null & imppid=$!
192
: | do_export & exppid=$!
193
checkpids $imppid $exppid && \
194
  err 'Suffix command on import did not fail when it should'
195

  
196
exit 0

Also available in: Unified diff