Revision 21bbbc9b snf-tools/test_suite.py

b/snf-tools/test_suite.py
35 35

  
36 36
"""Perform integration testing on a running Synnefo deployment"""
37 37

  
38
import __main__
38 39
import datetime
39 40
import inspect
40 41
import logging
41 42
import os
42 43
import paramiko
44
import prctl
43 45
import subprocess
46
import signal
44 47
import socket
45 48
import struct
46 49
import sys
47 50
import time
48 51

  
49 52
from IPy import IP
53
from multiprocessing import Process, Queue
50 54
from random import choice
55

  
51 56
from kamaki.client import Client, ClientError
52 57
from vncauthproxy.d3des import generate_response as d3des_generate_response
53 58

  
......
60 65
    import unittest
61 66

  
62 67

  
63
API = "http://dev67.dev.grnet.gr:8000/api/v1.1"
64
TOKEN = "46e427d657b20defe352804f0eb6f8a2"
68
API = None
69
TOKEN = None
70
DEFAULT_API = "http://dev67.dev.grnet.gr:8000/api/v1.1"
71
DEFAULT_TOKEN = "46e427d657b20defe352804f0eb6f8a2"
65 72
# A unique id identifying this test run
66
UNIQUE_RUN_ID = datetime.datetime.strftime(datetime.datetime.now(),
67
                                           "%Y%m%d%H%M%S")
68
SNF_TEST_PREFIX = "snf-test"
73
TEST_RUN_ID = datetime.datetime.strftime(datetime.datetime.now(),
74
                                         "%Y%m%d%H%M%S")
75
SNF_TEST_PREFIX = "snf-test-"
69 76

  
70 77
# Setup logging (FIXME - verigak)
71 78
logging.basicConfig(format="%(message)s")
......
216 223

  
217 224
    def _connect_loginname(self, os):
218 225
        """Return the login name for connections based on the server OS"""
219
        if os in ('ubuntu', 'kubuntu', 'fedora'):
220
            return 'user'
221
        elif os == 'windows':
222
            return 'Administrator'
226
        if os in ("ubuntu", "kubuntu", "fedora"):
227
            return "user"
228
        elif os == "windows":
229
            return "Administrator"
223 230
        else:
224
            return 'root'
231
            return "root"
225 232

  
226 233
    def _verify_server_status(self, current_status, new_status):
227 234
        """Verify a server has switched to a specified status"""
228 235
        server = self.client.get_server_details(self.serverid)
229 236
        self.assertIn(server["status"], (current_status, new_status))
237
        if server["status"] not in (current_status, new_status):
238
            return None  # Do not raise exception, return so the test fails
230 239
        self.assertEquals(server["status"], new_status)
231 240

  
232 241
    def _get_connected_tcp_socket(self, family, host, port):
......
279 288
        fail_tmout = time.time() + fail_timeout
280 289
        while True:
281 290
            self.assertLess(time.time(), fail_tmout,
282
                            "operation '%s' timed out" % opmsg)
291
                            "operation `%s' timed out" % opmsg)
283 292
            if time.time() > warn_tmout:
284 293
                log.warning("Server %d: `%s' operation `%s' not done yet",
285 294
                            self.serverid, self.servername, opmsg)
......
291 300
            time.sleep(self.query_interval)
292 301

  
293 302
    def _insist_on_tcp_connection(self, family, host, port):
294
        familystr = {socket.AF_INET: 'IPv4', socket.AF_INET6: 'IPv6'}
303
        familystr = {socket.AF_INET: "IPv4", socket.AF_INET6: "IPv6",
304
                     socket.AF_UNSPEC: "Unspecified-IPv4/6"}
295 305
        msg = "connect over %s to %s:%s" % \
296 306
              (familystr.get(family, "Unknown"), host, port)
