Statistics
| Branch: | Tag: | Revision:

root / snf-tools / synnefo_tools / burnin.py @ 5ccbdf82

History | View | Annotate | Download (81.9 kB)

1
#!/usr/bin/env python
2

    
3
# Copyright 2011 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35

    
36
"""Perform integration testing on a running Synnefo deployment"""
37

    
38
#import __main__
39
import datetime
40
import inspect
41
import logging
42
import os
43
import os.path
44
import paramiko
45
import prctl
46
import subprocess
47
import signal
48
import socket
49
import sys
50
import time
51
import tempfile
52
from base64 import b64encode
53
from IPy import IP
54
from multiprocessing import Process, Queue
55
from random import choice, randint
56
from optparse import OptionParser, OptionValueError
57

    
58
from kamaki.clients.compute import ComputeClient
59
from kamaki.clients.cyclades import CycladesClient
60
from kamaki.clients.image import ImageClient
61
from kamaki.clients.pithos import PithosClient
62
from kamaki.clients.astakos import AstakosClient
63
from kamaki.clients import ClientError
64

    
65
from vncauthproxy.d3des import generate_response as d3des_generate_response
66

    
67
# Use backported unittest functionality if Python < 2.7
68
try:
69
    import unittest2 as unittest
70
except ImportError:
71
    if sys.version_info < (2, 7):
72
        raise Exception("The unittest2 package is required for Python < 2.7")
73
    import unittest
74

    
75
# --------------------------------------------------------------------
76
# Global Variables
77
AUTH_URL = None
78
TOKEN = None
79
SYSTEM_IMAGES_USER = None
80
NO_IPV6 = None
81
NOFAILFAST = None
82
VERBOSE = None
83

    
84
# A unique id identifying this test run
85
TEST_RUN_ID = datetime.datetime.strftime(datetime.datetime.now(),
86
                                         "%Y%m%d%H%M%S")
87
SNF_TEST_PREFIX = "snf-test-"
88

    
89
red = '\x1b[31m'
90
yellow = '\x1b[33m'
91
green = '\x1b[32m'
92
normal = '\x1b[0m'
93

    
94

    
95
# --------------------------------------------------------------------
96
# Global functions
97
def _ssh_execute(hostip, username, password, command):
98
    """Execute a command via ssh"""
99
    ssh = paramiko.SSHClient()
100
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
101
    try:
102
        ssh.connect(hostip, username=username, password=password)
103
    except socket.error, err:
104
        raise AssertionError(err)
105
    try:
106
        stdin, stdout, stderr = ssh.exec_command(command)
107
    except paramiko.SSHException, err:
108
        raise AssertionError(err)
109
    status = stdout.channel.recv_exit_status()
110
    output = stdout.readlines()
111
    ssh.close()
112
    return output, status
113

    
114

    
115
def _get_user_id():
116
    """Authenticate to astakos and get unique users id"""
117
    astakos = AstakosClient(AUTH_URL, TOKEN)
118
    astakos.CONNECTION_RETRY_LIMIT = 2
119
    authenticate = astakos.authenticate()
120
    return authenticate['access']['user']['id']
121

    
122

    
123
# --------------------------------------------------------------------
124
# BurninTestReulst class
125
class BurninTestResult(unittest.TextTestResult):
126
    def addSuccess(self, test):
127
        super(BurninTestResult, self).addSuccess(test)
128
        if self.showAll:
129
            if hasattr(test, 'result_dict'):
130
                run_details = test.result_dict
131

    
132
                self.stream.write("\n")
133
                for i in run_details:
134
                    self.stream.write("%s : %s \n" % (i, run_details[i]))
135
                self.stream.write("\n")
136

    
137
        elif self.dots:
138
            self.stream.write('.')
139
            self.stream.flush()
140

    
141
    def addError(self, test, err):
142
        super(BurninTestResult, self).addError(test, err)
143
        if self.showAll:
144
            self.stream.writeln("ERROR")
145
            if hasattr(test, 'result_dict'):
146
                run_details = test.result_dict
147

    
148
                self.stream.write("\n")
149
                for i in run_details:
150
                    self.stream.write("%s : %s \n" % (i, run_details[i]))
151
                self.stream.write("\n")
152

    
153
        elif self.dots:
154
            self.stream.write('E')
155
            self.stream.flush()
156

    
157
    def addFailure(self, test, err):
158
        super(BurninTestResult, self).addFailure(test, err)
159
        if self.showAll:
160
            self.stream.writeln("FAIL")
161
            if hasattr(test, 'result_dict'):
162
                run_details = test.result_dict
163

    
164
                self.stream.write("\n")
165
                for i in run_details:
166
                    self.stream.write("%s : %s \n" % (i, run_details[i]))
167
                self.stream.write("\n")
168

    
169
        elif self.dots:
170
            self.stream.write('F')
171
            self.stream.flush()
172

    
173

    
174
# --------------------------------------------------------------------
175
# Format Results
176
class burninFormatter(logging.Formatter):
177
    err_fmt = red + "ERROR: %(msg)s" + normal
178
    dbg_fmt = green + "* %(msg)s" + normal
179
    info_fmt = "%(msg)s"
180

    
181
    def __init__(self, fmt="%(levelno)s: %(msg)s"):
182
        logging.Formatter.__init__(self, fmt)
183

    
184
    def format(self, record):
185
        format_orig = self._fmt
186
        # Replace the original format with one customized by logging level
187
        if record.levelno == 10:    # DEBUG
188
            self._fmt = burninFormatter.dbg_fmt
189
        elif record.levelno == 20:  # INFO
190
            self._fmt = burninFormatter.info_fmt
191
        elif record.levelno == 40:  # ERROR
192
            self._fmt = burninFormatter.err_fmt
193
        result = logging.Formatter.format(self, record)
194
        self._fmt = format_orig
195
        return result
196

    
197
log = logging.getLogger("burnin")
198
log.setLevel(logging.DEBUG)
199
handler = logging.StreamHandler()
200
handler.setFormatter(burninFormatter())
201
log.addHandler(handler)
202

    
203

    
204
# --------------------------------------------------------------------
205
# UnauthorizedTestCase class
206
class UnauthorizedTestCase(unittest.TestCase):
207
    """Test unauthorized access"""
208
    @classmethod
209
    def setUpClass(cls):
210
        cls.astakos = AstakosClient(AUTH_URL, TOKEN)
211
        cls.astakos.CONNECTION_RETRY_LIMIT = 2
212
        cls.compute_url = \
213
            cls.astakos.get_service_endpoints('compute')['publicURL']
214
        cls.result_dict = dict()
215

    
216
    def test_unauthorized_access(self):
217
        """Test access without a valid token fails"""
218
        log.info("Authentication test")
219
        falseToken = '12345'
220
        c = ComputeClient(self.compute_url, falseToken)
221
        c.CONNECTION_RETRY_LIMIT = 2
222

    
223
        with self.assertRaises(ClientError) as cm:
224
            c.list_servers()
225
            self.assertEqual(cm.exception.status, 401)
226

    
227

    
228
# --------------------------------------------------------------------
229
# This class gest replicated into Images TestCases dynamically
230
class ImagesTestCase(unittest.TestCase):
231
    """Test image lists for consistency"""
232
    @classmethod
233
    def setUpClass(cls):
234
        """Initialize kamaki, get (detailed) list of images"""
235
        log.info("Getting simple and detailed list of images")
236
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
237
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
238
        # Compute Client
239
        compute_url = \
240
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
241
        cls.compute_client = ComputeClient(compute_url, TOKEN)
242
        cls.compute_client.CONNECTION_RETRY_LIMIT = 2
243
        # Image Client
244
        image_url = \
245
            cls.astakos_client.get_service_endpoints('image')['publicURL']
246
        cls.image_client = ImageClient(image_url, TOKEN)
247
        cls.image_client.CONNECTION_RETRY_LIMIT = 2
248
        # Pithos Client
249
        pithos_url = cls.astakos_client.\
250
            get_service_endpoints('object-store')['publicURL']
251
        cls.pithos_client = PithosClient(pithos_url, TOKEN)
252
        cls.pithos_client.CONNECTION_RETRY_LIMIT = 2
253

    
254
        # Get images
255
        cls.images = \
256
            filter(lambda x: not x['name'].startswith(SNF_TEST_PREFIX),
257
                   cls.image_client.list_public())
258
        cls.dimages = \
259
            filter(lambda x: not x['name'].startswith(SNF_TEST_PREFIX),
260
                   cls.image_client.list_public(detail=True))
261
        cls.result_dict = dict()
262
        # Get uniq user id
263
        cls.uuid = _get_user_id()
264
        log.info("Uniq user id = %s" % cls.uuid)
265
        # Create temp directory and store it inside our class
266
        # XXX: In my machine /tmp has not enough space
267
        #      so use current directory to be sure.
268
        cls.temp_dir = tempfile.mkdtemp(dir=os.getcwd())
269
        cls.temp_image_name = \
270
            SNF_TEST_PREFIX + cls.imageid + ".diskdump"
271

    
272
    @classmethod
273
    def tearDownClass(cls):
274
        """Remove local files"""
275
        try:
276
            temp_file = os.path.join(cls.temp_dir, cls.temp_image_name)
277
            os.unlink(temp_file)
278
        except:
279
            pass
280
        try:
281
            os.rmdir(cls.temp_dir)
282
        except:
283
            pass
284

    
285
    def test_001_list_images(self):
286
        """Test image list actually returns images"""
287
        self.assertGreater(len(self.images), 0)
288

    
289
    def test_002_list_images_detailed(self):
290
        """Test detailed image list is the same length as list"""
291
        self.assertEqual(len(self.dimages), len(self.images))
292

    
293
    def test_003_same_image_names(self):
294
        """Test detailed and simple image list contain same names"""
295
        names = sorted(map(lambda x: x["name"], self.images))
296
        dnames = sorted(map(lambda x: x["name"], self.dimages))
297
        self.assertEqual(names, dnames)
298

    
299
    def test_004_unique_image_names(self):
300
        """Test system images have unique names"""
301
        sys_images = filter(lambda x: x['owner'] == SYSTEM_IMAGES_USER,
302
                            self.dimages)
303
        names = sorted(map(lambda x: x["name"], sys_images))
304
        self.assertEqual(sorted(list(set(names))), names)
305

    
306
    def test_005_image_metadata(self):
307
        """Test every image has specific metadata defined"""
308
        keys = frozenset(["osfamily", "root_partition"])
309
        details = self.compute_client.list_images(detail=True)
310
        sys_images = filter(lambda x: x['user_id'] == SYSTEM_IMAGES_USER,
311
                            details)
312
        for i in sys_images:
313
            self.assertTrue(keys.issubset(i["metadata"].keys()))
314

    
315
    def test_006_download_image(self):
316
        """Download image from pithos+"""
317
        # Get image location
318
        image = filter(
319
            lambda x: x['id'] == self.imageid, self.dimages)[0]
320
        image_location = \
321
            image['location'].replace("://", " ").replace("/", " ").split()
