Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / tools / burnin.py @ 1c636ad6

History | View | Annotate | Download (33.7 kB)

1
#!/usr/bin/env python
2

    
3
# Copyright 2011 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35

    
36
"""Perform integration testing on a running Synnefo deployment"""
37

    
38
import __main__
39
import datetime
40
import inspect
41
import logging
42
import os
43
import paramiko
44
import prctl
45
import subprocess
46
import signal
47
import socket
48
import struct
49
import sys
50
import time
51

    
52
from IPy import IP
53
from multiprocessing import Process, Queue
54
from random import choice
55

    
56
from kamaki.clients import ClientError, ComputeClient, CycladesClient
57
from kamaki.config import Config
58

    
59
from vncauthproxy.d3des import generate_response as d3des_generate_response
60

    
61
# Use backported unittest functionality if Python < 2.7
62
try:
63
    import unittest2 as unittest
64
except ImportError:
65
    if sys.version_info < (2, 7):
66
        raise Exception("The unittest2 package is required for Python < 2.7")
67
    import unittest
68

    
69

    
70
API = None
71
TOKEN = None
72
DEFAULT_API = "http://127.0.0.1:8000/api/v1.1"
73

    
74
# A unique id identifying this test run
75
TEST_RUN_ID = datetime.datetime.strftime(datetime.datetime.now(),
76
                                         "%Y%m%d%H%M%S")
77
SNF_TEST_PREFIX = "snf-test-"
78

    
79
# Setup logging (FIXME - verigak)
80
logging.basicConfig(format="%(message)s")
81
log = logging.getLogger("burnin")
82
log.setLevel(logging.INFO)
83

    
84

    
85
class UnauthorizedTestCase(unittest.TestCase):
86
    def test_unauthorized_access(self):
87
        """Test access without a valid token fails"""
88
        falseToken = '12345'
89
        conf = Config()
90
        conf.set('compute_token', falseToken)
91

    
92
        with self.assertRaises(ClientError) as cm:
93
            c.list_servers()
94
        self.assertEqual(cm.exception.status, 401)
95

    
96

    
97
class ImagesTestCase(unittest.TestCase):
98
    """Test image lists for consistency"""
99
    @classmethod
100
    def setUpClass(cls):
101
        """Initialize kamaki, get (detailed) list of images"""
102
        log.info("Getting simple and detailed list of images")
103

    
104
        conf = Config()
105
        conf.set('compute_token', TOKEN)
106
        cls.client = ComputeClient(conf)
107
        cls.images = cls.client.list_images()
108
        cls.dimages = cls.client.list_images(detail=True)
109

    
110
    def test_001_list_images(self):
111
        """Test image list actually returns images"""
112
        self.assertGreater(len(self.images), 0)
113

    
114
    def test_002_list_images_detailed(self):
115
        """Test detailed image list is the same length as list"""
116
        self.assertEqual(len(self.dimages), len(self.images))
117

    
118
    def test_003_same_image_names(self):
119
        """Test detailed and simple image list contain same names"""
120
        names = sorted(map(lambda x: x["name"], self.images))
121
        dnames = sorted(map(lambda x: x["name"], self.dimages))
122
        self.assertEqual(names, dnames)
123

    
124
    def test_004_unique_image_names(self):
125
        """Test images have unique names"""
126
        names = sorted(map(lambda x: x["name"], self.images))
127
        self.assertEqual(sorted(list(set(names))), names)
128

    
129
    def test_005_image_metadata(self):
130
        """Test every image has specific metadata defined"""
131
        keys = frozenset(["os", "description", "size"])
132
        for i in self.dimages:
133
            self.assertTrue(keys.issubset(i["metadata"]["values"].keys()))
134

    
135

    
136
class FlavorsTestCase(unittest.TestCase):
137
    """Test flavor lists for consistency"""
138
    @classmethod
139
    def setUpClass(cls):
140
        """Initialize kamaki, get (detailed) list of flavors"""
141
        log.info("Getting simple and detailed list of flavors")
142

    
143
        conf = Config()
144
        conf.set('compute_token', TOKEN)
145
        cls.client = ComputeClient(conf)
146
        cls.flavors = cls.client.list_flavors()
147
        cls.dflavors = cls.client.list_flavors(detail=True)
148

    
149
    def test_001_list_flavors(self):
150
        """Test flavor list actually returns flavors"""
