Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / tools / burnin.py @ c1d11f96

History | View | Annotate | Download (33.9 kB)

1
#!/usr/bin/env python
2

    
3
# Copyright 2011 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35

    
36
"""Perform integration testing on a running Synnefo deployment"""
37

    
38
import __main__
39
import datetime
40
import inspect
41
import logging
42
import os
43
import paramiko
44
import prctl
45
import subprocess
46
import signal
47
import socket
48
import struct
49
import sys
50
import time
51

    
52
from IPy import IP
53
from multiprocessing import Process, Queue
54
from random import choice
55

    
56
from kamaki.clients import ClientError, ComputeClient, CycladesClient
57
from kamaki.config import Config
58

    
59
from vncauthproxy.d3des import generate_response as d3des_generate_response
60

    
61
# Use backported unittest functionality if Python < 2.7
62
try:
63
    import unittest2 as unittest
64
except ImportError:
65
    if sys.version_info < (2, 7):
66
        raise Exception("The unittest2 package is required for Python < 2.7")
67
    import unittest
68

    
69

    
70
API = None
71
TOKEN = None
72
DEFAULT_API = "http://127.0.0.1:8000/api/v1.1"
73

    
74
# A unique id identifying this test run
75
TEST_RUN_ID = datetime.datetime.strftime(datetime.datetime.now(),
76
                                         "%Y%m%d%H%M%S")
77
SNF_TEST_PREFIX = "snf-test-"
78

    
79
# Setup logging (FIXME - verigak)
80
logging.basicConfig(format="%(message)s")
81
log = logging.getLogger("burnin")
82
log.setLevel(logging.INFO)
83

    
84

    
85
class UnauthorizedTestCase(unittest.TestCase):
86
    def test_unauthorized_access(self):
87
        """Test access without a valid token fails"""
88
        falseToken = '12345'
89
        conf = Config()
90
        conf.set('compute_token', falseToken)
91

    
92
        with self.assertRaises(ClientError) as cm:
93
            c.list_servers()
94
        self.assertEqual(cm.exception.status, 401)
95

    
96

    
97
class ImagesTestCase(unittest.TestCase):
98
    """Test image lists for consistency"""
99
    @classmethod
100
    def setUpClass(cls):
101
        """Initialize kamaki, get (detailed) list of images"""
102
        log.info("Getting simple and detailed list of images")
103

    
104
        conf = Config()
105
        conf.set('compute_token', TOKEN)
106
        cls.client = ComputeClient(conf)
107
        cls.images = cls.client.list_images()
108
        cls.dimages = cls.client.list_images(detail=True)
109

    
110
    def test_001_list_images(self):
111
        """Test image list actually returns images"""
112
        self.assertGreater(len(self.images), 0)
113

    
114
    def test_002_list_images_detailed(self):
115
        """Test detailed image list is the same length as list"""
116
        self.assertEqual(len(self.dimages), len(self.images))
117

    
118
    def test_003_same_image_names(self):
119
        """Test detailed and simple image list contain same names"""
120
        names = sorted(map(lambda x: x["name"], self.images))
121
        dnames = sorted(map(lambda x: x["name"], self.dimages))
122
        self.assertEqual(names, dnames)
123

    
124
    def test_004_unique_image_names(self):
125
        """Test images have unique names"""
126
        names = sorted(map(lambda x: x["name"], self.images))
127
        self.assertEqual(sorted(list(set(names))), names)
128

    
129
    def test_005_image_metadata(self):
130
        """Test every image has specific metadata defined"""
131
        keys = frozenset(["os", "description", "size"])
132
        for i in self.dimages:
133
            self.assertTrue(keys.issubset(i["metadata"]["values"].keys()))
134

    
135

    
136
class FlavorsTestCase(unittest.TestCase):
137
    """Test flavor lists for consistency"""
138
    @classmethod
139
    def setUpClass(cls):
140
        """Initialize kamaki, get (detailed) list of flavors"""
141
        log.info("Getting simple and detailed list of flavors")
142

    
143
        conf = Config()
144
        conf.set('compute_token', TOKEN)
145
        cls.client = ComputeClient(conf)
146
        cls.flavors = cls.client.list_flavors()
147
        cls.dflavors = cls.client.list_flavors(detail=True)
148

    
149
    def test_001_list_flavors(self):
150
        """Test flavor list actually returns flavors"""