322
        log.info("Download image, with owner %s\n\tcontainer %s, and name %s"
323
                 % (image_location[1], image_location[2], image_location[3]))
324
        self.pithos_client.account = image_location[1]
325
        self.pithos_client.container = image_location[2]
326
        temp_file = os.path.join(self.temp_dir, self.temp_image_name)
327
        with open(temp_file, "wb+") as f:
328
            self.pithos_client.download_object(image_location[3], f)
329

    
330
    def test_007_upload_image(self):
331
        """Upload and register image"""
332
        temp_file = os.path.join(self.temp_dir, self.temp_image_name)
333
        log.info("Upload image to pithos+")
334
        # Create container `images'
335
        self.pithos_client.account = self.uuid
336
        self.pithos_client.container = "images"
337
        self.pithos_client.container_put()
338
        with open(temp_file, "rb+") as f:
339
            self.pithos_client.upload_object(self.temp_image_name, f)
340
        log.info("Register image to plankton")
341
        location = "pithos://" + self.uuid + \
342
            "/images/" + self.temp_image_name
343
        params = {'is_public': False}
344
        properties = {'OSFAMILY': "linux", 'ROOT_PARTITION': 1}
345
        self.image_client.register(
346
            self.temp_image_name, location, params, properties)
347
        # Get image id
348
        details = self.image_client.list_public(detail=True)
349
        detail = filter(lambda x: x['location'] == location, details)
350
        self.assertEqual(len(detail), 1)
351
        cls = type(self)
352
        cls.temp_image_id = detail[0]['id']
353
        log.info("Image registered with id %s" % detail[0]['id'])
354

    
355
    def test_008_cleanup_image(self):
356
        """Cleanup image test"""
357
        log.info("Cleanup image test")
358
        # Remove image from pithos+
359
        self.pithos_client.account = self.uuid
360
        self.pithos_client.container = "images"
361
        self.pithos_client.del_object(self.temp_image_name)
362

    
363

    
364
# --------------------------------------------------------------------
365
# FlavorsTestCase class
366
class FlavorsTestCase(unittest.TestCase):
367
    """Test flavor lists for consistency"""
368
    @classmethod
369
    def setUpClass(cls):
370
        """Initialize kamaki, get (detailed) list of flavors"""
371
        log.info("Getting simple and detailed list of flavors")
372
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
373
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
374
        # Compute Client
375
        compute_url = \
376
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
377
        cls.compute_client = ComputeClient(compute_url, TOKEN)
378
        cls.compute_client.CONNECTION_RETRY_LIMIT = 2
379
        cls.flavors = cls.compute_client.list_flavors()
380
        cls.dflavors = cls.compute_client.list_flavors(detail=True)
381
        cls.result_dict = dict()
382

    
383
    def test_001_list_flavors(self):
384
        """Test flavor list actually returns flavors"""
385
        self.assertGreater(len(self.flavors), 0)
386

    
387
    def test_002_list_flavors_detailed(self):
388
        """Test detailed flavor list is the same length as list"""
389
        self.assertEquals(len(self.dflavors), len(self.flavors))
390

    
391
    def test_003_same_flavor_names(self):
392
        """Test detailed and simple flavor list contain same names"""
393
        names = sorted(map(lambda x: x["name"], self.flavors))
394
        dnames = sorted(map(lambda x: x["name"], self.dflavors))
395
        self.assertEqual(names, dnames)
396

    
397
    def test_004_unique_flavor_names(self):
398
        """Test flavors have unique names"""
399
        names = sorted(map(lambda x: x["name"], self.flavors))
400
        self.assertEqual(sorted(list(set(names))), names)
401

    
402
    def test_005_well_formed_flavor_names(self):
403
        """Test flavors have names of the form CxxRyyDzz
404
        Where xx is vCPU count, yy is RAM in MiB, zz is Disk in GiB
405
        """
406
        for f in self.dflavors:
407
            flavor = (f["vcpus"], f["ram"], f["disk"], f["SNF:disk_template"])
408
            self.assertEqual("C%dR%dD%d%s" % flavor,
409
                             f["name"],
410
                             "Flavor %s does not match its specs." % f["name"])
411

    
412

    
413
# --------------------------------------------------------------------
414
# ServersTestCase class
415
class ServersTestCase(unittest.TestCase):
416
    """Test server lists for consistency"""
417
    @classmethod
418
    def setUpClass(cls):
419
        """Initialize kamaki, get (detailed) list of servers"""
420
        log.info("Getting simple and detailed list of servers")
421

    
422
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
423
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
424
        # Compute Client
425
        compute_url = \
426
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
427
        cls.compute_client = ComputeClient(compute_url, TOKEN)
428
        cls.compute_client.CONNECTION_RETRY_LIMIT = 2
429
        cls.servers = cls.compute_client.list_servers()
430
        cls.dservers = cls.compute_client.list_servers(detail=True)
431
        cls.result_dict = dict()
432

    
433
    # def test_001_list_servers(self):
434
    #     """Test server list actually returns servers"""
435
    #     self.assertGreater(len(self.servers), 0)
436

    
437
    def test_002_list_servers_detailed(self):
438
        """Test detailed server list is the same length as list"""
439
        self.assertEqual(len(self.dservers), len(self.servers))
440

    
441
    def test_003_same_server_names(self):
442
        """Test detailed and simple flavor list contain same names"""
443
        names = sorted(map(lambda x: x["name"], self.servers))
444
        dnames = sorted(map(lambda x: x["name"], self.dservers))
445
        self.assertEqual(names, dnames)
446

    
447

    
448
# --------------------------------------------------------------------
449
# Pithos Test Cases
450
class PithosTestCase(unittest.TestCase):
451
    """Test pithos functionality"""
452
    @classmethod
453
    def setUpClass(cls):
454
        """Initialize kamaki, get list of containers"""
455
        # Get uniq user id
456
        cls.uuid = _get_user_id()
457
        log.info("Uniq user id = %s" % cls.uuid)
458
        log.info("Getting list of containers")
459

    
460
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
461
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
462
        # Pithos Client
463
        pithos_url = cls.astakos_client.\
464
            get_service_endpoints('object-store')['publicURL']
465
        cls.pithos_client = PithosClient(pithos_url, TOKEN, cls.uuid)
466
        cls.pithos_client.CONNECTION_RETRY_LIMIT = 2
467

    
468
        cls.containers = cls.pithos_client.list_containers()
469
        cls.result_dict = dict()
470

    
471
    def test_001_list_containers(self):
472
        """Test container list actually returns containers"""
473
        self.assertGreater(len(self.containers), 0)
474

    
475
    def test_002_unique_containers(self):
476
        """Test if containers have unique names"""
477
        names = [n['name'] for n in self.containers]
478
        names = sorted(names)
479
        self.assertEqual(sorted(list(set(names))), names)
480

    
481
    def test_003_create_container(self):
482
        """Test create a container"""
483
        rand_num = randint(1000, 9999)
484
        rand_name = "%s%s" % (SNF_TEST_PREFIX, rand_num)
485
        names = [n['name'] for n in self.containers]
486
        while rand_name in names:
487
            rand_num = randint(1000, 9999)
488
            rand_name = "%s%s" % (SNF_TEST_PREFIX, rand_num)
489
        # Create container
490
        self.pithos_client.container = rand_name
491
        self.pithos_client.container_put()
492
        # Get list of containers
493
        new_containers = self.pithos_client.list_containers()
494
        new_container_names = [n['name'] for n in new_containers]
495
        self.assertIn(rand_name, new_container_names)
496

    
497
    def test_004_upload(self):
498
        """Test uploading something to pithos+"""
499
        # Create a tmp file
500
        with tempfile.TemporaryFile() as f:
501
            f.write("This is a temp file")
502
            f.seek(0, 0)
503
            # Where to save file
504
            self.pithos_client.upload_object("test.txt", f)
505

    
506
    def test_005_download(self):
507
        """Test download something from pithos+"""
508
        # Create tmp directory to save file
509
        tmp_dir = tempfile.mkdtemp()
510
        tmp_file = os.path.join(tmp_dir, "test.txt")
511
        with open(tmp_file, "wb+") as f:
512
            self.pithos_client.download_object("test.txt", f)
513
            # Read file
514
            f.seek(0, 0)
515
            content = f.read()
516
        # Remove files
517
        os.unlink(tmp_file)
518
        os.rmdir(tmp_dir)
519
        # Compare results
520
        self.assertEqual(content, "This is a temp file")
521

    
522
    def test_006_remove(self):
523
        """Test removing files and containers"""
524
        cont_name = self.pithos_client.container
525
        self.pithos_client.del_object("test.txt")
526
        self.pithos_client.purge_container()
527
        # List containers
528
        containers = self.pithos_client.list_containers()
529
        cont_names = [n['name'] for n in containers]
530
        self.assertNotIn(cont_name, cont_names)
531

    
532

    
533
# --------------------------------------------------------------------
534
# This class gets replicated into actual TestCases dynamically
535
class SpawnServerTestCase(unittest.TestCase):
536
    """Test scenario for server of the specified image"""
537
    @classmethod
538
    def setUpClass(cls):
539
        """Initialize a kamaki instance"""
540
        log.info("Spawning server for image `%s'" % cls.imagename)
541

    
542
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
543
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
544
        # Cyclades Client
545
        compute_url = \
546
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
547
        cls.cyclades_client = CycladesClient(compute_url, TOKEN)
548
        cls.cyclades_client.CONNECTION_RETRY_LIMIT = 2
549

    
550
        cls.result_dict = dict()
551

    
552
    def _get_ipv4(self, server):
553
        """Get the public IPv4 of a server from the detailed server info"""
554

    
555
        nics = server["attachments"]
556

    
557
        for nic in nics:
558
            net_id = nic["network_id"]
559
            if self.cyclades_client.get_network_details(net_id)["public"]:
560
                public_addrs = nic["ipv4"]
561

    
562
        self.assertTrue(public_addrs is not None)
563

    
564
        return public_addrs
565

    
566
    def _get_ipv6(self, server):
567
        """Get the public IPv6 of a server from the detailed server info"""
568

    
569
        nics = server["attachments"]
570

    
571
        for nic in nics:
572
            net_id = nic["network_id"]
573
            if self.cyclades_client.get_network_details(net_id)["public"]:
574
                public_addrs = nic["ipv6"]
575

    
576
        self.assertTrue(public_addrs is not None)
577

    
578
        return public_addrs
579

    
580
    def _connect_loginname(self, os_value):
581
        """Return the login name for connections based on the server OS"""
582
        if os_value in ("Ubuntu", "Kubuntu", "Fedora"):
583
            return "user"
584
        elif os_value in ("windows", "windows_alpha1"):
585
            return "Administrator"
586
        else:
587
            return "root"
588

    
589
    def _verify_server_status(self, current_status, new_status):
590
        """Verify a server has switched to a specified status"""
591
        server = self.cyclades_client.get_server_details(self.serverid)
592
        if server["status"] not in (current_status, new_status):
593
            return None  # Do not raise exception, return so the test fails
594
        self.assertEquals(server["status"], new_status)
595

    
596
    def _get_connected_tcp_socket(self, family, host, port):
