Statistics
| Branch: | Tag: | Revision:

root / snf-tools / synnefo_tools / burnin.py @ 980c2592

History | View | Annotate | Download (82 kB)

1
#!/usr/bin/env python
2

    
3
# Copyright 2011 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35

    
36
"""Perform integration testing on a running Synnefo deployment"""
37

    
38
#import __main__
39
import datetime
40
import inspect
41
import logging
42
import os
43
import os.path
44
import paramiko
45
import prctl
46
import subprocess
47
import signal
48
import socket
49
import sys
50
import time
51
import tempfile
52
from base64 import b64encode
53
from IPy import IP
54
from multiprocessing import Process, Queue
55
from random import choice, randint
56
from optparse import OptionParser, OptionValueError
57

    
58
from kamaki.clients.compute import ComputeClient
59
from kamaki.clients.cyclades import CycladesClient
60
from kamaki.clients.image import ImageClient
61
from kamaki.clients.pithos import PithosClient
62
from kamaki.clients.astakos import AstakosClient
63
from kamaki.clients import ClientError
64

    
65
from vncauthproxy.d3des import generate_response as d3des_generate_response
66

    
67
# Use backported unittest functionality if Python < 2.7
68
try:
69
    import unittest2 as unittest
70
except ImportError:
71
    if sys.version_info < (2, 7):
72
        raise Exception("The unittest2 package is required for Python < 2.7")
73
    import unittest
74

    
75
# --------------------------------------------------------------------
76
# Global Variables
77
AUTH_URL = None
78
TOKEN = None
79
SYSTEM_IMAGES_USER = None
80
NO_IPV6 = None
81
NOFAILFAST = None
82
VERBOSE = None
83

    
84
# A unique id identifying this test run
85
TEST_RUN_ID = datetime.datetime.strftime(datetime.datetime.now(),
86
                                         "%Y%m%d%H%M%S")
87
SNF_TEST_PREFIX = "snf-test-"
88

    
89
red = '\x1b[31m'
90
yellow = '\x1b[33m'
91
green = '\x1b[32m'
92
normal = '\x1b[0m'
93

    
94

    
95
# --------------------------------------------------------------------
96
# Global functions
97
def _ssh_execute(hostip, username, password, command):
98
    """Execute a command via ssh"""
99
    ssh = paramiko.SSHClient()
100
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
101
    try:
102
        ssh.connect(hostip, username=username, password=password)
103
    except socket.error, err:
104
        raise AssertionError(err)
105
    try:
106
        stdin, stdout, stderr = ssh.exec_command(command)
107
    except paramiko.SSHException, err:
108
        raise AssertionError(err)
109
    status = stdout.channel.recv_exit_status()
110
    output = stdout.readlines()
111
    ssh.close()
112
    return output, status
113

    
114

    
115
def _get_user_id():
116
    """Authenticate to astakos and get unique users id"""
117
    astakos = AstakosClient(AUTH_URL, TOKEN)
118
    astakos.CONNECTION_RETRY_LIMIT = 2
119
    authenticate = astakos.authenticate()
120
    return authenticate['access']['user']['id']
121

    
122

    
123
# --------------------------------------------------------------------
124
# BurninTestReulst class
125
class BurninTestResult(unittest.TextTestResult):
126
    def addSuccess(self, test):
127
        super(BurninTestResult, self).addSuccess(test)
128
        if self.showAll:
129
            if hasattr(test, 'result_dict'):
130
                run_details = test.result_dict
131

    
132
                self.stream.write("\n")
133
                for i in run_details:
134
                    self.stream.write("%s : %s \n" % (i, run_details[i]))
135
                self.stream.write("\n")
136

    
137
        elif self.dots:
138
            self.stream.write('.')
139
            self.stream.flush()
140

    
141
    def addError(self, test, err):
142
        super(BurninTestResult, self).addError(test, err)
143
        if self.showAll:
144
            self.stream.writeln("ERROR")
145
            if hasattr(test, 'result_dict'):
146
                run_details = test.result_dict
147

    
148
                self.stream.write("\n")
149
                for i in run_details:
150
                    self.stream.write("%s : %s \n" % (i, run_details[i]))
151
                self.stream.write("\n")
152

    
153
        elif self.dots:
154
            self.stream.write('E')
155
            self.stream.flush()
156

    
157
    def addFailure(self, test, err):
158
        super(BurninTestResult, self).addFailure(test, err)
159
        if self.showAll:
160
            self.stream.writeln("FAIL")
161
            if hasattr(test, 'result_dict'):
162
                run_details = test.result_dict
163

    
164
                self.stream.write("\n")
165
                for i in run_details:
166
                    self.stream.write("%s : %s \n" % (i, run_details[i]))
167
                self.stream.write("\n")
168

    
169
        elif self.dots:
170
            self.stream.write('F')
171
            self.stream.flush()
172

    
173

    
174
# --------------------------------------------------------------------
175
# Format Results
176
class burninFormatter(logging.Formatter):
177
    err_fmt = red + "ERROR: %(msg)s" + normal
178
    dbg_fmt = green + "* %(msg)s" + normal
179
    info_fmt = "%(msg)s"
180

    
181
    def __init__(self, fmt="%(levelno)s: %(msg)s"):
182
        logging.Formatter.__init__(self, fmt)
183

    
184
    def format(self, record):
185
        format_orig = self._fmt
186
        # Replace the original format with one customized by logging level
187
        if record.levelno == 10:    # DEBUG
188
            self._fmt = burninFormatter.dbg_fmt
189
        elif record.levelno == 20:  # INFO
190
            self._fmt = burninFormatter.info_fmt
191
        elif record.levelno == 40:  # ERROR
192
            self._fmt = burninFormatter.err_fmt
193
        result = logging.Formatter.format(self, record)
194
        self._fmt = format_orig
195
        return result
196

    
197
log = logging.getLogger("burnin")
198
log.setLevel(logging.DEBUG)
199
handler = logging.StreamHandler()
200
handler.setFormatter(burninFormatter())
201
log.addHandler(handler)
202

    
203

    
204
# --------------------------------------------------------------------
205
# UnauthorizedTestCase class
206
class UnauthorizedTestCase(unittest.TestCase):
207
    """Test unauthorized access"""
208
    @classmethod
209
    def setUpClass(cls):
210
        cls.astakos = AstakosClient(AUTH_URL, TOKEN)
211
        cls.astakos.CONNECTION_RETRY_LIMIT = 2
212
        cls.compute_url = \
213
            cls.astakos.get_service_endpoints('compute')['publicURL']
214
        cls.result_dict = dict()
215

    
216
    def test_unauthorized_access(self):
217
        """Test access without a valid token fails"""
218
        log.info("Authentication test")
219
        falseToken = '12345'
220
        c = ComputeClient(self.compute_url, falseToken)
221
        c.CONNECTION_RETRY_LIMIT = 2
222

    
223
        with self.assertRaises(ClientError) as cm:
224
            c.list_servers()
225
            self.assertEqual(cm.exception.status, 401)
226

    
227

    
228
# --------------------------------------------------------------------
229
# This class gest replicated into Images TestCases dynamically
230
class ImagesTestCase(unittest.TestCase):
231
    """Test image lists for consistency"""
232
    @classmethod
233
    def setUpClass(cls):
234
        """Initialize kamaki, get (detailed) list of images"""
235
        log.info("Getting simple and detailed list of images")
236
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
237
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
238
        # Compute Client
239
        compute_url = \
240
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
241
        cls.compute_client = ComputeClient(compute_url, TOKEN)
242
        cls.compute_client.CONNECTION_RETRY_LIMIT = 2
243
        # Image Client
244
        image_url = \
245
            cls.astakos_client.get_service_endpoints('image')['publicURL']
246
        cls.image_client = ImageClient(image_url, TOKEN)
247
        cls.image_client.CONNECTION_RETRY_LIMIT = 2
248
        # Pithos Client
249
        pithos_url = cls.astakos_client.\
250
            get_service_endpoints('object-store')['publicURL']
251
        cls.pithos_client = PithosClient(pithos_url, TOKEN)
252
        cls.pithos_client.CONNECTION_RETRY_LIMIT = 2
253

    
254
        # Get images
255
        cls.images = \
256
            filter(lambda x: not x['name'].startswith(SNF_TEST_PREFIX),
257
                   cls.image_client.list_public())
258
        cls.dimages = \
259
            filter(lambda x: not x['name'].startswith(SNF_TEST_PREFIX),
260
                   cls.image_client.list_public(detail=True))
261
        cls.result_dict = dict()
262
        # Get uniq user id
263
        cls.uuid = _get_user_id()
264
        log.info("Uniq user id = %s" % cls.uuid)
265
        # Create temp directory and store it inside our class
266
        # XXX: In my machine /tmp has not enough space
267
        #      so use current directory to be sure.
268
        cls.temp_dir = tempfile.mkdtemp(dir=os.getcwd())
269
        cls.temp_image_name = \
270
            SNF_TEST_PREFIX + cls.imageid + ".diskdump"
271

    
272
    @classmethod
273
    def tearDownClass(cls):
274
        """Remove local files"""
275
        try:
276
            temp_file = os.path.join(cls.temp_dir, cls.temp_image_name)
277
            os.unlink(temp_file)
278
        except:
279
            pass
280
        try:
281
            os.rmdir(cls.temp_dir)
282
        except:
283
            pass
284

    
285
    def test_001_list_images(self):
286
        """Test image list actually returns images"""
287
        self.assertGreater(len(self.images), 0)
288

    
289
    def test_002_list_images_detailed(self):
290
        """Test detailed image list is the same length as list"""
291
        self.assertEqual(len(self.dimages), len(self.images))
292

    
293
    def test_003_same_image_names(self):
294
        """Test detailed and simple image list contain same names"""
295
        names = sorted(map(lambda x: x["name"], self.images))
296
        dnames = sorted(map(lambda x: x["name"], self.dimages))
297
        self.assertEqual(names, dnames)
298

    
299
    def test_004_unique_image_names(self):
300
        """Test system images have unique names"""
301
        sys_images = filter(lambda x: x['owner'] == SYSTEM_IMAGES_USER,
302
                            self.dimages)
303
        names = sorted(map(lambda x: x["name"], sys_images))
304
        self.assertEqual(sorted(list(set(names))), names)
305

    
306
    def test_005_image_metadata(self):
307
        """Test every image has specific metadata defined"""
308
        keys = frozenset(["osfamily", "root_partition"])
309
        details = self.compute_client.list_images(detail=True)
310
        sys_images = filter(lambda x: x['user_id'] == SYSTEM_IMAGES_USER,
311
                            details)
312
        for i in sys_images:
313
            self.assertTrue(keys.issubset(i["metadata"].keys()))
314

    
315
    def test_006_download_image(self):
316
        """Download image from pithos+"""
317
        # Get image location
318
        image = filter(
319
            lambda x: x['id'] == self.imageid, self.dimages)[0]
320
        image_location = \
321
            image['location'].replace("://", " ").replace("/", " ").split()
322
        log.info("Download image, with owner %s\n\tcontainer %s, and name %s"
323
                 % (image_location[1], image_location[2], image_location[3]))
