Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / tools / burnin.py @ 99d41650

History | View | Annotate | Download (35.2 kB)

1
#!/usr/bin/env python
2

    
3
# Copyright 2011 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35

    
36
"""Perform integration testing on a running Synnefo deployment"""
37

    
38
import __main__
39
import datetime
40
import inspect
41
import logging
42
import os
43
import paramiko
44
import prctl
45
import subprocess
46
import signal
47
import socket
48
import struct
49
import sys
50
import time
51

    
52
from IPy import IP
53
from multiprocessing import Process, Queue
54
from random import choice
55

    
56
from kamaki.clients import ClientError, ComputeClient, CycladesClient
57
from kamaki.config import Config
58

    
59
from vncauthproxy.d3des import generate_response as d3des_generate_response
60

    
61
# Use backported unittest functionality if Python < 2.7
62
try:
63
    import unittest2 as unittest
64
except ImportError:
65
    if sys.version_info < (2, 7):
66
        raise Exception("The unittest2 package is required for Python < 2.7")
67
    import unittest
68

    
69

    
70
API = None
71
TOKEN = None
72
DEFAULT_API = "http://127.0.0.1:8000/api/v1.1"
73

    
74
# A unique id identifying this test run
75
TEST_RUN_ID = datetime.datetime.strftime(datetime.datetime.now(),
76
                                         "%Y%m%d%H%M%S")
77
SNF_TEST_PREFIX = "snf-test-"
78

    
79
# Setup logging (FIXME - verigak)
80
logging.basicConfig(format="%(message)s")
81
log = logging.getLogger("burnin")
82
log.setLevel(logging.INFO)
83

    
84
class UnauthorizedTestCase(unittest.TestCase):
85
    def test_unauthorized_access(self):
86
        """Test access without a valid token fails"""
87
        falseToken = '12345'
88
        conf = Config()
89
        conf.set('compute_token', falseToken)
90
        c=ComputeClient(conf)
91

    
92
        with self.assertRaises(ClientError) as cm:
93
            c.list_servers()
94
        self.assertEqual(cm.exception.status, 401)
95

    
96

    
97
class ImagesTestCase(unittest.TestCase):
98
    """Test image lists for consistency"""
99
    @classmethod
100
    def setUpClass(cls):
101
        """Initialize kamaki, get (detailed) list of images"""
102
        log.info("Getting simple and detailed list of images")
103

    
104
        conf = Config()
105
        conf.set('compute_token', TOKEN)
106
        cls.client = ComputeClient(conf)
107
        cls.images = cls.client.list_images()
108
        cls.dimages = cls.client.list_images(detail=True)
109

    
110
    def test_001_list_images(self):
111
        """Test image list actually returns images"""
112
        self.assertGreater(len(self.images), 0)
113

    
114
    def test_002_list_images_detailed(self):
115
        """Test detailed image list is the same length as list"""
116
        self.assertEqual(len(self.dimages), len(self.images))
117

    
118
    def test_003_same_image_names(self):
119
        """Test detailed and simple image list contain same names"""
120
        names = sorted(map(lambda x: x["name"], self.images))
121
        dnames = sorted(map(lambda x: x["name"], self.dimages))
122
        self.assertEqual(names, dnames)
123

    
124
    def test_004_unique_image_names(self):
125
        """Test images have unique names"""
126
        names = sorted(map(lambda x: x["name"], self.images))
127
        self.assertEqual(sorted(list(set(names))), names)
128

    
129
    def test_005_image_metadata(self):
130
        """Test every image has specific metadata defined"""
131
        keys = frozenset(["os", "description", "size"])
132
        for i in self.dimages:
133
            self.assertTrue(keys.issubset(i["metadata"]["values"].keys()))
134

    
135

    
136
class FlavorsTestCase(unittest.TestCase):
137
    """Test flavor lists for consistency"""
138
    @classmethod
139
    def setUpClass(cls):
140
        """Initialize kamaki, get (detailed) list of flavors"""
141
        log.info("Getting simple and detailed list of flavors")
142

    
143
        conf = Config()
144
        conf.set('compute_token', TOKEN)
145
        cls.client = ComputeClient(conf)
146
        cls.flavors = cls.client.list_flavors()
147
        cls.dflavors = cls.client.list_flavors(detail=True)
148

    
149
    def test_001_list_flavors(self):
150
        """Test flavor list actually returns flavors"""
151
        self.assertGreater(len(self.flavors), 0)
152

    
153
    def test_002_list_flavors_detailed(self):
154
        """Test detailed flavor list is the same length as list"""