151
        self.assertGreater(len(self.flavors), 0)
152

    
153
    def test_002_list_flavors_detailed(self):
154
        """Test detailed flavor list is the same length as list"""
155
        self.assertEquals(len(self.dflavors), len(self.flavors))
156

    
157
    def test_003_same_flavor_names(self):
158
        """Test detailed and simple flavor list contain same names"""
159
        names = sorted(map(lambda x: x["name"], self.flavors))
160
        dnames = sorted(map(lambda x: x["name"], self.dflavors))
161
        self.assertEqual(names, dnames)
162

    
163
    def test_004_unique_flavor_names(self):
164
        """Test flavors have unique names"""
165
        names = sorted(map(lambda x: x["name"], self.flavors))
166
        self.assertEqual(sorted(list(set(names))), names)
167

    
168
    def test_005_well_formed_flavor_names(self):
169
        """Test flavors have names of the form CxxRyyDzz
170

171
        Where xx is vCPU count, yy is RAM in MiB, zz is Disk in GiB
172

173
        """
174
        for f in self.dflavors:
175
            self.assertEqual("C%dR%dD%d" % (f["cpu"], f["ram"], f["disk"]),
176
                             f["name"],
177
                             "Flavor %s does not match its specs." % f["name"])
178

    
179

    
180
class ServersTestCase(unittest.TestCase):
181
    """Test server lists for consistency"""
182
    @classmethod
183
    def setUpClass(cls):
184
        """Initialize kamaki, get (detailed) list of servers"""
185
        log.info("Getting simple and detailed list of servers")
186

    
187
        conf = Config()
188
        conf.set('compute_token', TOKEN)
189
        cls.client = ComputeClient(conf)
190
        cls.servers = cls.client.list_servers()
191
        cls.dservers = cls.client.list_servers(detail=True)
192

    
193
    def test_001_list_servers(self):
194
        """Test server list actually returns servers"""
195
        self.assertGreater(len(self.servers), 0)
196

    
197
    def test_002_list_servers_detailed(self):
198
        """Test detailed server list is the same length as list"""
199
        self.assertEqual(len(self.dservers), len(self.servers))
200

    
201
    def test_003_same_server_names(self):
202
        """Test detailed and simple flavor list contain same names"""
203
        names = sorted(map(lambda x: x["name"], self.servers))
204
        dnames = sorted(map(lambda x: x["name"], self.dservers))
205
        self.assertEqual(names, dnames)
206

    
207

    
208
# This class gets replicated into actual TestCases dynamically
209
class SpawnServerTestCase(unittest.TestCase):
210
    """Test scenario for server of the specified image"""
211

    
212
    @classmethod
213
    def setUpClass(cls):
214
        """Initialize a kamaki instance"""
215
        log.info("Spawning server for image `%s'", cls.imagename)
216

    
217
        conf = Config()
218
        conf.set('compute_token', TOKEN)
219
        cls.client = ComputeClient(conf)
220
        cls.cyclades = CycladesClient(conf)
221

    
222
    def _get_ipv4(self, server):
223
        """Get the public IPv4 of a server from the detailed server info"""
224

    
225
        public_addrs = filter(lambda x: x["id"] == "public",
226
                              server["addresses"]["values"])
227
        self.assertEqual(len(public_addrs), 1)
228
        ipv4_addrs = filter(lambda x: x["version"] == 4,
229
                            public_addrs[0]["values"])
230
        self.assertEqual(len(ipv4_addrs), 1)
231
        return ipv4_addrs[0]["addr"]
232

    
233
    def _get_ipv6(self, server):
234
        """Get the public IPv6 of a server from the detailed server info"""
235
        public_addrs = filter(lambda x: x["id"] == "public",
236
                              server["addresses"]["values"])
237
        self.assertEqual(len(public_addrs), 1)
238
        ipv6_addrs = filter(lambda x: x["version"] == 6,
239
                            public_addrs[0]["values"])
240
        self.assertEqual(len(ipv6_addrs), 1)
241
        return ipv6_addrs[0]["addr"]
242

    
243
    def _connect_loginname(self, os):
244
        """Return the login name for connections based on the server OS"""
245
        if os in ("Ubuntu", "Kubuntu", "Fedora"):
246
            return "user"
247
        elif os in ("windows", "windows_alpha1"):
248
            return "Administrator"
249
        else:
250
            return "root"
251

    
252
    def _verify_server_status(self, current_status, new_status):