151
        self.assertGreater(len(self.flavors), 0)
152

    
153
    def test_002_list_flavors_detailed(self):
154
        """Test detailed flavor list is the same length as list"""
155
        self.assertEquals(len(self.dflavors), len(self.flavors))
156

    
157
    def test_003_same_flavor_names(self):
158
        """Test detailed and simple flavor list contain same names"""
159
        names = sorted(map(lambda x: x["name"], self.flavors))
160
        dnames = sorted(map(lambda x: x["name"], self.dflavors))
161
        self.assertEqual(names, dnames)
162

    
163
    def test_004_unique_flavor_names(self):
164
        """Test flavors have unique names"""
165
        names = sorted(map(lambda x: x["name"], self.flavors))
166
        self.assertEqual(sorted(list(set(names))), names)
167

    
168
    def test_005_well_formed_flavor_names(self):
169
        """Test flavors have names of the form CxxRyyDzz
170

171
        Where xx is vCPU count, yy is RAM in MiB, zz is Disk in GiB
172

173
        """
174
        for f in self.dflavors:
175
            self.assertEqual("C%dR%dD%d" % (f["cpu"], f["ram"], f["disk"]),
176
                             f["name"],
177
                             "Flavor %s does not match its specs." % f["name"])
178

    
179

    
180
class ServersTestCase(unittest.TestCase):
181
    """Test server lists for consistency"""
182
    @classmethod
183
    def setUpClass(cls):
184
        """Initialize kamaki, get (detailed) list of servers"""
185
        log.info("Getting simple and detailed list of servers")
186

    
187
        conf = Config()
188
        conf.set('compute_token', TOKEN)
189
        cls.client = ComputeClient(conf)
190
        cls.servers = cls.client.list_servers()
191
        cls.dservers = cls.client.list_servers(detail=True)
192

    
193
    def test_001_list_servers(self):
194
        """Test server list actually returns servers"""
195
        self.assertGreater(len(self.servers), 0)
196

    
197
    def test_002_list_servers_detailed(self):
198
        """Test detailed server list is the same length as list"""
199
        self.assertEqual(len(self.dservers), len(self.servers))
200

    
201
    def test_003_same_server_names(self):
202
        """Test detailed and simple flavor list contain same names"""
203
        names = sorted(map(lambda x: x["name"], self.servers))
204
        dnames = sorted(map(lambda x: x["name"], self.dservers))
205
        self.assertEqual(names, dnames)
206

    
207

    
208
# This class gets replicated into actual TestCases dynamically
209
class SpawnServerTestCase(unittest.TestCase):
210
    """Test scenario for server of the specified image"""
211

    
212
    @classmethod
213
    def setUpClass(cls):
214
        """Initialize a kamaki instance"""
215
        log.info("Spawning server for image `%s'", cls.imagename)
216

    
217
        conf = Config()
218
        conf.set('compute_token', TOKEN)
219
        cls.client = ComputeClient(conf)
220

    
221
    def _get_ipv4(self, server):
222
        """Get the public IPv4 of a server from the detailed server info"""
223

    
224
        details = 
225
        public_addrs = filter(lambda x: x["id"] == "public",
226
                              server["addresses"]["values"])
227
        self.assertEqual(len(public_addrs), 1)
228
        ipv4_addrs = filter(lambda x: x["version"] == 4,
229
                            public_addrs[0]["values"])
230
        self.assertEqual(len(ipv4_addrs), 1)
231
        return ipv4_addrs[0]["addr"]
232

    
233
    def _get_ipv6(self, server):
234
        """Get the public IPv6 of a server from the detailed server info"""
235
        public_addrs = filter(lambda x: x["id"] == "public",
236
                              server["addresses"]["values"])
237
        self.assertEqual(len(public_addrs), 1)
238
        ipv6_addrs = filter(lambda x: x["version"] == 6,
239
                            public_addrs[0]["values"])
240
        self.assertEqual(len(ipv6_addrs), 1)
241
        return ipv6_addrs[0]["addr"]
242

    
243
    def _connect_loginname(self, os):
244
        """Return the login name for connections based on the server OS"""
245
        if os in ("Ubuntu", "Kubuntu", "Fedora"):
246
            return "user"
247
        elif os in ("windows", "windows_alpha1"):
248
            return "Administrator"
249
        else:
250
            return "root"