155
        self.assertEquals(len(self.dflavors), len(self.flavors))
156

    
157
    def test_003_same_flavor_names(self):
158
        """Test detailed and simple flavor list contain same names"""
159
        names = sorted(map(lambda x: x["name"], self.flavors))
160
        dnames = sorted(map(lambda x: x["name"], self.dflavors))
161
        self.assertEqual(names, dnames)
162

    
163
    def test_004_unique_flavor_names(self):
164
        """Test flavors have unique names"""
165
        names = sorted(map(lambda x: x["name"], self.flavors))
166
        self.assertEqual(sorted(list(set(names))), names)
167

    
168
    def test_005_well_formed_flavor_names(self):
169
        """Test flavors have names of the form CxxRyyDzz
170

171
        Where xx is vCPU count, yy is RAM in MiB, zz is Disk in GiB
172

173
        """
174
        for f in self.dflavors:
175
            self.assertEqual("C%dR%dD%d" % (f["cpu"], f["ram"], f["disk"]),
176
                             f["name"],
177
                             "Flavor %s does not match its specs." % f["name"])
178

    
179

    
180
class ServersTestCase(unittest.TestCase):
181
    """Test server lists for consistency"""
182
    @classmethod
183
    def setUpClass(cls):
184
        """Initialize kamaki, get (detailed) list of servers"""
185
        log.info("Getting simple and detailed list of servers")
186

    
187
        conf = Config()
188
        conf.set('compute_token', TOKEN)
189
        cls.client = ComputeClient(conf)
190
        cls.servers = cls.client.list_servers()
191
        cls.dservers = cls.client.list_servers(detail=True)
192

    
193
    def test_001_list_servers(self):
194
        """Test server list actually returns servers"""
195
        self.assertGreater(len(self.servers), 0)
196

    
197
    def test_002_list_servers_detailed(self):
198
        """Test detailed server list is the same length as list"""
199
        self.assertEqual(len(self.dservers), len(self.servers))
200

    
201
    def test_003_same_server_names(self):
202
        """Test detailed and simple flavor list contain same names"""
203
        names = sorted(map(lambda x: x["name"], self.servers))
204
        dnames = sorted(map(lambda x: x["name"], self.dservers))
205
        self.assertEqual(names, dnames)
206

    
207

    
208
# This class gets replicated into actual TestCases dynamically
209
class SpawnServerTestCase(unittest.TestCase):
210
    """Test scenario for server of the specified image"""
211

    
212
    @classmethod
213
    def setUpClass(cls):
214
        """Initialize a kamaki instance"""
215
        log.info("Spawning server for image `%s'", cls.imagename)
216

    
217
        conf = Config()
218
        conf.set('compute_token', TOKEN)
219
        cls.client = ComputeClient(conf)
220
        cls.cyclades = CycladesClient(conf)
221

    
222
    def _get_ipv4(self, server):
223
        """Get the public IPv4 of a server from the detailed server info"""
224

    
225
        public_addrs = filter(lambda x: x["id"] == "public",
226
                              server["addresses"]["values"])
227
        self.assertEqual(len(public_addrs), 1)
228
        ipv4_addrs = filter(lambda x: x["version"] == 4,
229
                            public_addrs[0]["values"])
230
        self.assertEqual(len(ipv4_addrs), 1)
231
        return ipv4_addrs[0]["addr"]
232

    
233
    def _get_ipv6(self, server):
234
        """Get the public IPv6 of a server from the detailed server info"""
235
        public_addrs = filter(lambda x: x["id"] == "public",
236
                              server["addresses"]["values"])
237
        self.assertEqual(len(public_addrs), 1)
238
        ipv6_addrs = filter(lambda x: x["version"] == 6,
239
                            public_addrs[0]["values"])
240
        self.assertEqual(len(ipv6_addrs), 1)
241
        return ipv6_addrs[0]["addr"]
242

    
243
    def _connect_loginname(self, os):
244
        """Return the login name for connections based on the server OS"""
245
        if os in ("Ubuntu", "Kubuntu", "Fedora"):
246
            return "user"
247
        elif os in ("windows", "windows_alpha1"):
248
            return "Administrator"
249
        else:
250
            return "root"
251

    
252
    def _verify_server_status(self, current_status, new_status):
253
        """Verify a server has switched to a specified status"""
254
        server = self.client.get_server_details(self.serverid)
255
        if server["status"] not in (current_status, new_status):
256
            return None  # Do not raise exception, return so the test fails
257
        self.assertEquals(server["status"], new_status)
