Statistics
| Branch: | Tag: | Revision:

root / snf-tools / synnefo_tools / burnin.py @ 83a3723e

History | View | Annotate | Download (80.7 kB)

1
#!/usr/bin/env python
2

    
3
# Copyright 2011 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35

    
36
"""Perform integration testing on a running Synnefo deployment"""
37

    
38
#import __main__
39
import datetime
40
import inspect
41
import logging
42
import os
43
import os.path
44
import paramiko
45
import prctl
46
import subprocess
47
import signal
48
import socket
49
import sys
50
import time
51
import tempfile
52
from base64 import b64encode
53
from IPy import IP
54
from multiprocessing import Process, Queue
55
from random import choice, randint
56
from optparse import OptionParser, OptionValueError
57

    
58
from kamaki.clients.compute import ComputeClient
59
from kamaki.clients.cyclades import CycladesClient
60
from kamaki.clients.image import ImageClient
61
from kamaki.clients.pithos import PithosClient
62
from kamaki.clients.astakos import AstakosClient
63
from kamaki.clients import ClientError
64

    
65
from vncauthproxy.d3des import generate_response as d3des_generate_response
66

    
67
# Use backported unittest functionality if Python < 2.7
68
try:
69
    import unittest2 as unittest
70
except ImportError:
71
    if sys.version_info < (2, 7):
72
        raise Exception("The unittest2 package is required for Python < 2.7")
73
    import unittest
74

    
75
# --------------------------------------------------------------------
76
# Global Variables
77
AUTH_URL = None
78
TOKEN = None
79
PLANKTON_USER = None
80
NO_IPV6 = None
81
DEFAULT_PLANKTON_USER = "images@okeanos.grnet.gr"
82
NOFAILFAST = None
83
VERBOSE = None
84

    
85
# A unique id identifying this test run
86
TEST_RUN_ID = datetime.datetime.strftime(datetime.datetime.now(),
87
                                         "%Y%m%d%H%M%S")
88
SNF_TEST_PREFIX = "snf-test-"
89

    
90
red = '\x1b[31m'
91
yellow = '\x1b[33m'
92
green = '\x1b[32m'
93
normal = '\x1b[0m'
94

    
95

    
96
# --------------------------------------------------------------------
97
# Global functions
98
def _ssh_execute(hostip, username, password, command):
99
    """Execute a command via ssh"""
100
    ssh = paramiko.SSHClient()
101
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
102
    try:
103
        ssh.connect(hostip, username=username, password=password)
104
    except socket.error, err:
105
        raise AssertionError(err)
106
    try:
107
        stdin, stdout, stderr = ssh.exec_command(command)
108
    except paramiko.SSHException, err:
109
        raise AssertionError(err)
110
    status = stdout.channel.recv_exit_status()
111
    output = stdout.readlines()
112
    ssh.close()
113
    return output, status
114

    
115

    
116
def _get_user_id():
117
    """Authenticate to astakos and get unique users id"""
118
    astakos = AstakosClient(AUTH_URL, TOKEN)
119
    authenticate = astakos.authenticate()
120
    return authenticate['access']['user']['id']
121

    
122

    
123
# --------------------------------------------------------------------
124
# BurninTestReulst class
125
class BurninTestResult(unittest.TextTestResult):
126
    def addSuccess(self, test):
127
        super(BurninTestResult, self).addSuccess(test)
128
        if self.showAll:
129
            if hasattr(test, 'result_dict'):
130
                run_details = test.result_dict
131

    
132
                self.stream.write("\n")
133
                for i in run_details:
134
                    self.stream.write("%s : %s \n" % (i, run_details[i]))
135
                self.stream.write("\n")
136

    
137
        elif self.dots:
138
            self.stream.write('.')
139
            self.stream.flush()
140

    
141
    def addError(self, test, err):
142
        super(BurninTestResult, self).addError(test, err)
143
        if self.showAll:
144
            self.stream.writeln("ERROR")
145
            if hasattr(test, 'result_dict'):
146
                run_details = test.result_dict
147

    
148
                self.stream.write("\n")
149
                for i in run_details:
150
                    self.stream.write("%s : %s \n" % (i, run_details[i]))
151
                self.stream.write("\n")
152

    
153
        elif self.dots:
154
            self.stream.write('E')
155
            self.stream.flush()
156

    
157
    def addFailure(self, test, err):
158
        super(BurninTestResult, self).addFailure(test, err)
159
        if self.showAll:
160
            self.stream.writeln("FAIL")
161
            if hasattr(test, 'result_dict'):
162
                run_details = test.result_dict
163

    
164
                self.stream.write("\n")
165
                for i in run_details:
166
                    self.stream.write("%s : %s \n" % (i, run_details[i]))
167
                self.stream.write("\n")
168

    
169
        elif self.dots:
170
            self.stream.write('F')
171
            self.stream.flush()
172

    
173

    
174
# --------------------------------------------------------------------
175
# Format Results
176
class burninFormatter(logging.Formatter):
177
    err_fmt = red + "ERROR: %(msg)s" + normal
178
    dbg_fmt = green + "* %(msg)s" + normal
179
    info_fmt = "%(msg)s"
180

    
181
    def __init__(self, fmt="%(levelno)s: %(msg)s"):
182
        logging.Formatter.__init__(self, fmt)
183

    
184
    def format(self, record):
185
        format_orig = self._fmt
186
        # Replace the original format with one customized by logging level
187
        if record.levelno == 10:    # DEBUG
188
            self._fmt = burninFormatter.dbg_fmt
189
        elif record.levelno == 20:  # INFO
190
            self._fmt = burninFormatter.info_fmt
191
        elif record.levelno == 40:  # ERROR
192
            self._fmt = burninFormatter.err_fmt
193
        result = logging.Formatter.format(self, record)
194
        self._fmt = format_orig
195
        return result
196

    
197
log = logging.getLogger("burnin")
198
log.setLevel(logging.DEBUG)
199
handler = logging.StreamHandler()
200
handler.setFormatter(burninFormatter())
201
log.addHandler(handler)
202

    
203

    
204
# --------------------------------------------------------------------
205
# UnauthorizedTestCase class
206
class UnauthorizedTestCase(unittest.TestCase):
207
    """Test unauthorized access"""
208
    @classmethod
209
    def setUpClass(cls):
210
        cls.astakos = AstakosClient(AUTH_URL, TOKEN)
211
        cls.compute_url = \
212
            cls.astakos.get_service_endpoints('compute')['publicURL']
213
        cls.result_dict = dict()
214

    
215
    def test_unauthorized_access(self):
216
        """Test access without a valid token fails"""
217
        log.info("Authentication test")
218
        falseToken = '12345'
219
        c = ComputeClient(self.compute_url, falseToken)
220

    
221
        with self.assertRaises(ClientError) as cm:
222
            c.list_servers()
223
            self.assertEqual(cm.exception.status, 401)
224

    
225

    
226
# --------------------------------------------------------------------
227
# This class gest replicated into Images TestCases dynamically
228
class ImagesTestCase(unittest.TestCase):
229
    """Test image lists for consistency"""
230
    @classmethod
231
    def setUpClass(cls):
232
        """Initialize kamaki, get (detailed) list of images"""
233
        log.info("Getting simple and detailed list of images")
234
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
235
        # Compute Client
236
        compute_url = \
237
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
238
        cls.compute_client = ComputeClient(compute_url, TOKEN)
239
        # Image Client
240
        image_url = \
241
            cls.astakos_client.get_service_endpoints('image')['publicURL']
242
        cls.image_client = ImageClient(image_url, TOKEN)
243
        # Pithos Client
244
        pithos_url = cls.astakos_client.\
245
            get_service_endpoints('object-store')['publicURL']
246
        cls.pithos_client = PithosClient(pithos_url, TOKEN)
247
        cls.pithos_client.CONNECTION_RETRY_LIMIT = 2
248

    
249
        # Get images
250
        cls.images = \
251
            filter(lambda x: not x['name'].startswith(SNF_TEST_PREFIX),
252
                   cls.image_client.list_public())
253
        cls.dimages = \
254
            filter(lambda x: not x['name'].startswith(SNF_TEST_PREFIX),
255
                   cls.image_client.list_public(detail=True))
256
        cls.result_dict = dict()
257
        # Get uniq user id
258
        cls.uuid = _get_user_id()
259
        log.info("Uniq user id = %s" % cls.uuid)
260
        # Create temp directory and store it inside our class
261
        # XXX: In my machine /tmp has not enough space
262
        #      so use current directory to be sure.
263
        cls.temp_dir = tempfile.mkdtemp(dir=os.getcwd())
264
        cls.temp_image_name = \
265
            SNF_TEST_PREFIX + cls.imageid + ".diskdump"
266

    
267
    @classmethod
268
    def tearDownClass(cls):
269
        """Remove local files"""
270
        try:
271
            temp_file = os.path.join(cls.temp_dir, cls.temp_image_name)
272
            os.unlink(temp_file)
273
        except:
274
            pass
275
        try:
276
            os.rmdir(cls.temp_dir)
277
        except:
278
            pass
279

    
280
    def test_001_list_images(self):
281
        """Test image list actually returns images"""
282
        self.assertGreater(len(self.images), 0)
283

    
284
    def test_002_list_images_detailed(self):
285
        """Test detailed image list is the same length as list"""
286
        self.assertEqual(len(self.dimages), len(self.images))
287

    
288
    def test_003_same_image_names(self):
289
        """Test detailed and simple image list contain same names"""
290
        names = sorted(map(lambda x: x["name"], self.images))
291
        dnames = sorted(map(lambda x: x["name"], self.dimages))
292
        self.assertEqual(names, dnames)
293

    
294
# XXX: Find a way to resolve owner's uuid to username.
295
#      (maybe use astakosclient)
296
#    def test_004_unique_image_names(self):
297
#        """Test system images have unique names"""
298
#        sys_images = filter(lambda x: x['owner'] == PLANKTON_USER,
299
#                            self.dimages)
300
#        names = sorted(map(lambda x: x["name"], sys_images))
301
#        self.assertEqual(sorted(list(set(names))), names)
302

    
303
    def test_005_image_metadata(self):
304
        """Test every image has specific metadata defined"""
305
        keys = frozenset(["osfamily", "root_partition"])
306
        details = self.compute_client.list_images(detail=True)
307
        for i in details:
308
            self.assertTrue(keys.issubset(i["metadata"].keys()))
309

    
310
    def test_006_download_image(self):
311
        """Download image from pithos+"""
312
        # Get image location
313
        image = filter(
314
            lambda x: x['id'] == self.imageid, self.dimages)[0]
315
        image_location = \
316
            image['location'].replace("://", " ").replace("/", " ").split()
317
        log.info("Download image, with owner %s\n\tcontainer %s, and name %s"
318
                 % (image_location[1], image_location[2], image_location[3]))
319
        self.pithos_client.account = image_location[1]
320
        self.pithos_client.container = image_location[2]