297 307
        sock = self._try_until_timeout_expires(
......
309 319
        self._try_until_timeout_expires(warn_timeout, fail_timeout,
310 320
                                        msg, self._verify_server_status,
311 321
                                        current_status, new_status)
322
        # Ensure the status is actually the expected one
323
        server = self.client.get_server_details(self.serverid)
324
        self.assertEquals(server["status"], new_status)
312 325

  
313 326
    def _insist_on_ssh_hostname(self, hostip, username, password):
314 327
        msg = "SSH to %s, as %s/%s" % (hostip, username, password)
......
531 544
    def test_019_server_no_longer_in_server_list(self):
532 545
        """Test server is no longer in server list"""
533 546
        servers = self.client.list_servers()
534
        self.assertNotIn(self.serverid, [s['id'] for s in servers])
547
        self.assertNotIn(self.serverid, [s["id"] for s in servers])
548

  
549

  
550
class TestRunnerProcess(Process):
551
    """A distinct process used to execute part of the tests in parallel"""
552
    def __init__(self, **kw):
553
        Process.__init__(self, **kw)
554
        kwargs = kw["kwargs"]
555
        self.testq = kwargs["testq"]
556
        self.runner = kwargs["runner"]
557

  
558
    def run(self):
559
        # Make sure this test runner process dies with the parent
560
        # and is not left behind.
561
        #
562
        # WARNING: This uses the prctl(2) call and is
563
        # Linux-specific.
564
        prctl.set_pdeathsig(signal.SIGHUP)
565

  
566
        while True:
567
            log.debug("I am process %d, GETting from queue is %s",
568
                     os.getpid(), self.testq)
569
            msg = self.testq.get()
570
            log.debug("Dequeued msg: %s", msg)
571

  
572
            if msg == "TEST_RUNNER_TERMINATE":
573
                raise SystemExit
574
            elif issubclass(msg, unittest.TestCase):
575
                # Assemble a TestSuite, and run it
576
                suite = unittest.TestLoader().loadTestsFromTestCase(msg)
577
                self.runner.run(suite)
578
            else:
579
                raise Exception("Cannot handle msg: %s" % msg)
580

  
581

  
582
def _run_cases_in_parallel(cases, fanout=1, runner=None):
583
    """Run instances of TestCase in parallel, in a number of distinct processes
584

  
585
    The cases iterable specifies the TestCases to be executed in parallel,
586
    by test runners running in distinct processes.
587

  
588
    The fanout parameter specifies the number of processes to spawn,
589
    and defaults to 1.
590

  
591
    The runner argument specifies the test runner class to use inside each
592
    runner process.
593

  
594
    """
595
    if runner is None:
596
        runner = unittest.TextTestRunner()
597

  
598
    # testq: The master process enqueues TestCase objects into this queue,
599
    #        test runner processes pick them up for execution, in parallel.
600
    testq = Queue()
601
    runners = []
602
    for i in xrange(0, fanout):
603
        kwargs = dict(testq=testq, runner=runner)
604
        runners.append(TestRunnerProcess(kwargs=kwargs))
605

  
606
    log.info("Spawning %d test runner processes", len(runners))
607
    for p in runners:
608
        p.start()
609
    log.debug("Spawned %d test runners, PIDs are %s",
610
              len(runners), [p.pid for p in runners])
611

  
612
    # Enqueue test cases
613
    map(testq.put, cases)
614
    map(testq.put, ["TEST_RUNNER_TERMINATE"] * len(runners))
615

  
616
    log.debug("Joining %d processes", len(runners))
617
    for p in runners:
618
        p.join()
619
    log.debug("Done joining %d processes", len(runners))
535 620

  
536 621

  
537 622
def _spawn_server_test_case(**kwargs):
538 623
    """Construct a new unit test case class from SpawnServerTestCase"""
539 624

  
540
    name = "SpawnServerTestCase_%s" % kwargs["imagename"].replace(" ", "_")
625
    name = "SpawnServerTestCase_%d" % kwargs["imageid"]
541 626
    cls = type(name, (SpawnServerTestCase,), kwargs)
542 627

  
543 628
    # Patch extra parameters into test names by manipulating method docstrings
......
545 630
        inspect.getmembers(cls, lambda x: inspect.ismethod(x)):
546 631
            if hasattr(m, __doc__):
547 632
                m.__func__.__doc__ = "[%s] %s" % (imagename, m.__doc__)
548

  
633
    setattr(__main__,name,cls)
549 634
    return cls
550 635

  
551 636

  
......
559 644

  
560 645
    print >> sys.stderr, "Found these stale servers from previous runs:"
561 646
    print "    " + \
562
          "\n    ".join(["%d: %s" % (s['id'], s['name']) for s in stale])
647
          "\n    ".join(["%d: %s" % (s["id"], s["name"]) for s in stale])
563 648

  
564 649
    if delete_stale:
565 650
        print >> sys.stderr, "Deleting %d stale servers:" % len(stale)
566 651
        for server in stale:
567
            c.delete_server(server['id'])
652
            c.delete_server(server["id"])
568 653
        print >> sys.stderr, "    ...done"
569 654
    else:
570 655
        print >> sys.stderr, "Use --delete-stale to delete them."
......
581 666

  
582 667
    parser = OptionParser(**kw)
583 668
    parser.disable_interspersed_args()
669
    parser.add_option("--api",
670
                      action="store", type="string", dest="api",
671
                      help="The API URI to use to reach the Synnefo API",
672
                      default=DEFAULT_API)
673
    parser.add_option("--token",
674
                      action="store", type="string", dest="token",
675
                      help="The token to use for authentication to the API",
676
                      default=DEFAULT_TOKEN)
584 677
    parser.add_option("--failfast",
585 678
                      action="store_true", dest="failfast",
586 679
                      help="Fail immediately if one of the tests fails",
......
609 702
                      help="Query server status when requests are pending " \
610 703
                           "every INTERVAL seconds",
611 704
                      default=3)
612
    parser.add_option("--build-fanout",
613
                      action="store", type="int", dest="build_fanout",
705
    parser.add_option("--fanout",
706
                      action="store", type="int", dest="fanout",
614 707
                      metavar="COUNT",
615
                      help="Test COUNT images in parallel, by submitting " \
616
                           "COUNT server build requests simultaneously",
708
                      help="Spawn up to COUNT child processes to execute " \
709
                           "in parallel, essentially have up to COUNT " \
710
                           "server build requests outstanding",
617 711
                      default=1)
618 712
    parser.add_option("--force-flavor",
619 713
                      action="store", type="int", dest="force_flavorid",
......
625 719
    parser.add_option("--show-stale",
626 720
                      action="store_true", dest="show_stale",
627 721
                      help="Show stale servers from previous runs, whose "\
628
                           "name starts with '%s'" % SNF_TEST_PREFIX,
722
                           "name starts with `%s'" % SNF_TEST_PREFIX,
629 723
                      default=False)
630 724
    parser.add_option("--delete-stale",
631 725
                      action="store_true", dest="delete_stale",
632 726
                      help="Delete stale servers from previous runs, whose "\
633
                           "name starts with '%s'" % SNF_TEST_PREFIX,
727
                           "name starts with `%s'" % SNF_TEST_PREFIX,
634 728
                      default=False)
635 729

  
636 730
    # FIXME: Change the default for build-fanout to 10
......
650 744

  
651 745
    IMPORTANT: Tests have dependencies and have to be run in the specified
652 746
    order inside a single test case. They communicate through attributes of the
653
    corresponding TestCase class (shared fixtures). TestCase classes for
654
    distinct Images may be run in parallel.
747
    corresponding TestCase class (shared fixtures). Distinct subclasses of
748
    TestCase MAY SHARE NO DATA, since they are run in parallel, in distinct
749
    test runner processes.
655 750

  
656 751
    """
657

  
658 752
    (opts, args) = parse_arguments(sys.argv[1:])
659 753

  
754
    global API, TOKEN
755
    API = opts.api
756
    TOKEN = opts.token
757

  
660 758
    # Cleanup stale servers from previous runs
661 759
    if opts.show_stale:
662 760
        cleanup_servers(delete_stale=opts.delete_stale)
......
667 765
    DIMAGES = c.list_images(detail=True)
668 766
    DFLAVORS = c.list_flavors(detail=True)
669 767

  
670
    #
671
    # Assemble all test cases
672
    #
673

  
674
    # Initial test cases
675
    cases = [UnauthorizedTestCase, FlavorsTestCase, ImagesTestCase]
676

  
677
    # Image test cases
678
    imageid = 1
679
    flavorid = opts.force_flavorid if opts.force_flavorid \
680
               else choice(filter(lambda x: x["disk"] >= 20, DFLAVORS))["id"]
681
    imagename = "Debian Base"
682
    personality = None
683
    servername = "%s-%s for %s" % (SNF_TEST_PREFIX, UNIQUE_RUN_ID, imagename)
684
    is_windows = imagename.lower().find("windows") >= 0
685
    case = _spawn_server_test_case(imageid=imageid, flavorid=flavorid,
686
                                   imagename=imagename,
687
                                   personality=personality,
688
                                   servername=servername,
689
                                   is_windows=is_windows,
690
                                   action_timeout=opts.action_timeout,
691
                                   build_warning=opts.build_warning,
692
                                   build_fail=opts.build_fail,
693
                                   query_interval=opts.query_interval)
694
    cases.append(case)
695

  
696
    # FIXME: logging, log, UNIQUE_RUN_ID arguments
768
    # FIXME: logging, log, LOG PID, TEST_RUN_ID, arguments
769
    # FIXME: Network testing? Create, destroy, connect, ping, disconnect VMs?
697 770
    # Run them: FIXME: In parallel, FAILEARLY, catchbreak?
698 771
    #unittest.main(verbosity=2, catchbreak=True)
699 772

  
700
    #
701
    # Run the resulting test suite
702
    #
703
    suites = map(unittest.TestLoader().loadTestsFromTestCase, cases)
704
    alltests = unittest.TestSuite(suites)
705
    unittest.TextTestRunner(verbosity=2, failfast=opts.failfast).run(alltests)
773
    runner = unittest.TextTestRunner(verbosity=2, failfast=opts.failfast)
774
    # The following cases run sequentially
775
    seq_cases = [UnauthorizedTestCase, FlavorsTestCase, ImagesTestCase]
776
    _run_cases_in_parallel(seq_cases, fanout=3, runner=runner)
777

  
778
    # The following cases run in parallel
779
    par_cases = []
706 780

  
781
    for image in DIMAGES:
782
        imageid = image["id"]
783
        imagename = image["name"]
784
        if opts.force_flavorid:
785
            flavorid = opts.force_flavorid
786
        else:
787
            flavorid = choice([f["id"] for f in DFLAVORS if f["disk"] >= 20])
788
        personality = None   # FIXME
789
        servername = "%s%s for %s" % (SNF_TEST_PREFIX, TEST_RUN_ID, imagename)
790
        is_windows = imagename.lower().find("windows") >= 0
791
        case = _spawn_server_test_case(imageid=imageid, flavorid=flavorid,
792
                                       imagename=imagename,
793
                                       personality=personality,
794
                                       servername=servername,
795
                                       is_windows=is_windows,
796
                                       action_timeout=opts.action_timeout,
797
                                       build_warning=opts.build_warning,
798
                                       build_fail=opts.build_fail,
799
                                       query_interval=opts.query_interval)
800
        par_cases.append(case)
801

  
802
    print "%s" % FlavorsTestCase
803
    print "dict", __main__.__dict__
804
    _run_cases_in_parallel(par_cases, fanout=opts.fanout, runner=runner)
707 805

  
708 806
if __name__ == "__main__":
709 807
    sys.exit(main())

Also available in: Unified diff