Statistics
| Branch: | Tag: | Revision:

root / snf-tools / synnefo_tools / burnin.py @ 6d8fb26a

History | View | Annotate | Download (81.8 kB)

1
#!/usr/bin/env python
2

    
3
# Copyright 2011 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35

    
36
"""Perform integration testing on a running Synnefo deployment"""
37

    
38
#import __main__
39
import datetime
40
import inspect
41
import logging
42
import os
43
import os.path
44
import paramiko
45
import prctl
46
import subprocess
47
import signal
48
import socket
49
import sys
50
import time
51
import tempfile
52
from base64 import b64encode
53
from IPy import IP
54
from multiprocessing import Process, Queue
55
from random import choice, randint
56
from optparse import OptionParser, OptionValueError
57

    
58
from kamaki.clients.compute import ComputeClient
59
from kamaki.clients.cyclades import CycladesClient
60
from kamaki.clients.image import ImageClient
61
from kamaki.clients.pithos import PithosClient
62
from kamaki.clients.astakos import AstakosClient
63
from kamaki.clients import ClientError
64

    
65
from vncauthproxy.d3des import generate_response as d3des_generate_response
66

    
67
# Use backported unittest functionality if Python < 2.7
68
try:
69
    import unittest2 as unittest
70
except ImportError:
71
    if sys.version_info < (2, 7):
72
        raise Exception("The unittest2 package is required for Python < 2.7")
73
    import unittest
74

    
75
# --------------------------------------------------------------------
76
# Global Variables
77
AUTH_URL = None
78
TOKEN = None
79
SYSTEM_IMAGES_USER = None
80
NO_IPV6 = None
81
NOFAILFAST = None
82
VERBOSE = None
83

    
84
# A unique id identifying this test run
85
TEST_RUN_ID = datetime.datetime.strftime(datetime.datetime.now(),
86
                                         "%Y%m%d%H%M%S")
87
SNF_TEST_PREFIX = "snf-test-"
88

    
89
red = '\x1b[31m'
90
yellow = '\x1b[33m'
91
green = '\x1b[32m'
92
normal = '\x1b[0m'
93

    
94

    
95
# --------------------------------------------------------------------
96
# Global functions
97
def _ssh_execute(hostip, username, password, command):
98
    """Execute a command via ssh"""
99
    ssh = paramiko.SSHClient()
100
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
101
    try:
102
        ssh.connect(hostip, username=username, password=password)
103
    except socket.error, err:
104
        raise AssertionError(err)
105
    try:
106
        stdin, stdout, stderr = ssh.exec_command(command)
107
    except paramiko.SSHException, err:
108
        raise AssertionError(err)
109
    status = stdout.channel.recv_exit_status()
110
    output = stdout.readlines()
111
    ssh.close()
112
    return output, status
113

    
114

    
115
def _get_user_id():
116
    """Authenticate to astakos and get unique users id"""
117
    astakos = AstakosClient(AUTH_URL, TOKEN)
118
    astakos.CONNECTION_RETRY_LIMIT = 2
119
    authenticate = astakos.authenticate()
120
    return authenticate['access']['user']['id']
121

    
122

    
123
# --------------------------------------------------------------------
124
# BurninTestReulst class
125
class BurninTestResult(unittest.TextTestResult):
126
    def addSuccess(self, test):
127
        super(BurninTestResult, self).addSuccess(test)
128
        if self.showAll:
129
            if hasattr(test, 'result_dict'):
130
                run_details = test.result_dict
131

    
132
                self.stream.write("\n")
133
                for i in run_details:
134
                    self.stream.write("%s : %s \n" % (i, run_details[i]))
135
                self.stream.write("\n")
136

    
137
        elif self.dots:
138
            self.stream.write('.')
139
            self.stream.flush()
140

    
141
    def addError(self, test, err):
142
        super(BurninTestResult, self).addError(test, err)
143
        if self.showAll:
144
            self.stream.writeln("ERROR")
145
            if hasattr(test, 'result_dict'):
146
                run_details = test.result_dict
147

    
148
                self.stream.write("\n")
149
                for i in run_details:
150
                    self.stream.write("%s : %s \n" % (i, run_details[i]))
151
                self.stream.write("\n")
152

    
153
        elif self.dots:
154
            self.stream.write('E')
155
            self.stream.flush()
156

    
157
    def addFailure(self, test, err):
158
        super(BurninTestResult, self).addFailure(test, err)
159
        if self.showAll:
160
            self.stream.writeln("FAIL")
161
            if hasattr(test, 'result_dict'):
162
                run_details = test.result_dict
163

    
164
                self.stream.write("\n")
165
                for i in run_details:
166
                    self.stream.write("%s : %s \n" % (i, run_details[i]))
167
                self.stream.write("\n")
168

    
169
        elif self.dots:
170
            self.stream.write('F')
171
            self.stream.flush()
172

    
173

    
174
# --------------------------------------------------------------------
175
# Format Results
176
class burninFormatter(logging.Formatter):
177
    err_fmt = red + "ERROR: %(msg)s" + normal
178
    dbg_fmt = green + "* %(msg)s" + normal
179
    info_fmt = "%(msg)s"
180

    
181
    def __init__(self, fmt="%(levelno)s: %(msg)s"):
182
        logging.Formatter.__init__(self, fmt)
183

    
184
    def format(self, record):
185
        format_orig = self._fmt
186
        # Replace the original format with one customized by logging level
187
        if record.levelno == 10:    # DEBUG
188
            self._fmt = burninFormatter.dbg_fmt
189
        elif record.levelno == 20:  # INFO
190
            self._fmt = burninFormatter.info_fmt
191
        elif record.levelno == 40:  # ERROR
192
            self._fmt = burninFormatter.err_fmt
193
        result = logging.Formatter.format(self, record)
194
        self._fmt = format_orig
195
        return result
196

    
197
log = logging.getLogger("burnin")
198
log.setLevel(logging.DEBUG)
199
handler = logging.StreamHandler()
200
handler.setFormatter(burninFormatter())
201
log.addHandler(handler)
202

    
203

    
204
# --------------------------------------------------------------------
205
# UnauthorizedTestCase class
206
class UnauthorizedTestCase(unittest.TestCase):
207
    """Test unauthorized access"""
208
    @classmethod
209
    def setUpClass(cls):
210
        cls.astakos = AstakosClient(AUTH_URL, TOKEN)
211
        cls.astakos.CONNECTION_RETRY_LIMIT = 2
212
        cls.compute_url = \
213
            cls.astakos.get_service_endpoints('compute')['publicURL']
214
        cls.result_dict = dict()
215

    
216
    def test_unauthorized_access(self):
217
        """Test access without a valid token fails"""
218
        log.info("Authentication test")
219
        falseToken = '12345'
220
        c = ComputeClient(self.compute_url, falseToken)
221
        c.CONNECTION_RETRY_LIMIT = 2
222

    
223
        with self.assertRaises(ClientError) as cm:
224
            c.list_servers()
225
            self.assertEqual(cm.exception.status, 401)
226

    
227

    
228
# --------------------------------------------------------------------
229
# This class gest replicated into Images TestCases dynamically
230
class ImagesTestCase(unittest.TestCase):
231
    """Test image lists for consistency"""
232
    @classmethod
233
    def setUpClass(cls):
234
        """Initialize kamaki, get (detailed) list of images"""
235
        log.info("Getting simple and detailed list of images")
236
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
237
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
238
        # Compute Client
239
        compute_url = \
240
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
241
        cls.compute_client = ComputeClient(compute_url, TOKEN)
242
        cls.compute_client.CONNECTION_RETRY_LIMIT = 2
243
        # Image Client
244
        image_url = \
245
            cls.astakos_client.get_service_endpoints('image')['publicURL']
246
        cls.image_client = ImageClient(image_url, TOKEN)
247
        cls.image_client.CONNECTION_RETRY_LIMIT = 2
248
        # Pithos Client
249
        pithos_url = cls.astakos_client.\
250
            get_service_endpoints('object-store')['publicURL']
251
        cls.pithos_client = PithosClient(pithos_url, TOKEN)
252
        cls.pithos_client.CONNECTION_RETRY_LIMIT = 2
253

    
254
        # Get images
255
        cls.images = \
256
            filter(lambda x: not x['name'].startswith(SNF_TEST_PREFIX),
257
                   cls.image_client.list_public())
258
        cls.dimages = \
259
            filter(lambda x: not x['name'].startswith(SNF_TEST_PREFIX),
260
                   cls.image_client.list_public(detail=True))
261
        cls.result_dict = dict()
262
        # Get uniq user id
263
        cls.uuid = _get_user_id()
264
        log.info("Uniq user id = %s" % cls.uuid)
265
        # Create temp directory and store it inside our class
266
        # XXX: In my machine /tmp has not enough space
267
        #      so use current directory to be sure.
268
        cls.temp_dir = tempfile.mkdtemp(dir=os.getcwd())
269
        cls.temp_image_name = \
270
            SNF_TEST_PREFIX + cls.imageid + ".diskdump"
271

    
272
    @classmethod
273
    def tearDownClass(cls):
274
        """Remove local files"""
275
        try:
276
            temp_file = os.path.join(cls.temp_dir, cls.temp_image_name)
277
            os.unlink(temp_file)
278
        except:
279
            pass
280
        try:
281
            os.rmdir(cls.temp_dir)
282
        except:
283
            pass
284

    
285
    def test_001_list_images(self):
286
        """Test image list actually returns images"""
287
        self.assertGreater(len(self.images), 0)
288

    
289
    def test_002_list_images_detailed(self):
290
        """Test detailed image list is the same length as list"""
291
        self.assertEqual(len(self.dimages), len(self.images))
292

    
293
    def test_003_same_image_names(self):
294
        """Test detailed and simple image list contain same names"""
295
        names = sorted(map(lambda x: x["name"], self.images))
296
        dnames = sorted(map(lambda x: x["name"], self.dimages))
297
        self.assertEqual(names, dnames)
298

    
299
    def test_004_unique_image_names(self):
300
        """Test system images have unique names"""
301
        sys_images = filter(lambda x: x['owner'] == SYSTEM_IMAGES_USER,
302
                            self.dimages)
303
        names = sorted(map(lambda x: x["name"], sys_images))
304
        self.assertEqual(sorted(list(set(names))), names)
305

    
306
    def test_005_image_metadata(self):
307
        """Test every image has specific metadata defined"""
308
        keys = frozenset(["osfamily", "root_partition"])
309
        sys_images = filter(lambda x: x['owner'] == SYSTEM_IMAGES_USER,
310
                            self.dimages)
311
        for i in sys_images:
312
            self.assertTrue(keys.issubset(i["metadata"].keys()))
313

    
314
    def test_006_download_image(self):
315
        """Download image from pithos+"""
316
        # Get image location
317
        image = filter(
318
            lambda x: x['id'] == self.imageid, self.dimages)[0]
319
        image_location = \
320
            image['location'].replace("://", " ").replace("/", " ").split()
321
        log.info("Download image, with owner %s\n\tcontainer %s, and name %s"
322
                 % (image_location[1], image_location[2], image_location[3]))