321
        temp_file = os.path.join(self.temp_dir, self.temp_image_name)
322
        with open(temp_file, "wb+") as f:
323
            self.pithos_client.download_object(image_location[3], f)
324

    
325
    def test_007_upload_image(self):
326
        """Upload and register image"""
327
        temp_file = os.path.join(self.temp_dir, self.temp_image_name)
328
        log.info("Upload image to pithos+")
329
        # Create container `images'
330
        self.pithos_client.account = self.uuid
331
        self.pithos_client.container = "images"
332
        self.pithos_client.container_put()
333
        with open(temp_file, "rb+") as f:
334
            self.pithos_client.upload_object(self.temp_image_name, f)
335
        log.info("Register image to plankton")
336
        location = "pithos://" + self.uuid + \
337
            "/images/" + self.temp_image_name
338
        params = {'is_public': True}
339
        properties = {'OSFAMILY': "linux", 'ROOT_PARTITION': 1}
340
        self.image_client.register(
341
            self.temp_image_name, location, params, properties)
342
        # Get image id
343
        details = self.image_client.list_public(detail=True)
344
        detail = filter(lambda x: x['location'] == location, details)
345
        self.assertEqual(len(detail), 1)
346
        cls = type(self)
347
        cls.temp_image_id = detail[0]['id']
348
        log.info("Image registered with id %s" % detail[0]['id'])
349

    
350
    def test_008_cleanup_image(self):
351
        """Cleanup image test"""
352
        log.info("Cleanup image test")
353
        # Remove image from pithos+
354
        self.pithos_client.account = self.uuid
355
        self.pithos_client.container = "images"
356
        self.pithos_client.del_object(self.temp_image_name)
357

    
358

    
359
# --------------------------------------------------------------------
360
# FlavorsTestCase class
361
class FlavorsTestCase(unittest.TestCase):
362
    """Test flavor lists for consistency"""
363
    @classmethod
364
    def setUpClass(cls):
365
        """Initialize kamaki, get (detailed) list of flavors"""
366
        log.info("Getting simple and detailed list of flavors")
367
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
368
        # Compute Client
369
        compute_url = \
370
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
371
        cls.compute_client = ComputeClient(compute_url, TOKEN)
372
        cls.flavors = cls.compute_client.list_flavors()
373
        cls.dflavors = cls.compute_client.list_flavors(detail=True)
374
        cls.result_dict = dict()
375

    
376
    def test_001_list_flavors(self):
377
        """Test flavor list actually returns flavors"""
378
        self.assertGreater(len(self.flavors), 0)
379

    
380
    def test_002_list_flavors_detailed(self):
381
        """Test detailed flavor list is the same length as list"""
382
        self.assertEquals(len(self.dflavors), len(self.flavors))
383

    
384
    def test_003_same_flavor_names(self):
385
        """Test detailed and simple flavor list contain same names"""
386
        names = sorted(map(lambda x: x["name"], self.flavors))
387
        dnames = sorted(map(lambda x: x["name"], self.dflavors))
388
        self.assertEqual(names, dnames)
389

    
390
    def test_004_unique_flavor_names(self):
391
        """Test flavors have unique names"""
392
        names = sorted(map(lambda x: x["name"], self.flavors))
393
        self.assertEqual(sorted(list(set(names))), names)
394

    
395
    def test_005_well_formed_flavor_names(self):
396
        """Test flavors have names of the form CxxRyyDzz
397
        Where xx is vCPU count, yy is RAM in MiB, zz is Disk in GiB
398
        """
399
        for f in self.dflavors:
400
            flavor = (f["vcpus"], f["ram"], f["disk"], f["SNF:disk_template"])
401
            self.assertEqual("C%dR%dD%d%s" % flavor,
402
                             f["name"],
403
                             "Flavor %s does not match its specs." % f["name"])
404

    
405

    
406
# --------------------------------------------------------------------
407
# ServersTestCase class
408
class ServersTestCase(unittest.TestCase):
409
    """Test server lists for consistency"""
410
    @classmethod
411
    def setUpClass(cls):
412
        """Initialize kamaki, get (detailed) list of servers"""
413
        log.info("Getting simple and detailed list of servers")
414

    
415
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
416
        # Compute Client
417
        compute_url = \
418
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
419
        cls.compute_client = ComputeClient(compute_url, TOKEN)
420
        cls.servers = cls.compute_client.list_servers()
421
        cls.dservers = cls.compute_client.list_servers(detail=True)
422
        cls.result_dict = dict()
423

    
424
    # def test_001_list_servers(self):
425
    #     """Test server list actually returns servers"""
426
    #     self.assertGreater(len(self.servers), 0)
427

    
428
    def test_002_list_servers_detailed(self):
429
        """Test detailed server list is the same length as list"""
430
        self.assertEqual(len(self.dservers), len(self.servers))
431

    
432
    def test_003_same_server_names(self):
433
        """Test detailed and simple flavor list contain same names"""
434
        names = sorted(map(lambda x: x["name"], self.servers))
435
        dnames = sorted(map(lambda x: x["name"], self.dservers))
436
        self.assertEqual(names, dnames)
437

    
438

    
439
# --------------------------------------------------------------------
440
# Pithos Test Cases
441
class PithosTestCase(unittest.TestCase):
442
    """Test pithos functionality"""
443
    @classmethod
444
    def setUpClass(cls):
445
        """Initialize kamaki, get list of containers"""
446
        # Get uniq user id
447
        cls.uuid = _get_user_id()
448
        log.info("Uniq user id = %s" % cls.uuid)
449
        log.info("Getting list of containers")
450

    
451
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
452
        # Pithos Client
453
        pithos_url = cls.astakos_client.\
454
            get_service_endpoints('object-store')['publicURL']
455
        cls.pithos_client = PithosClient(pithos_url, TOKEN, cls.uuid)
456
        cls.pithos_client.CONNECTION_RETRY_LIMIT = 2
457

    
458
        cls.containers = cls.pithos_client.list_containers()
459
        cls.result_dict = dict()
460

    
461
    def test_001_list_containers(self):
462
        """Test container list actually returns containers"""
463
        self.assertGreater(len(self.containers), 0)
464

    
465
    def test_002_unique_containers(self):
466
        """Test if containers have unique names"""
467
        names = [n['name'] for n in self.containers]
468
        names = sorted(names)
469
        self.assertEqual(sorted(list(set(names))), names)
470

    
471
    def test_003_create_container(self):
472
        """Test create a container"""
473
        rand_num = randint(1000, 9999)
474
        rand_name = "%s%s" % (SNF_TEST_PREFIX, rand_num)
475
        names = [n['name'] for n in self.containers]
476
        while rand_name in names:
477
            rand_num = randint(1000, 9999)
478
            rand_name = "%s%s" % (SNF_TEST_PREFIX, rand_num)
479
        # Create container
480
        self.pithos_client.container = rand_name
481
        self.pithos_client.container_put()
482
        # Get list of containers
483
        new_containers = self.pithos_client.list_containers()
484
        new_container_names = [n['name'] for n in new_containers]
485
        self.assertIn(rand_name, new_container_names)
486

    
487
    def test_004_upload(self):
488
        """Test uploading something to pithos+"""
489
        # Create a tmp file
490
        with tempfile.TemporaryFile() as f:
491
            f.write("This is a temp file")
492
            f.seek(0, 0)
493
            # Where to save file
494
            self.pithos_client.upload_object("test.txt", f)
495

    
496
    def test_005_download(self):
497
        """Test download something from pithos+"""
498
        # Create tmp directory to save file
499
        tmp_dir = tempfile.mkdtemp()
500
        tmp_file = os.path.join(tmp_dir, "test.txt")
501
        with open(tmp_file, "wb+") as f:
502
            self.pithos_client.download_object("test.txt", f)
503
            # Read file
504
            f.seek(0, 0)
505
            content = f.read()
506
        # Remove files
507
        os.unlink(tmp_file)
508
        os.rmdir(tmp_dir)
509
        # Compare results
510
        self.assertEqual(content, "This is a temp file")
511

    
512
    def test_006_remove(self):
513
        """Test removing files and containers"""
514
        cont_name = self.pithos_client.container
515
        self.pithos_client.del_object("test.txt")
516
        self.pithos_client.purge_container()
517
        # List containers
518
        containers = self.pithos_client.list_containers()
519
        cont_names = [n['name'] for n in containers]
520
        self.assertNotIn(cont_name, cont_names)
521

    
522

    
523
# --------------------------------------------------------------------
524
# This class gets replicated into actual TestCases dynamically
525
class SpawnServerTestCase(unittest.TestCase):
526
    """Test scenario for server of the specified image"""
527
    @classmethod
528
    def setUpClass(cls):
529
        """Initialize a kamaki instance"""
530
        log.info("Spawning server for image `%s'" % cls.imagename)
531

    
532
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
533
        # Cyclades Client
534
        compute_url = \
535
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
536
        cls.cyclades_client = CycladesClient(compute_url, TOKEN)
537

    
538
        cls.result_dict = dict()
539

    
540
    def _get_ipv4(self, server):
541
        """Get the public IPv4 of a server from the detailed server info"""
542

    
543
        nics = server["attachments"]
544

    
545
        for nic in nics:
546
            net_id = nic["network_id"]
547
            if self.cyclades_client.get_network_details(net_id)["public"]:
548
                public_addrs = nic["ipv4"]
549

    
550
        self.assertTrue(public_addrs is not None)
551

    
552
        return public_addrs
553

    
554
    def _get_ipv6(self, server):
555
        """Get the public IPv6 of a server from the detailed server info"""
556

    
557
        nics = server["attachments"]
558

    
559
        for nic in nics:
560
            net_id = nic["network_id"]
561
            if self.cyclades_client.get_network_details(net_id)["public"]:
562
                public_addrs = nic["ipv6"]
563

    
564
        self.assertTrue(public_addrs is not None)
565

    
566
        return public_addrs
567

    
568
    def _connect_loginname(self, os_value):
569
        """Return the login name for connections based on the server OS"""
570
        if os_value in ("Ubuntu", "Kubuntu", "Fedora"):
571
            return "user"
572
        elif os_value in ("windows", "windows_alpha1"):
573
            return "Administrator"
574
        else:
575
            return "root"
576

    
577
    def _verify_server_status(self, current_status, new_status):
578
        """Verify a server has switched to a specified status"""
579
        server = self.cyclades_client.get_server_details(self.serverid)
580
        if server["status"] not in (current_status, new_status):
581
            return None  # Do not raise exception, return so the test fails
582
        self.assertEquals(server["status"], new_status)
583

    
584
    def _get_connected_tcp_socket(self, family, host, port):
585
        """Get a connected socket from the specified family to host:port"""
586
        sock = None
587
        for res in \
588
            socket.getaddrinfo(host, port, family, socket.SOCK_STREAM, 0,
589
                               socket.AI_PASSIVE):
