Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / tools / burnin.py @ 74193008

History | View | Annotate | Download (33.9 kB)

1
#!/usr/bin/env python
2

    
3
# Copyright 2011 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35

    
36
"""Perform integration testing on a running Synnefo deployment"""
37

    
38
import __main__
39
import datetime
40
import inspect
41
import logging
42
import os
43
import paramiko
44
import prctl
45
import subprocess
46
import signal
47
import socket
48
import struct
49
import sys
50
import time
51

    
52
from IPy import IP
53
from multiprocessing import Process, Queue
54
from random import choice
55

    
56
from kamaki.clients import ClientError, ComputeClient, CycladesClient
57
from kamaki.config import Config
58

    
59
from vncauthproxy.d3des import generate_response as d3des_generate_response
60

    
61
# Use backported unittest functionality if Python < 2.7
62
try:
63
    import unittest2 as unittest
64
except ImportError:
65
    if sys.version_info < (2, 7):
66
        raise Exception("The unittest2 package is required for Python < 2.7")
67
    import unittest
68

    
69

    
70
API = None
71
TOKEN = None
72
DEFAULT_API = "http://127.0.0.1:8000/api/v1.1"
73

    
74
# A unique id identifying this test run
75
TEST_RUN_ID = datetime.datetime.strftime(datetime.datetime.now(),
76
                                         "%Y%m%d%H%M%S")
77
SNF_TEST_PREFIX = "snf-test-"
78

    
79
# Setup logging (FIXME - verigak)
80
logging.basicConfig(format="%(message)s")
81
log = logging.getLogger("burnin")
82
log.setLevel(logging.INFO)
83

    
84

    
85
class UnauthorizedTestCase(unittest.TestCase):
86
    def test_unauthorized_access(self):
87
        """Test access without a valid token fails"""
88
        falseToken = '12345'
89
        conf = Config()
90
        conf.set('compute_token', falseToken)
91

    
92
        with self.assertRaises(ClientError) as cm:
93
            c.list_servers()
94
        self.assertEqual(cm.exception.status, 401)
95

    
96

    
97
class ImagesTestCase(unittest.TestCase):
98
    """Test image lists for consistency"""
99
    @classmethod
100
    def setUpClass(cls):
101
        """Initialize kamaki, get (detailed) list of images"""
102
        log.info("Getting simple and detailed list of images")
103

    
104
        conf = Config()
105
        conf.set('compute_token', TOKEN)
106
        cls.client = ComputeClient(conf)
107
        cls.images = cls.client.list_images()
108
        cls.dimages = cls.client.list_images(detail=True)
109

    
110
    def test_001_list_images(self):
111
        """Test image list actually returns images"""
112
        self.assertGreater(len(self.images), 0)
113

    
114
    def test_002_list_images_detailed(self):
115
        """Test detailed image list is the same length as list"""
116
        self.assertEqual(len(self.dimages), len(self.images))
117

    
118
    def test_003_same_image_names(self):
119
        """Test detailed and simple image list contain same names"""
120
        names = sorted(map(lambda x: x["name"], self.images))
121
        dnames = sorted(map(lambda x: x["name"], self.dimages))
122
        self.assertEqual(names, dnames)
123

    
124
    def test_004_unique_image_names(self):
125
        """Test images have unique names"""
126
        names = sorted(map(lambda x: x["name"], self.images))
127
        self.assertEqual(sorted(list(set(names))), names)
128

    
129
    def test_005_image_metadata(self):
130
        """Test every image has specific metadata defined"""
131
        keys = frozenset(["os", "description", "size"])
132
        for i in self.dimages:
133
            self.assertTrue(keys.issubset(i["metadata"]["values"].keys()))
134

    
135

    
136
class FlavorsTestCase(unittest.TestCase):
137
    """Test flavor lists for consistency"""
138
    @classmethod
139
    def setUpClass(cls):
140
        """Initialize kamaki, get (detailed) list of flavors"""
141
        log.info("Getting simple and detailed list of flavors")
142

    
143
        conf = Config()
144
        conf.set('compute_token', TOKEN)
145
        cls.client = ComputeClient(conf)
146
        cls.flavors = cls.client.list_flavors()
147
        cls.dflavors = cls.client.list_flavors(detail=True)
148

    
149
    def test_001_list_flavors(self):
150
        """Test flavor list actually returns flavors"""
151
        self.assertGreater(len(self.flavors), 0)