324
        self.pithos_client.account = image_location[1]
325
        self.pithos_client.container = image_location[2]
326
        temp_file = os.path.join(self.temp_dir, self.temp_image_name)
327
        with open(temp_file, "wb+") as f:
328
            self.pithos_client.download_object(image_location[3], f)
329

    
330
    def test_007_upload_image(self):
331
        """Upload and register image"""
332
        temp_file = os.path.join(self.temp_dir, self.temp_image_name)
333
        log.info("Upload image to pithos+")
334
        # Create container `images'
335
        self.pithos_client.account = self.uuid
336
        self.pithos_client.container = "images"
337
        self.pithos_client.container_put()
338
        with open(temp_file, "rb+") as f:
339
            self.pithos_client.upload_object(self.temp_image_name, f)
340
        log.info("Register image to plankton")
341
        location = "pithos://" + self.uuid + \
342
            "/images/" + self.temp_image_name
343
        params = {'is_public': False}
344
        properties = {'OSFAMILY': "linux", 'ROOT_PARTITION': 1}
345
        self.image_client.register(
346
            self.temp_image_name, location, params, properties)
347
        # Get image id
348
        details = self.image_client.list_public(detail=True)
349
        detail = filter(lambda x: x['location'] == location, details)
350
        self.assertEqual(len(detail), 1)
351
        cls = type(self)
352
        cls.temp_image_id = detail[0]['id']
353
        log.info("Image registered with id %s" % detail[0]['id'])
354

    
355
    def test_008_cleanup_image(self):
356
        """Cleanup image test"""
357
        log.info("Cleanup image test")
358
        # Remove image from pithos+
359
        self.pithos_client.account = self.uuid
360
        self.pithos_client.container = "images"
361
        self.pithos_client.del_object(self.temp_image_name)
362

    
363

    
364
# --------------------------------------------------------------------
365
# FlavorsTestCase class
366
class FlavorsTestCase(unittest.TestCase):
367
    """Test flavor lists for consistency"""
368
    @classmethod
369
    def setUpClass(cls):
370
        """Initialize kamaki, get (detailed) list of flavors"""
371
        log.info("Getting simple and detailed list of flavors")
372
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
373
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
374
        # Compute Client
375
        compute_url = \
376
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
377
        cls.compute_client = ComputeClient(compute_url, TOKEN)
378
        cls.compute_client.CONNECTION_RETRY_LIMIT = 2
379
        cls.flavors = cls.compute_client.list_flavors()
380
        cls.dflavors = cls.compute_client.list_flavors(detail=True)
381
        cls.result_dict = dict()
382

    
383
    def test_001_list_flavors(self):
384
        """Test flavor list actually returns flavors"""
385
        self.assertGreater(len(self.flavors), 0)
386

    
387
    def test_002_list_flavors_detailed(self):
388
        """Test detailed flavor list is the same length as list"""
389
        self.assertEquals(len(self.dflavors), len(self.flavors))
390

    
391
    def test_003_same_flavor_names(self):
392
        """Test detailed and simple flavor list contain same names"""
393
        names = sorted(map(lambda x: x["name"], self.flavors))
394
        dnames = sorted(map(lambda x: x["name"], self.dflavors))
395
        self.assertEqual(names, dnames)
396

    
397
    def test_004_unique_flavor_names(self):
398
        """Test flavors have unique names"""
399
        names = sorted(map(lambda x: x["name"], self.flavors))
400
        self.assertEqual(sorted(list(set(names))), names)
401

    
402
    def test_005_well_formed_flavor_names(self):
403
        """Test flavors have names of the form CxxRyyDzz
404
        Where xx is vCPU count, yy is RAM in MiB, zz is Disk in GiB
405
        """
406
        for f in self.dflavors:
407
            flavor = (f["vcpus"], f["ram"], f["disk"], f["SNF:disk_template"])
408
            self.assertEqual("C%dR%dD%d%s" % flavor,
409
                             f["name"],
410
                             "Flavor %s does not match its specs." % f["name"])
411

    
412

    
413
# --------------------------------------------------------------------
414
# ServersTestCase class
415
class ServersTestCase(unittest.TestCase):
416
    """Test server lists for consistency"""
417
    @classmethod
418
    def setUpClass(cls):
419
        """Initialize kamaki, get (detailed) list of servers"""
420
        log.info("Getting simple and detailed list of servers")
421

    
422
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
423
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
424
        # Compute Client
425
        compute_url = \
426
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
427
        cls.compute_client = ComputeClient(compute_url, TOKEN)
428
        cls.compute_client.CONNECTION_RETRY_LIMIT = 2
429
        cls.servers = cls.compute_client.list_servers()
430
        cls.dservers = cls.compute_client.list_servers(detail=True)
431
        cls.result_dict = dict()
432

    
433
    # def test_001_list_servers(self):
434
    #     """Test server list actually returns servers"""
435
    #     self.assertGreater(len(self.servers), 0)
436

    
437
    def test_002_list_servers_detailed(self):
438
        """Test detailed server list is the same length as list"""
439
        self.assertEqual(len(self.dservers), len(self.servers))
440

    
441
    def test_003_same_server_names(self):
442
        """Test detailed and simple flavor list contain same names"""
443
        names = sorted(map(lambda x: x["name"], self.servers))
444
        dnames = sorted(map(lambda x: x["name"], self.dservers))
445
        self.assertEqual(names, dnames)
446

    
447

    
448
# --------------------------------------------------------------------
449
# Pithos Test Cases
450
class PithosTestCase(unittest.TestCase):
451
    """Test pithos functionality"""
452
    @classmethod
453
    def setUpClass(cls):
454
        """Initialize kamaki, get list of containers"""
455
        # Get uniq user id
456
        cls.uuid = _get_user_id()
457
        log.info("Uniq user id = %s" % cls.uuid)
458
        log.info("Getting list of containers")
459

    
460
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
461
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
462
        # Pithos Client
463
        pithos_url = cls.astakos_client.\
464
            get_service_endpoints('object-store')['publicURL']
465
        cls.pithos_client = PithosClient(pithos_url, TOKEN, cls.uuid)
466
        cls.pithos_client.CONNECTION_RETRY_LIMIT = 2
467

    
468
        cls.containers = cls.pithos_client.list_containers()
469
        cls.result_dict = dict()
470

    
471
    def test_001_list_containers(self):
472
        """Test container list actually returns containers"""
473
        self.assertGreater(len(self.containers), 0)
474

    
475
    def test_002_unique_containers(self):
476
        """Test if containers have unique names"""
477
        names = [n['name'] for n in self.containers]
478
        names = sorted(names)
479
        self.assertEqual(sorted(list(set(names))), names)
480

    
481
    def test_003_create_container(self):
482
        """Test create a container"""
483
        rand_num = randint(1000, 9999)
484
        rand_name = "%s%s" % (SNF_TEST_PREFIX, rand_num)
485
        names = [n['name'] for n in self.containers]
486
        while rand_name in names:
487
            rand_num = randint(1000, 9999)
488
            rand_name = "%s%s" % (SNF_TEST_PREFIX, rand_num)
489
        # Create container
490
        self.pithos_client.container = rand_name
491
        self.pithos_client.container_put()
492
        # Get list of containers
493
        new_containers = self.pithos_client.list_containers()
494
        new_container_names = [n['name'] for n in new_containers]
495
        self.assertIn(rand_name, new_container_names)
496

    
497
    def test_004_upload(self):
498
        """Test uploading something to pithos+"""
499
        # Create a tmp file
500
        with tempfile.TemporaryFile() as f:
501
            f.write("This is a temp file")
502
            f.seek(0, 0)
503
            # Where to save file
504
            self.pithos_client.upload_object("test.txt", f)
505

    
506
    def test_005_download(self):
507
        """Test download something from pithos+"""
508
        # Create tmp directory to save file
509
        tmp_dir = tempfile.mkdtemp()
510
        tmp_file = os.path.join(tmp_dir, "test.txt")
511
        with open(tmp_file, "wb+") as f:
512
            self.pithos_client.download_object("test.txt", f)
513
            # Read file
514
            f.seek(0, 0)
515
            content = f.read()
516
        # Remove files
517
        os.unlink(tmp_file)
518
        os.rmdir(tmp_dir)
519
        # Compare results
520
        self.assertEqual(content, "This is a temp file")
521

    
522
    def test_006_remove(self):
523
        """Test removing files and containers"""
524
        cont_name = self.pithos_client.container
525
        self.pithos_client.del_object("test.txt")
526
        self.pithos_client.purge_container()
527
        # List containers
528
        containers = self.pithos_client.list_containers()
529
        cont_names = [n['name'] for n in containers]
530
        self.assertNotIn(cont_name, cont_names)
531

    
532

    
533
# --------------------------------------------------------------------
534
# This class gets replicated into actual TestCases dynamically
535
class SpawnServerTestCase(unittest.TestCase):
536
    """Test scenario for server of the specified image"""
537
    @classmethod
538
    def setUpClass(cls):
539
        """Initialize a kamaki instance"""
540
        log.info("Spawning server for image `%s'" % cls.imagename)
541

    
542
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
543
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
544
        # Cyclades Client
545
        compute_url = \
546
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
547
        cls.cyclades_client = CycladesClient(compute_url, TOKEN)
548
        cls.cyclades_client.CONNECTION_RETRY_LIMIT = 2
549

    
550
        cls.result_dict = dict()
551

    
552
    def _get_ipv4(self, server):
553
        """Get the public IPv4 of a server from the detailed server info"""
554

    
555
        nics = server["attachments"]
556

    
557
        for nic in nics:
558
            net_id = nic["network_id"]
559
            if self.cyclades_client.get_network_details(net_id)["public"]:
560
                public_addrs = nic["ipv4"]
561

    
562
        self.assertTrue(public_addrs is not None)
563

    
564
        return public_addrs
565

    
566
    def _get_ipv6(self, server):
567
        """Get the public IPv6 of a server from the detailed server info"""
568

    
569
        nics = server["attachments"]
570

    
571
        for nic in nics:
572
            net_id = nic["network_id"]
573
            if self.cyclades_client.get_network_details(net_id)["public"]:
574
                public_addrs = nic["ipv6"]
575

    
576
        self.assertTrue(public_addrs is not None)
577

    
578
        return public_addrs
579

    
580
    def _connect_loginname(self, os_value):
581
        """Return the login name for connections based on the server OS"""
582
        if os_value in ("Ubuntu", "Kubuntu", "Fedora"):
583
            return "user"
584
        elif os_value in ("windows", "windows_alpha1"):
585
            return "Administrator"
586
        else:
587
            return "root"
588

    
589
    def _verify_server_status(self, current_status, new_status):
590
        """Verify a server has switched to a specified status"""
591
        server = self.cyclades_client.get_server_details(self.serverid)
592
        if server["status"] not in (current_status, new_status):
593
            return None  # Do not raise exception, return so the test fails
594
        self.assertEquals(server["status"], new_status)
595

    
596
    def _get_connected_tcp_socket(self, family, host, port):
597
        """Get a connected socket from the specified family to host:port"""