323
        self.pithos_client.account = image_location[1]
324
        self.pithos_client.container = image_location[2]
325
        temp_file = os.path.join(self.temp_dir, self.temp_image_name)
326
        with open(temp_file, "wb+") as f:
327
            self.pithos_client.download_object(image_location[3], f)
328

    
329
    def test_007_upload_image(self):
330
        """Upload and register image"""
331
        temp_file = os.path.join(self.temp_dir, self.temp_image_name)
332
        log.info("Upload image to pithos+")
333
        # Create container `images'
334
        self.pithos_client.account = self.uuid
335
        self.pithos_client.container = "images"
336
        self.pithos_client.container_put()
337
        with open(temp_file, "rb+") as f:
338
            self.pithos_client.upload_object(self.temp_image_name, f)
339
        log.info("Register image to plankton")
340
        location = "pithos://" + self.uuid + \
341
            "/images/" + self.temp_image_name
342
        params = {'is_public': False}
343
        properties = {'OSFAMILY': "linux", 'ROOT_PARTITION': 1}
344
        self.image_client.register(
345
            self.temp_image_name, location, params, properties)
346
        # Get image id
347
        details = self.image_client.list_public(detail=True)
348
        detail = filter(lambda x: x['location'] == location, details)
349
        self.assertEqual(len(detail), 1)
350
        cls = type(self)
351
        cls.temp_image_id = detail[0]['id']
352
        log.info("Image registered with id %s" % detail[0]['id'])
353

    
354
    def test_008_cleanup_image(self):
355
        """Cleanup image test"""
356
        log.info("Cleanup image test")
357
        # Remove image from pithos+
358
        self.pithos_client.account = self.uuid
359
        self.pithos_client.container = "images"
360
        self.pithos_client.del_object(self.temp_image_name)
361

    
362

    
363
# --------------------------------------------------------------------
364
# FlavorsTestCase class
365
class FlavorsTestCase(unittest.TestCase):
366
    """Test flavor lists for consistency"""
367
    @classmethod
368
    def setUpClass(cls):
369
        """Initialize kamaki, get (detailed) list of flavors"""
370
        log.info("Getting simple and detailed list of flavors")
371
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
372
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
373
        # Compute Client
374
        compute_url = \
375
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
376
        cls.compute_client = ComputeClient(compute_url, TOKEN)
377
        cls.compute_client.CONNECTION_RETRY_LIMIT = 2
378
        cls.flavors = cls.compute_client.list_flavors()
379
        cls.dflavors = cls.compute_client.list_flavors(detail=True)
380
        cls.result_dict = dict()
381

    
382
    def test_001_list_flavors(self):
383
        """Test flavor list actually returns flavors"""
384
        self.assertGreater(len(self.flavors), 0)
385

    
386
    def test_002_list_flavors_detailed(self):
387
        """Test detailed flavor list is the same length as list"""
388
        self.assertEquals(len(self.dflavors), len(self.flavors))
389

    
390
    def test_003_same_flavor_names(self):
391
        """Test detailed and simple flavor list contain same names"""
392
        names = sorted(map(lambda x: x["name"], self.flavors))
393
        dnames = sorted(map(lambda x: x["name"], self.dflavors))
394
        self.assertEqual(names, dnames)
395

    
396
    def test_004_unique_flavor_names(self):
397
        """Test flavors have unique names"""
398
        names = sorted(map(lambda x: x["name"], self.flavors))
399
        self.assertEqual(sorted(list(set(names))), names)
400

    
401
    def test_005_well_formed_flavor_names(self):
402
        """Test flavors have names of the form CxxRyyDzz
403
        Where xx is vCPU count, yy is RAM in MiB, zz is Disk in GiB
404
        """
405
        for f in self.dflavors:
406
            flavor = (f["vcpus"], f["ram"], f["disk"], f["SNF:disk_template"])
407
            self.assertEqual("C%dR%dD%d%s" % flavor,
408
                             f["name"],
409
                             "Flavor %s does not match its specs." % f["name"])
410

    
411

    
412
# --------------------------------------------------------------------
413
# ServersTestCase class
414
class ServersTestCase(unittest.TestCase):
415
    """Test server lists for consistency"""
416
    @classmethod
417
    def setUpClass(cls):
418
        """Initialize kamaki, get (detailed) list of servers"""
419
        log.info("Getting simple and detailed list of servers")
420

    
421
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
422
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
423
        # Compute Client
424
        compute_url = \
425
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
426
        cls.compute_client = ComputeClient(compute_url, TOKEN)
427
        cls.compute_client.CONNECTION_RETRY_LIMIT = 2
428
        cls.servers = cls.compute_client.list_servers()
429
        cls.dservers = cls.compute_client.list_servers(detail=True)
430
        cls.result_dict = dict()
431

    
432
    # def test_001_list_servers(self):
433
    #     """Test server list actually returns servers"""
434
    #     self.assertGreater(len(self.servers), 0)
435

    
436
    def test_002_list_servers_detailed(self):
437
        """Test detailed server list is the same length as list"""
438
        self.assertEqual(len(self.dservers), len(self.servers))
439

    
440
    def test_003_same_server_names(self):
441
        """Test detailed and simple flavor list contain same names"""
442
        names = sorted(map(lambda x: x["name"], self.servers))
443
        dnames = sorted(map(lambda x: x["name"], self.dservers))
444
        self.assertEqual(names, dnames)
445

    
446

    
447
# --------------------------------------------------------------------
448
# Pithos Test Cases
449
class PithosTestCase(unittest.TestCase):
450
    """Test pithos functionality"""
451
    @classmethod
452
    def setUpClass(cls):
453
        """Initialize kamaki, get list of containers"""
454
        # Get uniq user id
455
        cls.uuid = _get_user_id()
456
        log.info("Uniq user id = %s" % cls.uuid)
457
        log.info("Getting list of containers")
458

    
459
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
460
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
461
        # Pithos Client
462
        pithos_url = cls.astakos_client.\
463
            get_service_endpoints('object-store')['publicURL']
464
        cls.pithos_client = PithosClient(pithos_url, TOKEN, cls.uuid)
465
        cls.pithos_client.CONNECTION_RETRY_LIMIT = 2
466

    
467
        cls.containers = cls.pithos_client.list_containers()
468
        cls.result_dict = dict()
469

    
470
    def test_001_list_containers(self):
471
        """Test container list actually returns containers"""
472
        self.assertGreater(len(self.containers), 0)
473

    
474
    def test_002_unique_containers(self):
475
        """Test if containers have unique names"""
476
        names = [n['name'] for n in self.containers]
477
        names = sorted(names)
478
        self.assertEqual(sorted(list(set(names))), names)
479

    
480
    def test_003_create_container(self):
481
        """Test create a container"""
482
        rand_num = randint(1000, 9999)
483
        rand_name = "%s%s" % (SNF_TEST_PREFIX, rand_num)
484
        names = [n['name'] for n in self.containers]
485
        while rand_name in names:
486
            rand_num = randint(1000, 9999)
487
            rand_name = "%s%s" % (SNF_TEST_PREFIX, rand_num)
488
        # Create container
489
        self.pithos_client.container = rand_name
490
        self.pithos_client.container_put()
491
        # Get list of containers
492
        new_containers = self.pithos_client.list_containers()
493
        new_container_names = [n['name'] for n in new_containers]
494
        self.assertIn(rand_name, new_container_names)
495

    
496
    def test_004_upload(self):
497
        """Test uploading something to pithos+"""
498
        # Create a tmp file
499
        with tempfile.TemporaryFile() as f:
500
            f.write("This is a temp file")
501
            f.seek(0, 0)
502
            # Where to save file
503
            self.pithos_client.upload_object("test.txt", f)
504

    
505
    def test_005_download(self):
506
        """Test download something from pithos+"""
507
        # Create tmp directory to save file
508
        tmp_dir = tempfile.mkdtemp()
509
        tmp_file = os.path.join(tmp_dir, "test.txt")
510
        with open(tmp_file, "wb+") as f:
511
            self.pithos_client.download_object("test.txt", f)
512
            # Read file
513
            f.seek(0, 0)
514
            content = f.read()
515
        # Remove files
516
        os.unlink(tmp_file)
517
        os.rmdir(tmp_dir)
518
        # Compare results
519
        self.assertEqual(content, "This is a temp file")
520

    
521
    def test_006_remove(self):
522
        """Test removing files and containers"""
523
        cont_name = self.pithos_client.container
524
        self.pithos_client.del_object("test.txt")
525
        self.pithos_client.purge_container()
526
        # List containers
527
        containers = self.pithos_client.list_containers()
528
        cont_names = [n['name'] for n in containers]
529
        self.assertNotIn(cont_name, cont_names)
530

    
531

    
532
# --------------------------------------------------------------------
533
# This class gets replicated into actual TestCases dynamically
534
class SpawnServerTestCase(unittest.TestCase):
535
    """Test scenario for server of the specified image"""
536
    @classmethod
537
    def setUpClass(cls):
538
        """Initialize a kamaki instance"""
539
        log.info("Spawning server for image `%s'" % cls.imagename)
540

    
541
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
542
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
543
        # Cyclades Client
544
        compute_url = \
545
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
546
        cls.cyclades_client = CycladesClient(compute_url, TOKEN)
547
        cls.cyclades_client.CONNECTION_RETRY_LIMIT = 2
548

    
549
        cls.result_dict = dict()
550

    
551
    def _get_ipv4(self, server):
552
        """Get the public IPv4 of a server from the detailed server info"""
553

    
554
        nics = server["attachments"]
555

    
556
        for nic in nics:
557
            net_id = nic["network_id"]
558
            if self.cyclades_client.get_network_details(net_id)["public"]:
559
                public_addrs = nic["ipv4"]
560

    
561
        self.assertTrue(public_addrs is not None)
562

    
563
        return public_addrs
564

    
565
    def _get_ipv6(self, server):
566
        """Get the public IPv6 of a server from the detailed server info"""
567

    
568
        nics = server["attachments"]
569

    
570
        for nic in nics:
571
            net_id = nic["network_id"]
572
            if self.cyclades_client.get_network_details(net_id)["public"]:
573
                public_addrs = nic["ipv6"]
574

    
575
        self.assertTrue(public_addrs is not None)
576

    
577
        return public_addrs
578

    
579
    def _connect_loginname(self, os_value):
580
        """Return the login name for connections based on the server OS"""
581
        if os_value in ("Ubuntu", "Kubuntu", "Fedora"):
582
            return "user"
583
        elif os_value in ("windows", "windows_alpha1"):
584
            return "Administrator"
585
        else:
586
            return "root"
587

    
588
    def _verify_server_status(self, current_status, new_status):
589
        """Verify a server has switched to a specified status"""
590
        server = self.cyclades_client.get_server_details(self.serverid)
591
        if server["status"] not in (current_status, new_status):
592
            return None  # Do not raise exception, return so the test fails
593
        self.assertEquals(server["status"], new_status)
594

    
595
    def _get_connected_tcp_socket(self, family, host, port):