253
        """Verify a server has switched to a specified status"""
254
        server = self.client.get_server_details(self.serverid)
255
        if server["status"] not in (current_status, new_status):
256
            return None  # Do not raise exception, return so the test fails
257
        self.assertEquals(server["status"], new_status)
258

    
259
    def _get_connected_tcp_socket(self, family, host, port):
260
        """Get a connected socket from the specified family to host:port"""
261
        sock = None
262
        for res in \
263
            socket.getaddrinfo(host, port, family, socket.SOCK_STREAM, 0,
264
                               socket.AI_PASSIVE):
265
            af, socktype, proto, canonname, sa = res
266
            try:
267
                sock = socket.socket(af, socktype, proto)
268
            except socket.error as msg:
269
                sock = None
270
                continue
271
            try:
272
                sock.connect(sa)
273
            except socket.error as msg:
274
                sock.close()
275
                sock = None
276
                continue
277
        self.assertIsNotNone(sock)
278
        return sock
279

    
280
    def _ping_once(self, ipv6, ip):
281
        """Test server responds to a single IPv4 or IPv6 ping"""
282
        cmd = "ping%s -c 2 -w 3 %s" % ("6" if ipv6 else "", ip)
283
        ping = subprocess.Popen(cmd, shell=True,
284
                                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
285
        (stdout, stderr) = ping.communicate()
286
        ret = ping.wait()
287
        self.assertEquals(ret, 0)
288

    
289
    def _get_hostname_over_ssh(self, hostip, username, password):
290
        ssh = paramiko.SSHClient()
291
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
292
        try:
293
            ssh.connect(hostip, username=username, password=password)
294
        except socket.error:
295
            raise AssertionError
296
        stdin, stdout, stderr = ssh.exec_command("hostname")
297
        lines = stdout.readlines()
298
        self.assertEqual(len(lines), 1)
299
        return lines[0]
300

    
301
    def _try_until_timeout_expires(self, warn_timeout, fail_timeout,
302
                                   opmsg, callable, *args, **kwargs):
303
        if warn_timeout == fail_timeout:
304
            warn_timeout = fail_timeout + 1
305
        warn_tmout = time.time() + warn_timeout
306
        fail_tmout = time.time() + fail_timeout
307
        while True:
308
            self.assertLess(time.time(), fail_tmout,
309
                            "operation `%s' timed out" % opmsg)
310
            if time.time() > warn_tmout:
311
                log.warning("Server %d: `%s' operation `%s' not done yet",
312
                            self.serverid, self.servername, opmsg)
313
            try:
314
                log.info("%s... " % opmsg)
315
                return callable(*args, **kwargs)
316
            except AssertionError:
317
                pass
318
            time.sleep(self.query_interval)
319

    
320
    def _insist_on_tcp_connection(self, family, host, port):
321
        familystr = {socket.AF_INET: "IPv4", socket.AF_INET6: "IPv6",
322
                     socket.AF_UNSPEC: "Unspecified-IPv4/6"}
323
        msg = "connect over %s to %s:%s" % \
324
              (familystr.get(family, "Unknown"), host, port)
325
        sock = self._try_until_timeout_expires(
326
                self.action_timeout, self.action_timeout,
327
                msg, self._get_connected_tcp_socket,
328
                family, host, port)
329
        return sock
330

    
331
    def _insist_on_status_transition(self, current_status, new_status,
332
                                    fail_timeout, warn_timeout=None):
333
        msg = "Server %d: `%s', waiting for %s -> %s" % \
334
              (self.serverid, self.servername, current_status, new_status)
335
        if warn_timeout is None:
336
            warn_timeout = fail_timeout
337
        self._try_until_timeout_expires(warn_timeout, fail_timeout,
338
                                        msg, self._verify_server_status,
339
                                        current_status, new_status)
340
        # Ensure the status is actually the expected one
341
        server = self.client.get_server_details(self.serverid)
342
        self.assertEquals(server["status"], new_status)
343

    
344
    def _insist_on_ssh_hostname(self, hostip, username, password):
345
        msg = "SSH to %s, as %s/%s" % (hostip, username, password)
346
        hostname = self._try_until_timeout_expires(
347
                self.action_timeout, self.action_timeout,
348
                msg, self._get_hostname_over_ssh,
349
                hostip, username, password)
350

    
351
        # The hostname must be of the form 'prefix-id'
352
        self.assertTrue(hostname.endswith("-%d\n" % self.serverid))
353

    
354
    def _skipIf(self, condition, msg):
355
        if condition:
356
            self.skipTest(msg)
357

    
358
    def test_001_submit_create_server(self):
359
        """Test submit create server request"""
360
        server = self.client.create_server(self.servername, self.flavorid,
361
                                           self.imageid, self.personality)
362
        self.assertEqual(server["name"], self.servername)
363
        self.assertEqual(server["flavorRef"], self.flavorid)
364
        self.assertEqual(server["imageRef"], self.imageid)
365
        self.assertEqual(server["status"], "BUILD")
366

    
367
        # Update class attributes to reflect data on building server
368
        cls = type(self)
369
        cls.serverid = server["id"]
370
        cls.username = None
371
        cls.passwd = server["adminPass"]
372

    
373
    def test_002a_server_is_building_in_list(self):
374
        """Test server is in BUILD state, in server list"""
375
        servers = self.client.list_servers(detail=True)
376
        servers = filter(lambda x: x["name"] == self.servername, servers)
377
        self.assertEqual(len(servers), 1)
378
        server = servers[0]
379
        self.assertEqual(server["name"], self.servername)
380
        self.assertEqual(server["flavorRef"], self.flavorid)
381
        self.assertEqual(server["imageRef"], self.imageid)
382
        self.assertEqual(server["status"], "BUILD")
383

    
384
    def test_002b_server_is_building_in_details(self):
385
        """Test server is in BUILD state, in details"""
386
        server = self.client.get_server_details(self.serverid)
387
        self.assertEqual(server["name"], self.servername)
388
        self.assertEqual(server["flavorRef"], self.flavorid)
389
        self.assertEqual(server["imageRef"], self.imageid)
390
        self.assertEqual(server["status"], "BUILD")
391

    
392
    def test_002c_set_server_metadata(self):
393
        image = self.client.get_image_details(self.imageid)
394
        os = image["metadata"]["values"]["os"]
395
        loginname = image["metadata"]["values"].get("users", None)
396
        self.client.update_server_metadata(self.serverid, OS=os)
397

    
398
        # Determine the username to use for future connections
399
        # to this host
400
        cls = type(self)
401
        cls.username = loginname
402
        if not cls.username:
403
            cls.username = self._connect_loginname(os)
404
        self.assertIsNotNone(cls.username)
405

    
406
    def test_002d_verify_server_metadata(self):
407
        """Test server metadata keys are set based on image metadata"""
408
        servermeta = self.client.get_server_metadata(self.serverid)
409
        imagemeta = self.client.get_image_metadata(self.imageid)
410
        self.assertEqual(servermeta["os"], imagemeta["os"])
411

    
412
    def test_003_server_becomes_active(self):
413
        """Test server becomes ACTIVE"""
414
        self._insist_on_status_transition("BUILD", "ACTIVE",
415
                                         self.build_fail, self.build_warning)
416

    
417
    def test_003a_get_server_oob_console(self):
418
        """Test getting OOB server console over VNC
419

420
        Implementation of RFB protocol follows
421
        http://www.realvnc.com/docs/rfbproto.pdf.
422

423
        """
424
        
425
        console = self.cyclades.get_server_console(self.serverid)
426
        self.assertEquals(console['type'], "vnc")
427
        sock = self._insist_on_tcp_connection(socket.AF_UNSPEC,
428
                                        console["host"], console["port"])
429

    
430
        # Step 1. ProtocolVersion message (par. 6.1.1)
431
        version = sock.recv(1024)
432
        self.assertEquals(version, 'RFB 003.008\n')
433
        sock.send(version)
434

    
435
        # Step 2. Security (par 6.1.2): Only VNC Authentication supported
436
        sec = sock.recv(1024)
437
        self.assertEquals(list(sec), ['\x01', '\x02'])
438

    
439
        # Step 3. Request VNC Authentication (par 6.1.2)
440
        sock.send('\x02')
441

    
442
        # Step 4. Receive Challenge (par 6.2.2)
443
        challenge = sock.recv(1024)
444
        self.assertEquals(len(challenge), 16)
445

    
446
        # Step 5. DES-Encrypt challenge, use password as key (par 6.2.2)
447
        response = d3des_generate_response(
448
            (console["password"] + '\0' * 8)[:8], challenge)
449
        sock.send(response)
450

    
451
        # Step 6. SecurityResult (par 6.1.3)
452
        result = sock.recv(4)
453
        self.assertEquals(list(result), ['\x00', '\x00', '\x00', '\x00'])
454
        sock.close()
455

    
456
    def test_004_server_has_ipv4(self):
457
        """Test active server has a valid IPv4 address"""
458
        server = self.client.get_server_details(self.serverid)
459
        ipv4 = self._get_ipv4(server)
460
        self.assertEquals(IP(ipv4).version(), 4)
461

    
462
    def test_005_server_has_ipv6(self):
463
        """Test active server has a valid IPv6 address"""
464
        server = self.client.get_server_details(self.serverid)
465
        ipv6 = self._get_ipv6(server)
466
        self.assertEquals(IP(ipv6).version(), 6)
467

    
468
    def test_006_server_responds_to_ping_IPv4(self):
469
        """Test server responds to ping on IPv4 address"""
470
        server = self.client.get_server_details(self.serverid)
471
        ip = self._get_ipv4(server)
472
        self._try_until_timeout_expires(self.action_timeout,
473
                                        self.action_timeout,
474
                                        "PING IPv4 to %s" % ip,
475
                                        self._ping_once,
476
                                        False, ip)
477

    
478
    def test_007_server_responds_to_ping_IPv6(self):
479
        """Test server responds to ping on IPv6 address"""
480
        server = self.client.get_server_details(self.serverid)
481
        ip = self._get_ipv6(server)
482
        self._try_until_timeout_expires(self.action_timeout,
483
                                        self.action_timeout,
484
                                        "PING IPv6 to %s" % ip,
485
                                        self._ping_once,
486
                                        True, ip)
487

    
488
    def test_008_submit_shutdown_request(self):
489
        """Test submit request to shutdown server"""
490
        self.cyclades.shutdown_server(self.serverid)
491

    
492
    def test_009_server_becomes_stopped(self):
493
        """Test server becomes STOPPED"""
494
        self._insist_on_status_transition("ACTIVE", "STOPPED",
495
                                         self.action_timeout,
496
                                         self.action_timeout)
497

    
498
    def test_010_submit_start_request(self):
499
        """Test submit start server request"""
500
        self.cyclades.start_server(self.serverid)
501

    
502
    def test_011_server_becomes_active(self):
503
        """Test server becomes ACTIVE again"""
504
        self._insist_on_status_transition("STOPPED", "ACTIVE",
505
                                         self.action_timeout,
506
                                         self.action_timeout)
507

    
508
    def test_011a_server_responds_to_ping_IPv4(self):
509
        """Test server OS is actually up and running again"""
510
        self.test_006_server_responds_to_ping_IPv4()
511

    
512
    def test_012_ssh_to_server_IPv4(self):
513
        """Test SSH to server public IPv4 works, verify hostname"""
514
        self._skipIf(self.is_windows, "only valid for Linux servers")
515
        server = self.client.get_server_details(self.serverid)
516
        self._insist_on_ssh_hostname(self._get_ipv4(server),
517
                                     self.username, self.passwd)
518

    
519
    def test_013_ssh_to_server_IPv6(self):
520
        """Test SSH to server public IPv6 works, verify hostname"""
521
        self._skipIf(self.is_windows, "only valid for Linux servers")
522
        server = self.client.get_server_details(self.serverid)
523
        self._insist_on_ssh_hostname(self._get_ipv6(server),
524
                                     self.username, self.passwd)
525

    
526
    def test_014_rdp_to_server_IPv4(self):
527
        "Test RDP connection to server public IPv4 works"""
528
        self._skipIf(not self.is_windows, "only valid for Windows servers")
529
        server = self.client.get_server_details(self.serverid)
530
        ipv4 = self._get_ipv4(server)
531
        sock = _insist_on_tcp_connection(socket.AF_INET, ipv4, 3389)
532

    
533
        # No actual RDP processing done. We assume the RDP server is there
534
        # if the connection to the RDP port is successful.
535
        # FIXME: Use rdesktop, analyze exit code? see manpage [costasd]
536
        sock.close()
537

    
538
    def test_015_rdp_to_server_IPv6(self):
539
        "Test RDP connection to server public IPv6 works"""
540
        self._skipIf(not self.is_windows, "only valid for Windows servers")
541
        server = self.client.get_server_details(self.serverid)
542
        ipv6 = self._get_ipv6(server)
543
        sock = _get_tcp_connection(socket.AF_INET6, ipv6, 3389)
544

    
545
        # No actual RDP processing done. We assume the RDP server is there
546
        # if the connection to the RDP port is successful.
547
        sock.close()
548

    
549
    def test_016_personality_is_enforced(self):
550
        """Test file injection for personality enforcement"""
551
        self._skipIf(self.is_windows, "only implemented for Linux servers")
552
        self.assertTrue(False, "test not implemented, will fail")
553

    
554
    def test_017_submit_delete_request(self):
555
        """Test submit request to delete server"""
556
        self.client.delete_server(self.serverid)
557

    
558
    def test_018_server_becomes_deleted(self):
559
        """Test server becomes DELETED"""
560
        self._insist_on_status_transition("ACTIVE", "DELETED",
561
                                         self.action_timeout,
562
                                         self.action_timeout)
563

    
564
    def test_019_server_no_longer_in_server_list(self):
565
        """Test server is no longer in server list"""
566
        servers = self.client.list_servers()
567
        self.assertNotIn(self.serverid, [s["id"] for s in servers])
568

    
569

    
570
class TestRunnerProcess(Process):
571
    """A distinct process used to execute part of the tests in parallel"""
572
    def __init__(self, **kw):
573
        Process.__init__(self, **kw)
574
        kwargs = kw["kwargs"]
575
        self.testq = kwargs["testq"]
576
        self.runner = kwargs["runner"]
577

    
578
    def run(self):
579
        # Make sure this test runner process dies with the parent
580
        # and is not left behind.
581
        #
582
        # WARNING: This uses the prctl(2) call and is
583
        # Linux-specific.
584
        prctl.set_pdeathsig(signal.SIGHUP)
585

    
586
        while True:
587
            log.debug("I am process %d, GETting from queue is %s",
588
                     os.getpid(), self.testq)
589
            msg = self.testq.get()
590
            log.debug("Dequeued msg: %s", msg)
591

    
592
            if msg == "TEST_RUNNER_TERMINATE":
593
                raise SystemExit
594
            elif issubclass(msg, unittest.TestCase):
595
                # Assemble a TestSuite, and run it
596
                suite = unittest.TestLoader().loadTestsFromTestCase(msg)
597
                self.runner.run(suite)
598
            else:
599
                raise Exception("Cannot handle msg: %s" % msg)
600

    
601

    
602
def _run_cases_in_parallel(cases, fanout=1, runner=None):
603
    """Run instances of TestCase in parallel, in a number of distinct processes
604

605
    The cases iterable specifies the TestCases to be executed in parallel,
606
    by test runners running in distinct processes.
607
    The fanout parameter specifies the number of processes to spawn,
608
    and defaults to 1.
609
    The runner argument specifies the test runner class to use inside each
610
    runner process.
611

612
    """
613
    if runner is None:
614
        runner = unittest.TextTestRunner(verbosity=2, failfast=True)
615

    
616
    # testq: The master process enqueues TestCase objects into this queue,
617
    #        test runner processes pick them up for execution, in parallel.
618
    testq = Queue()
619
    runners = []
620
    for i in xrange(0, fanout):
621
        kwargs = dict(testq=testq, runner=runner)
622
        runners.append(TestRunnerProcess(kwargs=kwargs))
623

    
624
    log.info("Spawning %d test runner processes", len(runners))
625
    for p in runners:
626
        p.start()
627
    log.debug("Spawned %d test runners, PIDs are %s",
628
              len(runners), [p.pid for p in runners])
629

    
630
    # Enqueue test cases
631
    map(testq.put, cases)
632
    map(testq.put, ["TEST_RUNNER_TERMINATE"] * len(runners))
633

    
634
    log.debug("Joining %d processes", len(runners))
635
    for p in runners:
636
        p.join()
637
    log.debug("Done joining %d processes", len(runners))
638

    
639

    
640
def _spawn_server_test_case(**kwargs):
641
    """Construct a new unit test case class from SpawnServerTestCase"""
642

    
643
    name = "SpawnServerTestCase_%s" % kwargs["imageid"]
644
    cls = type(name, (SpawnServerTestCase,), kwargs)
645

    
646
    # Patch extra parameters into test names by manipulating method docstrings
647
    for (mname, m) in \
648
        inspect.getmembers(cls, lambda x: inspect.ismethod(x)):
649
            if hasattr(m, __doc__):
650
                m.__func__.__doc__ = "[%s] %s" % (imagename, m.__doc__)
651

    
652
    # Make sure the class can be pickled, by listing it among
653
    # the attributes of __main__. A PicklingError is raised otherwise.
654
    setattr(__main__, name, cls)
655
    return cls
656

    
657

    
658
def cleanup_servers(delete_stale=False):
659

    
660
    conf = Config()
661
    conf.set('compute_token', TOKEN)
662
    c = ComputeClient(conf)
663

    
664
    servers = c.list_servers()
665
    stale = [s for s in servers if s["name"].startswith(SNF_TEST_PREFIX)]
666

    
667
    if len(stale) == 0:
668
        return
669

    
670
    print >> sys.stderr, "Found these stale servers from previous runs:"
671
    print "    " + \
672
          "\n    ".join(["%d: %s" % (s["id"], s["name"]) for s in stale])
673

    
674
    if delete_stale:
675
        print >> sys.stderr, "Deleting %d stale servers:" % len(stale)
676
        for server in stale:
677
            c.delete_server(server["id"])
678
        print >> sys.stderr, "    ...done"
679
    else:
680
        print >> sys.stderr, "Use --delete-stale to delete them."
681

    
682

    
683
def parse_arguments(args):
684
    from optparse import OptionParser
685

    
686
    kw = {}
687
    kw["usage"] = "%prog [options]"
688
    kw["description"] = \
689
        "%prog runs a number of test scenarios on a " \
690
        "Synnefo deployment."
691

    
692
    parser = OptionParser(**kw)
693
    parser.disable_interspersed_args()
694
    parser.add_option("--api",
695
                      action="store", type="string", dest="api",
696
                      help="The API URI to use to reach the Synnefo API",
697
                      default=DEFAULT_API)
698
    parser.add_option("--token",
699
                      action="store", type="string", dest="token",
700
                      help="The token to use for authentication to the API")
701
    parser.add_option("--nofailfast",
702
                      action="store_true", dest="nofailfast",
703
                      help="Do not fail immediately if one of the tests " \
704
                           "fails (EXPERIMENTAL)",
705
                      default=False)
706
    parser.add_option("--action-timeout",
707
                      action="store", type="int", dest="action_timeout",
708
                      metavar="TIMEOUT",
709
                      help="Wait SECONDS seconds for a server action to " \
710
                           "complete, then the test is considered failed",
711
                      default=20)
712
    parser.add_option("--build-warning",
713
                      action="store", type="int", dest="build_warning",
714
                      metavar="TIMEOUT",
715
                      help="Warn if TIMEOUT seconds have passed and a " \
716
                           "build operation is still pending",
717
                      default=600)
718
    parser.add_option("--build-fail",
719
                      action="store", type="int", dest="build_fail",
720
                      metavar="BUILD_TIMEOUT",
721
                      help="Fail the test if TIMEOUT seconds have passed " \
722
                           "and a build operation is still incomplete",
723
                      default=900)
724
    parser.add_option("--query-interval",
725
                      action="store", type="int", dest="query_interval",
726
                      metavar="INTERVAL",
727
                      help="Query server status when requests are pending " \
728
                           "every INTERVAL seconds",
729
                      default=3)
730
    parser.add_option("--fanout",
731
                      action="store", type="int", dest="fanout",
732
                      metavar="COUNT",
733
                      help="Spawn up to COUNT child processes to execute " \
734
                           "in parallel, essentially have up to COUNT " \
735
                           "server build requests outstanding (EXPERIMENTAL)",
736
                      default=1)
737
    parser.add_option("--force-flavor",
738
                      action="store", type="int", dest="force_flavorid",
739
                      metavar="FLAVOR ID",
740
                      help="Force all server creations to use the specified "\
741
                           "FLAVOR ID instead of a randomly chosen one, " \
742
                           "useful if disk space is scarce",
743
                      default=None)
744
    parser.add_option("--image-id",
745
                      action="store", type="string", dest="force_imageid",
746
                      metavar="IMAGE ID",
747
                      help="Test the specified image id, use 'all' to test " \
748
                           "all available images (mandatory argument)",
749
                      default=None)
750
    parser.add_option("--show-stale",
751
                      action="store_true", dest="show_stale",
752
                      help="Show stale servers from previous runs, whose "\
753
                           "name starts with `%s'" % SNF_TEST_PREFIX,
754
                      default=False)
755
    parser.add_option("--delete-stale",
756
                      action="store_true", dest="delete_stale",
757
                      help="Delete stale servers from previous runs, whose "\
758
                           "name starts with `%s'" % SNF_TEST_PREFIX,
759
                      default=False)
760

    
761
    # FIXME: Change the default for build-fanout to 10
762
    # FIXME: Allow the user to specify a specific set of Images to test
763

    
764
    (opts, args) = parser.parse_args(args)
765

    
766
    # Verify arguments
767
    if opts.delete_stale:
768
        opts.show_stale = True
769

    
770
    if not opts.show_stale:
771
        if not opts.force_imageid:
772
            print >>sys.stderr, "The --image-id argument is mandatory."
773
            parser.print_help()
774
            sys.exit(1)
775

    
776
        if opts.force_imageid != 'all':
777
            try:
778
                opts.force_imageid = int(opts.force_imageid)
779
            except ValueError:
780
                print >>sys.stderr, "Invalid value specified for --image-id." \
781
                                    "Use a numeric id, or `all'."
782
                sys.exit(1)
783

    
784
    return (opts, args)
785

    
786

    
787
def main():
788
    """Assemble test cases into a test suite, and run it
789

790
    IMPORTANT: Tests have dependencies and have to be run in the specified
791
    order inside a single test case. They communicate through attributes of the
792
    corresponding TestCase class (shared fixtures). Distinct subclasses of
793
    TestCase MAY SHARE NO DATA, since they are run in parallel, in distinct
794
    test runner processes.
795

796
    """
797
    (opts, args) = parse_arguments(sys.argv[1:])
798

    
799
    global API, TOKEN
800
    API = opts.api
801
    TOKEN = opts.token
802

    
803
    # Cleanup stale servers from previous runs
804
    if opts.show_stale:
805
        cleanup_servers(delete_stale=opts.delete_stale)
806
        return 0
807

    
808
    # Initialize a kamaki instance, get flavors, images
809

    
810
    conf = Config()
811
    conf.set('compute_token', TOKEN)
812
    c = ComputeClient(conf)
813

    
814
    DIMAGES = c.list_images(detail=True)
815
    DFLAVORS = c.list_flavors(detail=True)
816

    
817
    # FIXME: logging, log, LOG PID, TEST_RUN_ID, arguments
818
    # FIXME: Network testing? Create, destroy, connect, ping, disconnect VMs?
819
    # Run them: FIXME: In parallel, FAILEARLY, catchbreak?
820
    #unittest.main(verbosity=2, catchbreak=True)
821

    
822
    runner = unittest.TextTestRunner(verbosity=2, failfast=not opts.nofailfast)
823
    # The following cases run sequentially
824
    seq_cases = [UnauthorizedTestCase, FlavorsTestCase, ImagesTestCase]
825
    _run_cases_in_parallel(seq_cases, fanout=3, runner=runner)
826

    
827
    # The following cases run in parallel
828
    par_cases = []
829

    
830
    if opts.force_imageid == 'all':
831
        test_images = DIMAGES
832
    else:
833
        test_images = filter(lambda x: x["id"] == opts.force_imageid, DIMAGES)
834

    
835
    for image in test_images:
836
        imageid = image["id"]
837
        imagename = image["name"]
838
        if opts.force_flavorid:
839
            flavorid = opts.force_flavorid
840
        else:
841
            flavorid = choice([f["id"] for f in DFLAVORS if f["disk"] >= 20])
842
        personality = None   # FIXME
843
        servername = "%s%s for %s" % (SNF_TEST_PREFIX, TEST_RUN_ID, imagename)
844
        is_windows = imagename.lower().find("windows") >= 0
845
        case = _spawn_server_test_case(imageid=str(imageid), flavorid=flavorid,
846
                                       imagename=imagename,
847
                                       personality=personality,
848
                                       servername=servername,
849
                                       is_windows=is_windows,
850
                                       action_timeout=opts.action_timeout,
851
                                       build_warning=opts.build_warning,
852
                                       build_fail=opts.build_fail,
853
                                       query_interval=opts.query_interval)
854
        par_cases.append(case)
855

    
856
    _run_cases_in_parallel(par_cases, fanout=opts.fanout, runner=runner)
857

    
858
if __name__ == "__main__":
859
    sys.exit(main())