597
        """Get a connected socket from the specified family to host:port"""
598
        sock = None
599
        for res in \
600
            socket.getaddrinfo(host, port, family, socket.SOCK_STREAM, 0,
601
                               socket.AI_PASSIVE):
602
            af, socktype, proto, canonname, sa = res
603
            try:
604
                sock = socket.socket(af, socktype, proto)
605
            except socket.error:
606
                sock = None
607
                continue
608
            try:
609
                sock.connect(sa)
610
            except socket.error:
611
                sock.close()
612
                sock = None
613
                continue
614
        self.assertIsNotNone(sock)
615
        return sock
616

    
617
    def _ping_once(self, ipv6, ip):
618
        """Test server responds to a single IPv4 or IPv6 ping"""
619
        cmd = "ping%s -c 7 -w 20 %s" % ("6" if ipv6 else "", ip)
620
        ping = subprocess.Popen(cmd, shell=True,
621
                                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
622
        (stdout, stderr) = ping.communicate()
623
        ret = ping.wait()
624
        self.assertEquals(ret, 0)
625

    
626
    def _get_hostname_over_ssh(self, hostip, username, password):
627
        lines, status = _ssh_execute(
628
            hostip, username, password, "hostname")
629
        self.assertEqual(len(lines), 1)
630
        return lines[0]
631

    
632
    def _try_until_timeout_expires(self, warn_timeout, fail_timeout,
633
                                   opmsg, callable, *args, **kwargs):
634
        if warn_timeout == fail_timeout:
635
            warn_timeout = fail_timeout + 1
636
        warn_tmout = time.time() + warn_timeout
637
        fail_tmout = time.time() + fail_timeout
638
        while True:
639
            self.assertLess(time.time(), fail_tmout,
640
                            "operation `%s' timed out" % opmsg)
641
            if time.time() > warn_tmout:
642
                log.warning("Server %d: `%s' operation `%s' not done yet",
643
                            self.serverid, self.servername, opmsg)
644
            try:
645
                log.info("%s... " % opmsg)
646
                return callable(*args, **kwargs)
647
            except AssertionError:
648
                pass
649
            time.sleep(self.query_interval)
650

    
651
    def _insist_on_tcp_connection(self, family, host, port):
652
        familystr = {socket.AF_INET: "IPv4", socket.AF_INET6: "IPv6",
653
                     socket.AF_UNSPEC: "Unspecified-IPv4/6"}
654
        msg = "connect over %s to %s:%s" % \
655
              (familystr.get(family, "Unknown"), host, port)
656
        sock = self._try_until_timeout_expires(
657
            self.action_timeout, self.action_timeout,
658
            msg, self._get_connected_tcp_socket,
659
            family, host, port)
660
        return sock
661

    
662
    def _insist_on_status_transition(self, current_status, new_status,
663
                                     fail_timeout, warn_timeout=None):
664
        msg = "Server %d: `%s', waiting for %s -> %s" % \
665
              (self.serverid, self.servername, current_status, new_status)
666
        if warn_timeout is None:
667
            warn_timeout = fail_timeout
668
        self._try_until_timeout_expires(warn_timeout, fail_timeout,
669
                                        msg, self._verify_server_status,
670
                                        current_status, new_status)
671
        # Ensure the status is actually the expected one
672
        server = self.cyclades_client.get_server_details(self.serverid)
673
        self.assertEquals(server["status"], new_status)
674

    
675
    def _insist_on_ssh_hostname(self, hostip, username, password):
676
        msg = "SSH to %s, as %s/%s" % (hostip, username, password)
677
        hostname = self._try_until_timeout_expires(
678
            self.action_timeout, self.action_timeout,
679
            msg, self._get_hostname_over_ssh,
680
            hostip, username, password)
681

    
682
        # The hostname must be of the form 'prefix-id'
683
        self.assertTrue(hostname.endswith("-%d\n" % self.serverid))
684

    
685
    def _check_file_through_ssh(self, hostip, username, password,
686
                                remotepath, content):
687
        msg = "Trying file injection through SSH to %s, as %s/%s" % \
688
            (hostip, username, password)
689
        log.info(msg)
690
        try:
691
            ssh = paramiko.SSHClient()
692
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
693
            ssh.connect(hostip, username=username, password=password)
694
            ssh.close()
695
        except socket.error, err:
696
            raise AssertionError(err)
697

    
698
        transport = paramiko.Transport((hostip, 22))
699
        transport.connect(username=username, password=password)
700

    
701
        localpath = '/tmp/' + SNF_TEST_PREFIX + 'injection'
702
        sftp = paramiko.SFTPClient.from_transport(transport)
703
        sftp.get(remotepath, localpath)
704
        sftp.close()
705
        transport.close()
706

    
707
        f = open(localpath)
708
        remote_content = b64encode(f.read())
709

    
710
        # Check if files are the same
711
        return (remote_content == content)
712

    
713
    def _skipIf(self, condition, msg):
714
        if condition:
715
            self.skipTest(msg)
716

    
717
    def test_001_submit_create_server(self):
718
        """Test submit create server request"""
719

    
720
        log.info("Submit new server request")
721
        server = self.cyclades_client.create_server(
722
            self.servername, self.flavorid, self.imageid, self.personality)
723

    
724
        log.info("Server id: " + str(server["id"]))
725
        log.info("Server password: " + server["adminPass"])
726
        self.assertEqual(server["name"], self.servername)
727
        self.assertEqual(server["flavor"]["id"], self.flavorid)
728
        self.assertEqual(server["image"]["id"], self.imageid)
729
        self.assertEqual(server["status"], "BUILD")
730

    
731
        # Update class attributes to reflect data on building server
732
        cls = type(self)
733
        cls.serverid = server["id"]
734
        cls.username = None
735
        cls.passwd = server["adminPass"]
736

    
737
        self.result_dict["Server ID"] = str(server["id"])
738
        self.result_dict["Password"] = str(server["adminPass"])
739

    
740
    def test_002a_server_is_building_in_list(self):
741
        """Test server is in BUILD state, in server list"""
742
        log.info("Server in BUILD state in server list")
743

    
744
        self.result_dict.clear()
745

    
746
        servers = self.cyclades_client.list_servers(detail=True)
747
        servers = filter(lambda x: x["name"] == self.servername, servers)
748

    
749
        server = servers[0]
750
        self.assertEqual(server["name"], self.servername)
751
        self.assertEqual(server["flavor"]["id"], self.flavorid)
752
        self.assertEqual(server["image"]["id"], self.imageid)
753
        self.assertEqual(server["status"], "BUILD")
754

    
755
    def test_002b_server_is_building_in_details(self):
756
        """Test server is in BUILD state, in details"""
757

    
758
        log.info("Server in BUILD state in details")
759

    
760
        server = self.cyclades_client.get_server_details(self.serverid)
761
        self.assertEqual(server["name"], self.servername)
762
        self.assertEqual(server["flavor"]["id"], self.flavorid)
763
        self.assertEqual(server["image"]["id"], self.imageid)
764
        self.assertEqual(server["status"], "BUILD")
765

    
766
    def test_002c_set_server_metadata(self):
767

    
768
        log.info("Creating server metadata")
769

    
770
        image = self.cyclades_client.get_image_details(self.imageid)
771
        os_value = image["metadata"]["os"]
772
        users = image["metadata"].get("users", None)
773
        self.cyclades_client.update_server_metadata(self.serverid, OS=os_value)
774

    
775
        userlist = users.split()
776

    
777
        # Determine the username to use for future connections
778
        # to this host
779
        cls = type(self)
780

    
781
        if "root" in userlist:
782
            cls.username = "root"
783
        elif users is None:
784
            cls.username = self._connect_loginname(os_value)
785
        else:
786
            cls.username = choice(userlist)
787

    
788
        self.assertIsNotNone(cls.username)
789

    
790
    def test_002d_verify_server_metadata(self):
791
        """Test server metadata keys are set based on image metadata"""
792

    
793
        log.info("Verifying image metadata")
794

    
795
        servermeta = self.cyclades_client.get_server_metadata(self.serverid)
796
        imagemeta = self.cyclades_client.get_image_metadata(self.imageid)
797

    
798
        self.assertEqual(servermeta["OS"], imagemeta["os"])
799

    
800
    def test_003_server_becomes_active(self):
801
        """Test server becomes ACTIVE"""
802

    
803
        log.info("Waiting for server to become ACTIVE")
804

    
805
        self._insist_on_status_transition(
806
            "BUILD", "ACTIVE", self.build_fail, self.build_warning)
807

    
808
    def test_003a_get_server_oob_console(self):
809
        """Test getting OOB server console over VNC
810

811
        Implementation of RFB protocol follows
812
        http://www.realvnc.com/docs/rfbproto.pdf.
813

814
        """
815
        console = self.cyclades_client.get_server_console(self.serverid)
816
        self.assertEquals(console['type'], "vnc")
817
        sock = self._insist_on_tcp_connection(
818
            socket.AF_INET, console["host"], console["port"])
819

    
820
        # Step 1. ProtocolVersion message (par. 6.1.1)
821
        version = sock.recv(1024)
822
        self.assertEquals(version, 'RFB 003.008\n')
823
        sock.send(version)
824

    
825
        # Step 2. Security (par 6.1.2): Only VNC Authentication supported
826
        sec = sock.recv(1024)
827
        self.assertEquals(list(sec), ['\x01', '\x02'])
828

    
829
        # Step 3. Request VNC Authentication (par 6.1.2)
830
        sock.send('\x02')
831

    
832
        # Step 4. Receive Challenge (par 6.2.2)
833
        challenge = sock.recv(1024)
834
        self.assertEquals(len(challenge), 16)
835

    
836
        # Step 5. DES-Encrypt challenge, use password as key (par 6.2.2)
837
        response = d3des_generate_response(
838
            (console["password"] + '\0' * 8)[:8], challenge)
839
        sock.send(response)
840

    
841
        # Step 6. SecurityResult (par 6.1.3)
842
        result = sock.recv(4)
843
        self.assertEquals(list(result), ['\x00', '\x00', '\x00', '\x00'])
844
        sock.close()
845

    
846
    def test_004_server_has_ipv4(self):
847
        """Test active server has a valid IPv4 address"""
848

    
849
        log.info("Validate server's IPv4")
850

    
851
        server = self.cyclades_client.get_server_details(self.serverid)
852
        ipv4 = self._get_ipv4(server)
853

    
854
        self.result_dict.clear()
855
        self.result_dict["IPv4"] = str(ipv4)
856

    
857
        self.assertEquals(IP(ipv4).version(), 4)
858

    
859
    def test_005_server_has_ipv6(self):
860
        """Test active server has a valid IPv6 address"""
861
        self._skipIf(NO_IPV6, "--no-ipv6 flag enabled")
862

    
863
        log.info("Validate server's IPv6")
864

    
865
        server = self.cyclades_client.get_server_details(self.serverid)
866
        ipv6 = self._get_ipv6(server)
867

    
868
        self.result_dict.clear()
869
        self.result_dict["IPv6"] = str(ipv6)
870

    
871
        self.assertEquals(IP(ipv6).version(), 6)
872

    
873
    def test_006_server_responds_to_ping_IPv4(self):
874
        """Test server responds to ping on IPv4 address"""
875

    
876
        log.info("Testing if server responds to pings in IPv4")
877
        self.result_dict.clear()
878

    
879
        server = self.cyclades_client.get_server_details(self.serverid)
880
        ip = self._get_ipv4(server)
881
        self._try_until_timeout_expires(self.action_timeout,
882
                                        self.action_timeout,
883
                                        "PING IPv4 to %s" % ip,
884
                                        self._ping_once,
885
                                        False, ip)
886

    
887
    def test_007_server_responds_to_ping_IPv6(self):
888
        """Test server responds to ping on IPv6 address"""
889
        self._skipIf(NO_IPV6, "--no-ipv6 flag enabled")
890
        log.info("Testing if server responds to pings in IPv6")
891

    
892
        server = self.cyclades_client.get_server_details(self.serverid)
893
        ip = self._get_ipv6(server)
894
        self._try_until_timeout_expires(self.action_timeout,
895
                                        self.action_timeout,
896
                                        "PING IPv6 to %s" % ip,
897
                                        self._ping_once,
898
                                        True, ip)
899

    
900
    def test_008_submit_shutdown_request(self):
901
        """Test submit request to shutdown server"""
902

    
903
        log.info("Shutting down server")
904

    
905
        self.cyclades_client.shutdown_server(self.serverid)
906

    
907
    def test_009_server_becomes_stopped(self):
908
        """Test server becomes STOPPED"""
909

    
910
        log.info("Waiting until server becomes STOPPED")
911
        self._insist_on_status_transition(
912
            "ACTIVE", "STOPPED", self.action_timeout, self.action_timeout)
913

    
914
    def test_010_submit_start_request(self):
915
        """Test submit start server request"""
916

    
917
        log.info("Starting server")
918

    
919
        self.cyclades_client.start_server(self.serverid)
920

    
921
    def test_011_server_becomes_active(self):
922
        """Test server becomes ACTIVE again"""
923

    
924
        log.info("Waiting until server becomes ACTIVE")
925
        self._insist_on_status_transition(
926
            "STOPPED", "ACTIVE", self.action_timeout, self.action_timeout)
927

    
928
    def test_011a_server_responds_to_ping_IPv4(self):
929
        """Test server OS is actually up and running again"""
930

    
931
        log.info("Testing if server is actually up and running")
932

    
933
        self.test_006_server_responds_to_ping_IPv4()
934

    
935
    def test_012_ssh_to_server_IPv4(self):
936
        """Test SSH to server public IPv4 works, verify hostname"""
937

    
938
        self._skipIf(self.is_windows, "only valid for Linux servers")
939
        server = self.cyclades_client.get_server_details(self.serverid)
940
        self._insist_on_ssh_hostname(self._get_ipv4(server),
941
                                     self.username, self.passwd)
942

    
943
    def test_013_ssh_to_server_IPv6(self):
944
        """Test SSH to server public IPv6 works, verify hostname"""
945
        self._skipIf(self.is_windows, "only valid for Linux servers")
946
        self._skipIf(NO_IPV6, "--no-ipv6 flag enabled")
947

    
948
        server = self.cyclades_client.get_server_details(self.serverid)
949
        self._insist_on_ssh_hostname(self._get_ipv6(server),
950
                                     self.username, self.passwd)
951

    
952
    def test_014_rdp_to_server_IPv4(self):
953
        "Test RDP connection to server public IPv4 works"""
954
        self._skipIf(not self.is_windows, "only valid for Windows servers")
955
        server = self.cyclades_client.get_server_details(self.serverid)
956
        ipv4 = self._get_ipv4(server)
957
        sock = self._insist_on_tcp_connection(socket.AF_INET, ipv4, 3389)
958

    
959
        # No actual RDP processing done. We assume the RDP server is there
960
        # if the connection to the RDP port is successful.
961
        # FIXME: Use rdesktop, analyze exit code? see manpage [costasd]
962
        sock.close()
963

    
964
    def test_015_rdp_to_server_IPv6(self):
965
        "Test RDP connection to server public IPv6 works"""
966
        self._skipIf(not self.is_windows, "only valid for Windows servers")
967
        self._skipIf(NO_IPV6, "--no-ipv6 flag enabled")
968

    
969
        server = self.cyclades_client.get_server_details(self.serverid)
970
        ipv6 = self._get_ipv6(server)
971
        sock = self._get_tcp_connection(socket.AF_INET6, ipv6, 3389)
972

    
973
        # No actual RDP processing done. We assume the RDP server is there
974
        # if the connection to the RDP port is successful.
975
        sock.close()
976

    
977
    def test_016_personality_is_enforced(self):
978
        """Test file injection for personality enforcement"""
979
        self._skipIf(self.is_windows, "only implemented for Linux servers")
980
        self._skipIf(self.personality is None, "No personality file selected")
981

    
982
        log.info("Trying to inject file for personality enforcement")
983

    
984
        server = self.cyclades_client.get_server_details(self.serverid)
985

    
986
        for inj_file in self.personality:
987
            equal_files = self._check_file_through_ssh(self._get_ipv4(server),
988
                                                       inj_file['owner'],
989
                                                       self.passwd,
990
                                                       inj_file['path'],
991
                                                       inj_file['contents'])
992
            self.assertTrue(equal_files)
993

    
994
    def test_017_submit_delete_request(self):
995
        """Test submit request to delete server"""
996

    
997
        log.info("Deleting server")
998

    
999
        self.cyclades_client.delete_server(self.serverid)
1000

    
1001
    def test_018_server_becomes_deleted(self):
1002
        """Test server becomes DELETED"""
1003

    
1004
        log.info("Testing if server becomes DELETED")
1005

    
1006
        self._insist_on_status_transition(
1007
            "ACTIVE", "DELETED", self.action_timeout, self.action_timeout)
1008

    
1009
    def test_019_server_no_longer_in_server_list(self):
1010
        """Test server is no longer in server list"""
1011

    
1012
        log.info("Test if server is no longer listed")
1013

    
1014
        servers = self.cyclades_client.list_servers()
1015
        self.assertNotIn(self.serverid, [s["id"] for s in servers])
1016

    
1017

    
1018
class NetworkTestCase(unittest.TestCase):
1019
    """ Testing networking in cyclades """
1020

    
1021
    @classmethod
1022
    def setUpClass(cls):
1023
        "Initialize kamaki, get list of current networks"
1024

    
1025
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
1026
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
1027
        # Cyclades Client
1028
        compute_url = \
1029
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
1030
        cls.cyclades_client = CycladesClient(compute_url, TOKEN)
1031
        cls.cyclades_client.CONNECTION_RETRY_LIMIT = 2
1032

    
1033
        cls.servername = "%s%s for %s" % (SNF_TEST_PREFIX,
1034
                                          TEST_RUN_ID,
1035
                                          cls.imagename)
1036

    
1037
        #Dictionary initialization for the vms credentials
1038
        cls.serverid = dict()
1039
        cls.username = dict()
1040
        cls.password = dict()
1041
        cls.is_windows = cls.imagename.lower().find("windows") >= 0
1042

    
1043
        cls.result_dict = dict()
1044

    
1045
    def _skipIf(self, condition, msg):
1046
        if condition:
1047
            self.skipTest(msg)
1048

    
1049
    def _get_ipv4(self, server):
1050
        """Get the public IPv4 of a server from the detailed server info"""
1051

    
1052
        nics = server["attachments"]
1053

    
1054
        for nic in nics:
1055
            net_id = nic["network_id"]
1056
            if self.cyclades_client.get_network_details(net_id)["public"]:
1057
                public_addrs = nic["ipv4"]
1058

    
1059
        self.assertTrue(public_addrs is not None)
1060

    
1061
        return public_addrs
1062

    
1063
    def _connect_loginname(self, os_value):
1064
        """Return the login name for connections based on the server OS"""
1065
        if os_value in ("Ubuntu", "Kubuntu", "Fedora"):
1066
            return "user"
1067
        elif os_value in ("windows", "windows_alpha1"):
1068
            return "Administrator"
1069
        else:
1070
            return "root"
1071

    
1072
    def _ping_once(self, ip):
1073

    
1074
        """Test server responds to a single IPv4 or IPv6 ping"""
1075
        cmd = "ping -c 7 -w 20 %s" % (ip)
1076
        ping = subprocess.Popen(cmd, shell=True,
1077
                                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1078
        (stdout, stderr) = ping.communicate()
1079
        ret = ping.wait()
1080

    
1081
        return (ret == 0)
1082

    
1083
    def test_00001a_submit_create_server_A(self):
1084
        """Test submit create server request"""
1085

    
1086
        log.info("Creating test server A")
1087

    
1088
        serverA = self.cyclades_client.create_server(
1089
            self.servername, self.flavorid, self.imageid, personality=None)
1090

    
1091
        self.assertEqual(serverA["name"], self.servername)
1092
        self.assertEqual(serverA["flavor"]["id"], self.flavorid)
1093
        self.assertEqual(serverA["image"]["id"], self.imageid)
1094
        self.assertEqual(serverA["status"], "BUILD")
1095

    
1096
        # Update class attributes to reflect data on building server
1097
        self.serverid['A'] = serverA["id"]
1098
        self.username['A'] = None
1099
        self.password['A'] = serverA["adminPass"]
1100

    
1101
        log.info("Server A id:" + str(serverA["id"]))
1102
        log.info("Server password " + (self.password['A']))
1103

    
1104
        self.result_dict["Server A ID"] = str(serverA["id"])
1105
        self.result_dict["Server A password"] = serverA["adminPass"]
1106

    
1107
    def test_00001b_serverA_becomes_active(self):
1108
        """Test server becomes ACTIVE"""
1109

    
1110
        log.info("Waiting until test server A becomes ACTIVE")
1111
        self.result_dict.clear()
1112

    
1113
        fail_tmout = time.time() + self.action_timeout
1114
        while True:
1115
            d = self.cyclades_client.get_server_details(self.serverid['A'])
1116
            status = d['status']
1117
            if status == 'ACTIVE':
1118
                active = True
1119
                break
1120
            elif time.time() > fail_tmout:
1121
                self.assertLess(time.time(), fail_tmout)
1122
            else:
1123
                time.sleep(self.query_interval)
1124

    
1125
        self.assertTrue(active)
1126

    
1127
    def test_00002a_submit_create_server_B(self):
1128
        """Test submit create server request"""
1129

    
1130
        log.info("Creating test server B")
1131

    
1132
        serverB = self.cyclades_client.create_server(
1133
            self.servername, self.flavorid, self.imageid, personality=None)
1134

    
1135
        self.assertEqual(serverB["name"], self.servername)
1136
        self.assertEqual(serverB["flavor"]["id"], self.flavorid)
1137
        self.assertEqual(serverB["image"]["id"], self.imageid)
1138
        self.assertEqual(serverB["status"], "BUILD")
1139

    
1140
        # Update class attributes to reflect data on building server
1141
        self.serverid['B'] = serverB["id"]
1142
        self.username['B'] = None
1143
        self.password['B'] = serverB["adminPass"]
1144

    
1145
        log.info("Server B id: " + str(serverB["id"]))
1146
        log.info("Password " + (self.password['B']))
1147

    
1148
        self.result_dict.clear()
1149
        self.result_dict["Server B ID"] = str(serverB["id"])
1150
        self.result_dict["Server B password"] = serverB["adminPass"]
1151

    
1152
    def test_00002b_serverB_becomes_active(self):
1153
        """Test server becomes ACTIVE"""
1154

    
1155
        log.info("Waiting until test server B becomes ACTIVE")
1156
        self.result_dict.clear()
1157

    
1158
        fail_tmout = time.time() + self.action_timeout
1159
        while True:
1160
            d = self.cyclades_client.get_server_details(self.serverid['B'])
1161
            status = d['status']
1162
            if status == 'ACTIVE':
1163
                active = True
1164
                break
1165
            elif time.time() > fail_tmout:
1166
                self.assertLess(time.time(), fail_tmout)
1167
            else:
1168
                time.sleep(self.query_interval)
1169

    
1170
        self.assertTrue(active)
1171

    
1172
    def test_001_create_network(self):
1173
        """Test submit create network request"""
1174

    
1175
        log.info("Submit new network request")
1176
        self.result_dict.clear()
1177

    
1178
        name = SNF_TEST_PREFIX + TEST_RUN_ID
1179
        #previous_num = len(self.client.list_networks())
1180
        network = self.cyclades_client.create_network(
1181
            name, cidr='10.0.1.0/28', dhcp=True)
1182

    
1183
        #Test if right name is assigned
1184
        self.assertEqual(network['name'], name)
1185

    
1186
        # Update class attributes
1187
        cls = type(self)
1188
        cls.networkid = network['id']
1189
        #networks = self.client.list_networks()
1190

    
1191
        fail_tmout = time.time() + self.action_timeout
1192

    
1193
        #Test if new network is created
1194
        while True:
1195
            d = self.cyclades_client.get_network_details(network['id'])
1196
            if d['status'] == 'ACTIVE':
1197
                connected = True
1198
                break
1199
            elif time.time() > fail_tmout:
1200
                self.assertLess(time.time(), fail_tmout)
1201
            else:
1202
                log.info("Waiting for network to become ACTIVE")
1203
                time.sleep(self.query_interval)
1204

    
1205
        self.assertTrue(connected)
1206

    
1207
        self.result_dict["Private network ID"] = str(network['id'])
1208

    
1209
    def test_002_connect_to_network(self):
1210
        """Test connect VMs to network"""
1211

    
1212
        log.info("Connect VMs to private network")
1213
        self.result_dict.clear()
1214

    
1215
        self.cyclades_client.connect_server(self.serverid['A'], self.networkid)
1216
        self.cyclades_client.connect_server(self.serverid['B'], self.networkid)
1217

    
1218
        #Insist on connecting until action timeout
1219
        fail_tmout = time.time() + self.action_timeout
1220

    
1221
        while True:
1222

    
1223
            netsA = [x['network_id']
1224
                     for x in self.cyclades_client.get_server_details(
1225
                         self.serverid['A'])['attachments']]
1226
            netsB = [x['network_id']
1227
                     for x in self.cyclades_client.get_server_details(
1228
                         self.serverid['B'])['attachments']]
1229

    
1230
            if (self.networkid in netsA) and (self.networkid in netsB):
1231
                conn_exists = True
1232
                break
1233
            elif time.time() > fail_tmout:
1234
                self.assertLess(time.time(), fail_tmout)
1235
            else:
1236
                time.sleep(self.query_interval)
1237

    
1238
        #Adding private IPs to class attributes
1239
        cls = type(self)
1240
        cls.priv_ip = dict()
1241

    
1242
        nicsA = self.cyclades_client.get_server_details(
1243
            self.serverid['A'])['attachments']
1244
        nicsB = self.cyclades_client.get_server_details(
1245
            self.serverid['B'])['attachments']
1246

    
1247
        if conn_exists:
1248
            for nic in nicsA:
1249
                if nic["network_id"] == self.networkid:
1250
                    cls.priv_ip["A"] = nic["ipv4"]
1251
            self.result_dict["Server A private IP"] = str(cls.priv_ip["A"])
1252

    
1253
            for nic in nicsB:
1254
                if nic["network_id"] == self.networkid:
1255
                    cls.priv_ip["B"] = nic["ipv4"]
1256
            self.result_dict["Server B private IP"] = str(cls.priv_ip["B"])
1257

    
1258
        self.assertTrue(conn_exists)
1259
        self.assertIsNot(cls.priv_ip["A"], None)
1260
        self.assertIsNot(cls.priv_ip["B"], None)
1261

    
1262
    def test_002a_reboot(self):
1263
        """Rebooting server A"""
1264

    
1265
        log.info("Rebooting server A")
1266

    
1267
        self.cyclades_client.shutdown_server(self.serverid['A'])
1268

    
1269
        fail_tmout = time.time() + self.action_timeout
1270
        while True:
1271
            d = self.cyclades_client.get_server_details(self.serverid['A'])
1272
            status = d['status']
1273
            if status == 'STOPPED':
1274
                break
1275
            elif time.time() > fail_tmout:
1276
                self.assertLess(time.time(), fail_tmout)
1277
            else:
1278
                time.sleep(self.query_interval)
1279

    
1280
        self.cyclades_client.start_server(self.serverid['A'])
1281

    
1282
        while True:
1283
            d = self.cyclades_client.get_server_details(self.serverid['A'])
1284
            status = d['status']
1285
            if status == 'ACTIVE':
1286
                active = True
1287
                break
1288
            elif time.time() > fail_tmout:
1289
                self.assertLess(time.time(), fail_tmout)
1290
            else:
1291
                time.sleep(self.query_interval)
1292

    
1293
        self.assertTrue(active)
1294

    
1295
    def test_002b_ping_server_A(self):
1296
        "Test if server A responds to IPv4 pings"
1297

    
1298
        log.info("Testing if server A responds to IPv4 pings ")
1299
        self.result_dict.clear()
1300

    
1301
        server = self.cyclades_client.get_server_details(self.serverid['A'])
1302
        ip = self._get_ipv4(server)
1303

    
1304
        fail_tmout = time.time() + self.action_timeout
1305

    
1306
        s = False
1307

    
1308
        self.result_dict["Server A public IP"] = str(ip)
1309

    
1310
        while True:
1311

    
1312
            if self._ping_once(ip):
1313
                s = True
1314
                break
1315

    
1316
            elif time.time() > fail_tmout:
1317
                self.assertLess(time.time(), fail_tmout)
1318

    
1319
            else:
1320
                time.sleep(self.query_interval)
1321

    
1322
        self.assertTrue(s)
1323

    
1324
    def test_002c_reboot(self):
1325
        """Reboot server B"""
1326

    
1327
        log.info("Rebooting server B")
1328
        self.result_dict.clear()
1329

    
1330
        self.cyclades_client.shutdown_server(self.serverid['B'])
1331

    
1332
        fail_tmout = time.time() + self.action_timeout
1333
        while True:
1334
            d = self.cyclades_client.get_server_details(self.serverid['B'])
1335
            status = d['status']
1336
            if status == 'STOPPED':
1337
                break
1338
            elif time.time() > fail_tmout:
1339
                self.assertLess(time.time(), fail_tmout)
1340
            else:
1341
                time.sleep(self.query_interval)
1342

    
1343
        self.cyclades_client.start_server(self.serverid['B'])
1344

    
1345
        while True:
1346
            d = self.cyclades_client.get_server_details(self.serverid['B'])
1347
            status = d['status']
1348
            if status == 'ACTIVE':
1349
                active = True
1350
                break
1351
            elif time.time() > fail_tmout:
1352
                self.assertLess(time.time(), fail_tmout)
1353
            else:
1354
                time.sleep(self.query_interval)
1355

    
1356
        self.assertTrue(active)
1357

    
1358
    def test_002d_ping_server_B(self):
1359
        """Test if server B responds to IPv4 pings"""
1360

    
1361
        log.info("Testing if server B responds to IPv4 pings")
1362
        self.result_dict.clear()
1363

    
1364
        server = self.cyclades_client.get_server_details(self.serverid['B'])
1365
        ip = self._get_ipv4(server)
1366

    
1367
        fail_tmout = time.time() + self.action_timeout
1368

    
1369
        s = False
1370

    
1371
        self.result_dict["Server B public IP"] = str(ip)
1372

    
1373
        while True:
1374
            if self._ping_once(ip):
1375
                s = True
1376
                break
1377

    
1378
            elif time.time() > fail_tmout:
1379
                self.assertLess(time.time(), fail_tmout)
1380

    
1381
            else:
1382
                time.sleep(self.query_interval)
1383

    
1384
        self.assertTrue(s)
1385

    
1386
    def test_003a_setup_interface_A(self):
1387
        """Setup eth1 for server A"""
1388

    
1389
        self._skipIf(self.is_windows, "only valid for Linux servers")
1390

    
1391
        log.info("Setting up interface eth1 for server A")
1392
        self.result_dict.clear()
1393

    
1394
        server = self.cyclades_client.get_server_details(self.serverid['A'])
1395
        image = self.cyclades_client.get_image_details(self.imageid)
1396
        os_value = image['metadata']['os']
1397

    
1398
        users = image["metadata"].get("users", None)
1399
        userlist = users.split()
1400

    
1401
        if "root" in userlist:
1402
            loginname = "root"
1403
        elif users is None:
1404
            loginname = self._connect_loginname(os_value)
1405
        else:
1406
            loginname = choice(userlist)
1407

    
1408
        hostip = self._get_ipv4(server)
1409
        myPass = self.password['A']
1410

    
1411
        log.info("SSH in server A as %s/%s" % (loginname, myPass))
1412
        command = "ifconfig eth1 %s && ifconfig eth1 | " \
1413
                  "grep 'inet addr:' | cut -d: -f2 | awk '{ print $1}'" \
1414
                  % self.priv_ip["A"]
1415
        output, status = _ssh_execute(
1416
            hostip, loginname, myPass, command)
1417

    
1418
        self.assertEquals(status, 0)
1419
        self.assertEquals(output[0].strip(), self.priv_ip["A"])
1420

    
1421
    def test_003b_setup_interface_B(self):
1422
        """Setup eth1 for server B"""
1423

    
1424
        self._skipIf(self.is_windows, "only valid for Linux servers")
1425

    
1426
        log.info("Setting up interface eth1 for server B")
1427

    
1428
        server = self.cyclades_client.get_server_details(self.serverid['B'])
1429
        image = self.cyclades_client.get_image_details(self.imageid)
1430
        os_value = image['metadata']['os']
1431

    
1432
        users = image["metadata"].get("users", None)
1433
        userlist = users.split()
1434

    
1435
        if "root" in userlist:
1436
            loginname = "root"
1437
        elif users is None:
1438
            loginname = self._connect_loginname(os_value)
1439
        else:
1440
            loginname = choice(userlist)
1441

    
1442
        hostip = self._get_ipv4(server)
1443
        myPass = self.password['B']
1444

    
1445
        log.info("SSH in server B as %s/%s" % (loginname, myPass))
1446
        command = "ifconfig eth1 %s && ifconfig eth1 | " \
1447
                  "grep 'inet addr:' | cut -d: -f2 | awk '{ print $1}'" \
1448
                  % self.priv_ip["B"]
1449
        output, status = _ssh_execute(
1450
            hostip, loginname, myPass, command)
1451

    
1452
        self.assertEquals(status, 0)
1453
        self.assertEquals(output[0].strip(), self.priv_ip["B"])
1454

    
1455
    def test_003c_test_connection_exists(self):
1456
        """Ping server B from server A to test if connection exists"""
1457

    
1458
        self._skipIf(self.is_windows, "only valid for Linux servers")
1459

    
1460
        log.info("Testing if server A is actually connected to server B")
1461

    
1462
        server = self.cyclades_client.get_server_details(self.serverid['A'])
1463
        image = self.cyclades_client.get_image_details(self.imageid)
1464
        os_value = image['metadata']['os']
1465
        hostip = self._get_ipv4(server)
1466

    
1467
        users = image["metadata"].get("users", None)
1468
        userlist = users.split()
1469

    
1470
        if "root" in userlist:
1471
            loginname = "root"
1472
        elif users is None:
1473
            loginname = self._connect_loginname(os_value)
1474
        else:
1475
            loginname = choice(userlist)
1476

    
1477
        myPass = self.password['A']
1478

    
1479
        cmd = "if ping -c 7 -w 20 %s >/dev/null; \
1480
               then echo \'True\'; fi;" % self.priv_ip["B"]
1481
        lines, status = _ssh_execute(
1482
            hostip, loginname, myPass, cmd)
1483

    
1484
        exists = False
1485

    
1486
        if 'True\n' in lines:
1487
            exists = True
1488

    
1489
        self.assertTrue(exists)
1490

    
1491
    def test_004_disconnect_from_network(self):
1492
        "Disconnecting server A and B from network"
1493

    
1494
        log.info("Disconnecting servers from private network")
1495

    
1496
        prev_state = self.cyclades_client.get_network_details(self.networkid)
1497
        prev_nics = prev_state['attachments']
1498
        #prev_conn = len(prev_nics)
1499

    
1500
        nicsA = [x['id']
1501
                 for x in self.cyclades_client.get_server_details(
1502
                     self.serverid['A'])['attachments']]
1503
        nicsB = [x['id']
1504
                 for x in self.cyclades_client.get_server_details(
1505
                     self.serverid['B'])['attachments']]
1506

    
1507
        for nic in prev_nics:
1508
            if nic in nicsA:
1509
                self.cyclades_client.disconnect_server(self.serverid['A'], nic)
1510
            if nic in nicsB:
1511
                self.cyclades_client.disconnect_server(self.serverid['B'], nic)
1512

    
1513
        #Insist on deleting until action timeout
1514
        fail_tmout = time.time() + self.action_timeout
1515

    
1516
        while True:
1517
            netsA = [x['network_id']
1518
                     for x in self.cyclades_client.get_server_details(
1519
                         self.serverid['A'])['attachments']]
1520
            netsB = [x['network_id']
1521
                     for x in self.cyclades_client.get_server_details(
1522
                         self.serverid['B'])['attachments']]
1523

    
1524
            #connected = (self.client.get_network_details(self.networkid))
1525
            #connections = connected['attachments']
1526
            if (self.networkid not in netsA) and (self.networkid not in netsB):
1527
                conn_exists = False
1528
                break
1529
            elif time.time() > fail_tmout:
1530
                self.assertLess(time.time(), fail_tmout)
1531
            else:
1532
                time.sleep(self.query_interval)
1533

    
1534
        self.assertFalse(conn_exists)
1535

    
1536
    def test_005_destroy_network(self):
1537
        """Test submit delete network request"""
1538

    
1539
        log.info("Submitting delete network request")
1540

    
1541
        self.cyclades_client.delete_network(self.networkid)
1542

    
1543
        fail_tmout = time.time() + self.action_timeout
1544

    
1545
        while True:
1546

    
1547
            curr_net = []
1548
            networks = self.cyclades_client.list_networks()
1549

    
1550
            for net in networks:
1551
                curr_net.append(net['id'])
1552

    
1553
            if self.networkid not in curr_net:
1554
                self.assertTrue(self.networkid not in curr_net)
1555
                break
1556

    
1557
            elif time.time() > fail_tmout:
1558
                self.assertLess(time.time(), fail_tmout)
1559

    
1560
            else:
1561
                time.sleep(self.query_interval)
1562

    
1563
    def test_006_cleanup_servers(self):
1564
        """Cleanup servers created for this test"""
1565

    
1566
        log.info("Delete servers created for this test")
1567

    
1568
        self.cyclades_client.delete_server(self.serverid['A'])
1569
        self.cyclades_client.delete_server(self.serverid['B'])
1570

    
1571
        fail_tmout = time.time() + self.action_timeout
1572

    
1573
        #Ensure server gets deleted
1574
        status = dict()
1575

    
1576
        while True:
1577
            details = \
1578
                self.cyclades_client.get_server_details(self.serverid['A'])
1579
            status['A'] = details['status']
1580
            details = \
1581
                self.cyclades_client.get_server_details(self.serverid['B'])
1582
            status['B'] = details['status']
1583
            if (status['A'] == 'DELETED') and (status['B'] == 'DELETED'):
1584
                deleted = True
1585
                break
1586
            elif time.time() > fail_tmout:
1587
                self.assertLess(time.time(), fail_tmout)
1588
            else:
1589
                time.sleep(self.query_interval)
1590

    
1591
        self.assertTrue(deleted)
1592

    
1593

    
1594
class TestRunnerProcess(Process):
1595
    """A distinct process used to execute part of the tests in parallel"""
1596
    def __init__(self, **kw):
1597
        Process.__init__(self, **kw)
1598
        kwargs = kw["kwargs"]
1599
        self.testq = kwargs["testq"]
1600
        self.worker_folder = kwargs["worker_folder"]
1601

    
1602
    def run(self):
1603
        # Make sure this test runner process dies with the parent
1604
        # and is not left behind.
1605
        #
1606
        # WARNING: This uses the prctl(2) call and is
1607
        # Linux-specific.
1608

    
1609
        prctl.set_pdeathsig(signal.SIGHUP)
1610

    
1611
        multi = logging.getLogger("multiprocess")
1612

    
1613
        while True:
1614
            multi.debug("I am process %d, GETting from queue is %s" %
1615
                        (os.getpid(), self.testq))
1616
            msg = self.testq.get()
1617

    
1618
            multi.debug("Dequeued msg: %s" % msg)
1619

    
1620
            if msg == "TEST_RUNNER_TERMINATE":
1621
                raise SystemExit
1622

    
1623
            elif issubclass(msg, unittest.TestCase):
1624
                # Assemble a TestSuite, and run it
1625

    
1626
                log_file = os.path.join(self.worker_folder, 'details_' +
1627
                                        (msg.__name__) + "_" +
1628
                                        TEST_RUN_ID + '.log')
1629

    
1630
                fail_file = os.path.join(self.worker_folder, 'failed_' +
1631
                                         (msg.__name__) + "_" +
1632
                                         TEST_RUN_ID + '.log')
1633
                error_file = os.path.join(self.worker_folder, 'error_' +
1634
                                          (msg.__name__) + "_" +
1635
                                          TEST_RUN_ID + '.log')
1636

    
1637
                f = open(log_file, 'w')
1638
                fail = open(fail_file, 'w')
1639
                error = open(error_file, 'w')
1640

    
1641
                log.info(yellow + '* Starting testcase: %s' % msg + normal)
1642

    
1643
                runner = unittest.TextTestRunner(
1644
                    f, verbosity=2, failfast=True,
1645
                    resultclass=BurninTestResult)
1646
                suite = unittest.TestLoader().loadTestsFromTestCase(msg)
1647
                result = runner.run(suite)
1648

    
1649
                for res in result.errors:
1650
                    log.error("snf-burnin encountered an error in "
1651
                              "testcase: %s" % msg)
1652
                    log.error("See log for details")
1653
                    error.write(str(res[0]) + '\n')
1654
                    error.write(str(res[0].shortDescription()) + '\n')
1655
                    error.write('\n')
1656

    
1657
                for res in result.failures:
1658
                    log.error("snf-burnin failed in testcase: %s" % msg)
1659
                    log.error("See log for details")
1660
                    fail.write(str(res[0]) + '\n')
1661
                    fail.write(str(res[0].shortDescription()) + '\n')
1662
                    fail.write('\n')
1663
                    if not NOFAILFAST:
1664
                        sys.exit()
1665

    
1666
                if (len(result.failures) == 0) and (len(result.errors) == 0):
1667
                    log.debug("Passed testcase: %s" % msg)
1668

    
1669
                f.close()
1670
                fail.close()
1671
                error.close()
1672

    
1673
            else:
1674
                raise Exception("Cannot handle msg: %s" % msg)
1675

    
1676

    
1677
def _run_cases_in_series(cases, image_folder):
1678
    """Run instances of TestCase in series"""
1679

    
1680
    for case in cases:
1681

    
1682
        test = case.__name__
1683

    
1684
        log.info(yellow + '* Starting testcase: %s' % test + normal)
1685
        log_file = os.path.join(image_folder, 'details_' +
1686
                                (case.__name__) + "_" +
1687
                                TEST_RUN_ID + '.log')
1688
        fail_file = os.path.join(image_folder, 'failed_' +
1689
                                 (case.__name__) + "_" +
1690
                                 TEST_RUN_ID + '.log')
1691
        error_file = os.path.join(image_folder, 'error_' +
1692
                                  (case.__name__) + "_" +
1693
                                  TEST_RUN_ID + '.log')
1694

    
1695
        f = open(log_file, "w")
1696
        fail = open(fail_file, "w")
1697
        error = open(error_file, "w")
1698

    
1699
        suite = unittest.TestLoader().loadTestsFromTestCase(case)
1700
        runner = unittest.TextTestRunner(
1701
            f, verbosity=2, failfast=True,
1702
            resultclass=BurninTestResult)
1703
        result = runner.run(suite)
1704

    
1705
        for res in result.errors:
1706
            log.error("snf-burnin encountered an error in "
1707
                      "testcase: %s" % test)
1708
            log.error("See log for details")
1709
            error.write(str(res[0]) + '\n')
1710
            error.write(str(res[0].shortDescription()) + '\n')
1711
            error.write('\n')
1712

    
1713
        for res in result.failures:
1714
            log.error("snf-burnin failed in testcase: %s" % test)
1715
            log.error("See log for details")
1716
            fail.write(str(res[0]) + '\n')
1717
            fail.write(str(res[0].shortDescription()) + '\n')
1718
            fail.write('\n')
1719
            if not NOFAILFAST:
1720
                sys.exit()
1721

    
1722
        if (len(result.failures) == 0) and (len(result.errors) == 0):
1723
            log.debug("Passed testcase: %s" % test)
1724

    
1725

    
1726
def _run_cases_in_parallel(cases, fanout, image_folder):
1727
    """Run instances of TestCase in parallel, in a number of distinct processes
1728

1729
    The cases iterable specifies the TestCases to be executed in parallel,
1730
    by test runners running in distinct processes.
1731
    The fanout parameter specifies the number of processes to spawn,
1732
    and defaults to 1.
1733
    The runner argument specifies the test runner class to use inside each
1734
    runner process.
1735

1736
    """
1737

    
1738
    multi = logging.getLogger("multiprocess")
1739
    handler = logging.StreamHandler()
1740
    multi.addHandler(handler)
1741

    
1742
    if VERBOSE:
1743
        multi.setLevel(logging.DEBUG)
1744
    else:
1745
        multi.setLevel(logging.INFO)
1746

    
1747
    testq = []
1748
    worker_folder = []
1749
    runners = []
1750

    
1751
    for i in xrange(0, fanout):
1752
        testq.append(Queue())
1753
        worker_folder.append(os.path.join(image_folder, 'process'+str(i)))
1754
        os.mkdir(worker_folder[i])
1755

    
1756
    for i in xrange(0, fanout):
1757
        kwargs = dict(testq=testq[i], worker_folder=worker_folder[i])
1758
        runners.append(TestRunnerProcess(kwargs=kwargs))
1759

    
1760
    multi.debug("Spawning %d test runner processes" % len(runners))
1761

    
1762
    for p in runners:
1763
        p.start()
1764

    
1765
    # Enqueue test cases
1766
    for i in xrange(0, fanout):
1767
        map(testq[i].put, cases)
1768
        testq[i].put("TEST_RUNNER_TERMINATE")
1769

    
1770
    multi.debug("Spawned %d test runners, PIDs are %s" %
1771
                (len(runners), [p.pid for p in runners]))
1772

    
1773
    multi.debug("Joining %d processes" % len(runners))
1774

    
1775
    for p in runners:
1776
        p.join()
1777

    
1778
    multi.debug("Done joining %d processes" % len(runners))
1779

    
1780

    
1781
def _images_test_case(**kwargs):
1782
    """Construct a new unit test case class from ImagesTestCase"""
1783
    name = "ImagesTestCase_%s" % kwargs["imageid"]
1784
    cls = type(name, (ImagesTestCase,), kwargs)
1785

    
1786
    #Patch extra parameters into test names by manipulating method docstrings
1787
    for (mname, m) in \
1788
            inspect.getmembers(cls, lambda x: inspect.ismethod(x)):
1789
        if hasattr(m, __doc__):
1790
            m.__func__.__doc__ = "[%s] %s" % (cls.imagename, m.__doc__)
1791

    
1792
    # Make sure the class can be pickled, by listing it among
1793
    # the attributes of __main__. A PicklingError is raised otherwise.
1794
    thismodule = sys.modules[__name__]
1795
    setattr(thismodule, name, cls)
1796
    return cls
1797

    
1798

    
1799
def _spawn_server_test_case(**kwargs):
1800
    """Construct a new unit test case class from SpawnServerTestCase"""
1801

    
1802
    name = "SpawnServerTestCase_%s" % kwargs["imageid"]
1803
    cls = type(name, (SpawnServerTestCase,), kwargs)
1804

    
1805
    # Patch extra parameters into test names by manipulating method docstrings
1806
    for (mname, m) in \
1807
            inspect.getmembers(cls, lambda x: inspect.ismethod(x)):
1808
        if hasattr(m, __doc__):
1809
            m.__func__.__doc__ = "[%s] %s" % (cls.imagename, m.__doc__)
1810

    
1811
    # Make sure the class can be pickled, by listing it among
1812
    # the attributes of __main__. A PicklingError is raised otherwise.
1813

    
1814
    thismodule = sys.modules[__name__]
1815
    setattr(thismodule, name, cls)
1816
    return cls
1817

    
1818

    
1819
def _spawn_network_test_case(**kwargs):
1820
    """Construct a new unit test case class from NetworkTestCase"""
1821

    
1822
    name = "NetworkTestCase" + TEST_RUN_ID
1823
    cls = type(name, (NetworkTestCase,), kwargs)
1824

    
1825
    # Make sure the class can be pickled, by listing it among
1826
    # the attributes of __main__. A PicklingError is raised otherwise.
1827

    
1828
    thismodule = sys.modules[__name__]
1829
    setattr(thismodule, name, cls)
1830
    return cls
1831

    
1832

    
1833
# --------------------------------------------------------------------
1834
# Clean up servers/networks functions
1835
def cleanup_servers(timeout, query_interval, delete_stale=False):
1836

    
1837
    astakos_client = AstakosClient(AUTH_URL, TOKEN)
1838
    astakos_client.CONNECTION_RETRY_LIMIT = 2
1839
    # Compute Client
1840
    compute_url = astakos_client.get_service_endpoints('compute')['publicURL']
1841
    compute_client = ComputeClient(compute_url, TOKEN)
1842
    compute_client.CONNECTION_RETRY_LIMIT = 2
1843

    
1844
    servers = compute_client.list_servers()
1845
    stale = [s for s in servers if s["name"].startswith(SNF_TEST_PREFIX)]
1846

    
1847
    if len(stale) == 0:
1848
        return
1849

    
1850
    # Show staled servers
1851
    print >>sys.stderr, yellow + \
1852
        "Found these stale servers from previous runs:" + \
1853
        normal
1854
    print >>sys.stderr, "    " + \
1855
        "\n    ".join(["%d: %s" % (s["id"], s["name"]) for s in stale])
1856

    
1857
    # Delete staled servers
1858
    if delete_stale:
1859
        print >> sys.stderr, "Deleting %d stale servers:" % len(stale)
1860
        fail_tmout = time.time() + timeout
1861
        for s in stale:
1862
            compute_client.delete_server(s["id"])
1863
        # Wait for all servers to be deleted
1864
        while True:
1865
            servers = compute_client.list_servers()
1866
            stale = [s for s in servers
1867
                     if s["name"].startswith(SNF_TEST_PREFIX)]
1868
            if len(stale) == 0:
1869
                print >> sys.stderr, green + "    ...done" + normal
1870
                break
1871
            elif time.time() > fail_tmout:
1872
                print >> sys.stderr, red + \
1873
                    "Not all stale servers deleted. Action timed out." + \
1874
                    normal
1875
                sys.exit(1)
1876
            else:
1877
                time.sleep(query_interval)
1878
    else:
1879
        print >> sys.stderr, "Use --delete-stale to delete them."
1880

    
1881

    
1882
def cleanup_networks(action_timeout, query_interval, delete_stale=False):
1883

    
1884
    astakos_client = AstakosClient(AUTH_URL, TOKEN)
1885
    astakos_client.CONNECTION_RETRY_LIMIT = 2
1886
    # Cyclades Client
1887
    compute_url = astakos_client.get_service_endpoints('compute')['publicURL']
1888
    cyclades_client = CycladesClient(compute_url, TOKEN)
1889
    cyclades_client.CONNECTION_RETRY_LIMIT = 2
1890

    
1891
    networks = cyclades_client.list_networks()
1892
    stale = [n for n in networks if n["name"].startswith(SNF_TEST_PREFIX)]
1893

    
1894
    if len(stale) == 0:
1895
        return
1896

    
1897
    # Show staled networks
1898
    print >> sys.stderr, yellow + \
1899
        "Found these stale networks from previous runs:" + \
1900
        normal
1901
    print "    " + \
1902
        "\n    ".join(["%s: %s" % (str(n["id"]), n["name"]) for n in stale])
1903

    
1904
    # Delete staled networks
1905
    if delete_stale:
1906
        print >> sys.stderr, "Deleting %d stale networks:" % len(stale)
1907
        fail_tmout = time.time() + action_timeout
1908
        for n in stale:
1909
            cyclades_client.delete_network(n["id"])
1910
        # Wait for all networks to be deleted
1911
        while True:
1912
            networks = cyclades_client.list_networks()
1913
            stale = [n for n in networks
1914
                     if n["name"].startswith(SNF_TEST_PREFIX)]
1915
            if len(stale) == 0:
1916
                print >> sys.stderr, green + "    ...done" + normal
1917
                break
1918
            elif time.time() > fail_tmout:
1919
                print >> sys.stderr, red + \
1920
                    "Not all stale networks deleted. Action timed out." + \
1921
                    normal
1922
                sys.exit(1)
1923
            else:
1924
                time.sleep(query_interval)
1925
    else:
1926
        print >> sys.stderr, "Use --delete-stale to delete them."
1927

    
1928

    
1929
# --------------------------------------------------------------------
1930
# Parse arguments functions
1931
def parse_comma(option, opt, value, parser):
1932
    tests = set(['all', 'auth', 'images', 'flavors',
1933
                 'pithos', 'servers', 'server_spawn',
1934
                 'network_spawn'])
1935
    parse_input = value.split(',')
1936

    
1937
    if not (set(parse_input)).issubset(tests):
1938
        raise OptionValueError("The selected set of tests is invalid")
1939

    
1940
    setattr(parser.values, option.dest, value.split(','))
1941

    
1942

    
1943
def parse_arguments(args):
1944

    
1945
    kw = {}
1946
    kw["usage"] = "%prog [options]"
1947
    kw["description"] = \
1948
        "%prog runs a number of test scenarios on a " \
1949
        "Synnefo deployment."
1950

    
1951
    parser = OptionParser(**kw)
1952
    parser.disable_interspersed_args()
1953

    
1954
    parser.add_option("--auth-url",
1955
                      action="store", type="string", dest="auth_url",
1956
                      help="The AUTH URI to use to reach the Synnefo API",
1957
                      default=None)
1958
    parser.add_option("--system-images-user",
1959
                      action="store", type="string", dest="system_images_user",
1960
                      help="Owner of system images",
1961
                      default=None)
1962
    parser.add_option("--token",
1963
                      action="store", type="string", dest="token",
1964
                      help="The token to use for authentication to the API")
1965
    parser.add_option("--nofailfast",
1966
                      action="store_true", dest="nofailfast",
1967
                      help="Do not fail immediately if one of the tests "
1968
                           "fails (EXPERIMENTAL)",
1969
                      default=False)
1970
    parser.add_option("--no-ipv6",
1971
                      action="store_true", dest="no_ipv6",
1972
                      help="Disables ipv6 related tests",
1973
                      default=False)
1974
    parser.add_option("--action-timeout",
1975
                      action="store", type="int", dest="action_timeout",
1976
                      metavar="TIMEOUT",
1977
                      help="Wait SECONDS seconds for a server action to "
1978
                           "complete, then the test is considered failed",
1979
                      default=100)
1980
    parser.add_option("--build-warning",
1981
                      action="store", type="int", dest="build_warning",
1982
                      metavar="TIMEOUT",
1983
                      help="Warn if TIMEOUT seconds have passed and a "
1984
                           "build operation is still pending",
1985
                      default=600)
1986
    parser.add_option("--build-fail",
1987
                      action="store", type="int", dest="build_fail",
1988
                      metavar="BUILD_TIMEOUT",
1989
                      help="Fail the test if TIMEOUT seconds have passed "
1990
                           "and a build operation is still incomplete",
1991
                      default=900)
1992
    parser.add_option("--query-interval",
1993
                      action="store", type="int", dest="query_interval",
1994
                      metavar="INTERVAL",
1995
                      help="Query server status when requests are pending "
1996
                           "every INTERVAL seconds",
1997
                      default=3)
1998
    parser.add_option("--fanout",
1999
                      action="store", type="int", dest="fanout",
2000
                      metavar="COUNT",
2001
                      help="Spawn up to COUNT child processes to execute "
2002
                           "in parallel, essentially have up to COUNT "
2003
                           "server build requests outstanding (EXPERIMENTAL)",
2004
                      default=1)
2005
    parser.add_option("--force-flavor",
2006
                      action="store", type="int", dest="force_flavorid",
2007
                      metavar="FLAVOR ID",
2008
                      help="Force all server creations to use the specified "
2009
                           "FLAVOR ID instead of a randomly chosen one, "
2010
                           "useful if disk space is scarce",
2011
                      default=None)
2012
    parser.add_option("--image-id",
2013
                      action="store", type="string", dest="force_imageid",
2014
                      metavar="IMAGE ID",
2015
                      help="Test the specified image id, use 'all' to test "
2016
                           "all available images (mandatory argument)",
2017
                      default=None)
2018
    parser.add_option("--show-stale",
2019
                      action="store_true", dest="show_stale",
2020
                      help="Show stale servers from previous runs, whose "
2021
                           "name starts with `%s'" % SNF_TEST_PREFIX,
2022
                      default=False)
2023
    parser.add_option("--delete-stale",
2024
                      action="store_true", dest="delete_stale",
2025
                      help="Delete stale servers from previous runs, whose "
2026
                           "name starts with `%s'" % SNF_TEST_PREFIX,
2027
                      default=False)
2028
    parser.add_option("--force-personality",
2029
                      action="store", type="string", dest="personality_path",
2030
                      help="Force a personality file injection.\
2031
                            File path required. ",
2032
                      default=None)
2033
    parser.add_option("--log-folder",
2034
                      action="store", type="string", dest="log_folder",
2035
                      help="Define the absolute path where the output \
2036
                            log is stored. ",
2037
                      default="/var/log/burnin/")
2038
    parser.add_option("--verbose", "-V",
2039
                      action="store_true", dest="verbose",
2040
                      help="Print detailed output about multiple "
2041
                           "processes spawning",
2042
                      default=False)
2043
    parser.add_option("--set-tests",
2044
                      action="callback",
2045
                      dest="tests",
2046
                      type="string",
2047
                      help='Set comma seperated tests for this run. \
2048
                            Available tests: auth, images, flavors, \
2049
                                             servers, server_spawn, \
2050
                                             network_spawn, pithos. \
2051
                            Default = all',
2052
                      default='all',
2053
                      callback=parse_comma)
2054

    
2055
    (opts, args) = parser.parse_args(args)
2056

    
2057
    # -----------------------
2058
    # Verify arguments
2059

    
2060
    # `delete_stale' implies `show_stale'
2061
    if opts.delete_stale:
2062
        opts.show_stale = True
2063

    
2064
    # `token' is mandatory
2065
    _mandatory_argument(opts.token, "--token")
2066
    # `auth_url' is mandatory
2067
    _mandatory_argument(opts.auth_url, "--auth-url")
2068
    # `system_images_user' is mandatory
2069
    _mandatory_argument(opts.system_images_user, "--system-images-user")
2070

    
2071
    if not opts.show_stale:
2072
        # `image-id' is mandatory
2073
        _mandatory_argument(opts.force_imageid, "--image-id")
2074
        if opts.force_imageid != 'all':
2075
            try:
2076
                opts.force_imageid = str(opts.force_imageid)
2077
            except ValueError:
2078
                print >>sys.stderr, red + \
2079
                    "Invalid value specified for" + \
2080
                    "--image-id. Use a valid id, or `all'." + \
2081
                    normal
2082
                sys.exit(1)
2083

    
2084
    return (opts, args)
2085

    
2086

    
2087
def _mandatory_argument(Arg, Str):
2088
    if (Arg is None) or (Arg == ""):
2089
        print >>sys.stderr, red + \
2090
            "The " + Str + " argument is mandatory.\n" + \
2091
            normal
2092
        sys.exit(1)
2093

    
2094

    
2095
# --------------------------------------------------------------------
2096
# Burnin main function
2097
def main():
2098
    """Assemble test cases into a test suite, and run it
2099

2100
    IMPORTANT: Tests have dependencies and have to be run in the specified
2101
    order inside a single test case. They communicate through attributes of the
2102
    corresponding TestCase class (shared fixtures). Distinct subclasses of
2103
    TestCase MAY SHARE NO DATA, since they are run in parallel, in distinct
2104
    test runner processes.
2105

2106
    """
2107

    
2108
    # Parse arguments using `optparse'
2109
    (opts, args) = parse_arguments(sys.argv[1:])
2110

    
2111
    # Some global variables
2112
    global AUTH_URL, TOKEN, SYSTEM_IMAGES_USER
2113
    global NO_IPV6, VERBOSE, NOFAILFAST
2114
    AUTH_URL = opts.auth_url
2115
    TOKEN = opts.token
2116
    SYSTEM_IMAGES_USER = opts.system_images_user
2117
    NO_IPV6 = opts.no_ipv6
2118
    VERBOSE = opts.verbose
2119
    NOFAILFAST = opts.nofailfast
2120

    
2121
    # If `show_stale', cleanup stale servers
2122
    # from previous runs and exit
2123
    if opts.show_stale:
2124
        # We must clean the servers first
2125
        cleanup_servers(opts.action_timeout, opts.query_interval,
2126
                        delete_stale=opts.delete_stale)
2127
        cleanup_networks(opts.action_timeout, opts.query_interval,
2128
                         delete_stale=opts.delete_stale)
2129
        return 0
2130

    
2131
    # Initialize a kamaki instance, get flavors, images
2132
    astakos_client = AstakosClient(AUTH_URL, TOKEN)
2133
    astakos_client.CONNECTION_RETRY_LIMIT = 2
2134
    # Compute Client
2135
    compute_url = astakos_client.get_service_endpoints('compute')['publicURL']
2136
    compute_client = ComputeClient(compute_url, TOKEN)
2137
    compute_client.CONNECTION_RETRY_LIMIT = 2
2138
    DIMAGES = compute_client.list_images(detail=True)
2139
    DFLAVORS = compute_client.list_flavors(detail=True)
2140

    
2141
    # FIXME: logging, log, LOG PID, TEST_RUN_ID, arguments
2142
    # Run them: FIXME: In parallel, FAILEARLY, catchbreak?
2143
    #unittest.main(verbosity=2, catchbreak=True)
2144

    
2145
    # Get a list of images we are going to test
2146
    if opts.force_imageid == 'all':
2147
        test_images = DIMAGES
2148
    else:
2149
        test_images = filter(lambda x: x["id"] == opts.force_imageid, DIMAGES)
2150

    
2151
    # Create output (logging) folder
2152
    if not os.path.exists(opts.log_folder):
2153
        os.mkdir(opts.log_folder)
2154
    test_folder = os.path.join(opts.log_folder, TEST_RUN_ID)
2155
    os.mkdir(test_folder)
2156

    
2157
    for image in test_images:
2158
        imageid = str(image["id"])
2159
        imagename = image["name"]
2160
        # Choose a flavor (given from user or random)
2161
        if opts.force_flavorid:
2162
            flavorid = opts.force_flavorid
2163
        else:
2164
            flavorid = choice([f["id"] for f in DFLAVORS if f["disk"] >= 20])
2165
        # Personality dictionary for file injection test
2166
        if opts.personality_path is not None:
2167
            f = open(opts.personality_path)
2168
            content = b64encode(f.read())
2169
            personality = []
2170
            st = os.stat(opts.personality_path)
2171
            personality.append({
2172
                'path': '/root/test_inj_file',
2173
                'owner': 'root',
2174
                'group': 'root',
2175
                'mode': 0x7777 & st.st_mode,
2176
                'contents': content})
2177
        else:
2178
            personality = None
2179
        # Give a name to our test servers
2180
        servername = "%s%s for %s" % (SNF_TEST_PREFIX, TEST_RUN_ID, imagename)
2181
        is_windows = imagename.lower().find("windows") >= 0
2182

    
2183
        # Create Server TestCases
2184
        ServerTestCase = _spawn_server_test_case(
2185
            imageid=imageid,
2186
            flavorid=flavorid,
2187
            imagename=imagename,
2188
            personality=personality,
2189
            servername=servername,
2190
            is_windows=is_windows,
2191
            action_timeout=opts.action_timeout,
2192
            build_warning=opts.build_warning,
2193
            build_fail=opts.build_fail,
2194
            query_interval=opts.query_interval)
2195
        # Create Network TestCases
2196
        NetworkTestCase = _spawn_network_test_case(
2197
            action_timeout=opts.action_timeout,
2198
            imageid=imageid,
2199
            flavorid=flavorid,
2200
            imagename=imagename,
2201
            query_interval=opts.query_interval)
2202
        # Create Images TestCase
2203
        CImagesTestCase = _images_test_case(
2204
            action_timeout=opts.action_timeout,
2205
            imageid=imageid,
2206
            flavorid=flavorid,
2207
            imagename=imagename,
2208
            query_interval=opts.query_interval)
2209

    
2210
        # Choose the tests we are going to run
2211
        test_dict = {'auth': UnauthorizedTestCase,
2212
                     'images': CImagesTestCase,
2213
                     'flavors': FlavorsTestCase,
2214
                     'servers': ServersTestCase,
2215
                     'pithos': PithosTestCase,
2216
                     'server_spawn': ServerTestCase,
2217
                     'network_spawn': NetworkTestCase}
2218
        seq_cases = []
2219
        if 'all' in opts.tests:
2220
            seq_cases = [UnauthorizedTestCase, CImagesTestCase,
2221
                         FlavorsTestCase, ServersTestCase,
2222
                         PithosTestCase, ServerTestCase,
2223
                         NetworkTestCase]
2224
        else:
2225
            for test in opts.tests:
2226
                seq_cases.append(test_dict[test])
2227

    
2228
        # Folder for each image
2229
        image_folder = os.path.join(test_folder, imageid)
2230
        os.mkdir(image_folder)
2231

    
2232
        # Run each test
2233
        if opts.fanout > 1:
2234
            _run_cases_in_parallel(seq_cases, opts.fanout, image_folder)
2235
        else:
2236
            _run_cases_in_series(seq_cases, image_folder)
2237

    
2238

    
2239
# --------------------------------------------------------------------
2240
# Call main
2241
if __name__ == "__main__":
2242
    sys.exit(main())