596
        """Get a connected socket from the specified family to host:port"""
597
        sock = None
598
        for res in \
599
            socket.getaddrinfo(host, port, family, socket.SOCK_STREAM, 0,
600
                               socket.AI_PASSIVE):
601
            af, socktype, proto, canonname, sa = res
602
            try:
603
                sock = socket.socket(af, socktype, proto)
604
            except socket.error:
605
                sock = None
606
                continue
607
            try:
608
                sock.connect(sa)
609
            except socket.error:
610
                sock.close()
611
                sock = None
612
                continue
613
        self.assertIsNotNone(sock)
614
        return sock
615

    
616
    def _ping_once(self, ipv6, ip):
617
        """Test server responds to a single IPv4 or IPv6 ping"""
618
        cmd = "ping%s -c 7 -w 20 %s" % ("6" if ipv6 else "", ip)
619
        ping = subprocess.Popen(cmd, shell=True,
620
                                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
621
        (stdout, stderr) = ping.communicate()
622
        ret = ping.wait()
623
        self.assertEquals(ret, 0)
624

    
625
    def _get_hostname_over_ssh(self, hostip, username, password):
626
        lines, status = _ssh_execute(
627
            hostip, username, password, "hostname")
628
        self.assertEqual(len(lines), 1)
629
        return lines[0]
630

    
631
    def _try_until_timeout_expires(self, warn_timeout, fail_timeout,
632
                                   opmsg, callable, *args, **kwargs):
633
        if warn_timeout == fail_timeout:
634
            warn_timeout = fail_timeout + 1
635
        warn_tmout = time.time() + warn_timeout
636
        fail_tmout = time.time() + fail_timeout
637
        while True:
638
            self.assertLess(time.time(), fail_tmout,
639
                            "operation `%s' timed out" % opmsg)
640
            if time.time() > warn_tmout:
641
                log.warning("Server %d: `%s' operation `%s' not done yet",
642
                            self.serverid, self.servername, opmsg)
643
            try:
644
                log.info("%s... " % opmsg)
645
                return callable(*args, **kwargs)
646
            except AssertionError:
647
                pass
648
            time.sleep(self.query_interval)
649

    
650
    def _insist_on_tcp_connection(self, family, host, port):
651
        familystr = {socket.AF_INET: "IPv4", socket.AF_INET6: "IPv6",
652
                     socket.AF_UNSPEC: "Unspecified-IPv4/6"}
653
        msg = "connect over %s to %s:%s" % \
654
              (familystr.get(family, "Unknown"), host, port)
655
        sock = self._try_until_timeout_expires(
656
            self.action_timeout, self.action_timeout,
657
            msg, self._get_connected_tcp_socket,
658
            family, host, port)
659
        return sock
660

    
661
    def _insist_on_status_transition(self, current_status, new_status,
662
                                     fail_timeout, warn_timeout=None):
663
        msg = "Server %d: `%s', waiting for %s -> %s" % \
664
              (self.serverid, self.servername, current_status, new_status)
665
        if warn_timeout is None:
666
            warn_timeout = fail_timeout
667
        self._try_until_timeout_expires(warn_timeout, fail_timeout,
668
                                        msg, self._verify_server_status,
669
                                        current_status, new_status)
670
        # Ensure the status is actually the expected one
671
        server = self.cyclades_client.get_server_details(self.serverid)
672
        self.assertEquals(server["status"], new_status)
673

    
674
    def _insist_on_ssh_hostname(self, hostip, username, password):
675
        msg = "SSH to %s, as %s/%s" % (hostip, username, password)
676
        hostname = self._try_until_timeout_expires(
677
            self.action_timeout, self.action_timeout,
678
            msg, self._get_hostname_over_ssh,
679
            hostip, username, password)
680

    
681
        # The hostname must be of the form 'prefix-id'
682
        self.assertTrue(hostname.endswith("-%d\n" % self.serverid))
683

    
684
    def _check_file_through_ssh(self, hostip, username, password,
685
                                remotepath, content):
686
        msg = "Trying file injection through SSH to %s, as %s/%s" % \
687
            (hostip, username, password)
688
        log.info(msg)
689
        try:
690
            ssh = paramiko.SSHClient()
691
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
692
            ssh.connect(hostip, username=username, password=password)
693
            ssh.close()
694
        except socket.error, err:
695
            raise AssertionError(err)
696

    
697
        transport = paramiko.Transport((hostip, 22))
698
        transport.connect(username=username, password=password)
699

    
700
        localpath = '/tmp/' + SNF_TEST_PREFIX + 'injection'
701
        sftp = paramiko.SFTPClient.from_transport(transport)
702
        sftp.get(remotepath, localpath)
703
        sftp.close()
704
        transport.close()
705

    
706
        f = open(localpath)
707
        remote_content = b64encode(f.read())
708

    
709
        # Check if files are the same
710
        return (remote_content == content)
711

    
712
    def _skipIf(self, condition, msg):
713
        if condition:
714
            self.skipTest(msg)
715

    
716
    def test_001_submit_create_server(self):
717
        """Test submit create server request"""
718

    
719
        log.info("Submit new server request")
720
        server = self.cyclades_client.create_server(
721
            self.servername, self.flavorid, self.imageid, self.personality)
722

    
723
        log.info("Server id: " + str(server["id"]))
724
        log.info("Server password: " + server["adminPass"])
725
        self.assertEqual(server["name"], self.servername)
726
        self.assertEqual(server["flavor"]["id"], self.flavorid)
727
        self.assertEqual(server["image"]["id"], self.imageid)
728
        self.assertEqual(server["status"], "BUILD")
729

    
730
        # Update class attributes to reflect data on building server
731
        cls = type(self)
732
        cls.serverid = server["id"]
733
        cls.username = None
734
        cls.passwd = server["adminPass"]
735

    
736
        self.result_dict["Server ID"] = str(server["id"])
737
        self.result_dict["Password"] = str(server["adminPass"])
738

    
739
    def test_002a_server_is_building_in_list(self):
740
        """Test server is in BUILD state, in server list"""
741
        log.info("Server in BUILD state in server list")
742

    
743
        self.result_dict.clear()
744

    
745
        servers = self.cyclades_client.list_servers(detail=True)
746
        servers = filter(lambda x: x["name"] == self.servername, servers)
747

    
748
        server = servers[0]
749
        self.assertEqual(server["name"], self.servername)
750
        self.assertEqual(server["flavor"]["id"], self.flavorid)
751
        self.assertEqual(server["image"]["id"], self.imageid)
752
        self.assertEqual(server["status"], "BUILD")
753

    
754
    def test_002b_server_is_building_in_details(self):
755
        """Test server is in BUILD state, in details"""
756

    
757
        log.info("Server in BUILD state in details")
758

    
759
        server = self.cyclades_client.get_server_details(self.serverid)
760
        self.assertEqual(server["name"], self.servername)
761
        self.assertEqual(server["flavor"]["id"], self.flavorid)
762
        self.assertEqual(server["image"]["id"], self.imageid)
763
        self.assertEqual(server["status"], "BUILD")
764

    
765
    def test_002c_set_server_metadata(self):
766

    
767
        log.info("Creating server metadata")
768

    
769
        image = self.cyclades_client.get_image_details(self.imageid)
770
        os_value = image["metadata"]["os"]
771
        users = image["metadata"].get("users", None)
772
        self.cyclades_client.update_server_metadata(self.serverid, OS=os_value)
773

    
774
        userlist = users.split()
775

    
776
        # Determine the username to use for future connections
777
        # to this host
778
        cls = type(self)
779

    
780
        if "root" in userlist:
781
            cls.username = "root"
782
        elif users is None:
783
            cls.username = self._connect_loginname(os_value)
784
        else:
785
            cls.username = choice(userlist)
786

    
787
        self.assertIsNotNone(cls.username)
788

    
789
    def test_002d_verify_server_metadata(self):
790
        """Test server metadata keys are set based on image metadata"""
791

    
792
        log.info("Verifying image metadata")
793

    
794
        servermeta = self.cyclades_client.get_server_metadata(self.serverid)
795
        imagemeta = self.cyclades_client.get_image_metadata(self.imageid)
796

    
797
        self.assertEqual(servermeta["OS"], imagemeta["os"])
798

    
799
    def test_003_server_becomes_active(self):
800
        """Test server becomes ACTIVE"""
801

    
802
        log.info("Waiting for server to become ACTIVE")
803

    
804
        self._insist_on_status_transition(
805
            "BUILD", "ACTIVE", self.build_fail, self.build_warning)
806

    
807
    def test_003a_get_server_oob_console(self):
808
        """Test getting OOB server console over VNC
809

810
        Implementation of RFB protocol follows
811
        http://www.realvnc.com/docs/rfbproto.pdf.
812

813
        """
814
        console = self.cyclades_client.get_server_console(self.serverid)
815
        self.assertEquals(console['type'], "vnc")
816
        sock = self._insist_on_tcp_connection(
817
            socket.AF_INET, console["host"], console["port"])
818

    
819
        # Step 1. ProtocolVersion message (par. 6.1.1)
820
        version = sock.recv(1024)
821
        self.assertEquals(version, 'RFB 003.008\n')
822
        sock.send(version)
823

    
824
        # Step 2. Security (par 6.1.2): Only VNC Authentication supported
825
        sec = sock.recv(1024)
826
        self.assertEquals(list(sec), ['\x01', '\x02'])
827

    
828
        # Step 3. Request VNC Authentication (par 6.1.2)
829
        sock.send('\x02')
830

    
831
        # Step 4. Receive Challenge (par 6.2.2)
832
        challenge = sock.recv(1024)
833
        self.assertEquals(len(challenge), 16)
834

    
835
        # Step 5. DES-Encrypt challenge, use password as key (par 6.2.2)
836
        response = d3des_generate_response(
837
            (console["password"] + '\0' * 8)[:8], challenge)
838
        sock.send(response)
839

    
840
        # Step 6. SecurityResult (par 6.1.3)
841
        result = sock.recv(4)
842
        self.assertEquals(list(result), ['\x00', '\x00', '\x00', '\x00'])
843
        sock.close()
844

    
845
    def test_004_server_has_ipv4(self):
846
        """Test active server has a valid IPv4 address"""
847

    
848
        log.info("Validate server's IPv4")
849

    
850
        server = self.cyclades_client.get_server_details(self.serverid)
851
        ipv4 = self._get_ipv4(server)
852

    
853
        self.result_dict.clear()
854
        self.result_dict["IPv4"] = str(ipv4)
855

    
856
        self.assertEquals(IP(ipv4).version(), 4)
857

    
858
    def test_005_server_has_ipv6(self):
859
        """Test active server has a valid IPv6 address"""
860
        self._skipIf(NO_IPV6, "--no-ipv6 flag enabled")
861

    
862
        log.info("Validate server's IPv6")
863

    
864
        server = self.cyclades_client.get_server_details(self.serverid)
865
        ipv6 = self._get_ipv6(server)
866

    
867
        self.result_dict.clear()
868
        self.result_dict["IPv6"] = str(ipv6)
869

    
870
        self.assertEquals(IP(ipv6).version(), 6)
871

    
872
    def test_006_server_responds_to_ping_IPv4(self):
873
        """Test server responds to ping on IPv4 address"""
874

    
875
        log.info("Testing if server responds to pings in IPv4")
876
        self.result_dict.clear()
877

    
878
        server = self.cyclades_client.get_server_details(self.serverid)
879
        ip = self._get_ipv4(server)
880
        self._try_until_timeout_expires(self.action_timeout,
881
                                        self.action_timeout,
882
                                        "PING IPv4 to %s" % ip,
883
                                        self._ping_once,
884
                                        False, ip)
885

    
886
    def test_007_server_responds_to_ping_IPv6(self):
887
        """Test server responds to ping on IPv6 address"""
888
        self._skipIf(NO_IPV6, "--no-ipv6 flag enabled")
889
        log.info("Testing if server responds to pings in IPv6")
890

    
891
        server = self.cyclades_client.get_server_details(self.serverid)
892
        ip = self._get_ipv6(server)
893
        self._try_until_timeout_expires(self.action_timeout,
894
                                        self.action_timeout,
895
                                        "PING IPv6 to %s" % ip,
896
                                        self._ping_once,
897
                                        True, ip)
898

    
899
    def test_008_submit_shutdown_request(self):
900
        """Test submit request to shutdown server"""
901

    
902
        log.info("Shutting down server")
903

    
904
        self.cyclades_client.shutdown_server(self.serverid)
905

    
906
    def test_009_server_becomes_stopped(self):
907
        """Test server becomes STOPPED"""
908

    
909
        log.info("Waiting until server becomes STOPPED")
910
        self._insist_on_status_transition(
911
            "ACTIVE", "STOPPED", self.action_timeout, self.action_timeout)
912

    
913
    def test_010_submit_start_request(self):
914
        """Test submit start server request"""
915

    
916
        log.info("Starting server")
917

    
918
        self.cyclades_client.start_server(self.serverid)
919

    
920
    def test_011_server_becomes_active(self):
921
        """Test server becomes ACTIVE again"""
922

    
923
        log.info("Waiting until server becomes ACTIVE")
924
        self._insist_on_status_transition(
925
            "STOPPED", "ACTIVE", self.action_timeout, self.action_timeout)
926

    
927
    def test_011a_server_responds_to_ping_IPv4(self):
928
        """Test server OS is actually up and running again"""
929

    
930
        log.info("Testing if server is actually up and running")
931

    
932
        self.test_006_server_responds_to_ping_IPv4()
933

    
934
    def test_012_ssh_to_server_IPv4(self):
935
        """Test SSH to server public IPv4 works, verify hostname"""
936

    
937
        self._skipIf(self.is_windows, "only valid for Linux servers")
938
        server = self.cyclades_client.get_server_details(self.serverid)
939
        self._insist_on_ssh_hostname(self._get_ipv4(server),
940
                                     self.username, self.passwd)
941

    
942
    def test_013_ssh_to_server_IPv6(self):
943
        """Test SSH to server public IPv6 works, verify hostname"""
944
        self._skipIf(self.is_windows, "only valid for Linux servers")
945
        self._skipIf(NO_IPV6, "--no-ipv6 flag enabled")
946

    
947
        server = self.cyclades_client.get_server_details(self.serverid)
948
        self._insist_on_ssh_hostname(self._get_ipv6(server),
949
                                     self.username, self.passwd)
950

    
951
    def test_014_rdp_to_server_IPv4(self):
952
        "Test RDP connection to server public IPv4 works"""
953
        self._skipIf(not self.is_windows, "only valid for Windows servers")
954
        server = self.cyclades_client.get_server_details(self.serverid)
955
        ipv4 = self._get_ipv4(server)
956
        sock = self._insist_on_tcp_connection(socket.AF_INET, ipv4, 3389)
957

    
958
        # No actual RDP processing done. We assume the RDP server is there
959
        # if the connection to the RDP port is successful.
960
        # FIXME: Use rdesktop, analyze exit code? see manpage [costasd]
961
        sock.close()
962

    
963
    def test_015_rdp_to_server_IPv6(self):
964
        "Test RDP connection to server public IPv6 works"""
965
        self._skipIf(not self.is_windows, "only valid for Windows servers")
966
        self._skipIf(NO_IPV6, "--no-ipv6 flag enabled")
967

    
968
        server = self.cyclades_client.get_server_details(self.serverid)
969
        ipv6 = self._get_ipv6(server)
970
        sock = self._get_tcp_connection(socket.AF_INET6, ipv6, 3389)
971

    
972
        # No actual RDP processing done. We assume the RDP server is there
973
        # if the connection to the RDP port is successful.
974
        sock.close()
975

    
976
    def test_016_personality_is_enforced(self):
977
        """Test file injection for personality enforcement"""
978
        self._skipIf(self.is_windows, "only implemented for Linux servers")
979
        self._skipIf(self.personality is None, "No personality file selected")
980

    
981
        log.info("Trying to inject file for personality enforcement")
982

    
983
        server = self.cyclades_client.get_server_details(self.serverid)
984

    
985
        for inj_file in self.personality:
986
            equal_files = self._check_file_through_ssh(self._get_ipv4(server),
987
                                                       inj_file['owner'],
988
                                                       self.passwd,
989
                                                       inj_file['path'],
990
                                                       inj_file['contents'])
991
            self.assertTrue(equal_files)
992

    
993
    def test_017_submit_delete_request(self):
994
        """Test submit request to delete server"""
995

    
996
        log.info("Deleting server")
997

    
998
        self.cyclades_client.delete_server(self.serverid)
999

    
1000
    def test_018_server_becomes_deleted(self):
1001
        """Test server becomes DELETED"""
1002

    
1003
        log.info("Testing if server becomes DELETED")
1004

    
1005
        self._insist_on_status_transition(
1006
            "ACTIVE", "DELETED", self.action_timeout, self.action_timeout)
1007

    
1008
    def test_019_server_no_longer_in_server_list(self):
1009
        """Test server is no longer in server list"""
1010

    
1011
        log.info("Test if server is no longer listed")
1012

    
1013
        servers = self.cyclades_client.list_servers()
1014
        self.assertNotIn(self.serverid, [s["id"] for s in servers])
1015

    
1016

    
1017
class NetworkTestCase(unittest.TestCase):
1018
    """ Testing networking in cyclades """
1019

    
1020
    @classmethod
1021
    def setUpClass(cls):
1022
        "Initialize kamaki, get list of current networks"
1023

    
1024
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
1025
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
1026
        # Cyclades Client
1027
        compute_url = \
1028
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
1029
        cls.cyclades_client = CycladesClient(compute_url, TOKEN)
1030
        cls.cyclades_client.CONNECTION_RETRY_LIMIT = 2
1031

    
1032
        cls.servername = "%s%s for %s" % (SNF_TEST_PREFIX,
1033
                                          TEST_RUN_ID,
1034
                                          cls.imagename)
1035

    
1036
        #Dictionary initialization for the vms credentials
1037
        cls.serverid = dict()
1038
        cls.username = dict()
1039
        cls.password = dict()
1040
        cls.is_windows = cls.imagename.lower().find("windows") >= 0
1041

    
1042
        cls.result_dict = dict()
1043

    
1044
    def _skipIf(self, condition, msg):
1045
        if condition:
1046
            self.skipTest(msg)
1047

    
1048
    def _get_ipv4(self, server):
1049
        """Get the public IPv4 of a server from the detailed server info"""
1050

    
1051
        nics = server["attachments"]
1052

    
1053
        for nic in nics:
1054
            net_id = nic["network_id"]
1055
            if self.cyclades_client.get_network_details(net_id)["public"]:
1056
                public_addrs = nic["ipv4"]
1057

    
1058
        self.assertTrue(public_addrs is not None)
1059

    
1060
        return public_addrs
1061

    
1062
    def _connect_loginname(self, os_value):
1063
        """Return the login name for connections based on the server OS"""
1064
        if os_value in ("Ubuntu", "Kubuntu", "Fedora"):
1065
            return "user"
1066
        elif os_value in ("windows", "windows_alpha1"):
1067
            return "Administrator"
1068
        else:
1069
            return "root"
1070

    
1071
    def _ping_once(self, ip):
1072

    
1073
        """Test server responds to a single IPv4 or IPv6 ping"""
1074
        cmd = "ping -c 7 -w 20 %s" % (ip)
1075
        ping = subprocess.Popen(cmd, shell=True,
1076
                                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1077
        (stdout, stderr) = ping.communicate()
1078
        ret = ping.wait()
1079

    
1080
        return (ret == 0)
1081

    
1082
    def test_00001a_submit_create_server_A(self):
1083
        """Test submit create server request"""
1084

    
1085
        log.info("Creating test server A")
1086

    
1087
        serverA = self.cyclades_client.create_server(
1088
            self.servername, self.flavorid, self.imageid, personality=None)
1089

    
1090
        self.assertEqual(serverA["name"], self.servername)
1091
        self.assertEqual(serverA["flavor"]["id"], self.flavorid)
1092
        self.assertEqual(serverA["image"]["id"], self.imageid)
1093
        self.assertEqual(serverA["status"], "BUILD")
1094

    
1095
        # Update class attributes to reflect data on building server
1096
        self.serverid['A'] = serverA["id"]
1097
        self.username['A'] = None
1098
        self.password['A'] = serverA["adminPass"]
1099

    
1100
        log.info("Server A id:" + str(serverA["id"]))
1101
        log.info("Server password " + (self.password['A']))
1102

    
1103
        self.result_dict["Server A ID"] = str(serverA["id"])
1104
        self.result_dict["Server A password"] = serverA["adminPass"]
1105

    
1106
    def test_00001b_serverA_becomes_active(self):
1107
        """Test server becomes ACTIVE"""
1108

    
1109
        log.info("Waiting until test server A becomes ACTIVE")
1110
        self.result_dict.clear()
1111

    
1112
        fail_tmout = time.time() + self.action_timeout
1113
        while True:
1114
            d = self.cyclades_client.get_server_details(self.serverid['A'])
1115
            status = d['status']
1116
            if status == 'ACTIVE':
1117
                active = True
1118
                break
1119
            elif time.time() > fail_tmout:
1120
                self.assertLess(time.time(), fail_tmout)
1121
            else:
1122
                time.sleep(self.query_interval)
1123

    
1124
        self.assertTrue(active)
1125

    
1126
    def test_00002a_submit_create_server_B(self):
1127
        """Test submit create server request"""
1128

    
1129
        log.info("Creating test server B")
1130

    
1131
        serverB = self.cyclades_client.create_server(
1132
            self.servername, self.flavorid, self.imageid, personality=None)
1133

    
1134
        self.assertEqual(serverB["name"], self.servername)
1135
        self.assertEqual(serverB["flavor"]["id"], self.flavorid)
1136
        self.assertEqual(serverB["image"]["id"], self.imageid)
1137
        self.assertEqual(serverB["status"], "BUILD")
1138

    
1139
        # Update class attributes to reflect data on building server
1140
        self.serverid['B'] = serverB["id"]
1141
        self.username['B'] = None
1142
        self.password['B'] = serverB["adminPass"]
1143

    
1144
        log.info("Server B id: " + str(serverB["id"]))
1145
        log.info("Password " + (self.password['B']))
1146

    
1147
        self.result_dict.clear()
1148
        self.result_dict["Server B ID"] = str(serverB["id"])
1149
        self.result_dict["Server B password"] = serverB["adminPass"]
1150

    
1151
    def test_00002b_serverB_becomes_active(self):
1152
        """Test server becomes ACTIVE"""
1153

    
1154
        log.info("Waiting until test server B becomes ACTIVE")
1155
        self.result_dict.clear()
1156

    
1157
        fail_tmout = time.time() + self.action_timeout
1158
        while True:
1159
            d = self.cyclades_client.get_server_details(self.serverid['B'])
1160
            status = d['status']
1161
            if status == 'ACTIVE':
1162
                active = True
1163
                break
1164
            elif time.time() > fail_tmout:
1165
                self.assertLess(time.time(), fail_tmout)
1166
            else:
1167
                time.sleep(self.query_interval)
1168

    
1169
        self.assertTrue(active)
1170

    
1171
    def test_001_create_network(self):
1172
        """Test submit create network request"""
1173

    
1174
        log.info("Submit new network request")
1175
        self.result_dict.clear()
1176

    
1177
        name = SNF_TEST_PREFIX + TEST_RUN_ID
1178
        #previous_num = len(self.client.list_networks())
1179
        network = self.cyclades_client.create_network(
1180
            name, cidr='10.0.1.0/28', dhcp=True)
1181

    
1182
        #Test if right name is assigned
1183
        self.assertEqual(network['name'], name)
1184

    
1185
        # Update class attributes
1186
        cls = type(self)
1187
        cls.networkid = network['id']
1188
        #networks = self.client.list_networks()
1189

    
1190
        fail_tmout = time.time() + self.action_timeout
1191

    
1192
        #Test if new network is created
1193
        while True:
1194
            d = self.cyclades_client.get_network_details(network['id'])
1195
            if d['status'] == 'ACTIVE':
1196
                connected = True
1197
                break
1198
            elif time.time() > fail_tmout:
1199
                self.assertLess(time.time(), fail_tmout)
1200
            else:
1201
                log.info("Waiting for network to become ACTIVE")
1202
                time.sleep(self.query_interval)
1203

    
1204
        self.assertTrue(connected)
1205

    
1206
        self.result_dict["Private network ID"] = str(network['id'])
1207

    
1208
    def test_002_connect_to_network(self):
1209
        """Test connect VMs to network"""
1210

    
1211
        log.info("Connect VMs to private network")
1212
        self.result_dict.clear()
1213

    
1214
        self.cyclades_client.connect_server(self.serverid['A'], self.networkid)
1215
        self.cyclades_client.connect_server(self.serverid['B'], self.networkid)
1216

    
1217
        #Insist on connecting until action timeout
1218
        fail_tmout = time.time() + self.action_timeout
1219

    
1220
        while True:
1221

    
1222
            netsA = [x['network_id']
1223
                     for x in self.cyclades_client.get_server_details(
1224
                         self.serverid['A'])['attachments']]
1225
            netsB = [x['network_id']
1226
                     for x in self.cyclades_client.get_server_details(
1227
                         self.serverid['B'])['attachments']]
1228

    
1229
            if (self.networkid in netsA) and (self.networkid in netsB):
1230
                conn_exists = True
1231
                break
1232
            elif time.time() > fail_tmout:
1233
                self.assertLess(time.time(), fail_tmout)
1234
            else:
1235
                time.sleep(self.query_interval)
1236

    
1237
        #Adding private IPs to class attributes
1238
        cls = type(self)
1239
        cls.priv_ip = dict()
1240

    
1241
        nicsA = self.cyclades_client.get_server_details(
1242
            self.serverid['A'])['attachments']
1243
        nicsB = self.cyclades_client.get_server_details(
1244
            self.serverid['B'])['attachments']
1245

    
1246
        if conn_exists:
1247
            for nic in nicsA:
1248
                if nic["network_id"] == self.networkid:
1249
                    cls.priv_ip["A"] = nic["ipv4"]
1250
            self.result_dict["Server A private IP"] = str(cls.priv_ip["A"])
1251

    
1252
            for nic in nicsB:
1253
                if nic["network_id"] == self.networkid:
1254
                    cls.priv_ip["B"] = nic["ipv4"]
1255
            self.result_dict["Server B private IP"] = str(cls.priv_ip["B"])
1256

    
1257
        self.assertTrue(conn_exists)
1258
        self.assertIsNot(cls.priv_ip["A"], None)
1259
        self.assertIsNot(cls.priv_ip["B"], None)
1260

    
1261
    def test_002a_reboot(self):
1262
        """Rebooting server A"""
1263

    
1264
        log.info("Rebooting server A")
1265

    
1266
        self.cyclades_client.shutdown_server(self.serverid['A'])
1267

    
1268
        fail_tmout = time.time() + self.action_timeout
1269
        while True:
1270
            d = self.cyclades_client.get_server_details(self.serverid['A'])
1271
            status = d['status']
1272
            if status == 'STOPPED':
1273
                break
1274
            elif time.time() > fail_tmout:
1275
                self.assertLess(time.time(), fail_tmout)
1276
            else:
1277
                time.sleep(self.query_interval)
1278

    
1279
        self.cyclades_client.start_server(self.serverid['A'])
1280

    
1281
        while True:
1282
            d = self.cyclades_client.get_server_details(self.serverid['A'])
1283
            status = d['status']
1284
            if status == 'ACTIVE':
1285
                active = True
1286
                break
1287
            elif time.time() > fail_tmout:
1288
                self.assertLess(time.time(), fail_tmout)
1289
            else:
1290
                time.sleep(self.query_interval)
1291

    
1292
        self.assertTrue(active)
1293

    
1294
    def test_002b_ping_server_A(self):
1295
        "Test if server A responds to IPv4 pings"
1296

    
1297
        log.info("Testing if server A responds to IPv4 pings ")
1298
        self.result_dict.clear()
1299

    
1300
        server = self.cyclades_client.get_server_details(self.serverid['A'])
1301
        ip = self._get_ipv4(server)
1302

    
1303
        fail_tmout = time.time() + self.action_timeout
1304

    
1305
        s = False
1306

    
1307
        self.result_dict["Server A public IP"] = str(ip)
1308

    
1309
        while True:
1310

    
1311
            if self._ping_once(ip):
1312
                s = True
1313
                break
1314

    
1315
            elif time.time() > fail_tmout:
1316
                self.assertLess(time.time(), fail_tmout)
1317

    
1318
            else:
1319
                time.sleep(self.query_interval)
1320

    
1321
        self.assertTrue(s)
1322

    
1323
    def test_002c_reboot(self):
1324
        """Reboot server B"""
1325

    
1326
        log.info("Rebooting server B")
1327
        self.result_dict.clear()
1328

    
1329
        self.cyclades_client.shutdown_server(self.serverid['B'])
1330

    
1331
        fail_tmout = time.time() + self.action_timeout
1332
        while True:
1333
            d = self.cyclades_client.get_server_details(self.serverid['B'])
1334
            status = d['status']
1335
            if status == 'STOPPED':
1336
                break
1337
            elif time.time() > fail_tmout:
1338
                self.assertLess(time.time(), fail_tmout)
1339
            else:
1340
                time.sleep(self.query_interval)
1341

    
1342
        self.cyclades_client.start_server(self.serverid['B'])
1343

    
1344
        while True:
1345
            d = self.cyclades_client.get_server_details(self.serverid['B'])
1346
            status = d['status']
1347
            if status == 'ACTIVE':
1348
                active = True
1349
                break
1350
            elif time.time() > fail_tmout:
1351
                self.assertLess(time.time(), fail_tmout)
1352
            else:
1353
                time.sleep(self.query_interval)
1354

    
1355
        self.assertTrue(active)
1356

    
1357
    def test_002d_ping_server_B(self):
1358
        """Test if server B responds to IPv4 pings"""
1359

    
1360
        log.info("Testing if server B responds to IPv4 pings")
1361
        self.result_dict.clear()
1362

    
1363
        server = self.cyclades_client.get_server_details(self.serverid['B'])
1364
        ip = self._get_ipv4(server)
1365

    
1366
        fail_tmout = time.time() + self.action_timeout
1367

    
1368
        s = False
1369

    
1370
        self.result_dict["Server B public IP"] = str(ip)
1371

    
1372
        while True:
1373
            if self._ping_once(ip):
1374
                s = True
1375
                break
1376

    
1377
            elif time.time() > fail_tmout:
1378
                self.assertLess(time.time(), fail_tmout)
1379

    
1380
            else:
1381
                time.sleep(self.query_interval)
1382

    
1383
        self.assertTrue(s)
1384

    
1385
    def test_003a_setup_interface_A(self):
1386
        """Setup eth1 for server A"""
1387

    
1388
        self._skipIf(self.is_windows, "only valid for Linux servers")
1389

    
1390
        log.info("Setting up interface eth1 for server A")
1391
        self.result_dict.clear()
1392

    
1393
        server = self.cyclades_client.get_server_details(self.serverid['A'])
1394
        image = self.cyclades_client.get_image_details(self.imageid)
1395
        os_value = image['metadata']['os']
1396

    
1397
        users = image["metadata"].get("users", None)
1398
        userlist = users.split()
1399

    
1400
        if "root" in userlist:
1401
            loginname = "root"
1402
        elif users is None:
1403
            loginname = self._connect_loginname(os_value)
1404
        else:
1405
            loginname = choice(userlist)
1406

    
1407
        hostip = self._get_ipv4(server)
1408
        myPass = self.password['A']
1409

    
1410
        log.info("SSH in server A as %s/%s" % (loginname, myPass))
1411
        command = "ifconfig eth1 %s && ifconfig eth1 | " \
1412
                  "grep 'inet addr:' | cut -d: -f2 | awk '{ print $1}'" \
1413
                  % self.priv_ip["A"]
1414
        output, status = _ssh_execute(
1415
            hostip, loginname, myPass, command)
1416

    
1417
        self.assertEquals(status, 0)
1418
        self.assertEquals(output[0].strip(), self.priv_ip["A"])
1419

    
1420
    def test_003b_setup_interface_B(self):
1421
        """Setup eth1 for server B"""
1422

    
1423
        self._skipIf(self.is_windows, "only valid for Linux servers")
1424

    
1425
        log.info("Setting up interface eth1 for server B")
1426

    
1427
        server = self.cyclades_client.get_server_details(self.serverid['B'])
1428
        image = self.cyclades_client.get_image_details(self.imageid)
1429
        os_value = image['metadata']['os']
1430

    
1431
        users = image["metadata"].get("users", None)
1432
        userlist = users.split()
1433

    
1434
        if "root" in userlist:
1435
            loginname = "root"
1436
        elif users is None:
1437
            loginname = self._connect_loginname(os_value)
1438
        else:
1439
            loginname = choice(userlist)
1440

    
1441
        hostip = self._get_ipv4(server)
1442
        myPass = self.password['B']
1443

    
1444
        log.info("SSH in server B as %s/%s" % (loginname, myPass))
1445
        command = "ifconfig eth1 %s && ifconfig eth1 | " \
1446
                  "grep 'inet addr:' | cut -d: -f2 | awk '{ print $1}'" \
1447
                  % self.priv_ip["B"]
1448
        output, status = _ssh_execute(
1449
            hostip, loginname, myPass, command)
1450

    
1451
        self.assertEquals(status, 0)
1452
        self.assertEquals(output[0].strip(), self.priv_ip["B"])
1453

    
1454
    def test_003c_test_connection_exists(self):
1455
        """Ping server B from server A to test if connection exists"""
1456

    
1457
        self._skipIf(self.is_windows, "only valid for Linux servers")
1458

    
1459
        log.info("Testing if server A is actually connected to server B")
1460

    
1461
        server = self.cyclades_client.get_server_details(self.serverid['A'])
1462
        image = self.cyclades_client.get_image_details(self.imageid)
1463
        os_value = image['metadata']['os']
1464
        hostip = self._get_ipv4(server)
1465

    
1466
        users = image["metadata"].get("users", None)
1467
        userlist = users.split()
1468

    
1469
        if "root" in userlist:
1470
            loginname = "root"
1471
        elif users is None:
1472
            loginname = self._connect_loginname(os_value)
1473
        else:
1474
            loginname = choice(userlist)
1475

    
1476
        myPass = self.password['A']
1477

    
1478
        cmd = "if ping -c 7 -w 20 %s >/dev/null; \
1479
               then echo \'True\'; fi;" % self.priv_ip["B"]
1480
        lines, status = _ssh_execute(
1481
            hostip, loginname, myPass, cmd)
1482

    
1483
        exists = False
1484

    
1485
        if 'True\n' in lines:
1486
            exists = True
1487

    
1488
        self.assertTrue(exists)
1489

    
1490
    def test_004_disconnect_from_network(self):
1491
        "Disconnecting server A and B from network"
1492

    
1493
        log.info("Disconnecting servers from private network")
1494

    
1495
        prev_state = self.cyclades_client.get_network_details(self.networkid)
1496
        prev_nics = prev_state['attachments']
1497
        #prev_conn = len(prev_nics)
1498

    
1499
        nicsA = [x['id']
1500
                 for x in self.cyclades_client.get_server_details(
1501
                     self.serverid['A'])['attachments']]
1502
        nicsB = [x['id']
1503
                 for x in self.cyclades_client.get_server_details(
1504
                     self.serverid['B'])['attachments']]
1505

    
1506
        for nic in prev_nics:
1507
            if nic in nicsA:
1508
                self.cyclades_client.disconnect_server(self.serverid['A'], nic)
1509
            if nic in nicsB:
1510
                self.cyclades_client.disconnect_server(self.serverid['B'], nic)
1511

    
1512
        #Insist on deleting until action timeout
1513
        fail_tmout = time.time() + self.action_timeout
1514

    
1515
        while True:
1516
            netsA = [x['network_id']
1517
                     for x in self.cyclades_client.get_server_details(
1518
                         self.serverid['A'])['attachments']]
1519
            netsB = [x['network_id']
1520
                     for x in self.cyclades_client.get_server_details(
1521
                         self.serverid['B'])['attachments']]
1522

    
1523
            #connected = (self.client.get_network_details(self.networkid))
1524
            #connections = connected['attachments']
1525
            if (self.networkid not in netsA) and (self.networkid not in netsB):
1526
                conn_exists = False
1527
                break
1528
            elif time.time() > fail_tmout:
1529
                self.assertLess(time.time(), fail_tmout)
1530
            else:
1531
                time.sleep(self.query_interval)
1532

    
1533
        self.assertFalse(conn_exists)
1534

    
1535
    def test_005_destroy_network(self):
1536
        """Test submit delete network request"""
1537

    
1538
        log.info("Submitting delete network request")
1539

    
1540
        self.cyclades_client.delete_network(self.networkid)
1541

    
1542
        fail_tmout = time.time() + self.action_timeout
1543

    
1544
        while True:
1545

    
1546
            curr_net = []
1547
            networks = self.cyclades_client.list_networks()
1548

    
1549
            for net in networks:
1550
                curr_net.append(net['id'])
1551

    
1552
            if self.networkid not in curr_net:
1553
                self.assertTrue(self.networkid not in curr_net)
1554
                break
1555

    
1556
            elif time.time() > fail_tmout:
1557
                self.assertLess(time.time(), fail_tmout)
1558

    
1559
            else:
1560
                time.sleep(self.query_interval)
1561

    
1562
    def test_006_cleanup_servers(self):
1563
        """Cleanup servers created for this test"""
1564

    
1565
        log.info("Delete servers created for this test")
1566

    
1567
        self.cyclades_client.delete_server(self.serverid['A'])
1568
        self.cyclades_client.delete_server(self.serverid['B'])
1569

    
1570
        fail_tmout = time.time() + self.action_timeout
1571

    
1572
        #Ensure server gets deleted
1573
        status = dict()
1574

    
1575
        while True:
1576
            details = \
1577
                self.cyclades_client.get_server_details(self.serverid['A'])
1578
            status['A'] = details['status']
1579
            details = \
1580
                self.cyclades_client.get_server_details(self.serverid['B'])
1581
            status['B'] = details['status']
1582
            if (status['A'] == 'DELETED') and (status['B'] == 'DELETED'):
1583
                deleted = True
1584
                break
1585
            elif time.time() > fail_tmout:
1586
                self.assertLess(time.time(), fail_tmout)
1587
            else:
1588
                time.sleep(self.query_interval)
1589

    
1590
        self.assertTrue(deleted)
1591

    
1592

    
1593
class TestRunnerProcess(Process):
1594
    """A distinct process used to execute part of the tests in parallel"""
1595
    def __init__(self, **kw):
1596
        Process.__init__(self, **kw)
1597
        kwargs = kw["kwargs"]
1598
        self.testq = kwargs["testq"]
1599
        self.worker_folder = kwargs["worker_folder"]
1600

    
1601
    def run(self):
1602
        # Make sure this test runner process dies with the parent
1603
        # and is not left behind.
1604
        #
1605
        # WARNING: This uses the prctl(2) call and is
1606
        # Linux-specific.
1607

    
1608
        prctl.set_pdeathsig(signal.SIGHUP)
1609

    
1610
        multi = logging.getLogger("multiprocess")
1611

    
1612
        while True:
1613
            multi.debug("I am process %d, GETting from queue is %s" %
1614
                        (os.getpid(), self.testq))
1615
            msg = self.testq.get()
1616

    
1617
            multi.debug("Dequeued msg: %s" % msg)
1618

    
1619
            if msg == "TEST_RUNNER_TERMINATE":
1620
                raise SystemExit
1621

    
1622
            elif issubclass(msg, unittest.TestCase):
1623
                # Assemble a TestSuite, and run it
1624

    
1625
                log_file = os.path.join(self.worker_folder, 'details_' +
1626
                                        (msg.__name__) + "_" +
1627
                                        TEST_RUN_ID + '.log')
1628

    
1629
                fail_file = os.path.join(self.worker_folder, 'failed_' +
1630
                                         (msg.__name__) + "_" +
1631
                                         TEST_RUN_ID + '.log')
1632
                error_file = os.path.join(self.worker_folder, 'error_' +
1633
                                          (msg.__name__) + "_" +
1634
                                          TEST_RUN_ID + '.log')
1635

    
1636
                f = open(log_file, 'w')
1637
                fail = open(fail_file, 'w')
1638
                error = open(error_file, 'w')
1639

    
1640
                log.info(yellow + '* Starting testcase: %s' % msg + normal)
1641

    
1642
                runner = unittest.TextTestRunner(
1643
                    f, verbosity=2, failfast=True,
1644
                    resultclass=BurninTestResult)
1645
                suite = unittest.TestLoader().loadTestsFromTestCase(msg)
1646
                result = runner.run(suite)
1647

    
1648
                for res in result.errors:
1649
                    log.error("snf-burnin encountered an error in "
1650
                              "testcase: %s" % msg)
1651
                    log.error("See log for details")
1652
                    error.write(str(res[0]) + '\n')
1653
                    error.write(str(res[0].shortDescription()) + '\n')
1654
                    error.write('\n')
1655

    
1656
                for res in result.failures:
1657
                    log.error("snf-burnin failed in testcase: %s" % msg)
1658
                    log.error("See log for details")
1659
                    fail.write(str(res[0]) + '\n')
1660
                    fail.write(str(res[0].shortDescription()) + '\n')
1661
                    fail.write('\n')
1662
                    if not NOFAILFAST:
1663
                        sys.exit()
1664

    
1665
                if (len(result.failures) == 0) and (len(result.errors) == 0):
1666
                    log.debug("Passed testcase: %s" % msg)
1667

    
1668
                f.close()
1669
                fail.close()
1670
                error.close()
1671

    
1672
            else:
1673
                raise Exception("Cannot handle msg: %s" % msg)
1674

    
1675

    
1676
def _run_cases_in_series(cases, image_folder):
1677
    """Run instances of TestCase in series"""
1678

    
1679
    for case in cases:
1680

    
1681
        test = case.__name__
1682

    
1683
        log.info(yellow + '* Starting testcase: %s' % test + normal)
1684
        log_file = os.path.join(image_folder, 'details_' +
1685
                                (case.__name__) + "_" +
1686
                                TEST_RUN_ID + '.log')
1687
        fail_file = os.path.join(image_folder, 'failed_' +
1688
                                 (case.__name__) + "_" +
1689
                                 TEST_RUN_ID + '.log')
1690
        error_file = os.path.join(image_folder, 'error_' +
1691
                                  (case.__name__) + "_" +
1692
                                  TEST_RUN_ID + '.log')
1693

    
1694
        f = open(log_file, "w")
1695
        fail = open(fail_file, "w")
1696
        error = open(error_file, "w")
1697

    
1698
        suite = unittest.TestLoader().loadTestsFromTestCase(case)
1699
        runner = unittest.TextTestRunner(
1700
            f, verbosity=2, failfast=True,
1701
            resultclass=BurninTestResult)
1702
        result = runner.run(suite)
1703

    
1704
        for res in result.errors:
1705
            log.error("snf-burnin encountered an error in "
1706
                      "testcase: %s" % test)
1707
            log.error("See log for details")
1708
            error.write(str(res[0]) + '\n')
1709
            error.write(str(res[0].shortDescription()) + '\n')
1710
            error.write('\n')
1711

    
1712
        for res in result.failures:
1713
            log.error("snf-burnin failed in testcase: %s" % test)
1714
            log.error("See log for details")
1715
            fail.write(str(res[0]) + '\n')
1716
            fail.write(str(res[0].shortDescription()) + '\n')
1717
            fail.write('\n')
1718
            if not NOFAILFAST:
1719
                sys.exit()
1720

    
1721
        if (len(result.failures) == 0) and (len(result.errors) == 0):
1722
            log.debug("Passed testcase: %s" % test)
1723

    
1724

    
1725
def _run_cases_in_parallel(cases, fanout, image_folder):
1726
    """Run instances of TestCase in parallel, in a number of distinct processes
1727

1728
    The cases iterable specifies the TestCases to be executed in parallel,
1729
    by test runners running in distinct processes.
1730
    The fanout parameter specifies the number of processes to spawn,
1731
    and defaults to 1.
1732
    The runner argument specifies the test runner class to use inside each
1733
    runner process.
1734

1735
    """
1736

    
1737
    multi = logging.getLogger("multiprocess")
1738
    handler = logging.StreamHandler()
1739
    multi.addHandler(handler)
1740

    
1741
    if VERBOSE:
1742
        multi.setLevel(logging.DEBUG)
1743
    else:
1744
        multi.setLevel(logging.INFO)
1745

    
1746
    testq = []
1747
    worker_folder = []
1748
    runners = []
1749

    
1750
    for i in xrange(0, fanout):
1751
        testq.append(Queue())
1752
        worker_folder.append(os.path.join(image_folder, 'process'+str(i)))
1753
        os.mkdir(worker_folder[i])
1754

    
1755
    for i in xrange(0, fanout):
1756
        kwargs = dict(testq=testq[i], worker_folder=worker_folder[i])
1757
        runners.append(TestRunnerProcess(kwargs=kwargs))
1758

    
1759
    multi.debug("Spawning %d test runner processes" % len(runners))
1760

    
1761
    for p in runners:
1762
        p.start()
1763

    
1764
    # Enqueue test cases
1765
    for i in xrange(0, fanout):
1766
        map(testq[i].put, cases)
1767
        testq[i].put("TEST_RUNNER_TERMINATE")
1768

    
1769
    multi.debug("Spawned %d test runners, PIDs are %s" %
1770
                (len(runners), [p.pid for p in runners]))
1771

    
1772
    multi.debug("Joining %d processes" % len(runners))
1773

    
1774
    for p in runners:
1775
        p.join()
1776

    
1777
    multi.debug("Done joining %d processes" % len(runners))
1778

    
1779

    
1780
def _images_test_case(**kwargs):
1781
    """Construct a new unit test case class from ImagesTestCase"""
1782
    name = "ImagesTestCase_%s" % kwargs["imageid"]
1783
    cls = type(name, (ImagesTestCase,), kwargs)
1784

    
1785
    #Patch extra parameters into test names by manipulating method docstrings
1786
    for (mname, m) in \
1787
            inspect.getmembers(cls, lambda x: inspect.ismethod(x)):
1788
        if hasattr(m, __doc__):
1789
            m.__func__.__doc__ = "[%s] %s" % (cls.imagename, m.__doc__)
1790

    
1791
    # Make sure the class can be pickled, by listing it among
1792
    # the attributes of __main__. A PicklingError is raised otherwise.
1793
    thismodule = sys.modules[__name__]
1794
    setattr(thismodule, name, cls)
1795
    return cls
1796

    
1797

    
1798
def _spawn_server_test_case(**kwargs):
1799
    """Construct a new unit test case class from SpawnServerTestCase"""
1800

    
1801
    name = "SpawnServerTestCase_%s" % kwargs["imageid"]
1802
    cls = type(name, (SpawnServerTestCase,), kwargs)
1803

    
1804
    # Patch extra parameters into test names by manipulating method docstrings
1805
    for (mname, m) in \
1806
            inspect.getmembers(cls, lambda x: inspect.ismethod(x)):
1807
        if hasattr(m, __doc__):
1808
            m.__func__.__doc__ = "[%s] %s" % (cls.imagename, m.__doc__)
1809

    
1810
    # Make sure the class can be pickled, by listing it among
1811
    # the attributes of __main__. A PicklingError is raised otherwise.
1812

    
1813
    thismodule = sys.modules[__name__]
1814
    setattr(thismodule, name, cls)
1815
    return cls
1816

    
1817

    
1818
def _spawn_network_test_case(**kwargs):
1819
    """Construct a new unit test case class from NetworkTestCase"""
1820

    
1821
    name = "NetworkTestCase" + TEST_RUN_ID
1822
    cls = type(name, (NetworkTestCase,), kwargs)
1823

    
1824
    # Make sure the class can be pickled, by listing it among
1825
    # the attributes of __main__. A PicklingError is raised otherwise.
1826

    
1827
    thismodule = sys.modules[__name__]
1828
    setattr(thismodule, name, cls)
1829
    return cls
1830

    
1831

    
1832
# --------------------------------------------------------------------
1833
# Clean up servers/networks functions
1834
def cleanup_servers(timeout, query_interval, delete_stale=False):
1835

    
1836
    astakos_client = AstakosClient(AUTH_URL, TOKEN)
1837
    astakos_client.CONNECTION_RETRY_LIMIT = 2
1838
    # Compute Client
1839
    compute_url = astakos_client.get_service_endpoints('compute')['publicURL']
1840
    compute_client = ComputeClient(compute_url, TOKEN)
1841
    compute_client.CONNECTION_RETRY_LIMIT = 2
1842

    
1843
    servers = compute_client.list_servers()
1844
    stale = [s for s in servers if s["name"].startswith(SNF_TEST_PREFIX)]
1845

    
1846
    if len(stale) == 0:
1847
        return
1848

    
1849
    # Show staled servers
1850
    print >>sys.stderr, yellow + \
1851
        "Found these stale servers from previous runs:" + \
1852
        normal
1853
    print >>sys.stderr, "    " + \
1854
        "\n    ".join(["%d: %s" % (s["id"], s["name"]) for s in stale])
1855

    
1856
    # Delete staled servers
1857
    if delete_stale:
1858
        print >> sys.stderr, "Deleting %d stale servers:" % len(stale)
1859
        fail_tmout = time.time() + timeout
1860
        for s in stale:
1861
            compute_client.delete_server(s["id"])
1862
        # Wait for all servers to be deleted
1863
        while True:
1864
            servers = compute_client.list_servers()
1865
            stale = [s for s in servers
1866
                     if s["name"].startswith(SNF_TEST_PREFIX)]
1867
            if len(stale) == 0:
1868
                print >> sys.stderr, green + "    ...done" + normal
1869
                break
1870
            elif time.time() > fail_tmout:
1871
                print >> sys.stderr, red + \
1872
                    "Not all stale servers deleted. Action timed out." + \
1873
                    normal
1874
                sys.exit(1)
1875
            else:
1876
                time.sleep(query_interval)
1877
    else:
1878
        print >> sys.stderr, "Use --delete-stale to delete them."
1879

    
1880

    
1881
def cleanup_networks(action_timeout, query_interval, delete_stale=False):
1882

    
1883
    astakos_client = AstakosClient(AUTH_URL, TOKEN)
1884
    astakos_client.CONNECTION_RETRY_LIMIT = 2
1885
    # Cyclades Client
1886
    compute_url = astakos_client.get_service_endpoints('compute')['publicURL']
1887
    cyclades_client = CycladesClient(compute_url, TOKEN)
1888
    cyclades_client.CONNECTION_RETRY_LIMIT = 2
1889

    
1890
    networks = cyclades_client.list_networks()
1891
    stale = [n for n in networks if n["name"].startswith(SNF_TEST_PREFIX)]
1892

    
1893
    if len(stale) == 0:
1894
        return
1895

    
1896
    # Show staled networks
1897
    print >> sys.stderr, yellow + \
1898
        "Found these stale networks from previous runs:" + \
1899
        normal
1900
    print "    " + \
1901
        "\n    ".join(["%s: %s" % (str(n["id"]), n["name"]) for n in stale])
1902

    
1903
    # Delete staled networks
1904
    if delete_stale:
1905
        print >> sys.stderr, "Deleting %d stale networks:" % len(stale)
1906
        fail_tmout = time.time() + action_timeout
1907
        for n in stale:
1908
            cyclades_client.delete_network(n["id"])
1909
        # Wait for all networks to be deleted
1910
        while True:
1911
            networks = cyclades_client.list_networks()
1912
            stale = [n for n in networks
1913
                     if n["name"].startswith(SNF_TEST_PREFIX)]
1914
            if len(stale) == 0:
1915
                print >> sys.stderr, green + "    ...done" + normal
1916
                break
1917
            elif time.time() > fail_tmout:
1918
                print >> sys.stderr, red + \
1919
                    "Not all stale networks deleted. Action timed out." + \
1920
                    normal
1921
                sys.exit(1)
1922
            else:
1923
                time.sleep(query_interval)
1924
    else:
1925
        print >> sys.stderr, "Use --delete-stale to delete them."
1926

    
1927

    
1928
# --------------------------------------------------------------------
1929
# Parse arguments functions
1930
def parse_comma(option, opt, value, parser):
1931
    tests = set(['all', 'auth', 'images', 'flavors',
1932
                 'pithos', 'servers', 'server_spawn',
1933
                 'network_spawn'])
1934
    parse_input = value.split(',')
1935

    
1936
    if not (set(parse_input)).issubset(tests):
1937
        raise OptionValueError("The selected set of tests is invalid")
1938

    
1939
    setattr(parser.values, option.dest, value.split(','))
1940

    
1941

    
1942
def parse_arguments(args):
1943

    
1944
    kw = {}
1945
    kw["usage"] = "%prog [options]"
1946
    kw["description"] = \
1947
        "%prog runs a number of test scenarios on a " \
1948
        "Synnefo deployment."
1949

    
1950
    parser = OptionParser(**kw)
1951
    parser.disable_interspersed_args()
1952

    
1953
    parser.add_option("--auth-url",
1954
                      action="store", type="string", dest="auth_url",
1955
                      help="The AUTH URI to use to reach the Synnefo API",
1956
                      default=None)
1957
    parser.add_option("--system-images-user",
1958
                      action="store", type="string", dest="system_images_user",
1959
                      help="Owner of system images",
1960
                      default=None)
1961
    parser.add_option("--token",
1962
                      action="store", type="string", dest="token",
1963
                      help="The token to use for authentication to the API")
1964
    parser.add_option("--nofailfast",
1965
                      action="store_true", dest="nofailfast",
1966
                      help="Do not fail immediately if one of the tests "
1967
                           "fails (EXPERIMENTAL)",
1968
                      default=False)
1969
    parser.add_option("--no-ipv6",
1970
                      action="store_true", dest="no_ipv6",
1971
                      help="Disables ipv6 related tests",
1972
                      default=False)
1973
    parser.add_option("--action-timeout",
1974
                      action="store", type="int", dest="action_timeout",
1975
                      metavar="TIMEOUT",
1976
                      help="Wait SECONDS seconds for a server action to "
1977
                           "complete, then the test is considered failed",
1978
                      default=100)
1979
    parser.add_option("--build-warning",
1980
                      action="store", type="int", dest="build_warning",
1981
                      metavar="TIMEOUT",
1982
                      help="Warn if TIMEOUT seconds have passed and a "
1983
                           "build operation is still pending",
1984
                      default=600)
1985
    parser.add_option("--build-fail",
1986
                      action="store", type="int", dest="build_fail",
1987
                      metavar="BUILD_TIMEOUT",
1988
                      help="Fail the test if TIMEOUT seconds have passed "
1989
                           "and a build operation is still incomplete",
1990
                      default=900)
1991
    parser.add_option("--query-interval",
1992
                      action="store", type="int", dest="query_interval",
1993
                      metavar="INTERVAL",
1994
                      help="Query server status when requests are pending "
1995
                           "every INTERVAL seconds",
1996
                      default=3)
1997
    parser.add_option("--fanout",
1998
                      action="store", type="int", dest="fanout",
1999
                      metavar="COUNT",
2000
                      help="Spawn up to COUNT child processes to execute "
2001
                           "in parallel, essentially have up to COUNT "
2002
                           "server build requests outstanding (EXPERIMENTAL)",
2003
                      default=1)
2004
    parser.add_option("--force-flavor",
2005
                      action="store", type="int", dest="force_flavorid",
2006
                      metavar="FLAVOR ID",
2007
                      help="Force all server creations to use the specified "
2008
                           "FLAVOR ID instead of a randomly chosen one, "
2009
                           "useful if disk space is scarce",
2010
                      default=None)
2011
    parser.add_option("--image-id",
2012
                      action="store", type="string", dest="force_imageid",
2013
                      metavar="IMAGE ID",
2014
                      help="Test the specified image id, use 'all' to test "
2015
                           "all available images (mandatory argument)",
2016
                      default=None)
2017
    parser.add_option("--show-stale",
2018
                      action="store_true", dest="show_stale",
2019
                      help="Show stale servers from previous runs, whose "
2020
                           "name starts with `%s'" % SNF_TEST_PREFIX,
2021
                      default=False)
2022
    parser.add_option("--delete-stale",
2023
                      action="store_true", dest="delete_stale",
2024
                      help="Delete stale servers from previous runs, whose "
2025
                           "name starts with `%s'" % SNF_TEST_PREFIX,
2026
                      default=False)
2027
    parser.add_option("--force-personality",
2028
                      action="store", type="string", dest="personality_path",
2029
                      help="Force a personality file injection.\
2030
                            File path required. ",
2031
                      default=None)
2032
    parser.add_option("--log-folder",
2033
                      action="store", type="string", dest="log_folder",
2034
                      help="Define the absolute path where the output \
2035
                            log is stored. ",
2036
                      default="/var/log/burnin/")
2037
    parser.add_option("--verbose", "-V",
2038
                      action="store_true", dest="verbose",
2039
                      help="Print detailed output about multiple "
2040
                           "processes spawning",
2041
                      default=False)
2042
    parser.add_option("--set-tests",
2043
                      action="callback",
2044
                      dest="tests",
2045
                      type="string",
2046
                      help='Set comma seperated tests for this run. \
2047
                            Available tests: auth, images, flavors, \
2048
                                             servers, server_spawn, \
2049
                                             network_spawn, pithos. \
2050
                            Default = all',
2051
                      default='all',
2052
                      callback=parse_comma)
2053

    
2054
    (opts, args) = parser.parse_args(args)
2055

    
2056
    # -----------------------
2057
    # Verify arguments
2058

    
2059
    # `delete_stale' implies `show_stale'
2060
    if opts.delete_stale:
2061
        opts.show_stale = True
2062

    
2063
    # `token' is mandatory
2064
    _mandatory_argument(opts.token, "--token")
2065
    # `auth_url' is mandatory
2066
    _mandatory_argument(opts.auth_url, "--auth-url")
2067
    # `system_images_user' is mandatory
2068
    _mandatory_argument(opts.system_images_user, "--system-images-user")
2069

    
2070
    if not opts.show_stale:
2071
        # `image-id' is mandatory
2072
        _mandatory_argument(opts.force_imageid, "--image-id")
2073
        if opts.force_imageid != 'all':
2074
            try:
2075
                opts.force_imageid = str(opts.force_imageid)
2076
            except ValueError:
2077
                print >>sys.stderr, red + \
2078
                    "Invalid value specified for" + \
2079
                    "--image-id. Use a valid id, or `all'." + \
2080
                    normal
2081
                sys.exit(1)
2082

    
2083
    return (opts, args)
2084

    
2085

    
2086
def _mandatory_argument(Arg, Str):
2087
    if (Arg is None) or (Arg == ""):
2088
        print >>sys.stderr, red + \
2089
            "The " + Str + " argument is mandatory.\n" + \
2090
            normal
2091
        sys.exit(1)
2092

    
2093

    
2094
# --------------------------------------------------------------------
2095
# Burnin main function
2096
def main():
2097
    """Assemble test cases into a test suite, and run it
2098

2099
    IMPORTANT: Tests have dependencies and have to be run in the specified
2100
    order inside a single test case. They communicate through attributes of the
2101
    corresponding TestCase class (shared fixtures). Distinct subclasses of
2102
    TestCase MAY SHARE NO DATA, since they are run in parallel, in distinct
2103
    test runner processes.
2104

2105
    """
2106

    
2107
    # Parse arguments using `optparse'
2108
    (opts, args) = parse_arguments(sys.argv[1:])
2109

    
2110
    # Some global variables
2111
    global AUTH_URL, TOKEN, SYSTEM_IMAGES_USER
2112
    global NO_IPV6, VERBOSE, NOFAILFAST
2113
    AUTH_URL = opts.auth_url
2114
    TOKEN = opts.token
2115
    SYSTEM_IMAGES_USER = opts.system_images_user
2116
    NO_IPV6 = opts.no_ipv6
2117
    VERBOSE = opts.verbose
2118
    NOFAILFAST = opts.nofailfast
2119

    
2120
    # If `show_stale', cleanup stale servers
2121
    # from previous runs and exit
2122
    if opts.show_stale:
2123
        # We must clean the servers first
2124
        cleanup_servers(opts.action_timeout, opts.query_interval,
2125
                        delete_stale=opts.delete_stale)
2126
        cleanup_networks(opts.action_timeout, opts.query_interval,
2127
                         delete_stale=opts.delete_stale)
2128
        return 0
2129

    
2130
    # Initialize a kamaki instance, get flavors, images
2131
    astakos_client = AstakosClient(AUTH_URL, TOKEN)
2132
    astakos_client.CONNECTION_RETRY_LIMIT = 2
2133
    # Compute Client
2134
    compute_url = astakos_client.get_service_endpoints('compute')['publicURL']
2135
    compute_client = ComputeClient(compute_url, TOKEN)
2136
    compute_client.CONNECTION_RETRY_LIMIT = 2
2137
    DIMAGES = compute_client.list_images(detail=True)
2138
    DFLAVORS = compute_client.list_flavors(detail=True)
2139

    
2140
    # FIXME: logging, log, LOG PID, TEST_RUN_ID, arguments
2141
    # Run them: FIXME: In parallel, FAILEARLY, catchbreak?
2142
    #unittest.main(verbosity=2, catchbreak=True)
2143

    
2144
    # Get a list of images we are going to test
2145
    if opts.force_imageid == 'all':
2146
        test_images = DIMAGES
2147
    else:
2148
        test_images = filter(lambda x: x["id"] == opts.force_imageid, DIMAGES)
2149

    
2150
    # Create output (logging) folder
2151
    if not os.path.exists(opts.log_folder):
2152
        os.mkdir(opts.log_folder)
2153
    test_folder = os.path.join(opts.log_folder, TEST_RUN_ID)
2154
    os.mkdir(test_folder)
2155

    
2156
    for image in test_images:
2157
        imageid = str(image["id"])
2158
        imagename = image["name"]
2159
        # Choose a flavor (given from user or random)
2160
        if opts.force_flavorid:
2161
            flavorid = opts.force_flavorid
2162
        else:
2163
            flavorid = choice([f["id"] for f in DFLAVORS if f["disk"] >= 20])
2164
        # Personality dictionary for file injection test
2165
        if opts.personality_path is not None:
2166
            f = open(opts.personality_path)
2167
            content = b64encode(f.read())
2168
            personality = []
2169
            st = os.stat(opts.personality_path)
2170
            personality.append({
2171
                'path': '/root/test_inj_file',
2172
                'owner': 'root',
2173
                'group': 'root',
2174
                'mode': 0x7777 & st.st_mode,
2175
                'contents': content})
2176
        else:
2177
            personality = None
2178
        # Give a name to our test servers
2179
        servername = "%s%s for %s" % (SNF_TEST_PREFIX, TEST_RUN_ID, imagename)
2180
        is_windows = imagename.lower().find("windows") >= 0
2181

    
2182
        # Create Server TestCases
2183
        ServerTestCase = _spawn_server_test_case(
2184
            imageid=imageid,
2185
            flavorid=flavorid,
2186
            imagename=imagename,
2187
            personality=personality,
2188
            servername=servername,
2189
            is_windows=is_windows,
2190
            action_timeout=opts.action_timeout,
2191
            build_warning=opts.build_warning,
2192
            build_fail=opts.build_fail,
2193
            query_interval=opts.query_interval)
2194
        # Create Network TestCases
2195
        NetworkTestCase = _spawn_network_test_case(
2196
            action_timeout=opts.action_timeout,
2197
            imageid=imageid,
2198
            flavorid=flavorid,
2199
            imagename=imagename,
2200
            query_interval=opts.query_interval)
2201
        # Create Images TestCase
2202
        CImagesTestCase = _images_test_case(
2203
            action_timeout=opts.action_timeout,
2204
            imageid=imageid,
2205
            flavorid=flavorid,
2206
            imagename=imagename,
2207
            query_interval=opts.query_interval)
2208

    
2209
        # Choose the tests we are going to run
2210
        test_dict = {'auth': UnauthorizedTestCase,
2211
                     'images': CImagesTestCase,
2212
                     'flavors': FlavorsTestCase,
2213
                     'servers': ServersTestCase,
2214
                     'pithos': PithosTestCase,
2215
                     'server_spawn': ServerTestCase,
2216
                     'network_spawn': NetworkTestCase}
2217
        seq_cases = []
2218
        if 'all' in opts.tests:
2219
            seq_cases = [UnauthorizedTestCase, CImagesTestCase,
2220
                         FlavorsTestCase, ServersTestCase,
2221
                         PithosTestCase, ServerTestCase,
2222
                         NetworkTestCase]
2223
        else:
2224
            for test in opts.tests:
2225
                seq_cases.append(test_dict[test])
2226

    
2227
        # Folder for each image
2228
        image_folder = os.path.join(test_folder, imageid)
2229
        os.mkdir(image_folder)
2230

    
2231
        # Run each test
2232
        if opts.fanout > 1:
2233
            _run_cases_in_parallel(seq_cases, opts.fanout, image_folder)
2234
        else:
2235
            _run_cases_in_series(seq_cases, image_folder)
2236

    
2237

    
2238
# --------------------------------------------------------------------
2239
# Call main
2240
if __name__ == "__main__":
2241
    sys.exit(main())