152

    
153
    def test_002_list_flavors_detailed(self):
154
        """Test detailed flavor list is the same length as list"""
155
        self.assertEquals(len(self.dflavors), len(self.flavors))
156

    
157
    def test_003_same_flavor_names(self):
158
        """Test detailed and simple flavor list contain same names"""
159
        names = sorted(map(lambda x: x["name"], self.flavors))
160
        dnames = sorted(map(lambda x: x["name"], self.dflavors))
161
        self.assertEqual(names, dnames)
162

    
163
    def test_004_unique_flavor_names(self):
164
        """Test flavors have unique names"""
165
        names = sorted(map(lambda x: x["name"], self.flavors))
166
        self.assertEqual(sorted(list(set(names))), names)
167

    
168
    def test_005_well_formed_flavor_names(self):
169
        """Test flavors have names of the form CxxRyyDzz
170

171
        Where xx is vCPU count, yy is RAM in MiB, zz is Disk in GiB
172

173
        """
174
        for f in self.dflavors:
175
            self.assertEqual("C%dR%dD%d" % (f["cpu"], f["ram"], f["disk"]),
176
                             f["name"],
177
                             "Flavor %s does not match its specs." % f["name"])
178

    
179

    
180
class ServersTestCase(unittest.TestCase):
181
    """Test server lists for consistency"""
182
    @classmethod
183
    def setUpClass(cls):
184
        """Initialize kamaki, get (detailed) list of servers"""
185
        log.info("Getting simple and detailed list of servers")
186

    
187
        conf = Config()
188
        conf.set('compute_token', TOKEN)
189
        cls.client = ComputeClient(conf)
190
        cls.servers = cls.client.list_servers()
191
        cls.dservers = cls.client.list_servers(detail=True)
192

    
193
    def test_001_list_servers(self):
194
        """Test server list actually returns servers"""
195
        self.assertGreater(len(self.servers), 0)
196

    
197
    def test_002_list_servers_detailed(self):
198
        """Test detailed server list is the same length as list"""
199
        self.assertEqual(len(self.dservers), len(self.servers))
200

    
201
    def test_003_same_server_names(self):
202
        """Test detailed and simple flavor list contain same names"""
203
        names = sorted(map(lambda x: x["name"], self.servers))
204
        dnames = sorted(map(lambda x: x["name"], self.dservers))
205
        self.assertEqual(names, dnames)
206

    
207

    
208
# This class gets replicated into actual TestCases dynamically
209
class SpawnServerTestCase(unittest.TestCase):
210
    """Test scenario for server of the specified image"""
211

    
212
    @classmethod
213
    def setUpClass(cls):
214
        """Initialize a kamaki instance"""
215
        log.info("Spawning server for image `%s'", cls.imagename)
216

    
217
        conf = Config()
218
        conf.set('compute_token', TOKEN)
219
        cls.client = ComputeClient(conf)
220
        cls.cyclades = CycladesClient(conf)
221

    
222
    def _get_ipv4(self, server):
223
        """Get the public IPv4 of a server from the detailed server info"""
224

    
225
        details = 
226
        public_addrs = filter(lambda x: x["id"] == "public",
227
                              server["addresses"]["values"])
228
        self.assertEqual(len(public_addrs), 1)
229
        ipv4_addrs = filter(lambda x: x["version"] == 4,
230
                            public_addrs[0]["values"])
231
        self.assertEqual(len(ipv4_addrs), 1)
232
        return ipv4_addrs[0]["addr"]
233

    
234
    def _get_ipv6(self, server):
235
        """Get the public IPv6 of a server from the detailed server info"""
236
        public_addrs = filter(lambda x: x["id"] == "public",
237
                              server["addresses"]["values"])
238
        self.assertEqual(len(public_addrs), 1)
239
        ipv6_addrs = filter(lambda x: x["version"] == 6,
240
                            public_addrs[0]["values"])
241
        self.assertEqual(len(ipv6_addrs), 1)
242
        return ipv6_addrs[0]["addr"]
243

    
244
    def _connect_loginname(self, os):
245
        """Return the login name for connections based on the server OS"""
246
        if os in ("Ubuntu", "Kubuntu", "Fedora"):
247
            return "user"
248
        elif os in ("windows", "windows_alpha1"):
249
            return "Administrator"
250
        else:
251
            return "root"
252

    
253
    def _verify_server_status(self, current_status, new_status):