590
            af, socktype, proto, canonname, sa = res
591
            try:
592
                sock = socket.socket(af, socktype, proto)
593
            except socket.error:
594
                sock = None
595
                continue
596
            try:
597
                sock.connect(sa)
598
            except socket.error:
599
                sock.close()
600
                sock = None
601
                continue
602
        self.assertIsNotNone(sock)
603
        return sock
604

    
605
    def _ping_once(self, ipv6, ip):
606
        """Test server responds to a single IPv4 or IPv6 ping"""
607
        cmd = "ping%s -c 7 -w 20 %s" % ("6" if ipv6 else "", ip)
608
        ping = subprocess.Popen(cmd, shell=True,
609
                                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
610
        (stdout, stderr) = ping.communicate()
611
        ret = ping.wait()
612
        self.assertEquals(ret, 0)
613

    
614
    def _get_hostname_over_ssh(self, hostip, username, password):
615
        lines, status = _ssh_execute(
616
            hostip, username, password, "hostname")
617
        self.assertEqual(len(lines), 1)
618
        return lines[0]
619

    
620
    def _try_until_timeout_expires(self, warn_timeout, fail_timeout,
621
                                   opmsg, callable, *args, **kwargs):
622
        if warn_timeout == fail_timeout:
623
            warn_timeout = fail_timeout + 1
624
        warn_tmout = time.time() + warn_timeout
625
        fail_tmout = time.time() + fail_timeout
626
        while True:
627
            self.assertLess(time.time(), fail_tmout,
628
                            "operation `%s' timed out" % opmsg)
629
            if time.time() > warn_tmout:
630
                log.warning("Server %d: `%s' operation `%s' not done yet",
631
                            self.serverid, self.servername, opmsg)
632
            try:
633
                log.info("%s... " % opmsg)
634
                return callable(*args, **kwargs)
635
            except AssertionError:
636
                pass
637
            time.sleep(self.query_interval)
638

    
639
    def _insist_on_tcp_connection(self, family, host, port):
640
        familystr = {socket.AF_INET: "IPv4", socket.AF_INET6: "IPv6",
641
                     socket.AF_UNSPEC: "Unspecified-IPv4/6"}
642
        msg = "connect over %s to %s:%s" % \
643
              (familystr.get(family, "Unknown"), host, port)
644
        sock = self._try_until_timeout_expires(
645
            self.action_timeout, self.action_timeout,
646
            msg, self._get_connected_tcp_socket,
647
            family, host, port)
648
        return sock
649

    
650
    def _insist_on_status_transition(self, current_status, new_status,
651
                                     fail_timeout, warn_timeout=None):
652
        msg = "Server %d: `%s', waiting for %s -> %s" % \
653
              (self.serverid, self.servername, current_status, new_status)
654
        if warn_timeout is None:
655
            warn_timeout = fail_timeout
656
        self._try_until_timeout_expires(warn_timeout, fail_timeout,
657
                                        msg, self._verify_server_status,
658
                                        current_status, new_status)
659
        # Ensure the status is actually the expected one
660
        server = self.cyclades_client.get_server_details(self.serverid)
661
        self.assertEquals(server["status"], new_status)
662

    
663
    def _insist_on_ssh_hostname(self, hostip, username, password):
664
        msg = "SSH to %s, as %s/%s" % (hostip, username, password)
665
        hostname = self._try_until_timeout_expires(
666
            self.action_timeout, self.action_timeout,
667
            msg, self._get_hostname_over_ssh,
668
            hostip, username, password)
669

    
670
        # The hostname must be of the form 'prefix-id'
671
        self.assertTrue(hostname.endswith("-%d\n" % self.serverid))
672

    
673
    def _check_file_through_ssh(self, hostip, username, password,
674
                                remotepath, content):
675
        msg = "Trying file injection through SSH to %s, as %s/%s" % \
676
            (hostip, username, password)
677
        log.info(msg)
678
        try:
679
            ssh = paramiko.SSHClient()
680
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
681
            ssh.connect(hostip, username=username, password=password)
682
            ssh.close()
683
        except socket.error, err:
684
            raise AssertionError(err)
685

    
686
        transport = paramiko.Transport((hostip, 22))
687
        transport.connect(username=username, password=password)
688

    
689
        localpath = '/tmp/' + SNF_TEST_PREFIX + 'injection'
690
        sftp = paramiko.SFTPClient.from_transport(transport)
691
        sftp.get(remotepath, localpath)
692
        sftp.close()
693
        transport.close()
694

    
695
        f = open(localpath)
696
        remote_content = b64encode(f.read())
697

    
698
        # Check if files are the same
699
        return (remote_content == content)
700

    
701
    def _skipIf(self, condition, msg):
702
        if condition:
703
            self.skipTest(msg)
704

    
705
    def test_001_submit_create_server(self):
706
        """Test submit create server request"""
707

    
708
        log.info("Submit new server request")
709
        server = self.cyclades_client.create_server(
710
            self.servername, self.flavorid, self.imageid, self.personality)
711

    
712
        log.info("Server id: " + str(server["id"]))
713
        log.info("Server password: " + server["adminPass"])
714
        self.assertEqual(server["name"], self.servername)
715
        self.assertEqual(server["flavor"]["id"], self.flavorid)
716
        self.assertEqual(server["image"]["id"], self.imageid)
717
        self.assertEqual(server["status"], "BUILD")
718

    
719
        # Update class attributes to reflect data on building server
720
        cls = type(self)
721
        cls.serverid = server["id"]
722
        cls.username = None
723
        cls.passwd = server["adminPass"]
724

    
725
        self.result_dict["Server ID"] = str(server["id"])
726
        self.result_dict["Password"] = str(server["adminPass"])
727

    
728
    def test_002a_server_is_building_in_list(self):
729
        """Test server is in BUILD state, in server list"""
730
        log.info("Server in BUILD state in server list")
731

    
732
        self.result_dict.clear()
733

    
734
        servers = self.cyclades_client.list_servers(detail=True)
735
        servers = filter(lambda x: x["name"] == self.servername, servers)
736

    
737
        server = servers[0]
738
        self.assertEqual(server["name"], self.servername)
739
        self.assertEqual(server["flavor"]["id"], self.flavorid)
740
        self.assertEqual(server["image"]["id"], self.imageid)
741
        self.assertEqual(server["status"], "BUILD")
742

    
743
    def test_002b_server_is_building_in_details(self):
744
        """Test server is in BUILD state, in details"""
745

    
746
        log.info("Server in BUILD state in details")
747

    
748
        server = self.cyclades_client.get_server_details(self.serverid)
749
        self.assertEqual(server["name"], self.servername)
750
        self.assertEqual(server["flavor"]["id"], self.flavorid)
751
        self.assertEqual(server["image"]["id"], self.imageid)
752
        self.assertEqual(server["status"], "BUILD")
753

    
754
    def test_002c_set_server_metadata(self):
755

    
756
        log.info("Creating server metadata")
757

    
758
        image = self.cyclades_client.get_image_details(self.imageid)
759
        os_value = image["metadata"]["os"]
760
        users = image["metadata"].get("users", None)
761
        self.cyclades_client.update_server_metadata(self.serverid, OS=os_value)
762

    
763
        userlist = users.split()
764

    
765
        # Determine the username to use for future connections
766
        # to this host
767
        cls = type(self)
768

    
769
        if "root" in userlist:
770
            cls.username = "root"
771
        elif users is None:
772
            cls.username = self._connect_loginname(os_value)
773
        else:
774
            cls.username = choice(userlist)
775

    
776
        self.assertIsNotNone(cls.username)
777

    
778
    def test_002d_verify_server_metadata(self):
779
        """Test server metadata keys are set based on image metadata"""
780

    
781
        log.info("Verifying image metadata")
782

    
783
        servermeta = self.cyclades_client.get_server_metadata(self.serverid)
784
        imagemeta = self.cyclades_client.get_image_metadata(self.imageid)
785

    
786
        self.assertEqual(servermeta["OS"], imagemeta["os"])
787

    
788
    def test_003_server_becomes_active(self):
789
        """Test server becomes ACTIVE"""
790

    
791
        log.info("Waiting for server to become ACTIVE")
792

    
793
        self._insist_on_status_transition(
794
            "BUILD", "ACTIVE", self.build_fail, self.build_warning)
795

    
796
    def test_003a_get_server_oob_console(self):
797
        """Test getting OOB server console over VNC
798

799
        Implementation of RFB protocol follows
800
        http://www.realvnc.com/docs/rfbproto.pdf.
801

802
        """
803
        console = self.cyclades_client.get_server_console(self.serverid)
804
        self.assertEquals(console['type'], "vnc")
805
        sock = self._insist_on_tcp_connection(
806
            socket.AF_INET, console["host"], console["port"])
807

    
808
        # Step 1. ProtocolVersion message (par. 6.1.1)
809
        version = sock.recv(1024)
810
        self.assertEquals(version, 'RFB 003.008\n')
811
        sock.send(version)
812

    
813
        # Step 2. Security (par 6.1.2): Only VNC Authentication supported
814
        sec = sock.recv(1024)
815
        self.assertEquals(list(sec), ['\x01', '\x02'])
816

    
817
        # Step 3. Request VNC Authentication (par 6.1.2)
818
        sock.send('\x02')
819

    
820
        # Step 4. Receive Challenge (par 6.2.2)
821
        challenge = sock.recv(1024)
822
        self.assertEquals(len(challenge), 16)
823

    
824
        # Step 5. DES-Encrypt challenge, use password as key (par 6.2.2)
825
        response = d3des_generate_response(
826
            (console["password"] + '\0' * 8)[:8], challenge)
827
        sock.send(response)
828

    
829
        # Step 6. SecurityResult (par 6.1.3)
830
        result = sock.recv(4)
831
        self.assertEquals(list(result), ['\x00', '\x00', '\x00', '\x00'])
832
        sock.close()
833

    
834
    def test_004_server_has_ipv4(self):
835
        """Test active server has a valid IPv4 address"""
836

    
837
        log.info("Validate server's IPv4")
838

    
839
        server = self.cyclades_client.get_server_details(self.serverid)
840
        ipv4 = self._get_ipv4(server)
841

    
842
        self.result_dict.clear()
843
        self.result_dict["IPv4"] = str(ipv4)
844

    
845
        self.assertEquals(IP(ipv4).version(), 4)
846

    
847
    def test_005_server_has_ipv6(self):
848
        """Test active server has a valid IPv6 address"""
849
        self._skipIf(NO_IPV6, "--no-ipv6 flag enabled")
850

    
851
        log.info("Validate server's IPv6")
852

    
853
        server = self.cyclades_client.get_server_details(self.serverid)
854
        ipv6 = self._get_ipv6(server)
855

    
856
        self.result_dict.clear()
857
        self.result_dict["IPv6"] = str(ipv6)
858

    
859
        self.assertEquals(IP(ipv6).version(), 6)
860

    
861
    def test_006_server_responds_to_ping_IPv4(self):
862
        """Test server responds to ping on IPv4 address"""
863

    
864
        log.info("Testing if server responds to pings in IPv4")
865
        self.result_dict.clear()
866

    
867
        server = self.cyclades_client.get_server_details(self.serverid)
868
        ip = self._get_ipv4(server)
869
        self._try_until_timeout_expires(self.action_timeout,
870
                                        self.action_timeout,
871
                                        "PING IPv4 to %s" % ip,
872
                                        self._ping_once,
873
                                        False, ip)
874

    
875
    def test_007_server_responds_to_ping_IPv6(self):
876
        """Test server responds to ping on IPv6 address"""
877
        self._skipIf(NO_IPV6, "--no-ipv6 flag enabled")
878
        log.info("Testing if server responds to pings in IPv6")
879

    
880
        server = self.cyclades_client.get_server_details(self.serverid)
881
        ip = self._get_ipv6(server)
882
        self._try_until_timeout_expires(self.action_timeout,
883
                                        self.action_timeout,
884
                                        "PING IPv6 to %s" % ip,
885
                                        self._ping_once,
886
                                        True, ip)
887

    
888
    def test_008_submit_shutdown_request(self):
889
        """Test submit request to shutdown server"""
890

    
891
        log.info("Shutting down server")
892

    
893
        self.cyclades_client.shutdown_server(self.serverid)
894

    
895
    def test_009_server_becomes_stopped(self):
896
        """Test server becomes STOPPED"""
897

    
898
        log.info("Waiting until server becomes STOPPED")
899
        self._insist_on_status_transition(
900
            "ACTIVE", "STOPPED", self.action_timeout, self.action_timeout)
901

    
902
    def test_010_submit_start_request(self):
903
        """Test submit start server request"""
904

    
905
        log.info("Starting server")
906

    
907
        self.cyclades_client.start_server(self.serverid)
908

    
909
    def test_011_server_becomes_active(self):
910
        """Test server becomes ACTIVE again"""
911

    
912
        log.info("Waiting until server becomes ACTIVE")
913
        self._insist_on_status_transition(
914
            "STOPPED", "ACTIVE", self.action_timeout, self.action_timeout)
915

    
916
    def test_011a_server_responds_to_ping_IPv4(self):
917
        """Test server OS is actually up and running again"""
918

    
919
        log.info("Testing if server is actually up and running")
920

    
921
        self.test_006_server_responds_to_ping_IPv4()
922

    
923
    def test_012_ssh_to_server_IPv4(self):
924
        """Test SSH to server public IPv4 works, verify hostname"""
925

    
926
        self._skipIf(self.is_windows, "only valid for Linux servers")
927
        server = self.cyclades_client.get_server_details(self.serverid)
928
        self._insist_on_ssh_hostname(self._get_ipv4(server),
929
                                     self.username, self.passwd)
930

    
931
    def test_013_ssh_to_server_IPv6(self):
932
        """Test SSH to server public IPv6 works, verify hostname"""
933
        self._skipIf(self.is_windows, "only valid for Linux servers")
934
        self._skipIf(NO_IPV6, "--no-ipv6 flag enabled")
935

    
936
        server = self.cyclades_client.get_server_details(self.serverid)
937
        self._insist_on_ssh_hostname(self._get_ipv6(server),
938
                                     self.username, self.passwd)
939

    
940
    def test_014_rdp_to_server_IPv4(self):
941
        "Test RDP connection to server public IPv4 works"""
942
        self._skipIf(not self.is_windows, "only valid for Windows servers")
943
        server = self.cyclades_client.get_server_details(self.serverid)
944
        ipv4 = self._get_ipv4(server)
945
        sock = self._insist_on_tcp_connection(socket.AF_INET, ipv4, 3389)
946

    
947
        # No actual RDP processing done. We assume the RDP server is there
948
        # if the connection to the RDP port is successful.
949
        # FIXME: Use rdesktop, analyze exit code? see manpage [costasd]
950
        sock.close()
951

    
952
    def test_015_rdp_to_server_IPv6(self):
953
        "Test RDP connection to server public IPv6 works"""
954
        self._skipIf(not self.is_windows, "only valid for Windows servers")
955
        self._skipIf(NO_IPV6, "--no-ipv6 flag enabled")
956

    
957
        server = self.cyclades_client.get_server_details(self.serverid)
958
        ipv6 = self._get_ipv6(server)
959
        sock = self._get_tcp_connection(socket.AF_INET6, ipv6, 3389)
960

    
961
        # No actual RDP processing done. We assume the RDP server is there
962
        # if the connection to the RDP port is successful.
963
        sock.close()
964

    
965
    def test_016_personality_is_enforced(self):
966
        """Test file injection for personality enforcement"""
967
        self._skipIf(self.is_windows, "only implemented for Linux servers")
968
        self._skipIf(self.personality is None, "No personality file selected")
969

    
970
        log.info("Trying to inject file for personality enforcement")
971

    
972
        server = self.cyclades_client.get_server_details(self.serverid)
973

    
974
        for inj_file in self.personality:
975
            equal_files = self._check_file_through_ssh(self._get_ipv4(server),
976
                                                       inj_file['owner'],
977
                                                       self.passwd,
978
                                                       inj_file['path'],
979
                                                       inj_file['contents'])
980
            self.assertTrue(equal_files)
981

    
982
    def test_017_submit_delete_request(self):
983
        """Test submit request to delete server"""
984

    
985
        log.info("Deleting server")
986

    
987
        self.cyclades_client.delete_server(self.serverid)
988

    
989
    def test_018_server_becomes_deleted(self):
990
        """Test server becomes DELETED"""
991

    
992
        log.info("Testing if server becomes DELETED")
993

    
994
        self._insist_on_status_transition(
995
            "ACTIVE", "DELETED", self.action_timeout, self.action_timeout)
996

    
997
    def test_019_server_no_longer_in_server_list(self):
998
        """Test server is no longer in server list"""
999

    
1000
        log.info("Test if server is no longer listed")
1001

    
1002
        servers = self.cyclades_client.list_servers()
1003
        self.assertNotIn(self.serverid, [s["id"] for s in servers])
1004

    
1005

    
1006
class NetworkTestCase(unittest.TestCase):
1007
    """ Testing networking in cyclades """
1008

    
1009
    @classmethod
1010
    def setUpClass(cls):
1011
        "Initialize kamaki, get list of current networks"
1012

    
1013
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
1014
        # Cyclades Client
1015
        compute_url = \
1016
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
1017
        cls.cyclades_client = CycladesClient(compute_url, TOKEN)
1018

    
1019
        cls.servername = "%s%s for %s" % (SNF_TEST_PREFIX,
1020
                                          TEST_RUN_ID,
1021
                                          cls.imagename)
1022

    
1023
        #Dictionary initialization for the vms credentials
1024
        cls.serverid = dict()
1025
        cls.username = dict()
1026
        cls.password = dict()
1027
        cls.is_windows = cls.imagename.lower().find("windows") >= 0
1028

    
1029
        cls.result_dict = dict()
1030

    
1031
    def _skipIf(self, condition, msg):
1032
        if condition:
1033
            self.skipTest(msg)
1034

    
1035
    def _get_ipv4(self, server):
1036
        """Get the public IPv4 of a server from the detailed server info"""
1037

    
1038
        nics = server["attachments"]
1039

    
1040
        for nic in nics:
1041
            net_id = nic["network_id"]
1042
            if self.cyclades_client.get_network_details(net_id)["public"]:
1043
                public_addrs = nic["ipv4"]
1044

    
1045
        self.assertTrue(public_addrs is not None)
1046

    
1047
        return public_addrs
1048

    
1049
    def _connect_loginname(self, os_value):
1050
        """Return the login name for connections based on the server OS"""
1051
        if os_value in ("Ubuntu", "Kubuntu", "Fedora"):
1052
            return "user"
1053
        elif os_value in ("windows", "windows_alpha1"):
1054
            return "Administrator"
1055
        else:
1056
            return "root"
1057

    
1058
    def _ping_once(self, ip):
1059

    
1060
        """Test server responds to a single IPv4 or IPv6 ping"""
1061
        cmd = "ping -c 7 -w 20 %s" % (ip)
1062
        ping = subprocess.Popen(cmd, shell=True,
1063
                                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1064
        (stdout, stderr) = ping.communicate()
1065
        ret = ping.wait()
1066

    
1067
        return (ret == 0)
1068

    
1069
    def test_00001a_submit_create_server_A(self):
1070
        """Test submit create server request"""
1071

    
1072
        log.info("Creating test server A")
1073

    
1074
        serverA = self.cyclades_client.create_server(
1075
            self.servername, self.flavorid, self.imageid, personality=None)
1076

    
1077
        self.assertEqual(serverA["name"], self.servername)
1078
        self.assertEqual(serverA["flavor"]["id"], self.flavorid)
1079
        self.assertEqual(serverA["image"]["id"], self.imageid)
1080
        self.assertEqual(serverA["status"], "BUILD")
1081

    
1082
        # Update class attributes to reflect data on building server
1083
        self.serverid['A'] = serverA["id"]
1084
        self.username['A'] = None
1085
        self.password['A'] = serverA["adminPass"]
1086

    
1087
        log.info("Server A id:" + str(serverA["id"]))
1088
        log.info("Server password " + (self.password['A']))
1089

    
1090
        self.result_dict["Server A ID"] = str(serverA["id"])
1091
        self.result_dict["Server A password"] = serverA["adminPass"]
1092

    
1093
    def test_00001b_serverA_becomes_active(self):
1094
        """Test server becomes ACTIVE"""
1095

    
1096
        log.info("Waiting until test server A becomes ACTIVE")
1097
        self.result_dict.clear()
1098

    
1099
        fail_tmout = time.time() + self.action_timeout
1100
        while True:
1101
            d = self.cyclades_client.get_server_details(self.serverid['A'])
1102
            status = d['status']
1103
            if status == 'ACTIVE':
1104
                active = True
1105
                break
1106
            elif time.time() > fail_tmout:
1107
                self.assertLess(time.time(), fail_tmout)
1108
            else:
1109
                time.sleep(self.query_interval)
1110

    
1111
        self.assertTrue(active)
1112

    
1113
    def test_00002a_submit_create_server_B(self):
1114
        """Test submit create server request"""
1115

    
1116
        log.info("Creating test server B")
1117

    
1118
        serverB = self.cyclades_client.create_server(
1119
            self.servername, self.flavorid, self.imageid, personality=None)
1120

    
1121
        self.assertEqual(serverB["name"], self.servername)
1122
        self.assertEqual(serverB["flavor"]["id"], self.flavorid)
1123
        self.assertEqual(serverB["image"]["id"], self.imageid)
1124
        self.assertEqual(serverB["status"], "BUILD")
1125

    
1126
        # Update class attributes to reflect data on building server
1127
        self.serverid['B'] = serverB["id"]
1128
        self.username['B'] = None
1129
        self.password['B'] = serverB["adminPass"]
1130

    
1131
        log.info("Server B id: " + str(serverB["id"]))
1132
        log.info("Password " + (self.password['B']))
1133

    
1134
        self.result_dict.clear()
1135
        self.result_dict["Server B ID"] = str(serverB["id"])
1136
        self.result_dict["Server B password"] = serverB["adminPass"]
1137

    
1138
    def test_00002b_serverB_becomes_active(self):
1139
        """Test server becomes ACTIVE"""
1140

    
1141
        log.info("Waiting until test server B becomes ACTIVE")
1142
        self.result_dict.clear()
1143

    
1144
        fail_tmout = time.time() + self.action_timeout
1145
        while True:
1146
            d = self.cyclades_client.get_server_details(self.serverid['B'])
1147
            status = d['status']
1148
            if status == 'ACTIVE':
1149
                active = True
1150
                break
1151
            elif time.time() > fail_tmout:
1152
                self.assertLess(time.time(), fail_tmout)
1153
            else:
1154
                time.sleep(self.query_interval)
1155

    
1156
        self.assertTrue(active)
1157

    
1158
    def test_001_create_network(self):
1159
        """Test submit create network request"""
1160

    
1161
        log.info("Submit new network request")
1162
        self.result_dict.clear()
1163

    
1164
        name = SNF_TEST_PREFIX + TEST_RUN_ID
1165
        #previous_num = len(self.client.list_networks())
1166
        network = self.cyclades_client.create_network(
1167
            name, cidr='10.0.1.0/28', dhcp=True)
1168

    
1169
        #Test if right name is assigned
1170
        self.assertEqual(network['name'], name)
1171

    
1172
        # Update class attributes
1173
        cls = type(self)
1174
        cls.networkid = network['id']
1175
        #networks = self.client.list_networks()
1176

    
1177
        fail_tmout = time.time() + self.action_timeout
1178

    
1179
        #Test if new network is created
1180
        while True:
1181
            d = self.cyclades_client.get_network_details(network['id'])
1182
            if d['status'] == 'ACTIVE':
1183
                connected = True
1184
                break
1185
            elif time.time() > fail_tmout:
1186
                self.assertLess(time.time(), fail_tmout)
1187
            else:
1188
                log.info("Waiting for network to become ACTIVE")
1189
                time.sleep(self.query_interval)
1190

    
1191
        self.assertTrue(connected)
1192

    
1193
        self.result_dict["Private network ID"] = str(network['id'])
1194

    
1195
    def test_002_connect_to_network(self):
1196
        """Test connect VMs to network"""
1197

    
1198
        log.info("Connect VMs to private network")
1199
        self.result_dict.clear()
1200

    
1201
        self.cyclades_client.connect_server(self.serverid['A'], self.networkid)
1202
        self.cyclades_client.connect_server(self.serverid['B'], self.networkid)
1203

    
1204
        #Insist on connecting until action timeout
1205
        fail_tmout = time.time() + self.action_timeout
1206

    
1207
        while True:
1208

    
1209
            netsA = [x['network_id']
1210
                     for x in self.cyclades_client.get_server_details(
1211
                         self.serverid['A'])['attachments']]
1212
            netsB = [x['network_id']
1213
                     for x in self.cyclades_client.get_server_details(
1214
                         self.serverid['B'])['attachments']]
1215

    
1216
            if (self.networkid in netsA) and (self.networkid in netsB):
1217
                conn_exists = True
1218
                break
1219
            elif time.time() > fail_tmout:
1220
                self.assertLess(time.time(), fail_tmout)
1221
            else:
1222
                time.sleep(self.query_interval)
1223

    
1224
        #Adding private IPs to class attributes
1225
        cls = type(self)
1226
        cls.priv_ip = dict()
1227

    
1228
        nicsA = self.cyclades_client.get_server_details(
1229
            self.serverid['A'])['attachments']
1230
        nicsB = self.cyclades_client.get_server_details(
1231
            self.serverid['B'])['attachments']
1232

    
1233
        if conn_exists:
1234
            for nic in nicsA:
1235
                if nic["network_id"] == self.networkid:
1236
                    cls.priv_ip["A"] = nic["ipv4"]
1237
            self.result_dict["Server A private IP"] = str(cls.priv_ip["A"])
1238

    
1239
            for nic in nicsB:
1240
                if nic["network_id"] == self.networkid:
1241
                    cls.priv_ip["B"] = nic["ipv4"]
1242
            self.result_dict["Server B private IP"] = str(cls.priv_ip["B"])
1243

    
1244
        self.assertTrue(conn_exists)
1245
        self.assertIsNot(cls.priv_ip["A"], None)
1246
        self.assertIsNot(cls.priv_ip["B"], None)
1247

    
1248
    def test_002a_reboot(self):
1249
        """Rebooting server A"""
1250

    
1251
        log.info("Rebooting server A")
1252

    
1253
        self.cyclades_client.shutdown_server(self.serverid['A'])
1254

    
1255
        fail_tmout = time.time() + self.action_timeout
1256
        while True:
1257
            d = self.cyclades_client.get_server_details(self.serverid['A'])
1258
            status = d['status']
1259
            if status == 'STOPPED':
1260
                break
1261
            elif time.time() > fail_tmout:
1262
                self.assertLess(time.time(), fail_tmout)
1263
            else:
1264
                time.sleep(self.query_interval)
1265

    
1266
        self.cyclades_client.start_server(self.serverid['A'])
1267

    
1268
        while True:
1269
            d = self.cyclades_client.get_server_details(self.serverid['A'])
1270
            status = d['status']
1271
            if status == 'ACTIVE':
1272
                active = True
1273
                break
1274
            elif time.time() > fail_tmout:
1275
                self.assertLess(time.time(), fail_tmout)
1276
            else:
1277
                time.sleep(self.query_interval)
1278

    
1279
        self.assertTrue(active)
1280

    
1281
    def test_002b_ping_server_A(self):
1282
        "Test if server A responds to IPv4 pings"
1283

    
1284
        log.info("Testing if server A responds to IPv4 pings ")
1285
        self.result_dict.clear()
1286

    
1287
        server = self.cyclades_client.get_server_details(self.serverid['A'])
1288
        ip = self._get_ipv4(server)
1289

    
1290
        fail_tmout = time.time() + self.action_timeout
1291

    
1292
        s = False
1293

    
1294
        self.result_dict["Server A public IP"] = str(ip)
1295

    
1296
        while True:
1297

    
1298
            if self._ping_once(ip):
1299
                s = True
1300
                break
1301

    
1302
            elif time.time() > fail_tmout:
1303
                self.assertLess(time.time(), fail_tmout)
1304

    
1305
            else:
1306
                time.sleep(self.query_interval)
1307

    
1308
        self.assertTrue(s)
1309

    
1310
    def test_002c_reboot(self):
1311
        """Reboot server B"""
1312

    
1313
        log.info("Rebooting server B")
1314
        self.result_dict.clear()
1315

    
1316
        self.cyclades_client.shutdown_server(self.serverid['B'])
1317

    
1318
        fail_tmout = time.time() + self.action_timeout
1319
        while True:
1320
            d = self.cyclades_client.get_server_details(self.serverid['B'])
1321
            status = d['status']
1322
            if status == 'STOPPED':
1323
                break
1324
            elif time.time() > fail_tmout:
1325
                self.assertLess(time.time(), fail_tmout)
1326
            else:
1327
                time.sleep(self.query_interval)
1328

    
1329
        self.cyclades_client.start_server(self.serverid['B'])
1330

    
1331
        while True:
1332
            d = self.cyclades_client.get_server_details(self.serverid['B'])
1333
            status = d['status']
1334
            if status == 'ACTIVE':
1335
                active = True
1336
                break
1337
            elif time.time() > fail_tmout:
1338
                self.assertLess(time.time(), fail_tmout)
1339
            else:
1340
                time.sleep(self.query_interval)
1341

    
1342
        self.assertTrue(active)
1343

    
1344
    def test_002d_ping_server_B(self):
1345
        """Test if server B responds to IPv4 pings"""
1346

    
1347
        log.info("Testing if server B responds to IPv4 pings")
1348
        self.result_dict.clear()
1349

    
1350
        server = self.cyclades_client.get_server_details(self.serverid['B'])
1351
        ip = self._get_ipv4(server)
1352

    
1353
        fail_tmout = time.time() + self.action_timeout
1354

    
1355
        s = False
1356

    
1357
        self.result_dict["Server B public IP"] = str(ip)
1358

    
1359
        while True:
1360
            if self._ping_once(ip):
1361
                s = True
1362
                break
1363

    
1364
            elif time.time() > fail_tmout:
1365
                self.assertLess(time.time(), fail_tmout)
1366

    
1367
            else:
1368
                time.sleep(self.query_interval)
1369

    
1370
        self.assertTrue(s)
1371

    
1372
    def test_003a_setup_interface_A(self):
1373
        """Setup eth1 for server A"""
1374

    
1375
        self._skipIf(self.is_windows, "only valid for Linux servers")
1376

    
1377
        log.info("Setting up interface eth1 for server A")
1378
        self.result_dict.clear()
1379

    
1380
        server = self.cyclades_client.get_server_details(self.serverid['A'])
1381
        image = self.cyclades_client.get_image_details(self.imageid)
1382
        os_value = image['metadata']['os']
1383

    
1384
        users = image["metadata"].get("users", None)
1385
        userlist = users.split()
1386

    
1387
        if "root" in userlist:
1388
            loginname = "root"
1389
        elif users is None:
1390
            loginname = self._connect_loginname(os_value)
1391
        else:
1392
            loginname = choice(userlist)
1393

    
1394
        hostip = self._get_ipv4(server)
1395
        myPass = self.password['A']
1396

    
1397
        log.info("SSH in server A as %s/%s" % (loginname, myPass))
1398
        command = "ifconfig eth1 %s && ifconfig eth1 | " \
1399
                  "grep 'inet addr:' | cut -d: -f2 | awk '{ print $1}'" \
1400
                  % self.priv_ip["A"]
1401
        output, status = _ssh_execute(
1402
            hostip, loginname, myPass, command)
1403

    
1404
        self.assertEquals(status, 0)
1405
        self.assertEquals(output[0].strip(), self.priv_ip["A"])
1406

    
1407
    def test_003b_setup_interface_B(self):
1408
        """Setup eth1 for server B"""
1409

    
1410
        self._skipIf(self.is_windows, "only valid for Linux servers")
1411

    
1412
        log.info("Setting up interface eth1 for server B")
1413

    
1414
        server = self.cyclades_client.get_server_details(self.serverid['B'])
1415
        image = self.cyclades_client.get_image_details(self.imageid)
1416
        os_value = image['metadata']['os']
1417

    
1418
        users = image["metadata"].get("users", None)
1419
        userlist = users.split()
1420

    
1421
        if "root" in userlist:
1422
            loginname = "root"
1423
        elif users is None:
1424
            loginname = self._connect_loginname(os_value)
1425
        else:
1426
            loginname = choice(userlist)
1427

    
1428
        hostip = self._get_ipv4(server)
1429
        myPass = self.password['B']
1430

    
1431
        log.info("SSH in server B as %s/%s" % (loginname, myPass))
1432
        command = "ifconfig eth1 %s && ifconfig eth1 | " \
1433
                  "grep 'inet addr:' | cut -d: -f2 | awk '{ print $1}'" \
1434
                  % self.priv_ip["B"]
1435
        output, status = _ssh_execute(
1436
            hostip, loginname, myPass, command)
1437

    
1438
        self.assertEquals(status, 0)
1439
        self.assertEquals(output[0].strip(), self.priv_ip["B"])
1440

    
1441
    def test_003c_test_connection_exists(self):
1442
        """Ping server B from server A to test if connection exists"""
1443

    
1444
        self._skipIf(self.is_windows, "only valid for Linux servers")
1445

    
1446
        log.info("Testing if server A is actually connected to server B")
1447

    
1448
        server = self.cyclades_client.get_server_details(self.serverid['A'])
1449
        image = self.cyclades_client.get_image_details(self.imageid)
1450
        os_value = image['metadata']['os']
1451
        hostip = self._get_ipv4(server)
1452

    
1453
        users = image["metadata"].get("users", None)
1454
        userlist = users.split()
1455

    
1456
        if "root" in userlist:
1457
            loginname = "root"
1458
        elif users is None:
1459
            loginname = self._connect_loginname(os_value)
1460
        else:
1461
            loginname = choice(userlist)
1462

    
1463
        myPass = self.password['A']
1464

    
1465
        cmd = "if ping -c 7 -w 20 %s >/dev/null; \
1466
               then echo \'True\'; fi;" % self.priv_ip["B"]
1467
        lines, status = _ssh_execute(
1468
            hostip, loginname, myPass, cmd)
1469

    
1470
        exists = False
1471

    
1472
        if 'True\n' in lines:
1473
            exists = True
1474

    
1475
        self.assertTrue(exists)
1476

    
1477
    def test_004_disconnect_from_network(self):
1478
        "Disconnecting server A and B from network"
1479

    
1480
        log.info("Disconnecting servers from private network")
1481

    
1482
        prev_state = self.cyclades_client.get_network_details(self.networkid)
1483
        prev_nics = prev_state['attachments']
1484
        #prev_conn = len(prev_nics)
1485

    
1486
        nicsA = [x['id']
1487
                 for x in self.cyclades_client.get_server_details(
1488
                     self.serverid['A'])['attachments']]
1489
        nicsB = [x['id']
1490
                 for x in self.cyclades_client.get_server_details(
1491
                     self.serverid['B'])['attachments']]
1492

    
1493
        for nic in prev_nics:
1494
            if nic in nicsA:
1495
                self.cyclades_client.disconnect_server(self.serverid['A'], nic)
1496
            if nic in nicsB:
1497
                self.cyclades_client.disconnect_server(self.serverid['B'], nic)
1498

    
1499
        #Insist on deleting until action timeout
1500
        fail_tmout = time.time() + self.action_timeout
1501

    
1502
        while True:
1503
            netsA = [x['network_id']
1504
                     for x in self.cyclades_client.get_server_details(
1505
                         self.serverid['A'])['attachments']]
1506
            netsB = [x['network_id']
1507
                     for x in self.cyclades_client.get_server_details(
1508
                         self.serverid['B'])['attachments']]
1509

    
1510
            #connected = (self.client.get_network_details(self.networkid))
1511
            #connections = connected['attachments']
1512
            if (self.networkid not in netsA) and (self.networkid not in netsB):
1513
                conn_exists = False
1514
                break
1515
            elif time.time() > fail_tmout:
1516
                self.assertLess(time.time(), fail_tmout)
1517
            else:
1518
                time.sleep(self.query_interval)
1519

    
1520
        self.assertFalse(conn_exists)
1521

    
1522
    def test_005_destroy_network(self):
1523
        """Test submit delete network request"""
1524

    
1525
        log.info("Submitting delete network request")
1526

    
1527
        self.cyclades_client.delete_network(self.networkid)
1528

    
1529
        fail_tmout = time.time() + self.action_timeout
1530

    
1531
        while True:
1532

    
1533
            curr_net = []
1534
            networks = self.cyclades_client.list_networks()
1535

    
1536
            for net in networks:
1537
                curr_net.append(net['id'])
1538

    
1539
            if self.networkid not in curr_net:
1540
                self.assertTrue(self.networkid not in curr_net)
1541
                break
1542

    
1543
            elif time.time() > fail_tmout:
1544
                self.assertLess(time.time(), fail_tmout)
1545

    
1546
            else:
1547
                time.sleep(self.query_interval)
1548

    
1549
    def test_006_cleanup_servers(self):
1550
        """Cleanup servers created for this test"""
1551

    
1552
        log.info("Delete servers created for this test")
1553

    
1554
        self.cyclades_client.delete_server(self.serverid['A'])
1555
        self.cyclades_client.delete_server(self.serverid['B'])
1556

    
1557
        fail_tmout = time.time() + self.action_timeout
1558

    
1559
        #Ensure server gets deleted
1560
        status = dict()
1561

    
1562
        while True:
1563
            details = \
1564
                self.cyclades_client.get_server_details(self.serverid['A'])
1565
            status['A'] = details['status']
1566
            details = \
1567
                self.cyclades_client.get_server_details(self.serverid['B'])
1568
            status['B'] = details['status']
1569
            if (status['A'] == 'DELETED') and (status['B'] == 'DELETED'):
1570
                deleted = True
1571
                break
1572
            elif time.time() > fail_tmout:
1573
                self.assertLess(time.time(), fail_tmout)
1574
            else:
1575
                time.sleep(self.query_interval)
1576

    
1577
        self.assertTrue(deleted)
1578

    
1579

    
1580
class TestRunnerProcess(Process):
1581
    """A distinct process used to execute part of the tests in parallel"""
1582
    def __init__(self, **kw):
1583
        Process.__init__(self, **kw)
1584
        kwargs = kw["kwargs"]
1585
        self.testq = kwargs["testq"]
1586
        self.worker_folder = kwargs["worker_folder"]
1587

    
1588
    def run(self):
1589
        # Make sure this test runner process dies with the parent
1590
        # and is not left behind.
1591
        #
1592
        # WARNING: This uses the prctl(2) call and is
1593
        # Linux-specific.
1594

    
1595
        prctl.set_pdeathsig(signal.SIGHUP)
1596

    
1597
        multi = logging.getLogger("multiprocess")
1598

    
1599
        while True:
1600
            multi.debug("I am process %d, GETting from queue is %s" %
1601
                        (os.getpid(), self.testq))
1602
            msg = self.testq.get()
1603

    
1604
            multi.debug("Dequeued msg: %s" % msg)
1605

    
1606
            if msg == "TEST_RUNNER_TERMINATE":
1607
                raise SystemExit
1608

    
1609
            elif issubclass(msg, unittest.TestCase):
1610
                # Assemble a TestSuite, and run it
1611

    
1612
                log_file = os.path.join(self.worker_folder, 'details_' +
1613
                                        (msg.__name__) + "_" +
1614
                                        TEST_RUN_ID + '.log')
1615

    
1616
                fail_file = os.path.join(self.worker_folder, 'failed_' +
1617
                                         (msg.__name__) + "_" +
1618
                                         TEST_RUN_ID + '.log')
1619
                error_file = os.path.join(self.worker_folder, 'error_' +
1620
                                          (msg.__name__) + "_" +
1621
                                          TEST_RUN_ID + '.log')
1622

    
1623
                f = open(log_file, 'w')
1624
                fail = open(fail_file, 'w')
1625
                error = open(error_file, 'w')
1626

    
1627
                log.info(yellow + '* Starting testcase: %s' % msg + normal)
1628

    
1629
                runner = unittest.TextTestRunner(
1630
                    f, verbosity=2, failfast=True,
1631
                    resultclass=BurninTestResult)
1632
                suite = unittest.TestLoader().loadTestsFromTestCase(msg)
1633
                result = runner.run(suite)
1634

    
1635
                for res in result.errors:
1636
                    log.error("snf-burnin encountered an error in "
1637
                              "testcase: %s" % msg)
1638
                    log.error("See log for details")
1639
                    error.write(str(res[0]) + '\n')
1640
                    error.write(str(res[0].shortDescription()) + '\n')
1641
                    error.write('\n')
1642

    
1643
                for res in result.failures:
1644
                    log.error("snf-burnin failed in testcase: %s" % msg)
1645
                    log.error("See log for details")
1646
                    fail.write(str(res[0]) + '\n')
1647
                    fail.write(str(res[0].shortDescription()) + '\n')
1648
                    fail.write('\n')
1649
                    if not NOFAILFAST:
1650
                        sys.exit()
1651

    
1652
                if (len(result.failures) == 0) and (len(result.errors) == 0):
1653
                    log.debug("Passed testcase: %s" % msg)
1654

    
1655
                f.close()
1656
                fail.close()
1657
                error.close()
1658

    
1659
            else:
1660
                raise Exception("Cannot handle msg: %s" % msg)
1661

    
1662

    
1663
def _run_cases_in_series(cases, image_folder):
1664
    """Run instances of TestCase in series"""
1665

    
1666
    for case in cases:
1667

    
1668
        test = case.__name__
1669

    
1670
        log.info(yellow + '* Starting testcase: %s' % test + normal)
1671
        log_file = os.path.join(image_folder, 'details_' +
1672
                                (case.__name__) + "_" +
1673
                                TEST_RUN_ID + '.log')
1674
        fail_file = os.path.join(image_folder, 'failed_' +
1675
                                 (case.__name__) + "_" +
1676
                                 TEST_RUN_ID + '.log')
1677
        error_file = os.path.join(image_folder, 'error_' +
1678
                                  (case.__name__) + "_" +
1679
                                  TEST_RUN_ID + '.log')
1680

    
1681
        f = open(log_file, "w")
1682
        fail = open(fail_file, "w")
1683
        error = open(error_file, "w")
1684

    
1685
        suite = unittest.TestLoader().loadTestsFromTestCase(case)
1686
        runner = unittest.TextTestRunner(
1687
            f, verbosity=2, failfast=True,
1688
            resultclass=BurninTestResult)
1689
        result = runner.run(suite)
1690

    
1691
        for res in result.errors:
1692
            log.error("snf-burnin encountered an error in "
1693
                      "testcase: %s" % test)
1694
            log.error("See log for details")
1695
            error.write(str(res[0]) + '\n')
1696
            error.write(str(res[0].shortDescription()) + '\n')
1697
            error.write('\n')
1698

    
1699
        for res in result.failures:
1700
            log.error("snf-burnin failed in testcase: %s" % test)
1701
            log.error("See log for details")
1702
            fail.write(str(res[0]) + '\n')
1703
            fail.write(str(res[0].shortDescription()) + '\n')
1704
            fail.write('\n')
1705
            if not NOFAILFAST:
1706
                sys.exit()
1707

    
1708
        if (len(result.failures) == 0) and (len(result.errors) == 0):
1709
            log.debug("Passed testcase: %s" % test)
1710

    
1711

    
1712
def _run_cases_in_parallel(cases, fanout, image_folder):
1713
    """Run instances of TestCase in parallel, in a number of distinct processes
1714

1715
    The cases iterable specifies the TestCases to be executed in parallel,
1716
    by test runners running in distinct processes.
1717
    The fanout parameter specifies the number of processes to spawn,
1718
    and defaults to 1.
1719
    The runner argument specifies the test runner class to use inside each
1720
    runner process.
1721

1722
    """
1723

    
1724
    multi = logging.getLogger("multiprocess")
1725
    handler = logging.StreamHandler()
1726
    multi.addHandler(handler)
1727

    
1728
    if VERBOSE:
1729
        multi.setLevel(logging.DEBUG)
1730
    else:
1731
        multi.setLevel(logging.INFO)
1732

    
1733
    testq = []
1734
    worker_folder = []
1735
    runners = []
1736

    
1737
    for i in xrange(0, fanout):
1738
        testq.append(Queue())
1739
        worker_folder.append(os.path.join(image_folder, 'process'+str(i)))
1740
        os.mkdir(worker_folder[i])
1741

    
1742
    for i in xrange(0, fanout):
1743
        kwargs = dict(testq=testq[i], worker_folder=worker_folder[i])
1744
        runners.append(TestRunnerProcess(kwargs=kwargs))
1745

    
1746
    multi.debug("Spawning %d test runner processes" % len(runners))
1747

    
1748
    for p in runners:
1749
        p.start()
1750

    
1751
    # Enqueue test cases
1752
    for i in xrange(0, fanout):
1753
        map(testq[i].put, cases)
1754
        testq[i].put("TEST_RUNNER_TERMINATE")
1755

    
1756
    multi.debug("Spawned %d test runners, PIDs are %s" %
1757
                (len(runners), [p.pid for p in runners]))
1758

    
1759
    multi.debug("Joining %d processes" % len(runners))
1760

    
1761
    for p in runners:
1762
        p.join()
1763

    
1764
    multi.debug("Done joining %d processes" % len(runners))
1765

    
1766

    
1767
def _images_test_case(**kwargs):
1768
    """Construct a new unit test case class from ImagesTestCase"""
1769
    name = "ImagesTestCase_%s" % kwargs["imageid"]
1770
    cls = type(name, (ImagesTestCase,), kwargs)
1771

    
1772
    #Patch extra parameters into test names by manipulating method docstrings
1773
    for (mname, m) in \
1774
            inspect.getmembers(cls, lambda x: inspect.ismethod(x)):
1775
        if hasattr(m, __doc__):
1776
            m.__func__.__doc__ = "[%s] %s" % (cls.imagename, m.__doc__)
1777

    
1778
    # Make sure the class can be pickled, by listing it among
1779
    # the attributes of __main__. A PicklingError is raised otherwise.
1780
    thismodule = sys.modules[__name__]
1781
    setattr(thismodule, name, cls)
1782
    return cls
1783

    
1784

    
1785
def _spawn_server_test_case(**kwargs):
1786
    """Construct a new unit test case class from SpawnServerTestCase"""
1787

    
1788
    name = "SpawnServerTestCase_%s" % kwargs["imageid"]
1789
    cls = type(name, (SpawnServerTestCase,), kwargs)
1790

    
1791
    # Patch extra parameters into test names by manipulating method docstrings
1792
    for (mname, m) in \
1793
            inspect.getmembers(cls, lambda x: inspect.ismethod(x)):
1794
        if hasattr(m, __doc__):
1795
            m.__func__.__doc__ = "[%s] %s" % (cls.imagename, m.__doc__)
1796

    
1797
    # Make sure the class can be pickled, by listing it among
1798
    # the attributes of __main__. A PicklingError is raised otherwise.
1799

    
1800
    thismodule = sys.modules[__name__]
1801
    setattr(thismodule, name, cls)
1802
    return cls
1803

    
1804

    
1805
def _spawn_network_test_case(**kwargs):
1806
    """Construct a new unit test case class from NetworkTestCase"""
1807

    
1808
    name = "NetworkTestCase" + TEST_RUN_ID
1809
    cls = type(name, (NetworkTestCase,), kwargs)
1810

    
1811
    # Make sure the class can be pickled, by listing it among
1812
    # the attributes of __main__. A PicklingError is raised otherwise.
1813

    
1814
    thismodule = sys.modules[__name__]
1815
    setattr(thismodule, name, cls)
1816
    return cls
1817

    
1818

    
1819
# --------------------------------------------------------------------
1820
# Clean up servers/networks functions
1821
def cleanup_servers(timeout, query_interval, delete_stale=False):
1822

    
1823
    astakos_client = AstakosClient(AUTH_URL, TOKEN)
1824
    # Compute Client
1825
    compute_url = astakos_client.get_service_endpoints('compute')['publicURL']
1826
    compute_client = ComputeClient(compute_url, TOKEN)
1827

    
1828
    servers = compute_client.list_servers()
1829
    stale = [s for s in servers if s["name"].startswith(SNF_TEST_PREFIX)]
1830

    
1831
    if len(stale) == 0:
1832
        return
1833

    
1834
    # Show staled servers
1835
    print >>sys.stderr, yellow + \
1836
        "Found these stale servers from previous runs:" + \
1837
        normal
1838
    print >>sys.stderr, "    " + \
1839
        "\n    ".join(["%d: %s" % (s["id"], s["name"]) for s in stale])
1840

    
1841
    # Delete staled servers
1842
    if delete_stale:
1843
        print >> sys.stderr, "Deleting %d stale servers:" % len(stale)
1844
        fail_tmout = time.time() + timeout
1845
        for s in stale:
1846
            compute_client.delete_server(s["id"])
1847
        # Wait for all servers to be deleted
1848
        while True:
1849
            servers = compute_client.list_servers()
1850
            stale = [s for s in servers
1851
                     if s["name"].startswith(SNF_TEST_PREFIX)]
1852
            if len(stale) == 0:
1853
                print >> sys.stderr, green + "    ...done" + normal
1854
                break
1855
            elif time.time() > fail_tmout:
1856
                print >> sys.stderr, red + \
1857
                    "Not all stale servers deleted. Action timed out." + \
1858
                    normal
1859
                sys.exit(1)
1860
            else:
1861
                time.sleep(query_interval)
1862
    else:
1863
        print >> sys.stderr, "Use --delete-stale to delete them."
1864

    
1865

    
1866
def cleanup_networks(action_timeout, query_interval, delete_stale=False):
1867

    
1868
    astakos_client = AstakosClient(AUTH_URL, TOKEN)
1869
    # Cyclades Client
1870
    compute_url = astakos_client.get_service_endpoints('compute')['publicURL']
1871
    cyclades_client = CycladesClient(compute_url, TOKEN)
1872

    
1873
    networks = cyclades_client.list_networks()
1874
    stale = [n for n in networks if n["name"].startswith(SNF_TEST_PREFIX)]
1875

    
1876
    if len(stale) == 0:
1877
        return
1878

    
1879
    # Show staled networks
1880
    print >> sys.stderr, yellow + \
1881
        "Found these stale networks from previous runs:" + \
1882
        normal
1883
    print "    " + \
1884
        "\n    ".join(["%s: %s" % (str(n["id"]), n["name"]) for n in stale])
1885

    
1886
    # Delete staled networks
1887
    if delete_stale:
1888
        print >> sys.stderr, "Deleting %d stale networks:" % len(stale)
1889
        fail_tmout = time.time() + action_timeout
1890
        for n in stale:
1891
            cyclades_client.delete_network(n["id"])
1892
        # Wait for all networks to be deleted
1893
        while True:
1894
            networks = cyclades_client.list_networks()
1895
            stale = [n for n in networks
1896
                     if n["name"].startswith(SNF_TEST_PREFIX)]
1897
            if len(stale) == 0:
1898
                print >> sys.stderr, green + "    ...done" + normal
1899
                break
1900
            elif time.time() > fail_tmout:
1901
                print >> sys.stderr, red + \
1902
                    "Not all stale networks deleted. Action timed out." + \
1903
                    normal
1904
                sys.exit(1)
1905
            else:
1906
                time.sleep(query_interval)
1907
    else:
1908
        print >> sys.stderr, "Use --delete-stale to delete them."
1909

    
1910

    
1911
# --------------------------------------------------------------------
1912
# Parse arguments functions
1913
def parse_comma(option, opt, value, parser):
1914
    tests = set(['all', 'auth', 'images', 'flavors',
1915
                 'pithos', 'servers', 'server_spawn',
1916
                 'network_spawn'])
1917
    parse_input = value.split(',')
1918

    
1919
    if not (set(parse_input)).issubset(tests):
1920
        raise OptionValueError("The selected set of tests is invalid")
1921

    
1922
    setattr(parser.values, option.dest, value.split(','))
1923

    
1924

    
1925
def parse_arguments(args):
1926

    
1927
    kw = {}
1928
    kw["usage"] = "%prog [options]"
1929
    kw["description"] = \
1930
        "%prog runs a number of test scenarios on a " \
1931
        "Synnefo deployment."
1932

    
1933
    parser = OptionParser(**kw)
1934
    parser.disable_interspersed_args()
1935

    
1936
    parser.add_option("--auth-url",
1937
                      action="store", type="string", dest="auth_url",
1938
                      help="The AUTH URI to use to reach the Synnefo API",
1939
                      default=None)
1940
    parser.add_option("--plankton-user",
1941
                      action="store", type="string", dest="plankton_user",
1942
                      help="Owner of system images",
1943
                      default=DEFAULT_PLANKTON_USER)
1944
    parser.add_option("--token",
1945
                      action="store", type="string", dest="token",
1946
                      help="The token to use for authentication to the API")
1947
    parser.add_option("--nofailfast",
1948
                      action="store_true", dest="nofailfast",
1949
                      help="Do not fail immediately if one of the tests "
1950
                           "fails (EXPERIMENTAL)",
1951
                      default=False)
1952
    parser.add_option("--no-ipv6",
1953
                      action="store_true", dest="no_ipv6",
1954
                      help="Disables ipv6 related tests",
1955
                      default=False)
1956
    parser.add_option("--action-timeout",
1957
                      action="store", type="int", dest="action_timeout",
1958
                      metavar="TIMEOUT",
1959
                      help="Wait SECONDS seconds for a server action to "
1960
                           "complete, then the test is considered failed",
1961
                      default=100)
1962
    parser.add_option("--build-warning",
1963
                      action="store", type="int", dest="build_warning",
1964
                      metavar="TIMEOUT",
1965
                      help="Warn if TIMEOUT seconds have passed and a "
1966
                           "build operation is still pending",
1967
                      default=600)
1968
    parser.add_option("--build-fail",
1969
                      action="store", type="int", dest="build_fail",
1970
                      metavar="BUILD_TIMEOUT",
1971
                      help="Fail the test if TIMEOUT seconds have passed "
1972
                           "and a build operation is still incomplete",
1973
                      default=900)
1974
    parser.add_option("--query-interval",
1975
                      action="store", type="int", dest="query_interval",
1976
                      metavar="INTERVAL",
1977
                      help="Query server status when requests are pending "
1978
                           "every INTERVAL seconds",
1979
                      default=3)
1980
    parser.add_option("--fanout",
1981
                      action="store", type="int", dest="fanout",
1982
                      metavar="COUNT",
1983
                      help="Spawn up to COUNT child processes to execute "
1984
                           "in parallel, essentially have up to COUNT "
1985
                           "server build requests outstanding (EXPERIMENTAL)",
1986
                      default=1)
1987
    parser.add_option("--force-flavor",
1988
                      action="store", type="int", dest="force_flavorid",
1989
                      metavar="FLAVOR ID",
1990
                      help="Force all server creations to use the specified "
1991
                           "FLAVOR ID instead of a randomly chosen one, "
1992
                           "useful if disk space is scarce",
1993
                      default=None)
1994
    parser.add_option("--image-id",
1995
                      action="store", type="string", dest="force_imageid",
1996
                      metavar="IMAGE ID",
1997
                      help="Test the specified image id, use 'all' to test "
1998
                           "all available images (mandatory argument)",
1999
                      default=None)
2000
    parser.add_option("--show-stale",
2001
                      action="store_true", dest="show_stale",
2002
                      help="Show stale servers from previous runs, whose "
2003
                           "name starts with `%s'" % SNF_TEST_PREFIX,
2004
                      default=False)
2005
    parser.add_option("--delete-stale",
2006
                      action="store_true", dest="delete_stale",
2007
                      help="Delete stale servers from previous runs, whose "
2008
                           "name starts with `%s'" % SNF_TEST_PREFIX,
2009
                      default=False)
2010
    parser.add_option("--force-personality",
2011
                      action="store", type="string", dest="personality_path",
2012
                      help="Force a personality file injection.\
2013
                            File path required. ",
2014
                      default=None)
2015
    parser.add_option("--log-folder",
2016
                      action="store", type="string", dest="log_folder",
2017
                      help="Define the absolute path where the output \
2018
                            log is stored. ",
2019
                      default="/var/log/burnin/")
2020
    parser.add_option("--verbose", "-V",
2021
                      action="store_true", dest="verbose",
2022
                      help="Print detailed output about multiple "
2023
                           "processes spawning",
2024
                      default=False)
2025
    parser.add_option("--set-tests",
2026
                      action="callback",
2027
                      dest="tests",
2028
                      type="string",
2029
                      help='Set comma seperated tests for this run. \
2030
                            Available tests: auth, images, flavors, \
2031
                                             servers, server_spawn, \
2032
                                             network_spawn, pithos. \
2033
                            Default = all',
2034
                      default='all',
2035
                      callback=parse_comma)
2036

    
2037
    (opts, args) = parser.parse_args(args)
2038

    
2039
    # -----------------------
2040
    # Verify arguments
2041

    
2042
    # `delete_stale' implies `show_stale'
2043
    if opts.delete_stale:
2044
        opts.show_stale = True
2045

    
2046
    # `token' is mandatory
2047
    _mandatory_argument(opts.token, "--token")
2048
    # `auth_url' is mandatory
2049
    _mandatory_argument(opts.auth_url, "--auth-url")
2050

    
2051
    if not opts.show_stale:
2052
        # `image-id' is mandatory
2053
        _mandatory_argument(opts.force_imageid, "--image-id")
2054
        if opts.force_imageid != 'all':
2055
            try:
2056
                opts.force_imageid = str(opts.force_imageid)
2057
            except ValueError:
2058
                print >>sys.stderr, red + \
2059
                    "Invalid value specified for" + \
2060
                    "--image-id. Use a valid id, or `all'." + \
2061
                    normal
2062
                sys.exit(1)
2063

    
2064
    return (opts, args)
2065

    
2066

    
2067
def _mandatory_argument(Arg, Str):
2068
    if (Arg is None) or (Arg == ""):
2069
        print >>sys.stderr, red + \
2070
            "The " + Str + " argument is mandatory.\n" + \
2071
            normal
2072
        sys.exit(1)
2073

    
2074

    
2075
# --------------------------------------------------------------------
2076
# Burnin main function
2077
def main():
2078
    """Assemble test cases into a test suite, and run it
2079

2080
    IMPORTANT: Tests have dependencies and have to be run in the specified
2081
    order inside a single test case. They communicate through attributes of the
2082
    corresponding TestCase class (shared fixtures). Distinct subclasses of
2083
    TestCase MAY SHARE NO DATA, since they are run in parallel, in distinct
2084
    test runner processes.
2085

2086
    """
2087

    
2088
    # Parse arguments using `optparse'
2089
    (opts, args) = parse_arguments(sys.argv[1:])
2090

    
2091
    # Some global variables
2092
    global AUTH_URL, TOKEN, PLANKTON_USER
2093
    global NO_IPV6, VERBOSE, NOFAILFAST
2094
    AUTH_URL = opts.auth_url
2095
    TOKEN = opts.token
2096
    PLANKTON_USER = opts.plankton_user
2097
    NO_IPV6 = opts.no_ipv6
2098
    VERBOSE = opts.verbose
2099
    NOFAILFAST = opts.nofailfast
2100

    
2101
    # If `show_stale', cleanup stale servers
2102
    # from previous runs and exit
2103
    if opts.show_stale:
2104
        # We must clean the servers first
2105
        cleanup_servers(opts.action_timeout, opts.query_interval,
2106
                        delete_stale=opts.delete_stale)
2107
        cleanup_networks(opts.action_timeout, opts.query_interval,
2108
                         delete_stale=opts.delete_stale)
2109
        return 0
2110

    
2111
    # Initialize a kamaki instance, get flavors, images
2112
    astakos_client = AstakosClient(AUTH_URL, TOKEN)
2113
    # Compute Client
2114
    compute_url = astakos_client.get_service_endpoints('compute')['publicURL']
2115
    compute_client = ComputeClient(compute_url, TOKEN)
2116
    DIMAGES = compute_client.list_images(detail=True)
2117
    DFLAVORS = compute_client.list_flavors(detail=True)
2118

    
2119
    # FIXME: logging, log, LOG PID, TEST_RUN_ID, arguments
2120
    # Run them: FIXME: In parallel, FAILEARLY, catchbreak?
2121
    #unittest.main(verbosity=2, catchbreak=True)
2122

    
2123
    # Get a list of images we are going to test
2124
    if opts.force_imageid == 'all':
2125
        test_images = DIMAGES
2126
    else:
2127
        test_images = filter(lambda x: x["id"] == opts.force_imageid, DIMAGES)
2128

    
2129
    # Create output (logging) folder
2130
    if not os.path.exists(opts.log_folder):
2131
        os.mkdir(opts.log_folder)
2132
    test_folder = os.path.join(opts.log_folder, TEST_RUN_ID)
2133
    os.mkdir(test_folder)
2134

    
2135
    for image in test_images:
2136
        imageid = str(image["id"])
2137
        imagename = image["name"]
2138
        # Choose a flavor (given from user or random)
2139
        if opts.force_flavorid:
2140
            flavorid = opts.force_flavorid
2141
        else:
2142
            flavorid = choice([f["id"] for f in DFLAVORS if f["disk"] >= 20])
2143
        # Personality dictionary for file injection test
2144
        if opts.personality_path is not None:
2145
            f = open(opts.personality_path)
2146
            content = b64encode(f.read())
2147
            personality = []
2148
            st = os.stat(opts.personality_path)
2149
            personality.append({
2150
                'path': '/root/test_inj_file',
2151
                'owner': 'root',
2152
                'group': 'root',
2153
                'mode': 0x7777 & st.st_mode,
2154
                'contents': content})
2155
        else:
2156
            personality = None
2157
        # Give a name to our test servers
2158
        servername = "%s%s for %s" % (SNF_TEST_PREFIX, TEST_RUN_ID, imagename)
2159
        is_windows = imagename.lower().find("windows") >= 0
2160

    
2161
        # Create Server TestCases
2162
        ServerTestCase = _spawn_server_test_case(
2163
            imageid=imageid,
2164
            flavorid=flavorid,
2165
            imagename=imagename,
2166
            personality=personality,
2167
            servername=servername,
2168
            is_windows=is_windows,
2169
            action_timeout=opts.action_timeout,
2170
            build_warning=opts.build_warning,
2171
            build_fail=opts.build_fail,
2172
            query_interval=opts.query_interval)
2173
        # Create Network TestCases
2174
        NetworkTestCase = _spawn_network_test_case(
2175
            action_timeout=opts.action_timeout,
2176
            imageid=imageid,
2177
            flavorid=flavorid,
2178
            imagename=imagename,
2179
            query_interval=opts.query_interval)
2180
        # Create Images TestCase
2181
        CImagesTestCase = _images_test_case(
2182
            action_timeout=opts.action_timeout,
2183
            imageid=imageid,
2184
            flavorid=flavorid,
2185
            imagename=imagename,
2186
            query_interval=opts.query_interval)
2187

    
2188
        # Choose the tests we are going to run
2189
        test_dict = {'auth': UnauthorizedTestCase,
2190
                     'images': CImagesTestCase,
2191
                     'flavors': FlavorsTestCase,
2192
                     'servers': ServersTestCase,
2193
                     'pithos': PithosTestCase,
2194
                     'server_spawn': ServerTestCase,
2195
                     'network_spawn': NetworkTestCase}
2196
        seq_cases = []
2197
        if 'all' in opts.tests:
2198
            seq_cases = [UnauthorizedTestCase, CImagesTestCase,
2199
                         FlavorsTestCase, ServersTestCase,
2200
                         PithosTestCase, ServerTestCase,
2201
                         NetworkTestCase]
2202
        else:
2203
            for test in opts.tests:
2204
                seq_cases.append(test_dict[test])
2205

    
2206
        # Folder for each image
2207
        image_folder = os.path.join(test_folder, imageid)
2208
        os.mkdir(image_folder)
2209

    
2210
        # Run each test
2211
        if opts.fanout > 1:
2212
            _run_cases_in_parallel(seq_cases, opts.fanout, image_folder)
2213
        else:
2214
            _run_cases_in_series(seq_cases, image_folder)
2215

    
2216

    
2217
# --------------------------------------------------------------------
2218
# Call main
2219
if __name__ == "__main__":
2220
    sys.exit(main())