251

    
252
    def _verify_server_status(self, current_status, new_status):
253
        """Verify a server has switched to a specified status"""
254
        server = self.client.get_server_details(self.serverid)
255
        if server["status"] not in (current_status, new_status):
256
            return None  # Do not raise exception, return so the test fails
257
        self.assertEquals(server["status"], new_status)
258

    
259
    def _get_connected_tcp_socket(self, family, host, port):
260
        """Get a connected socket from the specified family to host:port"""
261
        sock = None
262
        for res in \
263
            socket.getaddrinfo(host, port, family, socket.SOCK_STREAM, 0,
264
                               socket.AI_PASSIVE):
265
            af, socktype, proto, canonname, sa = res
266
            try:
267
                sock = socket.socket(af, socktype, proto)
268
            except socket.error as msg:
269
                sock = None
270
                continue
271
            try:
272
                sock.connect(sa)
273
            except socket.error as msg:
274
                sock.close()
275
                sock = None
276
                continue
277
        self.assertIsNotNone(sock)
278
        return sock
279

    
280
    def _ping_once(self, ipv6, ip):
281
        """Test server responds to a single IPv4 or IPv6 ping"""
282
        cmd = "ping%s -c 2 -w 3 %s" % ("6" if ipv6 else "", ip)
283
        ping = subprocess.Popen(cmd, shell=True,
284
                                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
285
        (stdout, stderr) = ping.communicate()
286
        ret = ping.wait()
287
        self.assertEquals(ret, 0)
288

    
289
    def _get_hostname_over_ssh(self, hostip, username, password):
290
        ssh = paramiko.SSHClient()
291
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
292
        try:
293
            ssh.connect(hostip, username=username, password=password)
294
        except socket.error:
295
            raise AssertionError
296
        stdin, stdout, stderr = ssh.exec_command("hostname")
297
        lines = stdout.readlines()
298
        self.assertEqual(len(lines), 1)
299
        return lines[0]
300

    
301
    def _try_until_timeout_expires(self, warn_timeout, fail_timeout,
302
                                   opmsg, callable, *args, **kwargs):
303
        if warn_timeout == fail_timeout:
304
            warn_timeout = fail_timeout + 1
305
        warn_tmout = time.time() + warn_timeout
306
        fail_tmout = time.time() + fail_timeout
307
        while True:
308
            self.assertLess(time.time(), fail_tmout,
309
                            "operation `%s' timed out" % opmsg)
310
            if time.time() > warn_tmout:
311
                log.warning("Server %d: `%s' operation `%s' not done yet",
312
                            self.serverid, self.servername, opmsg)
313
            try:
314
                log.info("%s... " % opmsg)
315
                return callable(*args, **kwargs)
316
            except AssertionError:
317
                pass
318
            time.sleep(self.query_interval)
319

    
320
    def _insist_on_tcp_connection(self, family, host, port):
321
        familystr = {socket.AF_INET: "IPv4", socket.AF_INET6: "IPv6",
322
                     socket.AF_UNSPEC: "Unspecified-IPv4/6"}
323
        msg = "connect over %s to %s:%s" % \
324
              (familystr.get(family, "Unknown"), host, port)
325
        sock = self._try_until_timeout_expires(
326
                self.action_timeout, self.action_timeout,
327
                msg, self._get_connected_tcp_socket,
328
                family, host, port)
329
        return sock
330

    
331
    def _insist_on_status_transition(self, current_status, new_status,
332
                                    fail_timeout, warn_timeout=None):
333
        msg = "Server %d: `%s', waiting for %s -> %s" % \
334
              (self.serverid, self.servername, current_status, new_status)
335
        if warn_timeout is None:
336
            warn_timeout = fail_timeout
337
        self._try_until_timeout_expires(warn_timeout, fail_timeout,
338
                                        msg, self._verify_server_status,
339
                                        current_status, new_status)
340
        # Ensure the status is actually the expected one
341
        server = self.client.get_server_details(self.serverid)
342
        self.assertEquals(server["status"], new_status)
343

    
344
    def _insist_on_ssh_hostname(self, hostip, username, password):
345
        msg = "SSH to %s, as %s/%s" % (hostip, username, password)
346
        hostname = self._try_until_timeout_expires(
347
                self.action_timeout, self.action_timeout,
348
                msg, self._get_hostname_over_ssh,
349
                hostip, username, password)
350

    
351
        # The hostname must be of the form 'prefix-id'
352
        self.assertTrue(hostname.endswith("-%d\n" % self.serverid))
353

    
354
    def _skipIf(self, condition, msg):
355
        if condition:
356
            self.skipTest(msg)
357

    
358
    def test_001_submit_create_server(self):
359
        """Test submit create server request"""
360
        server = self.client.create_server(self.servername, self.flavorid,
361
                                           self.imageid, self.personality)
362
        self.assertEqual(server["name"], self.servername)
363
        self.assertEqual(server["flavorRef"], self.flavorid)
364
        self.assertEqual(server["imageRef"], self.imageid)
365
        self.assertEqual(server["status"], "BUILD")
366

    
367
        # Update class attributes to reflect data on building server
368
        cls = type(self)
369
        cls.serverid = server["id"]
370
        cls.username = None
371
        cls.passwd = server["adminPass"]
372

    
373
    def test_002a_server_is_building_in_list(self):
374
        """Test server is in BUILD state, in server list"""
375
        servers = self.client.list_servers(detail=True)
376
        servers = filter(lambda x: x["name"] == self.servername, servers)
377
        self.assertEqual(len(servers), 1)
378
        server = servers[0]
379
        self.assertEqual(server["name"], self.servername)
380
        self.assertEqual(server["flavorRef"], self.flavorid)
381
        self.assertEqual(server["imageRef"], self.imageid)
382
        self.assertEqual(server["status"], "BUILD")
383

    
384
    def test_002b_server_is_building_in_details(self):
385
        """Test server is in BUILD state, in details"""
386
        server = self.client.get_server_details(self.serverid)
387
        self.assertEqual(server["name"], self.servername)
388
        self.assertEqual(server["flavorRef"], self.flavorid)
389
        self.assertEqual(server["imageRef"], self.imageid)
390
        self.assertEqual(server["status"], "BUILD")
391

    
392
    def test_002c_set_server_metadata(self):
393
        image = self.client.get_image_details(self.imageid)
394
        os = image["metadata"]["values"]["OS"]
395
        loginname = image["metadata"]["values"].get("loginname", None)
396
        self.client.update_server_metadata(self.serverid, OS=os)
397

    
398
        # Determine the username to use for future connections
399
        # to this host
400
        cls = type(self)
401
        cls.username = loginname
402
        if not cls.username:
403
            cls.username = self._connect_loginname(os)
404
        self.assertIsNotNone(cls.username)
405

    
406
    def test_002d_verify_server_metadata(self):
407
        """Test server metadata keys are set based on image metadata"""
408
        servermeta = self.client.get_server_metadata(self.serverid)
409
        imagemeta = self.client.get_image_metadata(self.imageid)
410
        self.assertEqual(servermeta["OS"], imagemeta["OS"])
411

    
412
    def test_003_server_becomes_active(self):
413
        """Test server becomes ACTIVE"""
414
        self._insist_on_status_transition("BUILD", "ACTIVE",
415
                                         self.build_fail, self.build_warning)
416

    
417
    def test_003a_get_server_oob_console(self):
418
        """Test getting OOB server console over VNC
419

420
        Implementation of RFB protocol follows
421
        http://www.realvnc.com/docs/rfbproto.pdf.
422

423
        """
424
        console = self.client.get_server_console(self.serverid)
425
        self.assertEquals(console['type'], "vnc")
426
        sock = self._insist_on_tcp_connection(socket.AF_UNSPEC,
427
                                        console["host"], console["port"])
428

    
429
        # Step 1. ProtocolVersion message (par. 6.1.1)
430
        version = sock.recv(1024)
431
        self.assertEquals(version, 'RFB 003.008\n')
432
        sock.send(version)
433

    
434
        # Step 2. Security (par 6.1.2): Only VNC Authentication supported
435
        sec = sock.recv(1024)
436
        self.assertEquals(list(sec), ['\x01', '\x02'])
437

    
438
        # Step 3. Request VNC Authentication (par 6.1.2)
439
        sock.send('\x02')
440

    
441
        # Step 4. Receive Challenge (par 6.2.2)
442
        challenge = sock.recv(1024)
443
        self.assertEquals(len(challenge), 16)
444

    
445
        # Step 5. DES-Encrypt challenge, use password as key (par 6.2.2)
446
        response = d3des_generate_response(
447
            (console["password"] + '\0' * 8)[:8], challenge)
448
        sock.send(response)
449

    
450
        # Step 6. SecurityResult (par 6.1.3)
451
        result = sock.recv(4)
452
        self.assertEquals(list(result), ['\x00', '\x00', '\x00', '\x00'])
453
        sock.close()
454

    
455
    def test_004_server_has_ipv4(self):
456
        """Test active server has a valid IPv4 address"""
457
        server = self.client.get_server_details(self.serverid)
458
        ipv4 = self._get_ipv4(server)
459
        self.assertEquals(IP(ipv4).version(), 4)
460

    
461
    def test_005_server_has_ipv6(self):
462
        """Test active server has a valid IPv6 address"""
463
        server = self.client.get_server_details(self.serverid)
464
        ipv6 = self._get_ipv6(server)
465
        self.assertEquals(IP(ipv6).version(), 6)
466

    
467
    def test_006_server_responds_to_ping_IPv4(self):
468
        """Test server responds to ping on IPv4 address"""
469
        server = self.client.get_server_details(self.serverid)
470
        ip = self._get_ipv4(server)
471
        self._try_until_timeout_expires(self.action_timeout,
472
                                        self.action_timeout,
473
                                        "PING IPv4 to %s" % ip,
474
                                        self._ping_once,
475
                                        False, ip)
476

    
477
    def test_007_server_responds_to_ping_IPv6(self):
478
        """Test server responds to ping on IPv6 address"""
479
        server = self.client.get_server_details(self.serverid)
480
        ip = self._get_ipv6(server)
481
        self._try_until_timeout_expires(self.action_timeout,
482
                                        self.action_timeout,
483
                                        "PING IPv6 to %s" % ip,
484
                                        self._ping_once,
485
                                        True, ip)
486

    
487
    def test_008_submit_shutdown_request(self):
488
        """Test submit request to shutdown server"""
489
        self.client.shutdown_server(self.serverid)
490

    
491
    def test_009_server_becomes_stopped(self):
492
        """Test server becomes STOPPED"""
493
        self._insist_on_status_transition("ACTIVE", "STOPPED",
494
                                         self.action_timeout,
495
                                         self.action_timeout)
496

    
497
    def test_010_submit_start_request(self):
498
        """Test submit start server request"""
499
        self.client.start_server(self.serverid)
500

    
501
    def test_011_server_becomes_active(self):
502
        """Test server becomes ACTIVE again"""
503
        self._insist_on_status_transition("STOPPED", "ACTIVE",
504
                                         self.action_timeout,
505
                                         self.action_timeout)
506

    
507
    def test_011a_server_responds_to_ping_IPv4(self):
508
        """Test server OS is actually up and running again"""
509
        self.test_006_server_responds_to_ping_IPv4()
510

    
511
    def test_012_ssh_to_server_IPv4(self):
512
        """Test SSH to server public IPv4 works, verify hostname"""
513
        self._skipIf(self.is_windows, "only valid for Linux servers")
514
        server = self.client.get_server_details(self.serverid)
515
        self._insist_on_ssh_hostname(self._get_ipv4(server),
516
                                     self.username, self.passwd)
517

    
518
    def test_013_ssh_to_server_IPv6(self):
519
        """Test SSH to server public IPv6 works, verify hostname"""
520
        self._skipIf(self.is_windows, "only valid for Linux servers")
521
        server = self.client.get_server_details(self.serverid)
522
        self._insist_on_ssh_hostname(self._get_ipv6(server),
523
                                     self.username, self.passwd)
524

    
525
    def test_014_rdp_to_server_IPv4(self):
526
        "Test RDP connection to server public IPv4 works"""
527
        self._skipIf(not self.is_windows, "only valid for Windows servers")
528
        server = self.client.get_server_details(self.serverid)
529
        ipv4 = self._get_ipv4(server)
530
        sock = _insist_on_tcp_connection(socket.AF_INET, ipv4, 3389)
531

    
532
        # No actual RDP processing done. We assume the RDP server is there
533
        # if the connection to the RDP port is successful.
534
        # FIXME: Use rdesktop, analyze exit code? see manpage [costasd]
535
        sock.close()
536

    
537
    def test_015_rdp_to_server_IPv6(self):
538
        "Test RDP connection to server public IPv6 works"""
539
        self._skipIf(not self.is_windows, "only valid for Windows servers")
540
        server = self.client.get_server_details(self.serverid)
541
        ipv6 = self._get_ipv6(server)
542
        sock = _get_tcp_connection(socket.AF_INET6, ipv6, 3389)
543

    
544
        # No actual RDP processing done. We assume the RDP server is there
545
        # if the connection to the RDP port is successful.
546
        sock.close()
547

    
548
    def test_016_personality_is_enforced(self):
549
        """Test file injection for personality enforcement"""
550
        self._skipIf(self.is_windows, "only implemented for Linux servers")
551
        self.assertTrue(False, "test not implemented, will fail")
552

    
553
    def test_017_submit_delete_request(self):
554
        """Test submit request to delete server"""
555
        self.client.delete_server(self.serverid)
556

    
557
    def test_018_server_becomes_deleted(self):
558
        """Test server becomes DELETED"""
559
        self._insist_on_status_transition("ACTIVE", "DELETED",
560
                                         self.action_timeout,
561
                                         self.action_timeout)
562

    
563
    def test_019_server_no_longer_in_server_list(self):
564
        """Test server is no longer in server list"""
565
        servers = self.client.list_servers()
566
        self.assertNotIn(self.serverid, [s["id"] for s in servers])
567

    
568

    
569
class TestRunnerProcess(Process):
570
    """A distinct process used to execute part of the tests in parallel"""
571
    def __init__(self, **kw):
572
        Process.__init__(self, **kw)
573
        kwargs = kw["kwargs"]
574
        self.testq = kwargs["testq"]
575
        self.runner = kwargs["runner"]
576

    
577
    def run(self):
578
        # Make sure this test runner process dies with the parent
579
        # and is not left behind.
580
        #
581
        # WARNING: This uses the prctl(2) call and is
582
        # Linux-specific.
583
        prctl.set_pdeathsig(signal.SIGHUP)
584

    
585
        while True:
586
            log.debug("I am process %d, GETting from queue is %s",
587
                     os.getpid(), self.testq)
588
            msg = self.testq.get()
589
            log.debug("Dequeued msg: %s", msg)
590

    
591
            if msg == "TEST_RUNNER_TERMINATE":
592
                raise SystemExit
593
            elif issubclass(msg, unittest.TestCase):
594
                # Assemble a TestSuite, and run it
595
                suite = unittest.TestLoader().loadTestsFromTestCase(msg)
596
                self.runner.run(suite)
597
            else:
598
                raise Exception("Cannot handle msg: %s" % msg)
599

    
600

    
601
def _run_cases_in_parallel(cases, fanout=1, runner=None):
602
    """Run instances of TestCase in parallel, in a number of distinct processes
603

604
    The cases iterable specifies the TestCases to be executed in parallel,
605
    by test runners running in distinct processes.
606
    The fanout parameter specifies the number of processes to spawn,
607
    and defaults to 1.
608
    The runner argument specifies the test runner class to use inside each
609
    runner process.
610

611
    """
612
    if runner is None:
613
        runner = unittest.TextTestRunner(verbosity=2, failfast=True)
614

    
615
    # testq: The master process enqueues TestCase objects into this queue,
616
    #        test runner processes pick them up for execution, in parallel.
617
    testq = Queue()
618
    runners = []
619
    for i in xrange(0, fanout):
620
        kwargs = dict(testq=testq, runner=runner)
621
        runners.append(TestRunnerProcess(kwargs=kwargs))
622

    
623
    log.info("Spawning %d test runner processes", len(runners))
624
    for p in runners:
625
        p.start()
626
    log.debug("Spawned %d test runners, PIDs are %s",
627
              len(runners), [p.pid for p in runners])
628

    
629
    # Enqueue test cases
630
    map(testq.put, cases)
631
    map(testq.put, ["TEST_RUNNER_TERMINATE"] * len(runners))
632

    
633
    log.debug("Joining %d processes", len(runners))
634
    for p in runners:
635
        p.join()
636
    log.debug("Done joining %d processes", len(runners))
637

    
638

    
639
def _spawn_server_test_case(**kwargs):
640
    """Construct a new unit test case class from SpawnServerTestCase"""
641

    
642
    name = "SpawnServerTestCase_%d" % kwargs["imageid"]
643
    cls = type(name, (SpawnServerTestCase,), kwargs)
644

    
645
    # Patch extra parameters into test names by manipulating method docstrings
646
    for (mname, m) in \
647
        inspect.getmembers(cls, lambda x: inspect.ismethod(x)):
648
            if hasattr(m, __doc__):
649
                m.__func__.__doc__ = "[%s] %s" % (imagename, m.__doc__)
650

    
651
    # Make sure the class can be pickled, by listing it among
652
    # the attributes of __main__. A PicklingError is raised otherwise.
653
    setattr(__main__, name, cls)
654
    return cls
655

    
656

    
657
def cleanup_servers(delete_stale=False):
658
    c = Client(API, TOKEN)
659
    servers = c.list_servers()
660
    stale = [s for s in servers if s["name"].startswith(SNF_TEST_PREFIX)]
661

    
662
    if len(stale) == 0:
663
        return
664

    
665
    print >> sys.stderr, "Found these stale servers from previous runs:"
666
    print "    " + \
667
          "\n    ".join(["%d: %s" % (s["id"], s["name"]) for s in stale])
668

    
669
    if delete_stale:
670
        print >> sys.stderr, "Deleting %d stale servers:" % len(stale)
671
        for server in stale:
672
            c.delete_server(server["id"])
673
        print >> sys.stderr, "    ...done"
674
    else:
675
        print >> sys.stderr, "Use --delete-stale to delete them."
676

    
677

    
678
def parse_arguments(args):
679
    from optparse import OptionParser
680

    
681
    kw = {}
682
    kw["usage"] = "%prog [options]"
683
    kw["description"] = \
684
        "%prog runs a number of test scenarios on a " \
685
        "Synnefo deployment."
686

    
687
    parser = OptionParser(**kw)
688
    parser.disable_interspersed_args()
689
    parser.add_option("--api",
690
                      action="store", type="string", dest="api",
691
                      help="The API URI to use to reach the Synnefo API",
692
                      default=DEFAULT_API)
693
    parser.add_option("--token",
694
                      action="store", type="string", dest="token",
695
                      help="The token to use for authentication to the API")
696
    parser.add_option("--nofailfast",
697
                      action="store_true", dest="nofailfast",
698
                      help="Do not fail immediately if one of the tests " \
699
                           "fails (EXPERIMENTAL)",
700
                      default=False)
701
    parser.add_option("--action-timeout",
702
                      action="store", type="int", dest="action_timeout",
703
                      metavar="TIMEOUT",
704
                      help="Wait SECONDS seconds for a server action to " \
705
                           "complete, then the test is considered failed",
706
                      default=20)
707
    parser.add_option("--build-warning",
708
                      action="store", type="int", dest="build_warning",
709
                      metavar="TIMEOUT",
710
                      help="Warn if TIMEOUT seconds have passed and a " \
711
                           "build operation is still pending",
712
                      default=600)
713
    parser.add_option("--build-fail",
714
                      action="store", type="int", dest="build_fail",
715
                      metavar="BUILD_TIMEOUT",
716
                      help="Fail the test if TIMEOUT seconds have passed " \
717
                           "and a build operation is still incomplete",
718
                      default=900)
719
    parser.add_option("--query-interval",
720
                      action="store", type="int", dest="query_interval",
721
                      metavar="INTERVAL",
722
                      help="Query server status when requests are pending " \
723
                           "every INTERVAL seconds",
724
                      default=3)
725
    parser.add_option("--fanout",
726
                      action="store", type="int", dest="fanout",
727
                      metavar="COUNT",
728
                      help="Spawn up to COUNT child processes to execute " \
729
                           "in parallel, essentially have up to COUNT " \
730
                           "server build requests outstanding (EXPERIMENTAL)",
731
                      default=1)
732
    parser.add_option("--force-flavor",
733
                      action="store", type="int", dest="force_flavorid",
734
                      metavar="FLAVOR ID",
735
                      help="Force all server creations to use the specified "\
736
                           "FLAVOR ID instead of a randomly chosen one, " \
737
                           "useful if disk space is scarce",
738
                      default=None)
739
    parser.add_option("--image-id",
740
                      action="store", type="string", dest="force_imageid",
741
                      metavar="IMAGE ID",
742
                      help="Test the specified image id, use 'all' to test " \
743
                           "all available images (mandatory argument)",
744
                      default=None)
745
    parser.add_option("--show-stale",
746
                      action="store_true", dest="show_stale",
747
                      help="Show stale servers from previous runs, whose "\
748
                           "name starts with `%s'" % SNF_TEST_PREFIX,
749
                      default=False)
750
    parser.add_option("--delete-stale",
751
                      action="store_true", dest="delete_stale",
752
                      help="Delete stale servers from previous runs, whose "\
753
                           "name starts with `%s'" % SNF_TEST_PREFIX,
754
                      default=False)
755

    
756
    # FIXME: Change the default for build-fanout to 10
757
    # FIXME: Allow the user to specify a specific set of Images to test
758

    
759
    (opts, args) = parser.parse_args(args)
760

    
761
    # Verify arguments
762
    if opts.delete_stale:
763
        opts.show_stale = True
764

    
765
    if not opts.show_stale:
766
        if not opts.force_imageid:
767
            print >>sys.stderr, "The --image-id argument is mandatory."
768
            parser.print_help()
769
            sys.exit(1)
770

    
771
        if opts.force_imageid != 'all':
772
            try:
773
                opts.force_imageid = int(opts.force_imageid)
774
            except ValueError:
775
                print >>sys.stderr, "Invalid value specified for --image-id." \
776
                                    "Use a numeric id, or `all'."
777
                sys.exit(1)
778

    
779
    return (opts, args)
780

    
781

    
782
def main():
783
    """Assemble test cases into a test suite, and run it
784

785
    IMPORTANT: Tests have dependencies and have to be run in the specified
786
    order inside a single test case. They communicate through attributes of the
787
    corresponding TestCase class (shared fixtures). Distinct subclasses of
788
    TestCase MAY SHARE NO DATA, since they are run in parallel, in distinct
789
    test runner processes.
790

791
    """
792
    (opts, args) = parse_arguments(sys.argv[1:])
793

    
794
    global API, TOKEN
795
    API = opts.api
796
    TOKEN = opts.token
797

    
798
    # Cleanup stale servers from previous runs
799
    if opts.show_stale:
800
        cleanup_servers(delete_stale=opts.delete_stale)
801
        return 0
802

    
803
    # Initialize a kamaki instance, get flavors, images
804
    c = Client(API, TOKEN)
805
    DIMAGES = c.list_images(detail=True)
806
    DFLAVORS = c.list_flavors(detail=True)
807

    
808
    # FIXME: logging, log, LOG PID, TEST_RUN_ID, arguments
809
    # FIXME: Network testing? Create, destroy, connect, ping, disconnect VMs?
810
    # Run them: FIXME: In parallel, FAILEARLY, catchbreak?
811
    #unittest.main(verbosity=2, catchbreak=True)
812

    
813
    runner = unittest.TextTestRunner(verbosity=2, failfast=not opts.nofailfast)
814
    # The following cases run sequentially
815
    seq_cases = [UnauthorizedTestCase, FlavorsTestCase, ImagesTestCase]
816
    _run_cases_in_parallel(seq_cases, fanout=3, runner=runner)
817

    
818
    # The following cases run in parallel
819
    par_cases = []
820

    
821
    if opts.force_imageid == 'all':
822
        test_images = DIMAGES
823
    else:
824
        test_images = filter(lambda x: x["id"] == opts.force_imageid, DIMAGES)
825

    
826
    for image in test_images:
827
        imageid = image["id"]
828
        imagename = image["name"]
829
        if opts.force_flavorid:
830
            flavorid = opts.force_flavorid
831
        else:
832
            flavorid = choice([f["id"] for f in DFLAVORS if f["disk"] >= 20])
833
        personality = None   # FIXME
834
        servername = "%s%s for %s" % (SNF_TEST_PREFIX, TEST_RUN_ID, imagename)
835
        is_windows = imagename.lower().find("windows") >= 0
836
        case = _spawn_server_test_case(imageid=imageid, flavorid=flavorid,
837
                                       imagename=imagename,
838
                                       personality=personality,
839
                                       servername=servername,
840
                                       is_windows=is_windows,
841
                                       action_timeout=opts.action_timeout,
842
                                       build_warning=opts.build_warning,
843
                                       build_fail=opts.build_fail,
844
                                       query_interval=opts.query_interval)
845
        par_cases.append(case)
846

    
847
    _run_cases_in_parallel(par_cases, fanout=opts.fanout, runner=runner)
848

    
849
if __name__ == "__main__":
850
    sys.exit(main())