254
        """Verify a server has switched to a specified status"""
255
        server = self.client.get_server_details(self.serverid)
256
        if server["status"] not in (current_status, new_status):
257
            return None  # Do not raise exception, return so the test fails
258
        self.assertEquals(server["status"], new_status)
259

    
260
    def _get_connected_tcp_socket(self, family, host, port):
261
        """Get a connected socket from the specified family to host:port"""
262
        sock = None
263
        for res in \
264
            socket.getaddrinfo(host, port, family, socket.SOCK_STREAM, 0,
265
                               socket.AI_PASSIVE):
266
            af, socktype, proto, canonname, sa = res
267
            try:
268
                sock = socket.socket(af, socktype, proto)
269
            except socket.error as msg:
270
                sock = None
271
                continue
272
            try:
273
                sock.connect(sa)
274
            except socket.error as msg:
275
                sock.close()
276
                sock = None
277
                continue
278
        self.assertIsNotNone(sock)
279
        return sock
280

    
281
    def _ping_once(self, ipv6, ip):
282
        """Test server responds to a single IPv4 or IPv6 ping"""
283
        cmd = "ping%s -c 2 -w 3 %s" % ("6" if ipv6 else "", ip)
284
        ping = subprocess.Popen(cmd, shell=True,
285
                                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
286
        (stdout, stderr) = ping.communicate()
287
        ret = ping.wait()
288
        self.assertEquals(ret, 0)
289

    
290
    def _get_hostname_over_ssh(self, hostip, username, password):
291
        ssh = paramiko.SSHClient()
292
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
293
        try:
294
            ssh.connect(hostip, username=username, password=password)
295
        except socket.error:
296
            raise AssertionError
297
        stdin, stdout, stderr = ssh.exec_command("hostname")
298
        lines = stdout.readlines()
299
        self.assertEqual(len(lines), 1)
300
        return lines[0]
301

    
302
    def _try_until_timeout_expires(self, warn_timeout, fail_timeout,
303
                                   opmsg, callable, *args, **kwargs):
304
        if warn_timeout == fail_timeout:
305
            warn_timeout = fail_timeout + 1
306
        warn_tmout = time.time() + warn_timeout
307
        fail_tmout = time.time() + fail_timeout
308
        while True:
309
            self.assertLess(time.time(), fail_tmout,
310
                            "operation `%s' timed out" % opmsg)
311
            if time.time() > warn_tmout:
312
                log.warning("Server %d: `%s' operation `%s' not done yet",
313
                            self.serverid, self.servername, opmsg)
314
            try:
315
                log.info("%s... " % opmsg)
316
                return callable(*args, **kwargs)
317
            except AssertionError:
318
                pass
319
            time.sleep(self.query_interval)
320

    
321
    def _insist_on_tcp_connection(self, family, host, port):
322
        familystr = {socket.AF_INET: "IPv4", socket.AF_INET6: "IPv6",
323
                     socket.AF_UNSPEC: "Unspecified-IPv4/6"}
324
        msg = "connect over %s to %s:%s" % \
325
              (familystr.get(family, "Unknown"), host, port)
326
        sock = self._try_until_timeout_expires(
327
                self.action_timeout, self.action_timeout,
328
                msg, self._get_connected_tcp_socket,
329
                family, host, port)
330
        return sock
331

    
332
    def _insist_on_status_transition(self, current_status, new_status,
333
                                    fail_timeout, warn_timeout=None):
334
        msg = "Server %d: `%s', waiting for %s -> %s" % \
335
              (self.serverid, self.servername, current_status, new_status)
336
        if warn_timeout is None:
337
            warn_timeout = fail_timeout
338
        self._try_until_timeout_expires(warn_timeout, fail_timeout,
339
                                        msg, self._verify_server_status,
340
                                        current_status, new_status)
341
        # Ensure the status is actually the expected one
342
        server = self.client.get_server_details(self.serverid)
343
        self.assertEquals(server["status"], new_status)
344

    
345
    def _insist_on_ssh_hostname(self, hostip, username, password):
346
        msg = "SSH to %s, as %s/%s" % (hostip, username, password)
347
        hostname = self._try_until_timeout_expires(
348
                self.action_timeout, self.action_timeout,
349
                msg, self._get_hostname_over_ssh,
350
                hostip, username, password)
351

    
352
        # The hostname must be of the form 'prefix-id'
353
        self.assertTrue(hostname.endswith("-%d\n" % self.serverid))
354

    
355
    def _skipIf(self, condition, msg):
356
        if condition:
357
            self.skipTest(msg)
358

    
359
    def test_001_submit_create_server(self):
360
        """Test submit create server request"""
361
        server = self.client.create_server(self.servername, self.flavorid,
362
                                           self.imageid, self.personality)
363
        self.assertEqual(server["name"], self.servername)
364
        self.assertEqual(server["flavorRef"], self.flavorid)
365
        self.assertEqual(server["imageRef"], self.imageid)
366
        self.assertEqual(server["status"], "BUILD")
367

    
368
        # Update class attributes to reflect data on building server
369
        cls = type(self)
370
        cls.serverid = server["id"]
371
        cls.username = None
372
        cls.passwd = server["adminPass"]
373

    
374
    def test_002a_server_is_building_in_list(self):
375
        """Test server is in BUILD state, in server list"""
376
        servers = self.client.list_servers(detail=True)
377
        servers = filter(lambda x: x["name"] == self.servername, servers)
378
        self.assertEqual(len(servers), 1)
379
        server = servers[0]
380
        self.assertEqual(server["name"], self.servername)
381
        self.assertEqual(server["flavorRef"], self.flavorid)
382
        self.assertEqual(server["imageRef"], self.imageid)
383
        self.assertEqual(server["status"], "BUILD")
384

    
385
    def test_002b_server_is_building_in_details(self):
386
        """Test server is in BUILD state, in details"""
387
        server = self.client.get_server_details(self.serverid)
388
        self.assertEqual(server["name"], self.servername)
389
        self.assertEqual(server["flavorRef"], self.flavorid)
390
        self.assertEqual(server["imageRef"], self.imageid)
391
        self.assertEqual(server["status"], "BUILD")
392

    
393
    def test_002c_set_server_metadata(self):
394
        image = self.client.get_image_details(self.imageid)
395
        os = image["metadata"]["values"]["os"]
396
        loginname = image["metadata"]["values"].get("users", None)
397
        self.client.update_server_metadata(self.serverid, OS=os)
398

    
399
        # Determine the username to use for future connections
400
        # to this host
401
        cls = type(self)
402
        cls.username = loginname
403
        if not cls.username:
404
            cls.username = self._connect_loginname(os)
405
        self.assertIsNotNone(cls.username)
406

    
407
    def test_002d_verify_server_metadata(self):
408
        """Test server metadata keys are set based on image metadata"""
409
        servermeta = self.client.get_server_metadata(self.serverid)
410
        imagemeta = self.client.get_image_metadata(self.imageid)
411
        self.assertEqual(servermeta["OS"], imagemeta["OS"])
412

    
413
    def test_003_server_becomes_active(self):
414
        """Test server becomes ACTIVE"""
415
        self._insist_on_status_transition("BUILD", "ACTIVE",
416
                                         self.build_fail, self.build_warning)
417

    
418
    def test_003a_get_server_oob_console(self):
419
        """Test getting OOB server console over VNC
420

421
        Implementation of RFB protocol follows
422
        http://www.realvnc.com/docs/rfbproto.pdf.
423

424
        """
425
        
426
        console = self.cyclades.get_server_console(self.serverid)
427
        self.assertEquals(console['type'], "vnc")
428
        sock = self._insist_on_tcp_connection(socket.AF_UNSPEC,
429
                                        console["host"], console["port"])
430

    
431
        # Step 1. ProtocolVersion message (par. 6.1.1)
432
        version = sock.recv(1024)
433
        self.assertEquals(version, 'RFB 003.008\n')
434
        sock.send(version)
435

    
436
        # Step 2. Security (par 6.1.2): Only VNC Authentication supported
437
        sec = sock.recv(1024)
438
        self.assertEquals(list(sec), ['\x01', '\x02'])
439

    
440
        # Step 3. Request VNC Authentication (par 6.1.2)
441
        sock.send('\x02')
442

    
443
        # Step 4. Receive Challenge (par 6.2.2)
444
        challenge = sock.recv(1024)
445
        self.assertEquals(len(challenge), 16)
446

    
447
        # Step 5. DES-Encrypt challenge, use password as key (par 6.2.2)
448
        response = d3des_generate_response(
449
            (console["password"] + '\0' * 8)[:8], challenge)
450
        sock.send(response)
451

    
452
        # Step 6. SecurityResult (par 6.1.3)
453
        result = sock.recv(4)
454
        self.assertEquals(list(result), ['\x00', '\x00', '\x00', '\x00'])
455
        sock.close()
456

    
457
    def test_004_server_has_ipv4(self):
458
        """Test active server has a valid IPv4 address"""
459
        server = self.client.get_server_details(self.serverid)
460
        ipv4 = self._get_ipv4(server)
461
        self.assertEquals(IP(ipv4).version(), 4)
462

    
463
    def test_005_server_has_ipv6(self):
464
        """Test active server has a valid IPv6 address"""
465
        server = self.client.get_server_details(self.serverid)
466
        ipv6 = self._get_ipv6(server)
467
        self.assertEquals(IP(ipv6).version(), 6)
468

    
469
    def test_006_server_responds_to_ping_IPv4(self):
470
        """Test server responds to ping on IPv4 address"""
471
        server = self.client.get_server_details(self.serverid)
472
        ip = self._get_ipv4(server)
473
        self._try_until_timeout_expires(self.action_timeout,
474
                                        self.action_timeout,
475
                                        "PING IPv4 to %s" % ip,
476
                                        self._ping_once,
477
                                        False, ip)
478

    
479
    def test_007_server_responds_to_ping_IPv6(self):
480
        """Test server responds to ping on IPv6 address"""
481
        server = self.client.get_server_details(self.serverid)
482
        ip = self._get_ipv6(server)
483
        self._try_until_timeout_expires(self.action_timeout,
484
                                        self.action_timeout,
485
                                        "PING IPv6 to %s" % ip,
486
                                        self._ping_once,
487
                                        True, ip)
488

    
489
    def test_008_submit_shutdown_request(self):
490
        """Test submit request to shutdown server"""
491
        self.cyclades.shutdown_server(self.serverid)
492

    
493
    def test_009_server_becomes_stopped(self):
494
        """Test server becomes STOPPED"""
495
        self._insist_on_status_transition("ACTIVE", "STOPPED",
496
                                         self.action_timeout,
497
                                         self.action_timeout)
498

    
499
    def test_010_submit_start_request(self):
500
        """Test submit start server request"""
501
        self.cyclades.start_server(self.serverid)
502

    
503
    def test_011_server_becomes_active(self):
504
        """Test server becomes ACTIVE again"""
505
        self._insist_on_status_transition("STOPPED", "ACTIVE",
506
                                         self.action_timeout,
507
                                         self.action_timeout)
508

    
509
    def test_011a_server_responds_to_ping_IPv4(self):
510
        """Test server OS is actually up and running again"""
511
        self.test_006_server_responds_to_ping_IPv4()
512

    
513
    def test_012_ssh_to_server_IPv4(self):
514
        """Test SSH to server public IPv4 works, verify hostname"""
515
        self._skipIf(self.is_windows, "only valid for Linux servers")
516
        server = self.client.get_server_details(self.serverid)
517
        self._insist_on_ssh_hostname(self._get_ipv4(server),
518
                                     self.username, self.passwd)
519

    
520
    def test_013_ssh_to_server_IPv6(self):
521
        """Test SSH to server public IPv6 works, verify hostname"""
522
        self._skipIf(self.is_windows, "only valid for Linux servers")
523
        server = self.client.get_server_details(self.serverid)
524
        self._insist_on_ssh_hostname(self._get_ipv6(server),
525
                                     self.username, self.passwd)
526

    
527
    def test_014_rdp_to_server_IPv4(self):
528
        "Test RDP connection to server public IPv4 works"""
529
        self._skipIf(not self.is_windows, "only valid for Windows servers")
530
        server = self.client.get_server_details(self.serverid)
531
        ipv4 = self._get_ipv4(server)
532
        sock = _insist_on_tcp_connection(socket.AF_INET, ipv4, 3389)
533

    
534
        # No actual RDP processing done. We assume the RDP server is there
535
        # if the connection to the RDP port is successful.
536
        # FIXME: Use rdesktop, analyze exit code? see manpage [costasd]
537
        sock.close()
538

    
539
    def test_015_rdp_to_server_IPv6(self):
540
        "Test RDP connection to server public IPv6 works"""
541
        self._skipIf(not self.is_windows, "only valid for Windows servers")
542
        server = self.client.get_server_details(self.serverid)
543
        ipv6 = self._get_ipv6(server)
544
        sock = _get_tcp_connection(socket.AF_INET6, ipv6, 3389)
545

    
546
        # No actual RDP processing done. We assume the RDP server is there
547
        # if the connection to the RDP port is successful.
548
        sock.close()
549

    
550
    def test_016_personality_is_enforced(self):
551
        """Test file injection for personality enforcement"""
552
        self._skipIf(self.is_windows, "only implemented for Linux servers")
553
        self.assertTrue(False, "test not implemented, will fail")
554

    
555
    def test_017_submit_delete_request(self):
556
        """Test submit request to delete server"""
557
        self.client.delete_server(self.serverid)
558

    
559
    def test_018_server_becomes_deleted(self):
560
        """Test server becomes DELETED"""
561
        self._insist_on_status_transition("ACTIVE", "DELETED",
562
                                         self.action_timeout,
563
                                         self.action_timeout)
564

    
565
    def test_019_server_no_longer_in_server_list(self):
566
        """Test server is no longer in server list"""
567
        servers = self.client.list_servers()
568
        self.assertNotIn(self.serverid, [s["id"] for s in servers])
569

    
570

    
571
class TestRunnerProcess(Process):
572
    """A distinct process used to execute part of the tests in parallel"""
573
    def __init__(self, **kw):
574
        Process.__init__(self, **kw)
575
        kwargs = kw["kwargs"]
576
        self.testq = kwargs["testq"]
577
        self.runner = kwargs["runner"]
578

    
579
    def run(self):
580
        # Make sure this test runner process dies with the parent
581
        # and is not left behind.
582
        #
583
        # WARNING: This uses the prctl(2) call and is
584
        # Linux-specific.
585
        prctl.set_pdeathsig(signal.SIGHUP)
586

    
587
        while True:
588
            log.debug("I am process %d, GETting from queue is %s",
589
                     os.getpid(), self.testq)
590
            msg = self.testq.get()
591
            log.debug("Dequeued msg: %s", msg)
592

    
593
            if msg == "TEST_RUNNER_TERMINATE":
594
                raise SystemExit
595
            elif issubclass(msg, unittest.TestCase):
596
                # Assemble a TestSuite, and run it
597
                suite = unittest.TestLoader().loadTestsFromTestCase(msg)
598
                self.runner.run(suite)
599
            else:
600
                raise Exception("Cannot handle msg: %s" % msg)
601

    
602

    
603
def _run_cases_in_parallel(cases, fanout=1, runner=None):
604
    """Run instances of TestCase in parallel, in a number of distinct processes
605

606
    The cases iterable specifies the TestCases to be executed in parallel,
607
    by test runners running in distinct processes.
608
    The fanout parameter specifies the number of processes to spawn,
609
    and defaults to 1.
610
    The runner argument specifies the test runner class to use inside each
611
    runner process.
612

613
    """
614
    if runner is None:
615
        runner = unittest.TextTestRunner(verbosity=2, failfast=True)
616

    
617
    # testq: The master process enqueues TestCase objects into this queue,
618
    #        test runner processes pick them up for execution, in parallel.
619
    testq = Queue()
620
    runners = []
621
    for i in xrange(0, fanout):
622
        kwargs = dict(testq=testq, runner=runner)
623
        runners.append(TestRunnerProcess(kwargs=kwargs))
624

    
625
    log.info("Spawning %d test runner processes", len(runners))
626
    for p in runners:
627
        p.start()
628
    log.debug("Spawned %d test runners, PIDs are %s",
629
              len(runners), [p.pid for p in runners])
630

    
631
    # Enqueue test cases
632
    map(testq.put, cases)
633
    map(testq.put, ["TEST_RUNNER_TERMINATE"] * len(runners))
634

    
635
    log.debug("Joining %d processes", len(runners))
636
    for p in runners:
637
        p.join()
638
    log.debug("Done joining %d processes", len(runners))
639

    
640

    
641
def _spawn_server_test_case(**kwargs):
642
    """Construct a new unit test case class from SpawnServerTestCase"""
643

    
644
    name = "SpawnServerTestCase_%d" % kwargs["imageid"]
645
    cls = type(name, (SpawnServerTestCase,), kwargs)
646

    
647
    # Patch extra parameters into test names by manipulating method docstrings
648
    for (mname, m) in \
649
        inspect.getmembers(cls, lambda x: inspect.ismethod(x)):
650
            if hasattr(m, __doc__):
651
                m.__func__.__doc__ = "[%s] %s" % (imagename, m.__doc__)
652

    
653
    # Make sure the class can be pickled, by listing it among
654
    # the attributes of __main__. A PicklingError is raised otherwise.
655
    setattr(__main__, name, cls)
656
    return cls
657

    
658

    
659
def cleanup_servers(delete_stale=False):
660

    
661
    conf = Config()
662
    conf.set('compute_token', TOKEN)
663
    c = ComputeClient(conf)
664

    
665
    servers = c.list_servers()
666
    stale = [s for s in servers if s["name"].startswith(SNF_TEST_PREFIX)]
667

    
668
    if len(stale) == 0:
669
        return
670

    
671
    print >> sys.stderr, "Found these stale servers from previous runs:"
672
    print "    " + \
673
          "\n    ".join(["%d: %s" % (s["id"], s["name"]) for s in stale])
674

    
675
    if delete_stale:
676
        print >> sys.stderr, "Deleting %d stale servers:" % len(stale)
677
        for server in stale:
678
            c.delete_server(server["id"])
679
        print >> sys.stderr, "    ...done"
680
    else:
681
        print >> sys.stderr, "Use --delete-stale to delete them."
682

    
683

    
684
def parse_arguments(args):
685
    from optparse import OptionParser
686

    
687
    kw = {}
688
    kw["usage"] = "%prog [options]"
689
    kw["description"] = \
690
        "%prog runs a number of test scenarios on a " \
691
        "Synnefo deployment."
692

    
693
    parser = OptionParser(**kw)
694
    parser.disable_interspersed_args()
695
    parser.add_option("--api",
696
                      action="store", type="string", dest="api",
697
                      help="The API URI to use to reach the Synnefo API",
698
                      default=DEFAULT_API)
699
    parser.add_option("--token",
700
                      action="store", type="string", dest="token",
701
                      help="The token to use for authentication to the API")
702
    parser.add_option("--nofailfast",
703
                      action="store_true", dest="nofailfast",
704
                      help="Do not fail immediately if one of the tests " \
705
                           "fails (EXPERIMENTAL)",
706
                      default=False)
707
    parser.add_option("--action-timeout",
708
                      action="store", type="int", dest="action_timeout",
709
                      metavar="TIMEOUT",
710
                      help="Wait SECONDS seconds for a server action to " \
711
                           "complete, then the test is considered failed",
712
                      default=20)
713
    parser.add_option("--build-warning",
714
                      action="store", type="int", dest="build_warning",
715
                      metavar="TIMEOUT",
716
                      help="Warn if TIMEOUT seconds have passed and a " \
717
                           "build operation is still pending",
718
                      default=600)
719
    parser.add_option("--build-fail",
720
                      action="store", type="int", dest="build_fail",
721
                      metavar="BUILD_TIMEOUT",
722
                      help="Fail the test if TIMEOUT seconds have passed " \
723
                           "and a build operation is still incomplete",
724
                      default=900)
725
    parser.add_option("--query-interval",
726
                      action="store", type="int", dest="query_interval",
727
                      metavar="INTERVAL",
728
                      help="Query server status when requests are pending " \
729
                           "every INTERVAL seconds",
730
                      default=3)
731
    parser.add_option("--fanout",
732
                      action="store", type="int", dest="fanout",
733
                      metavar="COUNT",
734
                      help="Spawn up to COUNT child processes to execute " \
735
                           "in parallel, essentially have up to COUNT " \
736
                           "server build requests outstanding (EXPERIMENTAL)",
737
                      default=1)
738
    parser.add_option("--force-flavor",
739
                      action="store", type="int", dest="force_flavorid",
740
                      metavar="FLAVOR ID",
741
                      help="Force all server creations to use the specified "\
742
                           "FLAVOR ID instead of a randomly chosen one, " \
743
                           "useful if disk space is scarce",
744
                      default=None)
745
    parser.add_option("--image-id",
746
                      action="store", type="string", dest="force_imageid",
747
                      metavar="IMAGE ID",
748
                      help="Test the specified image id, use 'all' to test " \
749
                           "all available images (mandatory argument)",
750
                      default=None)
751
    parser.add_option("--show-stale",
752
                      action="store_true", dest="show_stale",
753
                      help="Show stale servers from previous runs, whose "\
754
                           "name starts with `%s'" % SNF_TEST_PREFIX,
755
                      default=False)
756
    parser.add_option("--delete-stale",
757
                      action="store_true", dest="delete_stale",
758
                      help="Delete stale servers from previous runs, whose "\
759
                           "name starts with `%s'" % SNF_TEST_PREFIX,
760
                      default=False)
761

    
762
    # FIXME: Change the default for build-fanout to 10
763
    # FIXME: Allow the user to specify a specific set of Images to test
764

    
765
    (opts, args) = parser.parse_args(args)
766

    
767
    # Verify arguments
768
    if opts.delete_stale:
769
        opts.show_stale = True
770

    
771
    if not opts.show_stale:
772
        if not opts.force_imageid:
773
            print >>sys.stderr, "The --image-id argument is mandatory."
774
            parser.print_help()
775
            sys.exit(1)
776

    
777
        if opts.force_imageid != 'all':
778
            try:
779
                opts.force_imageid = int(opts.force_imageid)
780
            except ValueError:
781
                print >>sys.stderr, "Invalid value specified for --image-id." \
782
                                    "Use a numeric id, or `all'."
783
                sys.exit(1)
784

    
785
    return (opts, args)
786

    
787

    
788
def main():
789
    """Assemble test cases into a test suite, and run it
790

791
    IMPORTANT: Tests have dependencies and have to be run in the specified
792
    order inside a single test case. They communicate through attributes of the
793
    corresponding TestCase class (shared fixtures). Distinct subclasses of
794
    TestCase MAY SHARE NO DATA, since they are run in parallel, in distinct
795
    test runner processes.
796

797
    """
798
    (opts, args) = parse_arguments(sys.argv[1:])
799

    
800
    global API, TOKEN
801
    API = opts.api
802
    TOKEN = opts.token
803

    
804
    # Cleanup stale servers from previous runs
805
    if opts.show_stale:
806
        cleanup_servers(delete_stale=opts.delete_stale)
807
        return 0
808

    
809
    # Initialize a kamaki instance, get flavors, images
810

    
811
    conf = Config()
812
    conf.set('compute_token', TOKEN)
813
    c = ComputeClient(conf)
814

    
815
    DIMAGES = c.list_images(detail=True)
816
    DFLAVORS = c.list_flavors(detail=True)
817

    
818
    # FIXME: logging, log, LOG PID, TEST_RUN_ID, arguments
819
    # FIXME: Network testing? Create, destroy, connect, ping, disconnect VMs?
820
    # Run them: FIXME: In parallel, FAILEARLY, catchbreak?
821
    #unittest.main(verbosity=2, catchbreak=True)
822

    
823
    runner = unittest.TextTestRunner(verbosity=2, failfast=not opts.nofailfast)
824
    # The following cases run sequentially
825
    seq_cases = [UnauthorizedTestCase, FlavorsTestCase, ImagesTestCase]
826
    _run_cases_in_parallel(seq_cases, fanout=3, runner=runner)
827

    
828
    # The following cases run in parallel
829
    par_cases = []
830

    
831
    if opts.force_imageid == 'all':
832
        test_images = DIMAGES
833
    else:
834
        test_images = filter(lambda x: x["id"] == opts.force_imageid, DIMAGES)
835

    
836
    for image in test_images:
837
        imageid = image["id"]
838
        imagename = image["name"]
839
        if opts.force_flavorid:
840
            flavorid = opts.force_flavorid
841
        else:
842
            flavorid = choice([f["id"] for f in DFLAVORS if f["disk"] >= 20])
843
        personality = None   # FIXME
844
        servername = "%s%s for %s" % (SNF_TEST_PREFIX, TEST_RUN_ID, imagename)
845
        is_windows = imagename.lower().find("windows") >= 0
846
        case = _spawn_server_test_case(imageid=imageid, flavorid=flavorid,
847
                                       imagename=imagename,
848
                                       personality=personality,
849
                                       servername=servername,
850
                                       is_windows=is_windows,
851
                                       action_timeout=opts.action_timeout,
852
                                       build_warning=opts.build_warning,
853
                                       build_fail=opts.build_fail,
854
                                       query_interval=opts.query_interval)
855
        par_cases.append(case)
856

    
857
    _run_cases_in_parallel(par_cases, fanout=opts.fanout, runner=runner)
858

    
859
if __name__ == "__main__":
860
    sys.exit(main())