258

    
259
    def _get_connected_tcp_socket(self, family, host, port):
260
        """Get a connected socket from the specified family to host:port"""
261
        sock = None
262
        for res in \
263
            socket.getaddrinfo(host, port, family, socket.SOCK_STREAM, 0,
264
                               socket.AI_PASSIVE):
265
            af, socktype, proto, canonname, sa = res
266
            try:
267
                sock = socket.socket(af, socktype, proto)
268
            except socket.error as msg:
269
                sock = None
270
                continue
271
            try:
272
                sock.connect(sa)
273
            except socket.error as msg:
274
                sock.close()
275
                sock = None
276
                continue
277
        self.assertIsNotNone(sock)
278
        return sock
279

    
280
    def _ping_once(self, ipv6, ip):
281
        """Test server responds to a single IPv4 or IPv6 ping"""
282
        cmd = "ping%s -c 2 -w 3 %s" % ("6" if ipv6 else "", ip)
283
        ping = subprocess.Popen(cmd, shell=True,
284
                                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
285
        (stdout, stderr) = ping.communicate()
286
        ret = ping.wait()
287
        self.assertEquals(ret, 0)
288

    
289
    def _get_hostname_over_ssh(self, hostip, username, password):
290
        ssh = paramiko.SSHClient()
291
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
292
        try:
293
            ssh.connect(hostip, username=username, password=password)
294
        except socket.error:
295
            raise AssertionError
296
        stdin, stdout, stderr = ssh.exec_command("hostname")
297
        lines = stdout.readlines()
298
        self.assertEqual(len(lines), 1)
299
        return lines[0]
300

    
301
    def _try_until_timeout_expires(self, warn_timeout, fail_timeout,
302
                                   opmsg, callable, *args, **kwargs):
303
        if warn_timeout == fail_timeout:
304
            warn_timeout = fail_timeout + 1
305
        warn_tmout = time.time() + warn_timeout
306
        fail_tmout = time.time() + fail_timeout
307
        while True:
308
            self.assertLess(time.time(), fail_tmout,
309
                            "operation `%s' timed out" % opmsg)
310
            if time.time() > warn_tmout:
311
                log.warning("Server %d: `%s' operation `%s' not done yet",
312
                            self.serverid, self.servername, opmsg)
313
            try:
314
                log.info("%s... " % opmsg)
315
                return callable(*args, **kwargs)
316
            except AssertionError:
317
                pass
318
            time.sleep(self.query_interval)
319

    
320
    def _insist_on_tcp_connection(self, family, host, port):
321
        familystr = {socket.AF_INET: "IPv4", socket.AF_INET6: "IPv6",
322
                     socket.AF_UNSPEC: "Unspecified-IPv4/6"}
323
        msg = "connect over %s to %s:%s" % \
324
              (familystr.get(family, "Unknown"), host, port)
325
        sock = self._try_until_timeout_expires(
326
                self.action_timeout, self.action_timeout,
327
                msg, self._get_connected_tcp_socket,
328
                family, host, port)
329
        return sock
330

    
331
    def _insist_on_status_transition(self, current_status, new_status,
332
                                    fail_timeout, warn_timeout=None):
333
        msg = "Server %d: `%s', waiting for %s -> %s" % \
334
              (self.serverid, self.servername, current_status, new_status)
335
        if warn_timeout is None:
336
            warn_timeout = fail_timeout
337
        self._try_until_timeout_expires(warn_timeout, fail_timeout,
338
                                        msg, self._verify_server_status,
339
                                        current_status, new_status)
340
        # Ensure the status is actually the expected one
341
        server = self.client.get_server_details(self.serverid)
342
        self.assertEquals(server["status"], new_status)
343

    
344
    def _insist_on_ssh_hostname(self, hostip, username, password):
345
        msg = "SSH to %s, as %s/%s" % (hostip, username, password)
346
        hostname = self._try_until_timeout_expires(
347
                self.action_timeout, self.action_timeout,
348
                msg, self._get_hostname_over_ssh,
349
                hostip, username, password)
350

    
351
        # The hostname must be of the form 'prefix-id'
352
        self.assertTrue(hostname.endswith("-%d\n" % self.serverid))
353

    
354
    def _skipIf(self, condition, msg):
355
        if condition:
356
            self.skipTest(msg)
357

    
358
    def test_001_submit_create_server(self):
359
        """Test submit create server request"""
360
        server = self.client.create_server(self.servername, self.flavorid,
361
                                           self.imageid, self.personality)
362
        self.assertEqual(server["name"], self.servername)
363
        self.assertEqual(server["flavorRef"], self.flavorid)
364
        self.assertEqual(server["imageRef"], self.imageid)
365
        self.assertEqual(server["status"], "BUILD")
366

    
367
        # Update class attributes to reflect data on building server
368
        cls = type(self)
369
        cls.serverid = server["id"]
370
        cls.username = None
371
        cls.passwd = server["adminPass"]
372

    
373
    def test_002a_server_is_building_in_list(self):
374
        """Test server is in BUILD state, in server list"""
375
        servers = self.client.list_servers(detail=True)
376
        servers = filter(lambda x: x["name"] == self.servername, servers)
377
        self.assertEqual(len(servers), 1)
378
        server = servers[0]
379
        self.assertEqual(server["name"], self.servername)
380
        self.assertEqual(server["flavorRef"], self.flavorid)
381
        self.assertEqual(server["imageRef"], self.imageid)
382
        self.assertEqual(server["status"], "BUILD")
383

    
384
    def test_002b_server_is_building_in_details(self):
385
        """Test server is in BUILD state, in details"""
386
        server = self.client.get_server_details(self.serverid)
387
        self.assertEqual(server["name"], self.servername)
388
        self.assertEqual(server["flavorRef"], self.flavorid)
389
        self.assertEqual(server["imageRef"], self.imageid)
390
        self.assertEqual(server["status"], "BUILD")
391

    
392
    def test_002c_set_server_metadata(self):
393
        image = self.client.get_image_details(self.imageid)
394
        os = image["metadata"]["values"]["os"]
395
        loginname = image["metadata"]["values"].get("users", None)
396
        self.client.update_server_metadata(self.serverid, OS=os)
397

    
398
        # Determine the username to use for future connections
399
        # to this host
400
        cls = type(self)
401
        cls.username = loginname
402
        if not cls.username:
403
            cls.username = self._connect_loginname(os)
404
        self.assertIsNotNone(cls.username)
405

    
406
    def test_002d_verify_server_metadata(self):
407
        """Test server metadata keys are set based on image metadata"""
408
        servermeta = self.client.get_server_metadata(self.serverid)
409
        imagemeta = self.client.get_image_metadata(self.imageid)
410
        self.assertEqual(servermeta["OS"], imagemeta["os"])
411

    
412
    def test_003_server_becomes_active(self):
413
        """Test server becomes ACTIVE"""
414
        self._insist_on_status_transition("BUILD", "ACTIVE",
415
                                         self.build_fail, self.build_warning)
416

    
417
    def test_003a_get_server_oob_console(self):
418
        """Test getting OOB server console over VNC
419

420
        Implementation of RFB protocol follows
421
        http://www.realvnc.com/docs/rfbproto.pdf.
422

423
        """
424
        
425
        console = self.cyclades.get_server_console(self.serverid)
426
        self.assertEquals(console['type'], "vnc")
427
        sock = self._insist_on_tcp_connection(socket.AF_UNSPEC,
428
                                        console["host"], console["port"])
429

    
430
        # Step 1. ProtocolVersion message (par. 6.1.1)
431
        version = sock.recv(1024)
432
        self.assertEquals(version, 'RFB 003.008\n')
433
        sock.send(version)
434

    
435
        # Step 2. Security (par 6.1.2): Only VNC Authentication supported
436
        sec = sock.recv(1024)
437
        self.assertEquals(list(sec), ['\x01', '\x02'])
438

    
439
        # Step 3. Request VNC Authentication (par 6.1.2)
440
        sock.send('\x02')
441

    
442
        # Step 4. Receive Challenge (par 6.2.2)
443
        challenge = sock.recv(1024)
444
        self.assertEquals(len(challenge), 16)
445

    
446
        # Step 5. DES-Encrypt challenge, use password as key (par 6.2.2)
447
        response = d3des_generate_response(
448
            (console["password"] + '\0' * 8)[:8], challenge)
449
        sock.send(response)
450

    
451
        # Step 6. SecurityResult (par 6.1.3)
452
        result = sock.recv(4)
453
        self.assertEquals(list(result), ['\x00', '\x00', '\x00', '\x00'])
454
        sock.close()
455

    
456
    def test_004_server_has_ipv4(self):
457
        """Test active server has a valid IPv4 address"""
458
        server = self.client.get_server_details(self.serverid)
459
        ipv4 = self._get_ipv4(server)
460
        self.assertEquals(IP(ipv4).version(), 4)
461

    
462
    def test_005_server_has_ipv6(self):
463
        """Test active server has a valid IPv6 address"""
464
        server = self.client.get_server_details(self.serverid)
465
        ipv6 = self._get_ipv6(server)
466
        self.assertEquals(IP(ipv6).version(), 6)
467

    
468
    def test_006_server_responds_to_ping_IPv4(self):
469
        """Test server responds to ping on IPv4 address"""
470
        server = self.client.get_server_details(self.serverid)
471
        ip = self._get_ipv4(server)
472
        self._try_until_timeout_expires(self.action_timeout,
473
                                        self.action_timeout,
474
                                        "PING IPv4 to %s" % ip,
475
                                        self._ping_once,
476
                                        False, ip)
477

    
478
    def test_007_server_responds_to_ping_IPv6(self):
479
        """Test server responds to ping on IPv6 address"""
480
        server = self.client.get_server_details(self.serverid)
481
        ip = self._get_ipv6(server)
482
        self._try_until_timeout_expires(self.action_timeout,
483
                                        self.action_timeout,
484
                                        "PING IPv6 to %s" % ip,
485
                                        self._ping_once,
486
                                        True, ip)
487

    
488
    def test_008_submit_shutdown_request(self):
489
        """Test submit request to shutdown server"""
490
        self.cyclades.shutdown_server(self.serverid)
491

    
492
    def test_009_server_becomes_stopped(self):
493
        """Test server becomes STOPPED"""
494
        self._insist_on_status_transition("ACTIVE", "STOPPED",
495
                                         self.action_timeout,
496
                                         self.action_timeout)
497

    
498
    def test_010_submit_start_request(self):
499
        """Test submit start server request"""
500
        self.cyclades.start_server(self.serverid)
501

    
502
    def test_011_server_becomes_active(self):
503
        """Test server becomes ACTIVE again"""
504
        self._insist_on_status_transition("STOPPED", "ACTIVE",
505
                                         self.action_timeout,
506
                                         self.action_timeout)
507

    
508
    def test_011a_server_responds_to_ping_IPv4(self):
509
        """Test server OS is actually up and running again"""
510
        self.test_006_server_responds_to_ping_IPv4()
511

    
512
    def test_012_ssh_to_server_IPv4(self):
513
        """Test SSH to server public IPv4 works, verify hostname"""
514
        self._skipIf(self.is_windows, "only valid for Linux servers")
515
        server = self.client.get_server_details(self.serverid)
516
        self._insist_on_ssh_hostname(self._get_ipv4(server),
517
                                     self.username, self.passwd)
518

    
519
    def test_013_ssh_to_server_IPv6(self):
520
        """Test SSH to server public IPv6 works, verify hostname"""
521
        self._skipIf(self.is_windows, "only valid for Linux servers")
522
        server = self.client.get_server_details(self.serverid)
523
        self._insist_on_ssh_hostname(self._get_ipv6(server),
524
                                     self.username, self.passwd)
525

    
526
    def test_014_rdp_to_server_IPv4(self):
527
        "Test RDP connection to server public IPv4 works"""
528
        self._skipIf(not self.is_windows, "only valid for Windows servers")
529
        server = self.client.get_server_details(self.serverid)
530
        ipv4 = self._get_ipv4(server)
531
        sock = _insist_on_tcp_connection(socket.AF_INET, ipv4, 3389)
532

    
533
        # No actual RDP processing done. We assume the RDP server is there
534
        # if the connection to the RDP port is successful.
535
        # FIXME: Use rdesktop, analyze exit code? see manpage [costasd]
536
        sock.close()
537

    
538
    def test_015_rdp_to_server_IPv6(self):
539
        "Test RDP connection to server public IPv6 works"""
540
        self._skipIf(not self.is_windows, "only valid for Windows servers")
541
        server = self.client.get_server_details(self.serverid)
542
        ipv6 = self._get_ipv6(server)
543
        sock = _get_tcp_connection(socket.AF_INET6, ipv6, 3389)
544

    
545
        # No actual RDP processing done. We assume the RDP server is there
546
        # if the connection to the RDP port is successful.
547
        sock.close()
548

    
549
    def test_016_personality_is_enforced(self):
550
        """Test file injection for personality enforcement"""
551
        self._skipIf(self.is_windows, "only implemented for Linux servers")
552
        self.assertTrue(False, "test not implemented, will fail")
553

    
554
    def test_017_submit_delete_request(self):
555
        """Test submit request to delete server"""
556
        self.client.delete_server(self.serverid)
557

    
558
    def test_018_server_becomes_deleted(self):
559
        """Test server becomes DELETED"""
560
        self._insist_on_status_transition("ACTIVE", "DELETED",
561
                                         self.action_timeout,
562
                                         self.action_timeout)
563

    
564
    def test_019_server_no_longer_in_server_list(self):
565
        """Test server is no longer in server list"""
566
        servers = self.client.list_servers()
567
        self.assertNotIn(self.serverid, [s["id"] for s in servers])
568

    
569

    
570
class TestRunnerProcess(Process):
571
    """A distinct process used to execute part of the tests in parallel"""
572
    def __init__(self, **kw):
573
        Process.__init__(self, **kw)
574
        kwargs = kw["kwargs"]
575
        self.testq = kwargs["testq"]
576
        self.runner = kwargs["runner"]
577

    
578
    def run(self):
579
        # Make sure this test runner process dies with the parent
580
        # and is not left behind.
581
        #
582
        # WARNING: This uses the prctl(2) call and is
583
        # Linux-specific.
584
        prctl.set_pdeathsig(signal.SIGHUP)
585

    
586
        while True:
587
            log.debug("I am process %d, GETting from queue is %s",
588
                     os.getpid(), self.testq)
589
            msg = self.testq.get()
590
            log.debug("Dequeued msg: %s", msg)
591

    
592
            if msg == "TEST_RUNNER_TERMINATE":
593
                raise SystemExit
594
            elif issubclass(msg, unittest.TestCase):
595
                # Assemble a TestSuite, and run it
596
                suite = unittest.TestLoader().loadTestsFromTestCase(msg)
597
                self.runner.run(suite)
598
            else:
599
                raise Exception("Cannot handle msg: %s" % msg)
600

    
601

    
602

    
603
def _run_cases_in_parallel(cases, fanout=1, runner=None):
604
    """Run instances of TestCase in parallel, in a number of distinct processes
605

606
    The cases iterable specifies the TestCases to be executed in parallel,
607
    by test runners running in distinct processes.
608
    The fanout parameter specifies the number of processes to spawn,
609
    and defaults to 1.
610
    The runner argument specifies the test runner class to use inside each
611
    runner process.
612

613
    """
614
    if runner is None:
615
        runner = unittest.TextTestRunner(verbosity=2, failfast=True)
616

    
617
    # testq: The master process enqueues TestCase objects into this queue,
618
    #        test runner processes pick them up for execution, in parallel.
619
    testq = Queue()
620
    runners = []
621
    for i in xrange(0, fanout):
622
        kwargs = dict(testq=testq, runner=runner)
623
        runners.append(TestRunnerProcess(kwargs=kwargs))
624

    
625
    log.info("Spawning %d test runner processes", len(runners))
626
    for p in runners:
627
        p.start()
628
    log.debug("Spawned %d test runners, PIDs are %s",
629
              len(runners), [p.pid for p in runners])
630

    
631
    # Enqueue test cases
632
    map(testq.put, cases)
633
    map(testq.put, ["TEST_RUNNER_TERMINATE"] * len(runners))
634

    
635
    log.debug("Joining %d processes", len(runners))
636
    for p in runners:
637
        p.join()
638
    log.debug("Done joining %d processes", len(runners))
639

    
640

    
641
def _spawn_server_test_case(**kwargs):
642
    """Construct a new unit test case class from SpawnServerTestCase"""
643

    
644
    name = "SpawnServerTestCase_%s" % kwargs["imageid"]
645
    cls = type(name, (SpawnServerTestCase,), kwargs)
646

    
647
    # Patch extra parameters into test names by manipulating method docstrings
648
    for (mname, m) in \
649
        inspect.getmembers(cls, lambda x: inspect.ismethod(x)):
650
            if hasattr(m, __doc__):
651
                m.__func__.__doc__ = "[%s] %s" % (imagename, m.__doc__)
652

    
653
    # Make sure the class can be pickled, by listing it among
654
    # the attributes of __main__. A PicklingError is raised otherwise.
655
    setattr(__main__, name, cls)
656
    return cls
657

    
658

    
659
def cleanup_servers(delete_stale=False):
660

    
661
    conf = Config()
662
    conf.set('compute_token', TOKEN)
663
    c = ComputeClient(conf)
664

    
665
    servers = c.list_servers()
666
    stale = [s for s in servers if s["name"].startswith(SNF_TEST_PREFIX)]
667

    
668
    if len(stale) == 0:
669
        return
670

    
671
    print >> sys.stderr, "Found these stale servers from previous runs:"
672
    print "    " + \
673
          "\n    ".join(["%d: %s" % (s["id"], s["name"]) for s in stale])
674

    
675
    if delete_stale:
676
        print >> sys.stderr, "Deleting %d stale servers:" % len(stale)
677
        for server in stale:
678
            c.delete_server(server["id"])
679
        print >> sys.stderr, "    ...done"
680
    else:
681
        print >> sys.stderr, "Use --delete-stale to delete them."
682

    
683

    
684
def parse_arguments(args):
685
    from optparse import OptionParser
686

    
687
    kw = {}
688
    kw["usage"] = "%prog [options]"
689
    kw["description"] = \
690
        "%prog runs a number of test scenarios on a " \
691
        "Synnefo deployment."
692

    
693
    parser = OptionParser(**kw)
694
    parser.disable_interspersed_args()
695
    parser.add_option("--api",
696
                      action="store", type="string", dest="api",
697
                      help="The API URI to use to reach the Synnefo API",
698
                      default=DEFAULT_API)
699
    parser.add_option("--token",
700
                      action="store", type="string", dest="token",
701
                      help="The token to use for authentication to the API")
702
    parser.add_option("--nofailfast",
703
                      action="store_true", dest="nofailfast",
704
                      help="Do not fail immediately if one of the tests " \
705
                           "fails (EXPERIMENTAL)",
706
                      default=False)
707
    parser.add_option("--action-timeout",
708
                      action="store", type="int", dest="action_timeout",
709
                      metavar="TIMEOUT",
710
                      help="Wait SECONDS seconds for a server action to " \
711
                           "complete, then the test is considered failed",
712
                      default=50)
713
    parser.add_option("--build-warning",
714
                      action="store", type="int", dest="build_warning",
715
                      metavar="TIMEOUT",
716
                      help="Warn if TIMEOUT seconds have passed and a " \
717
                           "build operation is still pending",
718
                      default=600)
719
    parser.add_option("--build-fail",
720
                      action="store", type="int", dest="build_fail",
721
                      metavar="BUILD_TIMEOUT",
722
                      help="Fail the test if TIMEOUT seconds have passed " \
723
                           "and a build operation is still incomplete",
724
                      default=900)
725
    parser.add_option("--query-interval",
726
                      action="store", type="int", dest="query_interval",
727
                      metavar="INTERVAL",
728
                      help="Query server status when requests are pending " \
729
                           "every INTERVAL seconds",
730
                      default=3)
731
    parser.add_option("--fanout",
732
                      action="store", type="int", dest="fanout",
733
                      metavar="COUNT",
734
                      help="Spawn up to COUNT child processes to execute " \
735
                           "in parallel, essentially have up to COUNT " \
736
                           "server build requests outstanding (EXPERIMENTAL)",
737
                      default=1)
738
    parser.add_option("--force-flavor",
739
                      action="store", type="int", dest="force_flavorid",
740
                      metavar="FLAVOR ID",
741
                      help="Force all server creations to use the specified "\
742
                           "FLAVOR ID instead of a randomly chosen one, " \
743
                           "useful if disk space is scarce",
744
                      default=None)
745
    parser.add_option("--image-id",
746
                      action="store", type="string", dest="force_imageid",
747
                      metavar="IMAGE ID",
748
                      help="Test the specified image id, use 'all' to test " \
749
                           "all available images (mandatory argument)",
750
                      default=None)
751
    parser.add_option("--show-stale",
752
                      action="store_true", dest="show_stale",
753
                      help="Show stale servers from previous runs, whose "\
754
                           "name starts with `%s'" % SNF_TEST_PREFIX,
755
                      default=False)
756
    parser.add_option("--delete-stale",
757
                      action="store_true", dest="delete_stale",
758
                      help="Delete stale servers from previous runs, whose "\
759
                           "name starts with `%s'" % SNF_TEST_PREFIX,
760
                      default=False)
761

    
762
    # FIXME: Change the default for build-fanout to 10
763
    # FIXME: Allow the user to specify a specific set of Images to test
764

    
765
    (opts, args) = parser.parse_args(args)
766

    
767
    # Verify arguments
768
    if opts.delete_stale:
769
        opts.show_stale = True
770

    
771
    if not opts.show_stale:
772
        if not opts.force_imageid:
773
            print >>sys.stderr, "The --image-id argument is mandatory."
774
            parser.print_help()
775
            sys.exit(1)
776

    
777
        if opts.force_imageid != 'all':
778
            try:
779
                opts.force_imageid = str(opts.force_imageid)
780
            except ValueError:
781
                print >>sys.stderr, "Invalid value specified for --image-id." \
782
                                    "Use a numeric id, or `all'."
783
                sys.exit(1)
784

    
785
    return (opts, args)
786

    
787

    
788
def main():
789
    """Assemble test cases into a test suite, and run it
790

791
    IMPORTANT: Tests have dependencies and have to be run in the specified
792
    order inside a single test case. They communicate through attributes of the
793
    corresponding TestCase class (shared fixtures). Distinct subclasses of
794
    TestCase MAY SHARE NO DATA, since they are run in parallel, in distinct
795
    test runner processes.
796

797
    """
798
    (opts, args) = parse_arguments(sys.argv[1:])
799

    
800
    global API, TOKEN
801
    API = opts.api
802
    TOKEN = opts.token
803

    
804
    # Cleanup stale servers from previous runs
805
    if opts.show_stale:
806
        cleanup_servers(delete_stale=opts.delete_stale)
807
        return 0
808

    
809
    # Initialize a kamaki instance, get flavors, images
810

    
811
    conf = Config()
812
    conf.set('compute_token', TOKEN)
813
    c = ComputeClient(conf)
814

    
815
    DIMAGES = c.list_images(detail=True)
816
    DFLAVORS = c.list_flavors(detail=True)
817

    
818
    # FIXME: logging, log, LOG PID, TEST_RUN_ID, arguments
819
    # FIXME: Network testing? Create, destroy, connect, ping, disconnect VMs?
820
    # Run them: FIXME: In parallel, FAILEARLY, catchbreak?
821
    #unittest.main(verbosity=2, catchbreak=True)
822

    
823
    test_images = filter(lambda x: x["id"] == opts.force_imageid, DIMAGES)
824
    for image in test_images:
825
        imageid = str(image["id"])
826
        flavorid = choice([f["id"] for f in DFLAVORS if f["disk"] >= 20])
827
        imagename = image["name"]
828
        personality = None   # FIXME
829
        servername = "%s%s for %s" % (SNF_TEST_PREFIX, TEST_RUN_ID, imagename)
830
        is_windows = imagename.lower().find("windows") >= 0
831
        case = _spawn_server_test_case(imageid=imageid, flavorid=flavorid,
832
                                       imagename=imagename,
833
                                       personality=personality,
834
                                       servername=servername,
835
                                       is_windows=is_windows,
836
                                       action_timeout=opts.action_timeout,
837
                                       build_warning=opts.build_warning,
838
                                       build_fail=opts.build_fail,
839
                                       query_interval=opts.query_interval)
840

    
841

    
842
    seq_cases = [UnauthorizedTestCase, FlavorsTestCase, ImagesTestCase, case]
843

    
844
    for case in seq_cases:
845
        suite = unittest.TestLoader().loadTestsFromTestCase(case)
846
        unittest.TextTestRunner(verbosity=2).run(suite)
847

    
848
    
849

    
850
    # # The Following cases run sequentially
851
    # seq_cases = [UnauthorizedTestCase, FlavorsTestCase, ImagesTestCase]
852
    # _run_cases_in_parallel(seq_cases, fanout=3, runner=runner)
853

    
854
    # # The following cases run in parallel
855
    # par_cases = []
856

    
857
    # if opts.force_imageid == 'all':
858
    #     test_images = DIMAGES
859
    # else:
860
    #     test_images = filter(lambda x: x["id"] == opts.force_imageid, DIMAGES)
861

    
862
    # for image in test_images:
863
    #     imageid = image["id"]
864
    #     imagename = image["name"]
865
    #     if opts.force_flavorid:
866
    #         flavorid = opts.force_flavorid
867
    #     else:
868
    #         flavorid = choice([f["id"] for f in DFLAVORS if f["disk"] >= 20])
869
    #     personality = None   # FIXME
870
    #     servername = "%s%s for %s" % (SNF_TEST_PREFIX, TEST_RUN_ID, imagename)
871
    #     is_windows = imagename.lower().find("windows") >= 0
872
    #     case = _spawn_server_test_case(imageid=str(imageid), flavorid=flavorid,
873
    #                                    imagename=imagename,
874
    #                                    personality=personality,
875
    #                                    servername=servername,
876
    #                                    is_windows=is_windows,
877
    #                                    action_timeout=opts.action_timeout,
878
    #                                    build_warning=opts.build_warning,
879
    #                                    build_fail=opts.build_fail,
880
    #                                    query_interval=opts.query_interval)
881
    #     par_cases.append(case)
882

    
883
    # _run_cases_in_parallel(par_cases, fanout=opts.fanout, runner=runner)
884

    
885
if __name__ == "__main__":
886
    sys.exit(main())