598
        sock = None
599
        for res in \
600
            socket.getaddrinfo(host, port, family, socket.SOCK_STREAM, 0,
601
                               socket.AI_PASSIVE):
602
            af, socktype, proto, canonname, sa = res
603
            try:
604
                sock = socket.socket(af, socktype, proto)
605
            except socket.error:
606
                sock = None
607
                continue
608
            try:
609
                sock.connect(sa)
610
            except socket.error:
611
                sock.close()
612
                sock = None
613
                continue
614
        self.assertIsNotNone(sock)
615
        return sock
616

    
617
    def _ping_once(self, ipv6, ip):
618
        """Test server responds to a single IPv4 or IPv6 ping"""
619
        cmd = "ping%s -c 7 -w 20 %s" % ("6" if ipv6 else "", ip)
620
        ping = subprocess.Popen(cmd, shell=True,
621
                                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
622
        (stdout, stderr) = ping.communicate()
623
        ret = ping.wait()
624
        self.assertEquals(ret, 0)
625

    
626
    def _get_hostname_over_ssh(self, hostip, username, password):
627
        lines, status = _ssh_execute(
628
            hostip, username, password, "hostname")
629
        self.assertEqual(len(lines), 1)
630
        return lines[0]
631

    
632
    def _try_until_timeout_expires(self, warn_timeout, fail_timeout,
633
                                   opmsg, callable, *args, **kwargs):
634
        if warn_timeout == fail_timeout:
635
            warn_timeout = fail_timeout + 1
636
        warn_tmout = time.time() + warn_timeout
637
        fail_tmout = time.time() + fail_timeout
638
        while True:
639
            self.assertLess(time.time(), fail_tmout,
640
                            "operation `%s' timed out" % opmsg)
641
            if time.time() > warn_tmout:
642
                log.warning("Server %d: `%s' operation `%s' not done yet",
643
                            self.serverid, self.servername, opmsg)
644
            try:
645
                log.info("%s... " % opmsg)
646
                return callable(*args, **kwargs)
647
            except AssertionError:
648
                pass
649
            time.sleep(self.query_interval)
650

    
651
    def _insist_on_tcp_connection(self, family, host, port):
652
        familystr = {socket.AF_INET: "IPv4", socket.AF_INET6: "IPv6",
653
                     socket.AF_UNSPEC: "Unspecified-IPv4/6"}
654
        msg = "connect over %s to %s:%s" % \
655
              (familystr.get(family, "Unknown"), host, port)
656
        sock = self._try_until_timeout_expires(
657
            self.action_timeout, self.action_timeout,
658
            msg, self._get_connected_tcp_socket,
659
            family, host, port)
660
        return sock
661

    
662
    def _insist_on_status_transition(self, current_status, new_status,
663
                                     fail_timeout, warn_timeout=None):
664
        msg = "Server %d: `%s', waiting for %s -> %s" % \
665
              (self.serverid, self.servername, current_status, new_status)
666
        if warn_timeout is None:
667
            warn_timeout = fail_timeout
668
        self._try_until_timeout_expires(warn_timeout, fail_timeout,
669
                                        msg, self._verify_server_status,
670
                                        current_status, new_status)
671
        # Ensure the status is actually the expected one
672
        server = self.cyclades_client.get_server_details(self.serverid)
673
        self.assertEquals(server["status"], new_status)
674

    
675
    def _insist_on_ssh_hostname(self, hostip, username, password):
676
        msg = "SSH to %s, as %s/%s" % (hostip, username, password)
677
        hostname = self._try_until_timeout_expires(
678
            self.action_timeout, self.action_timeout,
679
            msg, self._get_hostname_over_ssh,
680
            hostip, username, password)
681

    
682
        # The hostname must be of the form 'prefix-id'
683
        self.assertTrue(hostname.endswith("-%d\n" % self.serverid))
684

    
685
    def _check_file_through_ssh(self, hostip, username, password,
686
                                remotepath, content):
687
        msg = "Trying file injection through SSH to %s, as %s/%s" % \
688
            (hostip, username, password)
689
        log.info(msg)
690
        try:
691
            ssh = paramiko.SSHClient()
692
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
693
            ssh.connect(hostip, username=username, password=password)
694
            ssh.close()
695
        except socket.error, err:
696
            raise AssertionError(err)
697

    
698
        transport = paramiko.Transport((hostip, 22))
699
        transport.connect(username=username, password=password)
700

    
701
        localpath = '/tmp/' + SNF_TEST_PREFIX + 'injection'
702
        sftp = paramiko.SFTPClient.from_transport(transport)
703
        sftp.get(remotepath, localpath)
704
        sftp.close()
705
        transport.close()
706

    
707
        f = open(localpath)
708
        remote_content = b64encode(f.read())
709

    
710
        # Check if files are the same
711
        return (remote_content == content)
712

    
713
    def _skipIf(self, condition, msg):
714
        if condition:
715
            self.skipTest(msg)
716

    
717
    def test_001_submit_create_server(self):
718
        """Test submit create server request"""
719

    
720
        log.info("Submit new server request")
721
        server = self.cyclades_client.create_server(
722
            self.servername, self.flavorid, self.imageid, self.personality)
723

    
724
        log.info("Server id: " + str(server["id"]))
725
        log.info("Server password: " + server["adminPass"])
726
        self.assertEqual(server["name"], self.servername)
727
        self.assertEqual(server["flavor"]["id"], self.flavorid)
728
        self.assertEqual(server["image"]["id"], self.imageid)
729
        self.assertEqual(server["status"], "BUILD")
730

    
731
        # Update class attributes to reflect data on building server
732
        cls = type(self)
733
        cls.serverid = server["id"]
734
        cls.username = None
735
        cls.passwd = server["adminPass"]
736

    
737
        self.result_dict["Server ID"] = str(server["id"])
738
        self.result_dict["Password"] = str(server["adminPass"])
739

    
740
    def test_002a_server_is_building_in_list(self):
741
        """Test server is in BUILD state, in server list"""
742
        log.info("Server in BUILD state in server list")
743

    
744
        self.result_dict.clear()
745

    
746
        servers = self.cyclades_client.list_servers(detail=True)
747
        servers = filter(lambda x: x["name"] == self.servername, servers)
748

    
749
        server = servers[0]
750
        self.assertEqual(server["name"], self.servername)
751
        self.assertEqual(server["flavor"]["id"], self.flavorid)
752
        self.assertEqual(server["image"]["id"], self.imageid)
753
        self.assertEqual(server["status"], "BUILD")
754

    
755
    def test_002b_server_is_building_in_details(self):
756
        """Test server is in BUILD state, in details"""
757

    
758
        log.info("Server in BUILD state in details")
759

    
760
        server = self.cyclades_client.get_server_details(self.serverid)
761
        self.assertEqual(server["name"], self.servername)
762
        self.assertEqual(server["flavor"]["id"], self.flavorid)
763
        self.assertEqual(server["image"]["id"], self.imageid)
764
        self.assertEqual(server["status"], "BUILD")
765

    
766
    def test_002c_set_server_metadata(self):
767

    
768
        log.info("Creating server metadata")
769

    
770
        image = self.cyclades_client.get_image_details(self.imageid)
771
        os_value = image["metadata"]["os"]
772
        users = image["metadata"].get("users", None)
773
        self.cyclades_client.update_server_metadata(self.serverid, OS=os_value)
774

    
775
        userlist = users.split()
776

    
777
        # Determine the username to use for future connections
778
        # to this host
779
        cls = type(self)
780

    
781
        if "root" in userlist:
782
            cls.username = "root"
783
        elif users is None:
784
            cls.username = self._connect_loginname(os_value)
785
        else:
786
            cls.username = choice(userlist)
787

    
788
        self.assertIsNotNone(cls.username)
789

    
790
    def test_002d_verify_server_metadata(self):
791
        """Test server metadata keys are set based on image metadata"""
792

    
793
        log.info("Verifying image metadata")
794

    
795
        servermeta = self.cyclades_client.get_server_metadata(self.serverid)
796
        imagemeta = self.cyclades_client.get_image_metadata(self.imageid)
797

    
798
        self.assertEqual(servermeta["OS"], imagemeta["os"])
799

    
800
    def test_003_server_becomes_active(self):
801
        """Test server becomes ACTIVE"""
802

    
803
        log.info("Waiting for server to become ACTIVE")
804

    
805
        self._insist_on_status_transition(
806
            "BUILD", "ACTIVE", self.build_fail, self.build_warning)
807

    
808
    def test_003a_get_server_oob_console(self):
809
        """Test getting OOB server console over VNC
810

811
        Implementation of RFB protocol follows
812
        http://www.realvnc.com/docs/rfbproto.pdf.
813

814
        """
815
        console = self.cyclades_client.get_server_console(self.serverid)
816
        self.assertEquals(console['type'], "vnc")
817
        sock = self._insist_on_tcp_connection(
818
            socket.AF_INET, console["host"], console["port"])
819

    
820
        # Step 1. ProtocolVersion message (par. 6.1.1)
821
        version = sock.recv(1024)
822
        self.assertEquals(version, 'RFB 003.008\n')
823
        sock.send(version)
824

    
825
        # Step 2. Security (par 6.1.2): Only VNC Authentication supported
826
        sec = sock.recv(1024)
827
        self.assertEquals(list(sec), ['\x01', '\x02'])
828

    
829
        # Step 3. Request VNC Authentication (par 6.1.2)
830
        sock.send('\x02')
831

    
832
        # Step 4. Receive Challenge (par 6.2.2)
833
        challenge = sock.recv(1024)
834
        self.assertEquals(len(challenge), 16)
835

    
836
        # Step 5. DES-Encrypt challenge, use password as key (par 6.2.2)
837
        response = d3des_generate_response(
838
            (console["password"] + '\0' * 8)[:8], challenge)
839
        sock.send(response)
840

    
841
        # Step 6. SecurityResult (par 6.1.3)
842
        result = sock.recv(4)
843
        self.assertEquals(list(result), ['\x00', '\x00', '\x00', '\x00'])
844
        sock.close()
845

    
846
    def test_004_server_has_ipv4(self):
847
        """Test active server has a valid IPv4 address"""
848

    
849
        log.info("Validate server's IPv4")
850

    
851
        server = self.cyclades_client.get_server_details(self.serverid)
852
        ipv4 = self._get_ipv4(server)
853

    
854
        self.result_dict.clear()
855
        self.result_dict["IPv4"] = str(ipv4)
856

    
857
        self.assertEquals(IP(ipv4).version(), 4)
858

    
859
    def test_005_server_has_ipv6(self):
860
        """Test active server has a valid IPv6 address"""
861
        self._skipIf(NO_IPV6, "--no-ipv6 flag enabled")
862

    
863
        log.info("Validate server's IPv6")
864

    
865
        server = self.cyclades_client.get_server_details(self.serverid)
866
        ipv6 = self._get_ipv6(server)
867

    
868
        self.result_dict.clear()
869
        self.result_dict["IPv6"] = str(ipv6)
870

    
871
        self.assertEquals(IP(ipv6).version(), 6)
872

    
873
    def test_006_server_responds_to_ping_IPv4(self):
874
        """Test server responds to ping on IPv4 address"""
875

    
876
        log.info("Testing if server responds to pings in IPv4")
877
        self.result_dict.clear()
878

    
879
        server = self.cyclades_client.get_server_details(self.serverid)
880
        ip = self._get_ipv4(server)
881
        self._try_until_timeout_expires(self.action_timeout,
882
                                        self.action_timeout,
883
                                        "PING IPv4 to %s" % ip,
884
                                        self._ping_once,
885
                                        False, ip)
886

    
887
    def test_007_server_responds_to_ping_IPv6(self):
888
        """Test server responds to ping on IPv6 address"""
889
        self._skipIf(NO_IPV6, "--no-ipv6 flag enabled")
890
        log.info("Testing if server responds to pings in IPv6")
891

    
892
        server = self.cyclades_client.get_server_details(self.serverid)
893
        ip = self._get_ipv6(server)
894
        self._try_until_timeout_expires(self.action_timeout,
895
                                        self.action_timeout,
896
                                        "PING IPv6 to %s" % ip,
897
                                        self._ping_once,
898
                                        True, ip)
899

    
900
    def test_008_submit_shutdown_request(self):
901
        """Test submit request to shutdown server"""
902

    
903
        log.info("Shutting down server")
904

    
905
        self.cyclades_client.shutdown_server(self.serverid)
906

    
907
    def test_009_server_becomes_stopped(self):
908
        """Test server becomes STOPPED"""
909

    
910
        log.info("Waiting until server becomes STOPPED")
911
        self._insist_on_status_transition(
912
            "ACTIVE", "STOPPED", self.action_timeout, self.action_timeout)
913

    
914
    def test_010_submit_start_request(self):
915
        """Test submit start server request"""
916

    
917
        log.info("Starting server")
918

    
919
        self.cyclades_client.start_server(self.serverid)
920

    
921
    def test_011_server_becomes_active(self):
922
        """Test server becomes ACTIVE again"""
923

    
924
        log.info("Waiting until server becomes ACTIVE")
925
        self._insist_on_status_transition(
926
            "STOPPED", "ACTIVE", self.action_timeout, self.action_timeout)
927

    
928
    def test_011a_server_responds_to_ping_IPv4(self):
929
        """Test server OS is actually up and running again"""
930

    
931
        log.info("Testing if server is actually up and running")
932

    
933
        self.test_006_server_responds_to_ping_IPv4()
934

    
935
    def test_012_ssh_to_server_IPv4(self):
936
        """Test SSH to server public IPv4 works, verify hostname"""
937

    
938
        self._skipIf(self.is_windows, "only valid for Linux servers")
939
        server = self.cyclades_client.get_server_details(self.serverid)
940
        self._insist_on_ssh_hostname(self._get_ipv4(server),
941
                                     self.username, self.passwd)
942

    
943
    def test_013_ssh_to_server_IPv6(self):
944
        """Test SSH to server public IPv6 works, verify hostname"""
945
        self._skipIf(self.is_windows, "only valid for Linux servers")
946
        self._skipIf(NO_IPV6, "--no-ipv6 flag enabled")
947

    
948
        server = self.cyclades_client.get_server_details(self.serverid)
949
        self._insist_on_ssh_hostname(self._get_ipv6(server),
950
                                     self.username, self.passwd)
951

    
952
    def test_014_rdp_to_server_IPv4(self):
953
        "Test RDP connection to server public IPv4 works"""
954
        self._skipIf(not self.is_windows, "only valid for Windows servers")
955
        server = self.cyclades_client.get_server_details(self.serverid)
956
        ipv4 = self._get_ipv4(server)
957
        sock = self._insist_on_tcp_connection(socket.AF_INET, ipv4, 3389)
958

    
959
        # No actual RDP processing done. We assume the RDP server is there
960
        # if the connection to the RDP port is successful.
961
        # FIXME: Use rdesktop, analyze exit code? see manpage [costasd]
962
        sock.close()
963

    
964
    def test_015_rdp_to_server_IPv6(self):
965
        "Test RDP connection to server public IPv6 works"""
966
        self._skipIf(not self.is_windows, "only valid for Windows servers")
967
        self._skipIf(NO_IPV6, "--no-ipv6 flag enabled")
968

    
969
        server = self.cyclades_client.get_server_details(self.serverid)
970
        ipv6 = self._get_ipv6(server)
971
        sock = self._get_tcp_connection(socket.AF_INET6, ipv6, 3389)
972

    
973
        # No actual RDP processing done. We assume the RDP server is there
974
        # if the connection to the RDP port is successful.
975
        sock.close()
976

    
977
    def test_016_personality_is_enforced(self):
978
        """Test file injection for personality enforcement"""
979
        self._skipIf(self.is_windows, "only implemented for Linux servers")
980
        self._skipIf(self.personality is None, "No personality file selected")
981

    
982
        log.info("Trying to inject file for personality enforcement")
983

    
984
        server = self.cyclades_client.get_server_details(self.serverid)
985

    
986
        for inj_file in self.personality:
987
            equal_files = self._check_file_through_ssh(self._get_ipv4(server),
988
                                                       inj_file['owner'],
989
                                                       self.passwd,
990
                                                       inj_file['path'],
991
                                                       inj_file['contents'])
992
            self.assertTrue(equal_files)
993

    
994
    def test_017_submit_delete_request(self):
995
        """Test submit request to delete server"""
996

    
997
        log.info("Deleting server")
998

    
999
        self.cyclades_client.delete_server(self.serverid)
1000

    
1001
    def test_018_server_becomes_deleted(self):
1002
        """Test server becomes DELETED"""
1003

    
1004
        log.info("Testing if server becomes DELETED")
1005

    
1006
        self._insist_on_status_transition(
1007
            "ACTIVE", "DELETED", self.action_timeout, self.action_timeout)
1008

    
1009
    def test_019_server_no_longer_in_server_list(self):
1010
        """Test server is no longer in server list"""
1011

    
1012
        log.info("Test if server is no longer listed")
1013

    
1014
        servers = self.cyclades_client.list_servers()
1015
        self.assertNotIn(self.serverid, [s["id"] for s in servers])
1016

    
1017

    
1018
class NetworkTestCase(unittest.TestCase):
1019
    """ Testing networking in cyclades """
1020

    
1021
    @classmethod
1022
    def setUpClass(cls):
1023
        "Initialize kamaki, get list of current networks"
1024

    
1025
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
1026
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
1027
        # Cyclades Client
1028
        compute_url = \
1029
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
1030
        cls.cyclades_client = CycladesClient(compute_url, TOKEN)
1031
        cls.cyclades_client.CONNECTION_RETRY_LIMIT = 2
1032

    
1033
        cls.servername = "%s%s for %s" % (SNF_TEST_PREFIX,
1034
                                          TEST_RUN_ID,
1035
                                          cls.imagename)
1036

    
1037
        #Dictionary initialization for the vms credentials
1038
        cls.serverid = dict()
1039
        cls.username = dict()
1040
        cls.password = dict()
1041
        cls.is_windows = cls.imagename.lower().find("windows") >= 0
1042

    
1043
        cls.result_dict = dict()
1044

    
1045
    def _skipIf(self, condition, msg):
1046
        if condition:
1047
            self.skipTest(msg)
1048

    
1049
    def _get_ipv4(self, server):
1050
        """Get the public IPv4 of a server from the detailed server info"""
1051

    
1052
        nics = server["attachments"]
1053

    
1054
        for nic in nics:
1055
            net_id = nic["network_id"]
1056
            if self.cyclades_client.get_network_details(net_id)["public"]:
1057
                public_addrs = nic["ipv4"]
1058

    
1059
        self.assertTrue(public_addrs is not None)
1060

    
1061
        return public_addrs
1062

    
1063
    def _connect_loginname(self, os_value):
1064
        """Return the login name for connections based on the server OS"""
1065
        if os_value in ("Ubuntu", "Kubuntu", "Fedora"):
1066
            return "user"
1067
        elif os_value in ("windows", "windows_alpha1"):
1068
            return "Administrator"
1069
        else:
1070
            return "root"
1071

    
1072
    def _ping_once(self, ip):
1073

    
1074
        """Test server responds to a single IPv4 or IPv6 ping"""
1075
        cmd = "ping -c 7 -w 20 %s" % (ip)
1076
        ping = subprocess.Popen(cmd, shell=True,
1077
                                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1078
        (stdout, stderr) = ping.communicate()
1079
        ret = ping.wait()
1080

    
1081
        return (ret == 0)
1082

    
1083
    def test_00001a_submit_create_server_A(self):
1084
        """Test submit create server request"""
1085

    
1086
        log.info("Creating test server A")
1087

    
1088
        serverA = self.cyclades_client.create_server(
1089
            self.servername, self.flavorid, self.imageid, personality=None)
1090

    
1091
        self.assertEqual(serverA["name"], self.servername)
1092
        self.assertEqual(serverA["flavor"]["id"], self.flavorid)
1093
        self.assertEqual(serverA["image"]["id"], self.imageid)
1094
        self.assertEqual(serverA["status"], "BUILD")
1095

    
1096
        # Update class attributes to reflect data on building server
1097
        self.serverid['A'] = serverA["id"]
1098
        self.username['A'] = None
1099
        self.password['A'] = serverA["adminPass"]
1100

    
1101
        log.info("Server A id:" + str(serverA["id"]))
1102
        log.info("Server password " + (self.password['A']))
1103

    
1104
        self.result_dict["Server A ID"] = str(serverA["id"])
1105
        self.result_dict["Server A password"] = serverA["adminPass"]
1106

    
1107
    def test_00001b_serverA_becomes_active(self):
1108
        """Test server becomes ACTIVE"""
1109

    
1110
        log.info("Waiting until test server A becomes ACTIVE")
1111
        self.result_dict.clear()
1112

    
1113
        fail_tmout = time.time() + self.action_timeout
1114
        while True:
1115
            d = self.cyclades_client.get_server_details(self.serverid['A'])
1116
            status = d['status']
1117
            if status == 'ACTIVE':
1118
                active = True
1119
                break
1120
            elif time.time() > fail_tmout:
1121
                self.assertLess(time.time(), fail_tmout)
1122
            else:
1123
                time.sleep(self.query_interval)
1124

    
1125
        self.assertTrue(active)
1126

    
1127
    def test_00002a_submit_create_server_B(self):
1128
        """Test submit create server request"""
1129

    
1130
        log.info("Creating test server B")
1131

    
1132
        serverB = self.cyclades_client.create_server(
1133
            self.servername, self.flavorid, self.imageid, personality=None)
1134

    
1135
        self.assertEqual(serverB["name"], self.servername)
1136
        self.assertEqual(serverB["flavor"]["id"], self.flavorid)
1137
        self.assertEqual(serverB["image"]["id"], self.imageid)
1138
        self.assertEqual(serverB["status"], "BUILD")
1139

    
1140
        # Update class attributes to reflect data on building server
1141
        self.serverid['B'] = serverB["id"]
1142
        self.username['B'] = None
1143
        self.password['B'] = serverB["adminPass"]
1144

    
1145
        log.info("Server B id: " + str(serverB["id"]))
1146
        log.info("Password " + (self.password['B']))
1147

    
1148
        self.result_dict.clear()
1149
        self.result_dict["Server B ID"] = str(serverB["id"])
1150
        self.result_dict["Server B password"] = serverB["adminPass"]
1151

    
1152
    def test_00002b_serverB_becomes_active(self):
1153
        """Test server becomes ACTIVE"""
1154

    
1155
        log.info("Waiting until test server B becomes ACTIVE")
1156
        self.result_dict.clear()
1157

    
1158
        fail_tmout = time.time() + self.action_timeout
1159
        while True:
1160
            d = self.cyclades_client.get_server_details(self.serverid['B'])
1161
            status = d['status']
1162
            if status == 'ACTIVE':
1163
                active = True
1164
                break
1165
            elif time.time() > fail_tmout:
1166
                self.assertLess(time.time(), fail_tmout)
1167
            else:
1168
                time.sleep(self.query_interval)
1169

    
1170
        self.assertTrue(active)
1171

    
1172
    def test_001_create_network(self):
1173
        """Test submit create network request"""
1174

    
1175
        log.info("Submit new network request")
1176
        self.result_dict.clear()
1177

    
1178
        name = SNF_TEST_PREFIX + TEST_RUN_ID
1179
        #previous_num = len(self.client.list_networks())
1180
        network = self.cyclades_client.create_network(
1181
            name, cidr='10.0.1.0/28', dhcp=True)
1182

    
1183
        #Test if right name is assigned
1184
        self.assertEqual(network['name'], name)
1185

    
1186
        # Update class attributes
1187
        cls = type(self)
1188
        cls.networkid = network['id']
1189
        #networks = self.client.list_networks()
1190

    
1191
        fail_tmout = time.time() + self.action_timeout
1192

    
1193
        #Test if new network is created
1194
        while True:
1195
            d = self.cyclades_client.get_network_details(network['id'])
1196
            if d['status'] == 'ACTIVE':
1197
                connected = True
1198
                break
1199
            elif time.time() > fail_tmout:
1200
                self.assertLess(time.time(), fail_tmout)
1201
            else:
1202
                log.info("Waiting for network to become ACTIVE")
1203
                time.sleep(self.query_interval)
1204

    
1205
        self.assertTrue(connected)
1206

    
1207
        self.result_dict["Private network ID"] = str(network['id'])
1208

    
1209
    def test_002_connect_to_network(self):
1210
        """Test connect VMs to network"""
1211

    
1212
        log.info("Connect VMs to private network")
1213
        self.result_dict.clear()
1214

    
1215
        self.cyclades_client.connect_server(self.serverid['A'], self.networkid)
1216
        self.cyclades_client.connect_server(self.serverid['B'], self.networkid)
1217

    
1218
        #Insist on connecting until action timeout
1219
        fail_tmout = time.time() + self.action_timeout
1220

    
1221
        while True:
1222

    
1223
            netsA = [x['network_id']
1224
                     for x in self.cyclades_client.get_server_details(
1225
                         self.serverid['A'])['attachments']]
1226
            netsB = [x['network_id']
1227
                     for x in self.cyclades_client.get_server_details(
1228
                         self.serverid['B'])['attachments']]
1229

    
1230
            if (self.networkid in netsA) and (self.networkid in netsB):
1231
                conn_exists = True
1232
                break
1233
            elif time.time() > fail_tmout:
1234
                self.assertLess(time.time(), fail_tmout)
1235
            else:
1236
                time.sleep(self.query_interval)
1237

    
1238
        #Adding private IPs to class attributes
1239
        cls = type(self)
1240
        cls.priv_ip = dict()
1241

    
1242
        nicsA = self.cyclades_client.get_server_details(
1243
            self.serverid['A'])['attachments']
1244
        nicsB = self.cyclades_client.get_server_details(
1245
            self.serverid['B'])['attachments']
1246

    
1247
        if conn_exists:
1248
            for nic in nicsA:
1249
                if nic["network_id"] == self.networkid:
1250
                    cls.priv_ip["A"] = nic["ipv4"]
1251
            self.result_dict["Server A private IP"] = str(cls.priv_ip["A"])
1252

    
1253
            for nic in nicsB:
1254
                if nic["network_id"] == self.networkid:
1255
                    cls.priv_ip["B"] = nic["ipv4"]
1256
            self.result_dict["Server B private IP"] = str(cls.priv_ip["B"])
1257

    
1258
        self.assertTrue(conn_exists)
1259
        self.assertIsNot(cls.priv_ip["A"], None)
1260
        self.assertIsNot(cls.priv_ip["B"], None)
1261

    
1262
    def test_002a_reboot(self):
1263
        """Rebooting server A"""
1264

    
1265
        log.info("Rebooting server A")
1266

    
1267
        self.cyclades_client.shutdown_server(self.serverid['A'])
1268

    
1269
        fail_tmout = time.time() + self.action_timeout
1270
        while True:
1271
            d = self.cyclades_client.get_server_details(self.serverid['A'])
1272
            status = d['status']
1273
            if status == 'STOPPED':
1274
                break
1275
            elif time.time() > fail_tmout:
1276
                self.assertLess(time.time(), fail_tmout)
1277
            else:
1278
                time.sleep(self.query_interval)
1279

    
1280
        self.cyclades_client.start_server(self.serverid['A'])
1281

    
1282
        while True:
1283
            d = self.cyclades_client.get_server_details(self.serverid['A'])
1284
            status = d['status']
1285
            if status == 'ACTIVE':
1286
                active = True
1287
                break
1288
            elif time.time() > fail_tmout:
1289
                self.assertLess(time.time(), fail_tmout)
1290
            else:
1291
                time.sleep(self.query_interval)
1292

    
1293
        self.assertTrue(active)
1294

    
1295
    def test_002b_ping_server_A(self):
1296
        "Test if server A responds to IPv4 pings"
1297

    
1298
        log.info("Testing if server A responds to IPv4 pings ")
1299
        self.result_dict.clear()
1300

    
1301
        server = self.cyclades_client.get_server_details(self.serverid['A'])
1302
        ip = self._get_ipv4(server)
1303

    
1304
        fail_tmout = time.time() + self.action_timeout
1305

    
1306
        s = False
1307

    
1308
        self.result_dict["Server A public IP"] = str(ip)
1309

    
1310
        while True:
1311

    
1312
            if self._ping_once(ip):
1313
                s = True
1314
                break
1315

    
1316
            elif time.time() > fail_tmout:
1317
                self.assertLess(time.time(), fail_tmout)
1318

    
1319
            else:
1320
                time.sleep(self.query_interval)
1321

    
1322
        self.assertTrue(s)
1323

    
1324
    def test_002c_reboot(self):
1325
        """Reboot server B"""
1326

    
1327
        log.info("Rebooting server B")
1328
        self.result_dict.clear()
1329

    
1330
        self.cyclades_client.shutdown_server(self.serverid['B'])
1331

    
1332
        fail_tmout = time.time() + self.action_timeout
1333
        while True:
1334
            d = self.cyclades_client.get_server_details(self.serverid['B'])
1335
            status = d['status']
1336
            if status == 'STOPPED':
1337
                break
1338
            elif time.time() > fail_tmout:
1339
                self.assertLess(time.time(), fail_tmout)
1340
            else:
1341
                time.sleep(self.query_interval)
1342

    
1343
        self.cyclades_client.start_server(self.serverid['B'])
1344

    
1345
        while True:
1346
            d = self.cyclades_client.get_server_details(self.serverid['B'])
1347
            status = d['status']
1348
            if status == 'ACTIVE':
1349
                active = True
1350
                break
1351
            elif time.time() > fail_tmout:
1352
                self.assertLess(time.time(), fail_tmout)
1353
            else:
1354
                time.sleep(self.query_interval)
1355

    
1356
        self.assertTrue(active)
1357

    
1358
    def test_002d_ping_server_B(self):
1359
        """Test if server B responds to IPv4 pings"""
1360

    
1361
        log.info("Testing if server B responds to IPv4 pings")
1362
        self.result_dict.clear()
1363

    
1364
        server = self.cyclades_client.get_server_details(self.serverid['B'])
1365
        ip = self._get_ipv4(server)
1366

    
1367
        fail_tmout = time.time() + self.action_timeout
1368

    
1369
        s = False
1370

    
1371
        self.result_dict["Server B public IP"] = str(ip)
1372

    
1373
        while True:
1374
            if self._ping_once(ip):
1375
                s = True
1376
                break
1377

    
1378
            elif time.time() > fail_tmout:
1379
                self.assertLess(time.time(), fail_tmout)
1380

    
1381
            else:
1382
                time.sleep(self.query_interval)
1383

    
1384
        self.assertTrue(s)
1385

    
1386
    def test_003a_setup_interface_A(self):
1387
        """Setup eth1 for server A"""
1388

    
1389
        self._skipIf(self.is_windows, "only valid for Linux servers")
1390

    
1391
        log.info("Setting up interface eth1 for server A")
1392
        self.result_dict.clear()
1393

    
1394
        server = self.cyclades_client.get_server_details(self.serverid['A'])
1395
        image = self.cyclades_client.get_image_details(self.imageid)
1396
        os_value = image['metadata']['os']
1397

    
1398
        users = image["metadata"].get("users", None)
1399
        userlist = users.split()
1400

    
1401
        if "root" in userlist:
1402
            loginname = "root"
1403
        elif users is None:
1404
            loginname = self._connect_loginname(os_value)
1405
        else:
1406
            loginname = choice(userlist)
1407

    
1408
        hostip = self._get_ipv4(server)
1409
        myPass = self.password['A']
1410

    
1411
        log.info("SSH in server A as %s/%s" % (loginname, myPass))
1412
        command = "ifconfig eth1 %s && ifconfig eth1 | " \
1413
                  "grep 'inet addr:' | cut -d: -f2 | awk '{ print $1}'" \
1414
                  % self.priv_ip["A"]
1415
        output, status = _ssh_execute(
1416
            hostip, loginname, myPass, command)
1417

    
1418
        self.assertEquals(status, 0)
1419
        self.assertEquals(output[0].strip(), self.priv_ip["A"])
1420

    
1421
    def test_003b_setup_interface_B(self):
1422
        """Setup eth1 for server B"""
1423

    
1424
        self._skipIf(self.is_windows, "only valid for Linux servers")
1425

    
1426
        log.info("Setting up interface eth1 for server B")
1427

    
1428
        server = self.cyclades_client.get_server_details(self.serverid['B'])
1429
        image = self.cyclades_client.get_image_details(self.imageid)
1430
        os_value = image['metadata']['os']
1431

    
1432
        users = image["metadata"].get("users", None)
1433
        userlist = users.split()
1434

    
1435
        if "root" in userlist:
1436
            loginname = "root"
1437
        elif users is None:
1438
            loginname = self._connect_loginname(os_value)
1439
        else:
1440
            loginname = choice(userlist)
1441

    
1442
        hostip = self._get_ipv4(server)
1443
        myPass = self.password['B']
1444

    
1445
        log.info("SSH in server B as %s/%s" % (loginname, myPass))
1446
        command = "ifconfig eth1 %s && ifconfig eth1 | " \
1447
                  "grep 'inet addr:' | cut -d: -f2 | awk '{ print $1}'" \
1448
                  % self.priv_ip["B"]
1449
        output, status = _ssh_execute(
1450
            hostip, loginname, myPass, command)
1451

    
1452
        self.assertEquals(status, 0)
1453
        self.assertEquals(output[0].strip(), self.priv_ip["B"])
1454

    
1455
    def test_003c_test_connection_exists(self):
1456
        """Ping server B from server A to test if connection exists"""
1457

    
1458
        self._skipIf(self.is_windows, "only valid for Linux servers")
1459

    
1460
        log.info("Testing if server A is actually connected to server B")
1461

    
1462
        server = self.cyclades_client.get_server_details(self.serverid['A'])
1463
        image = self.cyclades_client.get_image_details(self.imageid)
1464
        os_value = image['metadata']['os']
1465
        hostip = self._get_ipv4(server)
1466

    
1467
        users = image["metadata"].get("users", None)
1468
        userlist = users.split()
1469

    
1470
        if "root" in userlist:
1471
            loginname = "root"
1472
        elif users is None:
1473
            loginname = self._connect_loginname(os_value)
1474
        else:
1475
            loginname = choice(userlist)
1476

    
1477
        myPass = self.password['A']
1478

    
1479
        cmd = "if ping -c 7 -w 20 %s >/dev/null; \
1480
               then echo \'True\'; fi;" % self.priv_ip["B"]
1481
        lines, status = _ssh_execute(
1482
            hostip, loginname, myPass, cmd)
1483

    
1484
        exists = False
1485

    
1486
        if 'True\n' in lines:
1487
            exists = True
1488

    
1489
        self.assertTrue(exists)
1490

    
1491
    def test_004_disconnect_from_network(self):
1492
        "Disconnecting server A and B from network"
1493

    
1494
        log.info("Disconnecting servers from private network")
1495

    
1496
        prev_state = self.cyclades_client.get_network_details(self.networkid)
1497
        prev_nics = prev_state['attachments']
1498
        #prev_conn = len(prev_nics)
1499

    
1500
        nicsA = [x['id']
1501
                 for x in self.cyclades_client.get_server_details(
1502
                     self.serverid['A'])['attachments']]
1503
        nicsB = [x['id']
1504
                 for x in self.cyclades_client.get_server_details(
1505
                     self.serverid['B'])['attachments']]
1506

    
1507
        for nic in prev_nics:
1508
            if nic in nicsA:
1509
                self.cyclades_client.disconnect_server(self.serverid['A'], nic)
1510
            if nic in nicsB:
1511
                self.cyclades_client.disconnect_server(self.serverid['B'], nic)
1512

    
1513
        #Insist on deleting until action timeout
1514
        fail_tmout = time.time() + self.action_timeout
1515

    
1516
        while True:
1517
            netsA = [x['network_id']
1518
                     for x in self.cyclades_client.get_server_details(
1519
                         self.serverid['A'])['attachments']]
1520
            netsB = [x['network_id']
1521
                     for x in self.cyclades_client.get_server_details(
1522
                         self.serverid['B'])['attachments']]
1523

    
1524
            #connected = (self.client.get_network_details(self.networkid))
1525
            #connections = connected['attachments']
1526
            if (self.networkid not in netsA) and (self.networkid not in netsB):
1527
                conn_exists = False
1528
                break
1529
            elif time.time() > fail_tmout:
1530
                self.assertLess(time.time(), fail_tmout)
1531
            else:
1532
                time.sleep(self.query_interval)
1533

    
1534
        self.assertFalse(conn_exists)
1535

    
1536
    def test_005_destroy_network(self):
1537
        """Test submit delete network request"""
1538

    
1539
        log.info("Submitting delete network request")
1540

    
1541
        self.cyclades_client.delete_network(self.networkid)
1542

    
1543
        fail_tmout = time.time() + self.action_timeout
1544

    
1545
        while True:
1546

    
1547
            curr_net = []
1548
            networks = self.cyclades_client.list_networks()
1549

    
1550
            for net in networks:
1551
                curr_net.append(net['id'])
1552

    
1553
            if self.networkid not in curr_net:
1554
                self.assertTrue(self.networkid not in curr_net)
1555
                break
1556

    
1557
            elif time.time() > fail_tmout:
1558
                self.assertLess(time.time(), fail_tmout)
1559

    
1560
            else:
1561
                time.sleep(self.query_interval)
1562

    
1563
    def test_006_cleanup_servers(self):
1564
        """Cleanup servers created for this test"""
1565

    
1566
        log.info("Delete servers created for this test")
1567

    
1568
        self.cyclades_client.delete_server(self.serverid['A'])
1569
        self.cyclades_client.delete_server(self.serverid['B'])
1570

    
1571
        fail_tmout = time.time() + self.action_timeout
1572

    
1573
        #Ensure server gets deleted
1574
        status = dict()
1575

    
1576
        while True:
1577
            details = \
1578
                self.cyclades_client.get_server_details(self.serverid['A'])
1579
            status['A'] = details['status']
1580
            details = \
1581
                self.cyclades_client.get_server_details(self.serverid['B'])
1582
            status['B'] = details['status']
1583
            if (status['A'] == 'DELETED') and (status['B'] == 'DELETED'):
1584
                deleted = True
1585
                break
1586
            elif time.time() > fail_tmout:
1587
                self.assertLess(time.time(), fail_tmout)
1588
            else:
1589
                time.sleep(self.query_interval)
1590

    
1591
        self.assertTrue(deleted)
1592

    
1593

    
1594
class TestRunnerProcess(Process):
1595
    """A distinct process used to execute part of the tests in parallel"""
1596
    def __init__(self, **kw):
1597
        Process.__init__(self, **kw)
1598
        kwargs = kw["kwargs"]
1599
        self.testq = kwargs["testq"]
1600
        self.worker_folder = kwargs["worker_folder"]
1601

    
1602
    def run(self):
1603
        # Make sure this test runner process dies with the parent
1604
        # and is not left behind.
1605
        #
1606
        # WARNING: This uses the prctl(2) call and is
1607
        # Linux-specific.
1608

    
1609
        prctl.set_pdeathsig(signal.SIGHUP)
1610

    
1611
        multi = logging.getLogger("multiprocess")
1612

    
1613
        while True:
1614
            multi.debug("I am process %d, GETting from queue is %s" %
1615
                        (os.getpid(), self.testq))
1616
            msg = self.testq.get()
1617

    
1618
            multi.debug("Dequeued msg: %s" % msg)
1619

    
1620
            if msg == "TEST_RUNNER_TERMINATE":
1621
                raise SystemExit
1622

    
1623
            elif issubclass(msg, unittest.TestCase):
1624
                # Assemble a TestSuite, and run it
1625

    
1626
                log_file = os.path.join(self.worker_folder, 'details_' +
1627
                                        (msg.__name__) + "_" +
1628
                                        TEST_RUN_ID + '.log')
1629

    
1630
                fail_file = os.path.join(self.worker_folder, 'failed_' +
1631
                                         (msg.__name__) + "_" +
1632
                                         TEST_RUN_ID + '.log')
1633
                error_file = os.path.join(self.worker_folder, 'error_' +
1634
                                          (msg.__name__) + "_" +
1635
                                          TEST_RUN_ID + '.log')
1636

    
1637
                f = open(log_file, 'w')
1638
                fail = open(fail_file, 'w')
1639
                error = open(error_file, 'w')
1640

    
1641
                log.info(yellow + '* Starting testcase: %s' % msg + normal)
1642

    
1643
                runner = unittest.TextTestRunner(
1644
                    f, verbosity=2, failfast=True,
1645
                    resultclass=BurninTestResult)
1646
                suite = unittest.TestLoader().loadTestsFromTestCase(msg)
1647
                result = runner.run(suite)
1648

    
1649
                for res in result.errors:
1650
                    log.error("snf-burnin encountered an error in "
1651
                              "testcase: %s" % msg)
1652
                    log.error("See log for details")
1653
                    error.write(str(res[0]) + '\n')
1654
                    error.write(str(res[0].shortDescription()) + '\n')
1655
                    error.write('\n')
1656

    
1657
                for res in result.failures:
1658
                    log.error("snf-burnin failed in testcase: %s" % msg)
1659
                    log.error("See log for details")
1660
                    fail.write(str(res[0]) + '\n')
1661
                    fail.write(str(res[0].shortDescription()) + '\n')
1662
                    fail.write('\n')
1663
                    if not NOFAILFAST:
1664
                        sys.exit(1)
1665

    
1666
                if (len(result.failures) == 0) and (len(result.errors) == 0):
1667
                    log.debug("Passed testcase: %s" % msg)
1668

    
1669
                f.close()
1670
                fail.close()
1671
                error.close()
1672

    
1673
            else:
1674
                raise Exception("Cannot handle msg: %s" % msg)
1675

    
1676

    
1677
def _run_cases_in_series(cases, image_folder):
1678
    """Run instances of TestCase in series"""
1679

    
1680
    error_found = False
1681

    
1682
    for case in cases:
1683

    
1684
        test = case.__name__
1685

    
1686
        log.info(yellow + '* Starting testcase: %s' % test + normal)
1687
        log_file = os.path.join(image_folder, 'details_' +
1688
                                (case.__name__) + "_" +
1689
                                TEST_RUN_ID + '.log')
1690
        fail_file = os.path.join(image_folder, 'failed_' +
1691
                                 (case.__name__) + "_" +
1692
                                 TEST_RUN_ID + '.log')
1693
        error_file = os.path.join(image_folder, 'error_' +
1694
                                  (case.__name__) + "_" +
1695
                                  TEST_RUN_ID + '.log')
1696

    
1697
        f = open(log_file, "w")
1698
        fail = open(fail_file, "w")
1699
        error = open(error_file, "w")
1700

    
1701
        suite = unittest.TestLoader().loadTestsFromTestCase(case)
1702
        runner = unittest.TextTestRunner(
1703
            f, verbosity=2, failfast=True,
1704
            resultclass=BurninTestResult)
1705
        result = runner.run(suite)
1706

    
1707
        for res in result.errors:
1708
            log.error("snf-burnin encountered an error in "
1709
                      "testcase: %s" % test)
1710
            log.error("See log for details")
1711
            error.write(str(res[0]) + '\n')
1712
            error.write(str(res[0].shortDescription()) + '\n')
1713
            error.write('\n')
1714
            error_found = True
1715

    
1716
        for res in result.failures:
1717
            log.error("snf-burnin failed in testcase: %s" % test)
1718
            log.error("See log for details")
1719
            fail.write(str(res[0]) + '\n')
1720
            fail.write(str(res[0].shortDescription()) + '\n')
1721
            fail.write('\n')
1722
            error_found = True
1723
            if not NOFAILFAST:
1724
                sys.exit(1)
1725

    
1726
        if (len(result.failures) == 0) and (len(result.errors) == 0):
1727
            log.debug("Passed testcase: %s" % test)
1728

    
1729
    # Return
1730
    if error_found:
1731
        return 1
1732
    else:
1733
        return 0
1734

    
1735

    
1736
def _run_cases_in_parallel(cases, fanout, image_folder):
1737
    """Run instances of TestCase in parallel, in a number of distinct processes
1738

1739
    The cases iterable specifies the TestCases to be executed in parallel,
1740
    by test runners running in distinct processes.
1741
    The fanout parameter specifies the number of processes to spawn,
1742
    and defaults to 1.
1743
    The runner argument specifies the test runner class to use inside each
1744
    runner process.
1745

1746
    """
1747

    
1748
    multi = logging.getLogger("multiprocess")
1749
    handler = logging.StreamHandler()
1750
    multi.addHandler(handler)
1751

    
1752
    if VERBOSE:
1753
        multi.setLevel(logging.DEBUG)
1754
    else:
1755
        multi.setLevel(logging.INFO)
1756

    
1757
    testq = []
1758
    worker_folder = []
1759
    runners = []
1760

    
1761
    for i in xrange(0, fanout):
1762
        testq.append(Queue())
1763
        worker_folder.append(os.path.join(image_folder, 'process'+str(i)))
1764
        os.mkdir(worker_folder[i])
1765

    
1766
    for i in xrange(0, fanout):
1767
        kwargs = dict(testq=testq[i], worker_folder=worker_folder[i])
1768
        runners.append(TestRunnerProcess(kwargs=kwargs))
1769

    
1770
    multi.debug("Spawning %d test runner processes" % len(runners))
1771

    
1772
    for p in runners:
1773
        p.start()
1774

    
1775
    # Enqueue test cases
1776
    for i in xrange(0, fanout):
1777
        map(testq[i].put, cases)
1778
        testq[i].put("TEST_RUNNER_TERMINATE")
1779

    
1780
    multi.debug("Spawned %d test runners, PIDs are %s" %
1781
                (len(runners), [p.pid for p in runners]))
1782

    
1783
    multi.debug("Joining %d processes" % len(runners))
1784

    
1785
    for p in runners:
1786
        p.join()
1787

    
1788
    multi.debug("Done joining %d processes" % len(runners))
1789

    
1790

    
1791
def _images_test_case(**kwargs):
1792
    """Construct a new unit test case class from ImagesTestCase"""
1793
    name = "ImagesTestCase_%s" % kwargs["imageid"]
1794
    cls = type(name, (ImagesTestCase,), kwargs)
1795

    
1796
    #Patch extra parameters into test names by manipulating method docstrings
1797
    for (mname, m) in \
1798
            inspect.getmembers(cls, lambda x: inspect.ismethod(x)):
1799
        if hasattr(m, __doc__):
1800
            m.__func__.__doc__ = "[%s] %s" % (cls.imagename, m.__doc__)
1801

    
1802
    # Make sure the class can be pickled, by listing it among
1803
    # the attributes of __main__. A PicklingError is raised otherwise.
1804
    thismodule = sys.modules[__name__]
1805
    setattr(thismodule, name, cls)
1806
    return cls
1807

    
1808

    
1809
def _spawn_server_test_case(**kwargs):
1810
    """Construct a new unit test case class from SpawnServerTestCase"""
1811

    
1812
    name = "SpawnServerTestCase_%s" % kwargs["imageid"]
1813
    cls = type(name, (SpawnServerTestCase,), kwargs)
1814

    
1815
    # Patch extra parameters into test names by manipulating method docstrings
1816
    for (mname, m) in \
1817
            inspect.getmembers(cls, lambda x: inspect.ismethod(x)):
1818
        if hasattr(m, __doc__):
1819
            m.__func__.__doc__ = "[%s] %s" % (cls.imagename, m.__doc__)
1820

    
1821
    # Make sure the class can be pickled, by listing it among
1822
    # the attributes of __main__. A PicklingError is raised otherwise.
1823

    
1824
    thismodule = sys.modules[__name__]
1825
    setattr(thismodule, name, cls)
1826
    return cls
1827

    
1828

    
1829
def _spawn_network_test_case(**kwargs):
1830
    """Construct a new unit test case class from NetworkTestCase"""
1831

    
1832
    name = "NetworkTestCase" + TEST_RUN_ID
1833
    cls = type(name, (NetworkTestCase,), kwargs)
1834

    
1835
    # Make sure the class can be pickled, by listing it among
1836
    # the attributes of __main__. A PicklingError is raised otherwise.
1837

    
1838
    thismodule = sys.modules[__name__]
1839
    setattr(thismodule, name, cls)
1840
    return cls
1841

    
1842

    
1843
# --------------------------------------------------------------------
1844
# Clean up servers/networks functions
1845
def cleanup_servers(timeout, query_interval, delete_stale=False):
1846

    
1847
    astakos_client = AstakosClient(AUTH_URL, TOKEN)
1848
    astakos_client.CONNECTION_RETRY_LIMIT = 2
1849
    # Compute Client
1850
    compute_url = astakos_client.get_service_endpoints('compute')['publicURL']
1851
    compute_client = ComputeClient(compute_url, TOKEN)
1852
    compute_client.CONNECTION_RETRY_LIMIT = 2
1853

    
1854
    servers = compute_client.list_servers()
1855
    stale = [s for s in servers if s["name"].startswith(SNF_TEST_PREFIX)]
1856

    
1857
    if len(stale) == 0:
1858
        return
1859

    
1860
    # Show staled servers
1861
    print >>sys.stderr, yellow + \
1862
        "Found these stale servers from previous runs:" + \
1863
        normal
1864
    print >>sys.stderr, "    " + \
1865
        "\n    ".join(["%d: %s" % (s["id"], s["name"]) for s in stale])
1866

    
1867
    # Delete staled servers
1868
    if delete_stale:
1869
        print >> sys.stderr, "Deleting %d stale servers:" % len(stale)
1870
        fail_tmout = time.time() + timeout
1871
        for s in stale:
1872
            compute_client.delete_server(s["id"])
1873
        # Wait for all servers to be deleted
1874
        while True:
1875
            servers = compute_client.list_servers()
1876
            stale = [s for s in servers
1877
                     if s["name"].startswith(SNF_TEST_PREFIX)]
1878
            if len(stale) == 0:
1879
                print >> sys.stderr, green + "    ...done" + normal
1880
                break
1881
            elif time.time() > fail_tmout:
1882
                print >> sys.stderr, red + \
1883
                    "Not all stale servers deleted. Action timed out." + \
1884
                    normal
1885
                sys.exit(1)
1886
            else:
1887
                time.sleep(query_interval)
1888
    else:
1889
        print >> sys.stderr, "Use --delete-stale to delete them."
1890

    
1891

    
1892
def cleanup_networks(action_timeout, query_interval, delete_stale=False):
1893

    
1894
    astakos_client = AstakosClient(AUTH_URL, TOKEN)
1895
    astakos_client.CONNECTION_RETRY_LIMIT = 2
1896
    # Cyclades Client
1897
    compute_url = astakos_client.get_service_endpoints('compute')['publicURL']
1898
    cyclades_client = CycladesClient(compute_url, TOKEN)
1899
    cyclades_client.CONNECTION_RETRY_LIMIT = 2
1900

    
1901
    networks = cyclades_client.list_networks()
1902
    stale = [n for n in networks if n["name"].startswith(SNF_TEST_PREFIX)]
1903

    
1904
    if len(stale) == 0:
1905
        return
1906

    
1907
    # Show staled networks
1908
    print >> sys.stderr, yellow + \
1909
        "Found these stale networks from previous runs:" + \
1910
        normal
1911
    print "    " + \
1912
        "\n    ".join(["%s: %s" % (str(n["id"]), n["name"]) for n in stale])
1913

    
1914
    # Delete staled networks
1915
    if delete_stale:
1916
        print >> sys.stderr, "Deleting %d stale networks:" % len(stale)
1917
        fail_tmout = time.time() + action_timeout
1918
        for n in stale:
1919
            cyclades_client.delete_network(n["id"])
1920
        # Wait for all networks to be deleted
1921
        while True:
1922
            networks = cyclades_client.list_networks()
1923
            stale = [n for n in networks
1924
                     if n["name"].startswith(SNF_TEST_PREFIX)]
1925
            if len(stale) == 0:
1926
                print >> sys.stderr, green + "    ...done" + normal
1927
                break
1928
            elif time.time() > fail_tmout:
1929
                print >> sys.stderr, red + \
1930
                    "Not all stale networks deleted. Action timed out." + \
1931
                    normal
1932
                sys.exit(1)
1933
            else:
1934
                time.sleep(query_interval)
1935
    else:
1936
        print >> sys.stderr, "Use --delete-stale to delete them."
1937

    
1938

    
1939
# --------------------------------------------------------------------
1940
# Parse arguments functions
1941
def parse_comma(option, opt, value, parser):
1942
    tests = set(['all', 'auth', 'images', 'flavors',
1943
                 'pithos', 'servers', 'server_spawn',
1944
                 'network_spawn'])
1945
    parse_input = value.split(',')
1946

    
1947
    if not (set(parse_input)).issubset(tests):
1948
        raise OptionValueError("The selected set of tests is invalid")
1949

    
1950
    setattr(parser.values, option.dest, value.split(','))
1951

    
1952

    
1953
def parse_arguments(args):
1954

    
1955
    kw = {}
1956
    kw["usage"] = "%prog [options]"
1957
    kw["description"] = \
1958
        "%prog runs a number of test scenarios on a " \
1959
        "Synnefo deployment."
1960

    
1961
    parser = OptionParser(**kw)
1962
    parser.disable_interspersed_args()
1963

    
1964
    parser.add_option("--auth-url",
1965
                      action="store", type="string", dest="auth_url",
1966
                      help="The AUTH URI to use to reach the Synnefo API",
1967
                      default=None)
1968
    parser.add_option("--system-images-user",
1969
                      action="store", type="string", dest="system_images_user",
1970
                      help="Owner of system images",
1971
                      default=None)
1972
    parser.add_option("--token",
1973
                      action="store", type="string", dest="token",
1974
                      help="The token to use for authentication to the API")
1975
    parser.add_option("--nofailfast",
1976
                      action="store_true", dest="nofailfast",
1977
                      help="Do not fail immediately if one of the tests "
1978
                           "fails (EXPERIMENTAL)",
1979
                      default=False)
1980
    parser.add_option("--no-ipv6",
1981
                      action="store_true", dest="no_ipv6",
1982
                      help="Disables ipv6 related tests",
1983
                      default=False)
1984
    parser.add_option("--action-timeout",
1985
                      action="store", type="int", dest="action_timeout",
1986
                      metavar="TIMEOUT",
1987
                      help="Wait SECONDS seconds for a server action to "
1988
                           "complete, then the test is considered failed",
1989
                      default=100)
1990
    parser.add_option("--build-warning",
1991
                      action="store", type="int", dest="build_warning",
1992
                      metavar="TIMEOUT",
1993
                      help="Warn if TIMEOUT seconds have passed and a "
1994
                           "build operation is still pending",
1995
                      default=600)
1996
    parser.add_option("--build-fail",
1997
                      action="store", type="int", dest="build_fail",
1998
                      metavar="BUILD_TIMEOUT",
1999
                      help="Fail the test if TIMEOUT seconds have passed "
2000
                           "and a build operation is still incomplete",
2001
                      default=900)
2002
    parser.add_option("--query-interval",
2003
                      action="store", type="int", dest="query_interval",
2004
                      metavar="INTERVAL",
2005
                      help="Query server status when requests are pending "
2006
                           "every INTERVAL seconds",
2007
                      default=3)
2008
    parser.add_option("--fanout",
2009
                      action="store", type="int", dest="fanout",
2010
                      metavar="COUNT",
2011
                      help="Spawn up to COUNT child processes to execute "
2012
                           "in parallel, essentially have up to COUNT "
2013
                           "server build requests outstanding (EXPERIMENTAL)",
2014
                      default=1)
2015
    parser.add_option("--force-flavor",
2016
                      action="store", type="int", dest="force_flavorid",
2017
                      metavar="FLAVOR ID",
2018
                      help="Force all server creations to use the specified "
2019
                           "FLAVOR ID instead of a randomly chosen one, "
2020
                           "useful if disk space is scarce",
2021
                      default=None)
2022
    parser.add_option("--image-id",
2023
                      action="store", type="string", dest="force_imageid",
2024
                      metavar="IMAGE ID",
2025
                      help="Test the specified image id, use 'all' to test "
2026
                           "all available images (mandatory argument)",
2027
                      default=None)
2028
    parser.add_option("--show-stale",
2029
                      action="store_true", dest="show_stale",
2030
                      help="Show stale servers from previous runs, whose "
2031
                           "name starts with `%s'" % SNF_TEST_PREFIX,
2032
                      default=False)
2033
    parser.add_option("--delete-stale",
2034
                      action="store_true", dest="delete_stale",
2035
                      help="Delete stale servers from previous runs, whose "
2036
                           "name starts with `%s'" % SNF_TEST_PREFIX,
2037
                      default=False)
2038
    parser.add_option("--force-personality",
2039
                      action="store", type="string", dest="personality_path",
2040
                      help="Force a personality file injection.\
2041
                            File path required. ",
2042
                      default=None)
2043
    parser.add_option("--log-folder",
2044
                      action="store", type="string", dest="log_folder",
2045
                      help="Define the absolute path where the output \
2046
                            log is stored. ",
2047
                      default="/var/log/burnin/")
2048
    parser.add_option("--verbose", "-V",
2049
                      action="store_true", dest="verbose",
2050
                      help="Print detailed output about multiple "
2051
                           "processes spawning",
2052
                      default=False)
2053
    parser.add_option("--set-tests",
2054
                      action="callback",
2055
                      dest="tests",
2056
                      type="string",
2057
                      help='Set comma seperated tests for this run. \
2058
                            Available tests: auth, images, flavors, \
2059
                                             servers, server_spawn, \
2060
                                             network_spawn, pithos. \
2061
                            Default = all',
2062
                      default='all',
2063
                      callback=parse_comma)
2064

    
2065
    (opts, args) = parser.parse_args(args)
2066

    
2067
    # -----------------------
2068
    # Verify arguments
2069

    
2070
    # `delete_stale' implies `show_stale'
2071
    if opts.delete_stale:
2072
        opts.show_stale = True
2073

    
2074
    # `token' is mandatory
2075
    _mandatory_argument(opts.token, "--token")
2076
    # `auth_url' is mandatory
2077
    _mandatory_argument(opts.auth_url, "--auth-url")
2078
    # `system_images_user' is mandatory
2079
    _mandatory_argument(opts.system_images_user, "--system-images-user")
2080

    
2081
    if not opts.show_stale:
2082
        # `image-id' is mandatory
2083
        _mandatory_argument(opts.force_imageid, "--image-id")
2084
        if opts.force_imageid != 'all':
2085
            try:
2086
                opts.force_imageid = str(opts.force_imageid)
2087
            except ValueError:
2088
                print >>sys.stderr, red + \
2089
                    "Invalid value specified for" + \
2090
                    "--image-id. Use a valid id, or `all'." + \
2091
                    normal
2092
                sys.exit(1)
2093

    
2094
    return (opts, args)
2095

    
2096

    
2097
def _mandatory_argument(Arg, Str):
2098
    if (Arg is None) or (Arg == ""):
2099
        print >>sys.stderr, red + \
2100
            "The " + Str + " argument is mandatory.\n" + \
2101
            normal
2102
        sys.exit(1)
2103

    
2104

    
2105
# --------------------------------------------------------------------
2106
# Burnin main function
2107
def main():
2108
    """Assemble test cases into a test suite, and run it
2109

2110
    IMPORTANT: Tests have dependencies and have to be run in the specified
2111
    order inside a single test case. They communicate through attributes of the
2112
    corresponding TestCase class (shared fixtures). Distinct subclasses of
2113
    TestCase MAY SHARE NO DATA, since they are run in parallel, in distinct
2114
    test runner processes.
2115

2116
    """
2117

    
2118
    # Parse arguments using `optparse'
2119
    (opts, args) = parse_arguments(sys.argv[1:])
2120

    
2121
    # Some global variables
2122
    global AUTH_URL, TOKEN, SYSTEM_IMAGES_USER
2123
    global NO_IPV6, VERBOSE, NOFAILFAST
2124
    AUTH_URL = opts.auth_url
2125
    TOKEN = opts.token
2126
    SYSTEM_IMAGES_USER = opts.system_images_user
2127
    NO_IPV6 = opts.no_ipv6
2128
    VERBOSE = opts.verbose
2129
    NOFAILFAST = opts.nofailfast
2130

    
2131
    # If `show_stale', cleanup stale servers
2132
    # from previous runs and exit
2133
    if opts.show_stale:
2134
        # We must clean the servers first
2135
        cleanup_servers(opts.action_timeout, opts.query_interval,
2136
                        delete_stale=opts.delete_stale)
2137
        cleanup_networks(opts.action_timeout, opts.query_interval,
2138
                         delete_stale=opts.delete_stale)
2139
        return 0
2140

    
2141
    # Initialize a kamaki instance, get flavors, images
2142
    astakos_client = AstakosClient(AUTH_URL, TOKEN)
2143
    astakos_client.CONNECTION_RETRY_LIMIT = 2
2144
    # Compute Client
2145
    compute_url = astakos_client.get_service_endpoints('compute')['publicURL']
2146
    compute_client = ComputeClient(compute_url, TOKEN)
2147
    compute_client.CONNECTION_RETRY_LIMIT = 2
2148
    DIMAGES = compute_client.list_images(detail=True)
2149
    DFLAVORS = compute_client.list_flavors(detail=True)
2150

    
2151
    # FIXME: logging, log, LOG PID, TEST_RUN_ID, arguments
2152
    # Run them: FIXME: In parallel, FAILEARLY, catchbreak?
2153
    #unittest.main(verbosity=2, catchbreak=True)
2154

    
2155
    # Get a list of images we are going to test
2156
    if opts.force_imageid == 'all':
2157
        test_images = DIMAGES
2158
    else:
2159
        test_images = filter(lambda x: x["id"] == opts.force_imageid, DIMAGES)
2160

    
2161
    # Create output (logging) folder
2162
    if not os.path.exists(opts.log_folder):
2163
        os.mkdir(opts.log_folder)
2164
    test_folder = os.path.join(opts.log_folder, TEST_RUN_ID)
2165
    os.mkdir(test_folder)
2166

    
2167
    for image in test_images:
2168
        imageid = str(image["id"])
2169
        imagename = image["name"]
2170
        # Choose a flavor (given from user or random)
2171
        if opts.force_flavorid:
2172
            flavorid = opts.force_flavorid
2173
        else:
2174
            flavorid = choice([f["id"] for f in DFLAVORS if f["disk"] >= 20])
2175
        # Personality dictionary for file injection test
2176
        if opts.personality_path is not None:
2177
            f = open(opts.personality_path)
2178
            content = b64encode(f.read())
2179
            personality = []
2180
            st = os.stat(opts.personality_path)
2181
            personality.append({
2182
                'path': '/root/test_inj_file',
2183
                'owner': 'root',
2184
                'group': 'root',
2185
                'mode': 0x7777 & st.st_mode,
2186
                'contents': content})
2187
        else:
2188
            personality = None
2189
        # Give a name to our test servers
2190
        servername = "%s%s for %s" % (SNF_TEST_PREFIX, TEST_RUN_ID, imagename)
2191
        is_windows = imagename.lower().find("windows") >= 0
2192

    
2193
        # Create Server TestCases
2194
        ServerTestCase = _spawn_server_test_case(
2195
            imageid=imageid,
2196
            flavorid=flavorid,
2197
            imagename=imagename,
2198
            personality=personality,
2199
            servername=servername,
2200
            is_windows=is_windows,
2201
            action_timeout=opts.action_timeout,
2202
            build_warning=opts.build_warning,
2203
            build_fail=opts.build_fail,
2204
            query_interval=opts.query_interval)
2205
        # Create Network TestCases
2206
        NetworkTestCase = _spawn_network_test_case(
2207
            action_timeout=opts.action_timeout,
2208
            imageid=imageid,
2209
            flavorid=flavorid,
2210
            imagename=imagename,
2211
            query_interval=opts.query_interval)
2212
        # Create Images TestCase
2213
        CImagesTestCase = _images_test_case(
2214
            action_timeout=opts.action_timeout,
2215
            imageid=imageid,
2216
            flavorid=flavorid,
2217
            imagename=imagename,
2218
            query_interval=opts.query_interval)
2219

    
2220
        # Choose the tests we are going to run
2221
        test_dict = {'auth': UnauthorizedTestCase,
2222
                     'images': CImagesTestCase,
2223
                     'flavors': FlavorsTestCase,
2224
                     'servers': ServersTestCase,
2225
                     'pithos': PithosTestCase,
2226
                     'server_spawn': ServerTestCase,
2227
                     'network_spawn': NetworkTestCase}
2228
        seq_cases = []
2229
        if 'all' in opts.tests:
2230
            seq_cases = [UnauthorizedTestCase, CImagesTestCase,
2231
                         FlavorsTestCase, ServersTestCase,
2232
                         PithosTestCase, ServerTestCase,
2233
                         NetworkTestCase]
2234
        else:
2235
            for test in opts.tests:
2236
                seq_cases.append(test_dict[test])
2237

    
2238
        # Folder for each image
2239
        image_folder = os.path.join(test_folder, imageid)
2240
        os.mkdir(image_folder)
2241

    
2242
        # Run each test
2243
        if opts.fanout > 1:
2244
            _run_cases_in_parallel(seq_cases, opts.fanout, image_folder)
2245
        else:
2246
            _run_cases_in_series(seq_cases, image_folder)
2247

    
2248

    
2249
# --------------------------------------------------------------------
2250
# Call main
2251
if __name__ == "__main__":
2252
    sys.exit(main())