Statistics
| Branch: | Tag: | Revision:

root / snf-tools / synnefo_tools / burnin.py @ 89af2bbd

History | View | Annotate | Download (81.8 kB)

1
#!/usr/bin/env python
2

    
3
# Copyright 2011 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35

    
36
"""Perform integration testing on a running Synnefo deployment"""
37

    
38
#import __main__
39
import datetime
40
import inspect
41
import logging
42
import os
43
import os.path
44
import paramiko
45
import prctl
46
import subprocess
47
import signal
48
import socket
49
import sys
50
import time
51
import tempfile
52
from base64 import b64encode
53
from IPy import IP
54
from multiprocessing import Process, Queue
55
from random import choice, randint
56
from optparse import OptionParser, OptionValueError
57

    
58
from kamaki.clients.compute import ComputeClient
59
from kamaki.clients.cyclades import CycladesClient
60
from kamaki.clients.image import ImageClient
61
from kamaki.clients.pithos import PithosClient
62
from kamaki.clients.astakos import AstakosClient
63
from kamaki.clients import ClientError
64

    
65
from vncauthproxy.d3des import generate_response as d3des_generate_response
66

    
67
# Use backported unittest functionality if Python < 2.7
68
try:
69
    import unittest2 as unittest
70
except ImportError:
71
    if sys.version_info < (2, 7):
72
        raise Exception("The unittest2 package is required for Python < 2.7")
73
    import unittest
74

    
75
# --------------------------------------------------------------------
76
# Global Variables
77
AUTH_URL = None
78
TOKEN = None
79
PLANKTON_USER = None
80
NO_IPV6 = None
81
DEFAULT_PLANKTON_USER = "images@okeanos.grnet.gr"
82
NOFAILFAST = None
83
VERBOSE = None
84

    
85
# A unique id identifying this test run
86
TEST_RUN_ID = datetime.datetime.strftime(datetime.datetime.now(),
87
                                         "%Y%m%d%H%M%S")
88
SNF_TEST_PREFIX = "snf-test-"
89

    
90
red = '\x1b[31m'
91
yellow = '\x1b[33m'
92
green = '\x1b[32m'
93
normal = '\x1b[0m'
94

    
95

    
96
# --------------------------------------------------------------------
97
# Global functions
98
def _ssh_execute(hostip, username, password, command):
99
    """Execute a command via ssh"""
100
    ssh = paramiko.SSHClient()
101
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
102
    try:
103
        ssh.connect(hostip, username=username, password=password)
104
    except socket.error, err:
105
        raise AssertionError(err)
106
    try:
107
        stdin, stdout, stderr = ssh.exec_command(command)
108
    except paramiko.SSHException, err:
109
        raise AssertionError(err)
110
    status = stdout.channel.recv_exit_status()
111
    output = stdout.readlines()
112
    ssh.close()
113
    return output, status
114

    
115

    
116
def _get_user_id():
117
    """Authenticate to astakos and get unique users id"""
118
    astakos = AstakosClient(AUTH_URL, TOKEN)
119
    astakos.CONNECTION_RETRY_LIMIT = 2
120
    authenticate = astakos.authenticate()
121
    return authenticate['access']['user']['id']
122

    
123

    
124
# --------------------------------------------------------------------
125
# BurninTestReulst class
126
class BurninTestResult(unittest.TextTestResult):
127
    def addSuccess(self, test):
128
        super(BurninTestResult, self).addSuccess(test)
129
        if self.showAll:
130
            if hasattr(test, 'result_dict'):
131
                run_details = test.result_dict
132

    
133
                self.stream.write("\n")
134
                for i in run_details:
135
                    self.stream.write("%s : %s \n" % (i, run_details[i]))
136
                self.stream.write("\n")
137

    
138
        elif self.dots:
139
            self.stream.write('.')
140
            self.stream.flush()
141

    
142
    def addError(self, test, err):
143
        super(BurninTestResult, self).addError(test, err)
144
        if self.showAll:
145
            self.stream.writeln("ERROR")
146
            if hasattr(test, 'result_dict'):
147
                run_details = test.result_dict
148

    
149
                self.stream.write("\n")
150
                for i in run_details:
151
                    self.stream.write("%s : %s \n" % (i, run_details[i]))
152
                self.stream.write("\n")
153

    
154
        elif self.dots:
155
            self.stream.write('E')
156
            self.stream.flush()
157

    
158
    def addFailure(self, test, err):
159
        super(BurninTestResult, self).addFailure(test, err)
160
        if self.showAll:
161
            self.stream.writeln("FAIL")
162
            if hasattr(test, 'result_dict'):
163
                run_details = test.result_dict
164

    
165
                self.stream.write("\n")
166
                for i in run_details:
167
                    self.stream.write("%s : %s \n" % (i, run_details[i]))
168
                self.stream.write("\n")
169

    
170
        elif self.dots:
171
            self.stream.write('F')
172
            self.stream.flush()
173

    
174

    
175
# --------------------------------------------------------------------
176
# Format Results
177
class burninFormatter(logging.Formatter):
178
    err_fmt = red + "ERROR: %(msg)s" + normal
179
    dbg_fmt = green + "* %(msg)s" + normal
180
    info_fmt = "%(msg)s"
181

    
182
    def __init__(self, fmt="%(levelno)s: %(msg)s"):
183
        logging.Formatter.__init__(self, fmt)
184

    
185
    def format(self, record):
186
        format_orig = self._fmt
187
        # Replace the original format with one customized by logging level
188
        if record.levelno == 10:    # DEBUG
189
            self._fmt = burninFormatter.dbg_fmt
190
        elif record.levelno == 20:  # INFO
191
            self._fmt = burninFormatter.info_fmt
192
        elif record.levelno == 40:  # ERROR
193
            self._fmt = burninFormatter.err_fmt
194
        result = logging.Formatter.format(self, record)
195
        self._fmt = format_orig
196
        return result
197

    
198
log = logging.getLogger("burnin")
199
log.setLevel(logging.DEBUG)
200
handler = logging.StreamHandler()
201
handler.setFormatter(burninFormatter())
202
log.addHandler(handler)
203

    
204

    
205
# --------------------------------------------------------------------
206
# UnauthorizedTestCase class
207
class UnauthorizedTestCase(unittest.TestCase):
208
    """Test unauthorized access"""
209
    @classmethod
210
    def setUpClass(cls):
211
        cls.astakos = AstakosClient(AUTH_URL, TOKEN)
212
        cls.astakos.CONNECTION_RETRY_LIMIT = 2
213
        cls.compute_url = \
214
            cls.astakos.get_service_endpoints('compute')['publicURL']
215
        cls.result_dict = dict()
216

    
217
    def test_unauthorized_access(self):
218
        """Test access without a valid token fails"""
219
        log.info("Authentication test")
220
        falseToken = '12345'
221
        c = ComputeClient(self.compute_url, falseToken)
222
        c.CONNECTION_RETRY_LIMIT = 2
223

    
224
        with self.assertRaises(ClientError) as cm:
225
            c.list_servers()
226
            self.assertEqual(cm.exception.status, 401)
227

    
228

    
229
# --------------------------------------------------------------------
230
# This class gest replicated into Images TestCases dynamically
231
class ImagesTestCase(unittest.TestCase):
232
    """Test image lists for consistency"""
233
    @classmethod
234
    def setUpClass(cls):
235
        """Initialize kamaki, get (detailed) list of images"""
236
        log.info("Getting simple and detailed list of images")
237
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
238
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
239
        # Compute Client
240
        compute_url = \
241
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
242
        cls.compute_client = ComputeClient(compute_url, TOKEN)
243
        cls.compute_client.CONNECTION_RETRY_LIMIT = 2
244
        # Image Client
245
        image_url = \
246
            cls.astakos_client.get_service_endpoints('image')['publicURL']
247
        cls.image_client = ImageClient(image_url, TOKEN)
248
        cls.image_client.CONNECTION_RETRY_LIMIT = 2
249
        # Pithos Client
250
        pithos_url = cls.astakos_client.\
251
            get_service_endpoints('object-store')['publicURL']
252
        cls.pithos_client = PithosClient(pithos_url, TOKEN)
253
        cls.pithos_client.CONNECTION_RETRY_LIMIT = 2
254

    
255
        # Get images
256
        cls.images = \
257
            filter(lambda x: not x['name'].startswith(SNF_TEST_PREFIX),
258
                   cls.image_client.list_public())
259
        cls.dimages = \
260
            filter(lambda x: not x['name'].startswith(SNF_TEST_PREFIX),
261
                   cls.image_client.list_public(detail=True))
262
        cls.result_dict = dict()
263
        # Get uniq user id
264
        cls.uuid = _get_user_id()
265
        log.info("Uniq user id = %s" % cls.uuid)
266
        # Create temp directory and store it inside our class
267
        # XXX: In my machine /tmp has not enough space
268
        #      so use current directory to be sure.
269
        cls.temp_dir = tempfile.mkdtemp(dir=os.getcwd())
270
        cls.temp_image_name = \
271
            SNF_TEST_PREFIX + cls.imageid + ".diskdump"
272

    
273
    @classmethod
274
    def tearDownClass(cls):
275
        """Remove local files"""
276
        try:
277
            temp_file = os.path.join(cls.temp_dir, cls.temp_image_name)
278
            os.unlink(temp_file)
279
        except:
280
            pass
281
        try:
282
            os.rmdir(cls.temp_dir)
283
        except:
284
            pass
285

    
286
    def test_001_list_images(self):
287
        """Test image list actually returns images"""
288
        self.assertGreater(len(self.images), 0)
289

    
290
    def test_002_list_images_detailed(self):
291
        """Test detailed image list is the same length as list"""
292
        self.assertEqual(len(self.dimages), len(self.images))
293

    
294
    def test_003_same_image_names(self):
295
        """Test detailed and simple image list contain same names"""
296
        names = sorted(map(lambda x: x["name"], self.images))
297
        dnames = sorted(map(lambda x: x["name"], self.dimages))
298
        self.assertEqual(names, dnames)
299

    
300
# XXX: Find a way to resolve owner's uuid to username.
301
#      (maybe use astakosclient)
302
#    def test_004_unique_image_names(self):
303
#        """Test system images have unique names"""
304
#        sys_images = filter(lambda x: x['owner'] == PLANKTON_USER,
305
#                            self.dimages)
306
#        names = sorted(map(lambda x: x["name"], sys_images))
307
#        self.assertEqual(sorted(list(set(names))), names)
308

    
309
    def test_005_image_metadata(self):
310
        """Test every image has specific metadata defined"""
311
        keys = frozenset(["osfamily", "root_partition"])
312
        details = self.compute_client.list_images(detail=True)
313
        for i in details:
314
            self.assertTrue(keys.issubset(i["metadata"].keys()))
315

    
316
    def test_006_download_image(self):
317
        """Download image from pithos+"""
318
        # Get image location
319
        image = filter(
320
            lambda x: x['id'] == self.imageid, self.dimages)[0]
321
        image_location = \
322
            image['location'].replace("://", " ").replace("/", " ").split()
323
        log.info("Download image, with owner %s\n\tcontainer %s, and name %s"
324
                 % (image_location[1], image_location[2], image_location[3]))
325
        self.pithos_client.account = image_location[1]
326
        self.pithos_client.container = image_location[2]
327
        temp_file = os.path.join(self.temp_dir, self.temp_image_name)
328
        with open(temp_file, "wb+") as f:
329
            self.pithos_client.download_object(image_location[3], f)
330

    
331
    def test_007_upload_image(self):
332
        """Upload and register image"""
333
        temp_file = os.path.join(self.temp_dir, self.temp_image_name)
334
        log.info("Upload image to pithos+")
335
        # Create container `images'
336
        self.pithos_client.account = self.uuid
337
        self.pithos_client.container = "images"
338
        self.pithos_client.container_put()
339
        with open(temp_file, "rb+") as f:
340
            self.pithos_client.upload_object(self.temp_image_name, f)
341
        log.info("Register image to plankton")
342
        location = "pithos://" + self.uuid + \
343
            "/images/" + self.temp_image_name
344
        params = {'is_public': True}
345
        properties = {'OSFAMILY': "linux", 'ROOT_PARTITION': 1}
346
        self.image_client.register(
347
            self.temp_image_name, location, params, properties)
348
        # Get image id
349
        details = self.image_client.list_public(detail=True)
350
        detail = filter(lambda x: x['location'] == location, details)
351
        self.assertEqual(len(detail), 1)
352
        cls = type(self)
353
        cls.temp_image_id = detail[0]['id']
354
        log.info("Image registered with id %s" % detail[0]['id'])
355

    
356
    def test_008_cleanup_image(self):
357
        """Cleanup image test"""
358
        log.info("Cleanup image test")
359
        # Remove image from pithos+
360
        self.pithos_client.account = self.uuid
361
        self.pithos_client.container = "images"
362
        self.pithos_client.del_object(self.temp_image_name)
363

    
364

    
365
# --------------------------------------------------------------------
366
# FlavorsTestCase class
367
class FlavorsTestCase(unittest.TestCase):
368
    """Test flavor lists for consistency"""
369
    @classmethod
370
    def setUpClass(cls):
371
        """Initialize kamaki, get (detailed) list of flavors"""
372
        log.info("Getting simple and detailed list of flavors")
373
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
374
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
375
        # Compute Client
376
        compute_url = \
377
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
378
        cls.compute_client = ComputeClient(compute_url, TOKEN)
379
        cls.compute_client.CONNECTION_RETRY_LIMIT = 2
380
        cls.flavors = cls.compute_client.list_flavors()
381
        cls.dflavors = cls.compute_client.list_flavors(detail=True)
382
        cls.result_dict = dict()
383

    
384
    def test_001_list_flavors(self):
385
        """Test flavor list actually returns flavors"""
386
        self.assertGreater(len(self.flavors), 0)
387

    
388
    def test_002_list_flavors_detailed(self):
389
        """Test detailed flavor list is the same length as list"""
390
        self.assertEquals(len(self.dflavors), len(self.flavors))
391

    
392
    def test_003_same_flavor_names(self):
393
        """Test detailed and simple flavor list contain same names"""
394
        names = sorted(map(lambda x: x["name"], self.flavors))
395
        dnames = sorted(map(lambda x: x["name"], self.dflavors))
396
        self.assertEqual(names, dnames)
397

    
398
    def test_004_unique_flavor_names(self):
399
        """Test flavors have unique names"""
400
        names = sorted(map(lambda x: x["name"], self.flavors))
401
        self.assertEqual(sorted(list(set(names))), names)
402

    
403
    def test_005_well_formed_flavor_names(self):
404
        """Test flavors have names of the form CxxRyyDzz
405
        Where xx is vCPU count, yy is RAM in MiB, zz is Disk in GiB
406
        """
407
        for f in self.dflavors:
408
            flavor = (f["vcpus"], f["ram"], f["disk"], f["SNF:disk_template"])
409
            self.assertEqual("C%dR%dD%d%s" % flavor,
410
                             f["name"],
411
                             "Flavor %s does not match its specs." % f["name"])
412

    
413

    
414
# --------------------------------------------------------------------
415
# ServersTestCase class
416
class ServersTestCase(unittest.TestCase):
417
    """Test server lists for consistency"""
418
    @classmethod
419
    def setUpClass(cls):
420
        """Initialize kamaki, get (detailed) list of servers"""
421
        log.info("Getting simple and detailed list of servers")
422

    
423
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
424
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
425
        # Compute Client
426
        compute_url = \
427
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
428
        cls.compute_client = ComputeClient(compute_url, TOKEN)
429
        cls.compute_client.CONNECTION_RETRY_LIMIT = 2
430
        cls.servers = cls.compute_client.list_servers()
431
        cls.dservers = cls.compute_client.list_servers(detail=True)
432
        cls.result_dict = dict()
433

    
434
    # def test_001_list_servers(self):
435
    #     """Test server list actually returns servers"""
436
    #     self.assertGreater(len(self.servers), 0)
437

    
438
    def test_002_list_servers_detailed(self):
439
        """Test detailed server list is the same length as list"""
440
        self.assertEqual(len(self.dservers), len(self.servers))
441

    
442
    def test_003_same_server_names(self):
443
        """Test detailed and simple flavor list contain same names"""
444
        names = sorted(map(lambda x: x["name"], self.servers))
445
        dnames = sorted(map(lambda x: x["name"], self.dservers))
446
        self.assertEqual(names, dnames)
447

    
448

    
449
# --------------------------------------------------------------------
450
# Pithos Test Cases
451
class PithosTestCase(unittest.TestCase):
452
    """Test pithos functionality"""
453
    @classmethod
454
    def setUpClass(cls):
455
        """Initialize kamaki, get list of containers"""
456
        # Get uniq user id
457
        cls.uuid = _get_user_id()
458
        log.info("Uniq user id = %s" % cls.uuid)
459
        log.info("Getting list of containers")
460

    
461
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
462
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
463
        # Pithos Client
464
        pithos_url = cls.astakos_client.\
465
            get_service_endpoints('object-store')['publicURL']
466
        cls.pithos_client = PithosClient(pithos_url, TOKEN, cls.uuid)
467
        cls.pithos_client.CONNECTION_RETRY_LIMIT = 2
468

    
469
        cls.containers = cls.pithos_client.list_containers()
470
        cls.result_dict = dict()
471

    
472
    def test_001_list_containers(self):
473
        """Test container list actually returns containers"""
474
        self.assertGreater(len(self.containers), 0)
475

    
476
    def test_002_unique_containers(self):
477
        """Test if containers have unique names"""
478
        names = [n['name'] for n in self.containers]
479
        names = sorted(names)
480
        self.assertEqual(sorted(list(set(names))), names)
481

    
482
    def test_003_create_container(self):
483
        """Test create a container"""
484
        rand_num = randint(1000, 9999)
485
        rand_name = "%s%s" % (SNF_TEST_PREFIX, rand_num)
486
        names = [n['name'] for n in self.containers]
487
        while rand_name in names:
488
            rand_num = randint(1000, 9999)
489
            rand_name = "%s%s" % (SNF_TEST_PREFIX, rand_num)
490
        # Create container
491
        self.pithos_client.container = rand_name
492
        self.pithos_client.container_put()
493
        # Get list of containers
494
        new_containers = self.pithos_client.list_containers()
495
        new_container_names = [n['name'] for n in new_containers]
496
        self.assertIn(rand_name, new_container_names)
497

    
498
    def test_004_upload(self):
499
        """Test uploading something to pithos+"""
500
        # Create a tmp file
501
        with tempfile.TemporaryFile() as f:
502
            f.write("This is a temp file")
503
            f.seek(0, 0)
504
            # Where to save file
505
            self.pithos_client.upload_object("test.txt", f)
506

    
507
    def test_005_download(self):
508
        """Test download something from pithos+"""
509
        # Create tmp directory to save file
510
        tmp_dir = tempfile.mkdtemp()
511
        tmp_file = os.path.join(tmp_dir, "test.txt")
512
        with open(tmp_file, "wb+") as f:
513
            self.pithos_client.download_object("test.txt", f)
514
            # Read file
515
            f.seek(0, 0)
516
            content = f.read()
517
        # Remove files
518
        os.unlink(tmp_file)
519
        os.rmdir(tmp_dir)
520
        # Compare results
521
        self.assertEqual(content, "This is a temp file")
522

    
523
    def test_006_remove(self):
524
        """Test removing files and containers"""
525
        cont_name = self.pithos_client.container
526
        self.pithos_client.del_object("test.txt")
527
        self.pithos_client.purge_container()
528
        # List containers
529
        containers = self.pithos_client.list_containers()
530
        cont_names = [n['name'] for n in containers]
531
        self.assertNotIn(cont_name, cont_names)
532

    
533

    
534
# --------------------------------------------------------------------
535
# This class gets replicated into actual TestCases dynamically
536
class SpawnServerTestCase(unittest.TestCase):
537
    """Test scenario for server of the specified image"""
538
    @classmethod
539
    def setUpClass(cls):
540
        """Initialize a kamaki instance"""
541
        log.info("Spawning server for image `%s'" % cls.imagename)
542

    
543
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
544
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
545
        # Cyclades Client
546
        compute_url = \
547
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
548
        cls.cyclades_client = CycladesClient(compute_url, TOKEN)
549
        cls.cyclades_client.CONNECTION_RETRY_LIMIT = 2
550

    
551
        cls.result_dict = dict()
552

    
553
    def _get_ipv4(self, server):
554
        """Get the public IPv4 of a server from the detailed server info"""
555

    
556
        nics = server["attachments"]
557

    
558
        for nic in nics:
559
            net_id = nic["network_id"]
560
            if self.cyclades_client.get_network_details(net_id)["public"]:
561
                public_addrs = nic["ipv4"]
562

    
563
        self.assertTrue(public_addrs is not None)
564

    
565
        return public_addrs
566

    
567
    def _get_ipv6(self, server):
568
        """Get the public IPv6 of a server from the detailed server info"""
569

    
570
        nics = server["attachments"]
571

    
572
        for nic in nics:
573
            net_id = nic["network_id"]
574
            if self.cyclades_client.get_network_details(net_id)["public"]:
575
                public_addrs = nic["ipv6"]
576

    
577
        self.assertTrue(public_addrs is not None)
578

    
579
        return public_addrs
580

    
581
    def _connect_loginname(self, os_value):
582
        """Return the login name for connections based on the server OS"""
583
        if os_value in ("Ubuntu", "Kubuntu", "Fedora"):
584
            return "user"
585
        elif os_value in ("windows", "windows_alpha1"):
586
            return "Administrator"
587
        else:
588
            return "root"
589

    
590
    def _verify_server_status(self, current_status, new_status):
591
        """Verify a server has switched to a specified status"""
592
        server = self.cyclades_client.get_server_details(self.serverid)
593
        if server["status"] not in (current_status, new_status):
594
            return None  # Do not raise exception, return so the test fails
595
        self.assertEquals(server["status"], new_status)
596

    
597
    def _get_connected_tcp_socket(self, family, host, port):
598
        """Get a connected socket from the specified family to host:port"""
599
        sock = None
600
        for res in \
601
            socket.getaddrinfo(host, port, family, socket.SOCK_STREAM, 0,
602
                               socket.AI_PASSIVE):
603
            af, socktype, proto, canonname, sa = res
604
            try:
605
                sock = socket.socket(af, socktype, proto)
606
            except socket.error:
607
                sock = None
608
                continue
609
            try:
610
                sock.connect(sa)
611
            except socket.error:
612
                sock.close()
613
                sock = None
614
                continue
615
        self.assertIsNotNone(sock)
616
        return sock
617

    
618
    def _ping_once(self, ipv6, ip):
619
        """Test server responds to a single IPv4 or IPv6 ping"""
620
        cmd = "ping%s -c 2 -w 3 %s" % ("6" if ipv6 else "", ip)
621
        ping = subprocess.Popen(cmd, shell=True,
622
                                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
623
        (stdout, stderr) = ping.communicate()
624
        ret = ping.wait()
625
        self.assertEquals(ret, 0)
626

    
627
    def _get_hostname_over_ssh(self, hostip, username, password):
628
        lines, status = _ssh_execute(
629
            hostip, username, password, "hostname")
630
        self.assertEqual(len(lines), 1)
631
        return lines[0]
632

    
633
    def _try_until_timeout_expires(self, warn_timeout, fail_timeout,
634
                                   opmsg, callable, *args, **kwargs):
635
        if warn_timeout == fail_timeout:
636
            warn_timeout = fail_timeout + 1
637
        warn_tmout = time.time() + warn_timeout
638
        fail_tmout = time.time() + fail_timeout
639
        while True:
640
            self.assertLess(time.time(), fail_tmout,
641
                            "operation `%s' timed out" % opmsg)
642
            if time.time() > warn_tmout:
643
                log.warning("Server %d: `%s' operation `%s' not done yet",
644
                            self.serverid, self.servername, opmsg)
645
            try:
646
                log.info("%s... " % opmsg)
647
                return callable(*args, **kwargs)
648
            except AssertionError:
649
                pass
650
            time.sleep(self.query_interval)
651

    
652
    def _insist_on_tcp_connection(self, family, host, port):
653
        familystr = {socket.AF_INET: "IPv4", socket.AF_INET6: "IPv6",
654
                     socket.AF_UNSPEC: "Unspecified-IPv4/6"}
655
        msg = "connect over %s to %s:%s" % \
656
              (familystr.get(family, "Unknown"), host, port)
657
        sock = self._try_until_timeout_expires(
658
            self.action_timeout, self.action_timeout,
659
            msg, self._get_connected_tcp_socket,
660
            family, host, port)
661
        return sock
662

    
663
    def _insist_on_status_transition(self, current_status, new_status,
664
                                     fail_timeout, warn_timeout=None):
665
        msg = "Server %d: `%s', waiting for %s -> %s" % \
666
              (self.serverid, self.servername, current_status, new_status)
667
        if warn_timeout is None:
668
            warn_timeout = fail_timeout
669
        self._try_until_timeout_expires(warn_timeout, fail_timeout,
670
                                        msg, self._verify_server_status,
671
                                        current_status, new_status)
672
        # Ensure the status is actually the expected one
673
        server = self.cyclades_client.get_server_details(self.serverid)
674
        self.assertEquals(server["status"], new_status)
675

    
676
    def _insist_on_ssh_hostname(self, hostip, username, password):
677
        msg = "SSH to %s, as %s/%s" % (hostip, username, password)
678
        hostname = self._try_until_timeout_expires(
679
            self.action_timeout, self.action_timeout,
680
            msg, self._get_hostname_over_ssh,
681
            hostip, username, password)
682

    
683
        # The hostname must be of the form 'prefix-id'
684
        self.assertTrue(hostname.endswith("-%d\n" % self.serverid))
685

    
686
    def _check_file_through_ssh(self, hostip, username, password,
687
                                remotepath, content):
688
        msg = "Trying file injection through SSH to %s, as %s/%s" % \
689
            (hostip, username, password)
690
        log.info(msg)
691
        try:
692
            ssh = paramiko.SSHClient()
693
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
694
            ssh.connect(hostip, username=username, password=password)
695
            ssh.close()
696
        except socket.error, err:
697
            raise AssertionError(err)
698

    
699
        transport = paramiko.Transport((hostip, 22))
700
        transport.connect(username=username, password=password)
701

    
702
        localpath = '/tmp/' + SNF_TEST_PREFIX + 'injection'
703
        sftp = paramiko.SFTPClient.from_transport(transport)
704
        sftp.get(remotepath, localpath)
705
        sftp.close()
706
        transport.close()
707

    
708
        f = open(localpath)
709
        remote_content = b64encode(f.read())
710

    
711
        # Check if files are the same
712
        return (remote_content == content)
713

    
714
    def _skipIf(self, condition, msg):
715
        if condition:
716
            self.skipTest(msg)
717

    
718
    def test_001_submit_create_server(self):
719
        """Test submit create server request"""
720

    
721
        log.info("Submit new server request")
722
        server = self.cyclades_client.create_server(
723
            self.servername, self.flavorid, self.imageid, self.personality)
724

    
725
        log.info("Server id: " + str(server["id"]))
726
        log.info("Server password: " + server["adminPass"])
727
        self.assertEqual(server["name"], self.servername)
728
        self.assertEqual(server["flavor"]["id"], self.flavorid)
729
        self.assertEqual(server["image"]["id"], self.imageid)
730
        self.assertEqual(server["status"], "BUILD")
731

    
732
        # Update class attributes to reflect data on building server
733
        cls = type(self)
734
        cls.serverid = server["id"]
735
        cls.username = None
736
        cls.passwd = server["adminPass"]
737

    
738
        self.result_dict["Server ID"] = str(server["id"])
739
        self.result_dict["Password"] = str(server["adminPass"])
740

    
741
    def test_002a_server_is_building_in_list(self):
742
        """Test server is in BUILD state, in server list"""
743
        log.info("Server in BUILD state in server list")
744

    
745
        self.result_dict.clear()
746

    
747
        servers = self.cyclades_client.list_servers(detail=True)
748
        servers = filter(lambda x: x["name"] == self.servername, servers)
749

    
750
        server = servers[0]
751
        self.assertEqual(server["name"], self.servername)
752
        self.assertEqual(server["flavor"]["id"], self.flavorid)
753
        self.assertEqual(server["image"]["id"], self.imageid)
754
        self.assertEqual(server["status"], "BUILD")
755

    
756
    def test_002b_server_is_building_in_details(self):
757
        """Test server is in BUILD state, in details"""
758

    
759
        log.info("Server in BUILD state in details")
760

    
761
        server = self.cyclades_client.get_server_details(self.serverid)
762
        self.assertEqual(server["name"], self.servername)
763
        self.assertEqual(server["flavor"]["id"], self.flavorid)
764
        self.assertEqual(server["image"]["id"], self.imageid)
765
        self.assertEqual(server["status"], "BUILD")
766

    
767
    def test_002c_set_server_metadata(self):
768

    
769
        log.info("Creating server metadata")
770

    
771
        image = self.cyclades_client.get_image_details(self.imageid)
772
        os_value = image["metadata"]["os"]
773
        users = image["metadata"].get("users", None)
774
        self.cyclades_client.update_server_metadata(self.serverid, OS=os_value)
775

    
776
        userlist = users.split()
777

    
778
        # Determine the username to use for future connections
779
        # to this host
780
        cls = type(self)
781

    
782
        if "root" in userlist:
783
            cls.username = "root"
784
        elif users is None:
785
            cls.username = self._connect_loginname(os_value)
786
        else:
787
            cls.username = choice(userlist)
788

    
789
        self.assertIsNotNone(cls.username)
790

    
791
    def test_002d_verify_server_metadata(self):
792
        """Test server metadata keys are set based on image metadata"""
793

    
794
        log.info("Verifying image metadata")
795

    
796
        servermeta = self.cyclades_client.get_server_metadata(self.serverid)
797
        imagemeta = self.cyclades_client.get_image_metadata(self.imageid)
798

    
799
        self.assertEqual(servermeta["OS"], imagemeta["os"])
800

    
801
    def test_003_server_becomes_active(self):
802
        """Test server becomes ACTIVE"""
803

    
804
        log.info("Waiting for server to become ACTIVE")
805

    
806
        self._insist_on_status_transition(
807
            "BUILD", "ACTIVE", self.build_fail, self.build_warning)
808

    
809
    def test_003a_get_server_oob_console(self):
810
        """Test getting OOB server console over VNC
811

812
        Implementation of RFB protocol follows
813
        http://www.realvnc.com/docs/rfbproto.pdf.
814

815
        """
816
        console = self.cyclades_client.get_server_console(self.serverid)
817
        self.assertEquals(console['type'], "vnc")
818
        sock = self._insist_on_tcp_connection(
819
            socket.AF_INET, console["host"], console["port"])
820

    
821
        # Step 1. ProtocolVersion message (par. 6.1.1)
822
        version = sock.recv(1024)
823
        self.assertEquals(version, 'RFB 003.008\n')
824
        sock.send(version)
825

    
826
        # Step 2. Security (par 6.1.2): Only VNC Authentication supported
827
        sec = sock.recv(1024)
828
        self.assertEquals(list(sec), ['\x01', '\x02'])
829

    
830
        # Step 3. Request VNC Authentication (par 6.1.2)
831
        sock.send('\x02')
832

    
833
        # Step 4. Receive Challenge (par 6.2.2)
834
        challenge = sock.recv(1024)
835
        self.assertEquals(len(challenge), 16)
836

    
837
        # Step 5. DES-Encrypt challenge, use password as key (par 6.2.2)
838
        response = d3des_generate_response(
839
            (console["password"] + '\0' * 8)[:8], challenge)
840
        sock.send(response)
841

    
842
        # Step 6. SecurityResult (par 6.1.3)
843
        result = sock.recv(4)
844
        self.assertEquals(list(result), ['\x00', '\x00', '\x00', '\x00'])
845
        sock.close()
846

    
847
    def test_004_server_has_ipv4(self):
848
        """Test active server has a valid IPv4 address"""
849

    
850
        log.info("Validate server's IPv4")
851

    
852
        server = self.cyclades_client.get_server_details(self.serverid)
853
        ipv4 = self._get_ipv4(server)
854

    
855
        self.result_dict.clear()
856
        self.result_dict["IPv4"] = str(ipv4)
857

    
858
        self.assertEquals(IP(ipv4).version(), 4)
859

    
860
    def test_005_server_has_ipv6(self):
861
        """Test active server has a valid IPv6 address"""
862
        self._skipIf(NO_IPV6, "--no-ipv6 flag enabled")
863

    
864
        log.info("Validate server's IPv6")
865

    
866
        server = self.cyclades_client.get_server_details(self.serverid)
867
        ipv6 = self._get_ipv6(server)
868

    
869
        self.result_dict.clear()
870
        self.result_dict["IPv6"] = str(ipv6)
871

    
872
        self.assertEquals(IP(ipv6).version(), 6)
873

    
874
    def test_006_server_responds_to_ping_IPv4(self):
875
        """Test server responds to ping on IPv4 address"""
876

    
877
        log.info("Testing if server responds to pings in IPv4")
878
        self.result_dict.clear()
879

    
880
        server = self.cyclades_client.get_server_details(self.serverid)
881
        ip = self._get_ipv4(server)
882
        self._try_until_timeout_expires(self.action_timeout,
883
                                        self.action_timeout,
884
                                        "PING IPv4 to %s" % ip,
885
                                        self._ping_once,
886
                                        False, ip)
887

    
888
    def test_007_server_responds_to_ping_IPv6(self):
889
        """Test server responds to ping on IPv6 address"""
890
        self._skipIf(NO_IPV6, "--no-ipv6 flag enabled")
891
        log.info("Testing if server responds to pings in IPv6")
892

    
893
        server = self.cyclades_client.get_server_details(self.serverid)
894
        ip = self._get_ipv6(server)
895
        self._try_until_timeout_expires(self.action_timeout,
896
                                        self.action_timeout,
897
                                        "PING IPv6 to %s" % ip,
898
                                        self._ping_once,
899
                                        True, ip)
900

    
901
    def test_008_submit_shutdown_request(self):
902
        """Test submit request to shutdown server"""
903

    
904
        log.info("Shutting down server")
905

    
906
        self.cyclades_client.shutdown_server(self.serverid)
907

    
908
    def test_009_server_becomes_stopped(self):
909
        """Test server becomes STOPPED"""
910

    
911
        log.info("Waiting until server becomes STOPPED")
912
        self._insist_on_status_transition(
913
            "ACTIVE", "STOPPED", self.action_timeout, self.action_timeout)
914

    
915
    def test_010_submit_start_request(self):
916
        """Test submit start server request"""
917

    
918
        log.info("Starting server")
919

    
920
        self.cyclades_client.start_server(self.serverid)
921

    
922
    def test_011_server_becomes_active(self):
923
        """Test server becomes ACTIVE again"""
924

    
925
        log.info("Waiting until server becomes ACTIVE")
926
        self._insist_on_status_transition(
927
            "STOPPED", "ACTIVE", self.action_timeout, self.action_timeout)
928

    
929
    def test_011a_server_responds_to_ping_IPv4(self):
930
        """Test server OS is actually up and running again"""
931

    
932
        log.info("Testing if server is actually up and running")
933

    
934
        self.test_006_server_responds_to_ping_IPv4()
935

    
936
    def test_012_ssh_to_server_IPv4(self):
937
        """Test SSH to server public IPv4 works, verify hostname"""
938

    
939
        self._skipIf(self.is_windows, "only valid for Linux servers")
940
        server = self.cyclades_client.get_server_details(self.serverid)
941
        self._insist_on_ssh_hostname(self._get_ipv4(server),
942
                                     self.username, self.passwd)
943

    
944
    def test_013_ssh_to_server_IPv6(self):
945
        """Test SSH to server public IPv6 works, verify hostname"""
946
        self._skipIf(self.is_windows, "only valid for Linux servers")
947
        self._skipIf(NO_IPV6, "--no-ipv6 flag enabled")
948

    
949
        server = self.cyclades_client.get_server_details(self.serverid)
950
        self._insist_on_ssh_hostname(self._get_ipv6(server),
951
                                     self.username, self.passwd)
952

    
953
    def test_014_rdp_to_server_IPv4(self):
954
        "Test RDP connection to server public IPv4 works"""
955
        self._skipIf(not self.is_windows, "only valid for Windows servers")
956
        server = self.cyclades_client.get_server_details(self.serverid)
957
        ipv4 = self._get_ipv4(server)
958
        sock = self._insist_on_tcp_connection(socket.AF_INET, ipv4, 3389)
959

    
960
        # No actual RDP processing done. We assume the RDP server is there
961
        # if the connection to the RDP port is successful.
962
        # FIXME: Use rdesktop, analyze exit code? see manpage [costasd]
963
        sock.close()
964

    
965
    def test_015_rdp_to_server_IPv6(self):
966
        "Test RDP connection to server public IPv6 works"""
967
        self._skipIf(not self.is_windows, "only valid for Windows servers")
968
        self._skipIf(NO_IPV6, "--no-ipv6 flag enabled")
969

    
970
        server = self.cyclades_client.get_server_details(self.serverid)
971
        ipv6 = self._get_ipv6(server)
972
        sock = self._get_tcp_connection(socket.AF_INET6, ipv6, 3389)
973

    
974
        # No actual RDP processing done. We assume the RDP server is there
975
        # if the connection to the RDP port is successful.
976
        sock.close()
977

    
978
    def test_016_personality_is_enforced(self):
979
        """Test file injection for personality enforcement"""
980
        self._skipIf(self.is_windows, "only implemented for Linux servers")
981
        self._skipIf(self.personality is None, "No personality file selected")
982

    
983
        log.info("Trying to inject file for personality enforcement")
984

    
985
        server = self.cyclades_client.get_server_details(self.serverid)
986

    
987
        for inj_file in self.personality:
988
            equal_files = self._check_file_through_ssh(self._get_ipv4(server),
989
                                                       inj_file['owner'],
990
                                                       self.passwd,
991
                                                       inj_file['path'],
992
                                                       inj_file['contents'])
993
            self.assertTrue(equal_files)
994

    
995
    def test_017_submit_delete_request(self):
996
        """Test submit request to delete server"""
997

    
998
        log.info("Deleting server")
999

    
1000
        self.cyclades_client.delete_server(self.serverid)
1001

    
1002
    def test_018_server_becomes_deleted(self):
1003
        """Test server becomes DELETED"""
1004

    
1005
        log.info("Testing if server becomes DELETED")
1006

    
1007
        self._insist_on_status_transition(
1008
            "ACTIVE", "DELETED", self.action_timeout, self.action_timeout)
1009

    
1010
    def test_019_server_no_longer_in_server_list(self):
1011
        """Test server is no longer in server list"""
1012

    
1013
        log.info("Test if server is no longer listed")
1014

    
1015
        servers = self.cyclades_client.list_servers()
1016
        self.assertNotIn(self.serverid, [s["id"] for s in servers])
1017

    
1018

    
1019
class NetworkTestCase(unittest.TestCase):
1020
    """ Testing networking in cyclades """
1021

    
1022
    @classmethod
1023
    def setUpClass(cls):
1024
        "Initialize kamaki, get list of current networks"
1025

    
1026
        cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
1027
        cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
1028
        # Cyclades Client
1029
        compute_url = \
1030
            cls.astakos_client.get_service_endpoints('compute')['publicURL']
1031
        cls.cyclades_client = CycladesClient(compute_url, TOKEN)
1032
        cls.cyclades_client.CONNECTION_RETRY_LIMIT = 2
1033

    
1034
        cls.servername = "%s%s for %s" % (SNF_TEST_PREFIX,
1035
                                          TEST_RUN_ID,
1036
                                          cls.imagename)
1037

    
1038
        #Dictionary initialization for the vms credentials
1039
        cls.serverid = dict()
1040
        cls.username = dict()
1041
        cls.password = dict()
1042
        cls.is_windows = cls.imagename.lower().find("windows") >= 0
1043

    
1044
        cls.result_dict = dict()
1045

    
1046
    def _skipIf(self, condition, msg):
1047
        if condition:
1048
            self.skipTest(msg)
1049

    
1050
    def _get_ipv4(self, server):
1051
        """Get the public IPv4 of a server from the detailed server info"""
1052

    
1053
        nics = server["attachments"]
1054

    
1055
        for nic in nics:
1056
            net_id = nic["network_id"]
1057
            if self.cyclades_client.get_network_details(net_id)["public"]:
1058
                public_addrs = nic["ipv4"]
1059

    
1060
        self.assertTrue(public_addrs is not None)
1061

    
1062
        return public_addrs
1063

    
1064
    def _connect_loginname(self, os_value):
1065
        """Return the login name for connections based on the server OS"""
1066
        if os_value in ("Ubuntu", "Kubuntu", "Fedora"):
1067
            return "user"
1068
        elif os_value in ("windows", "windows_alpha1"):
1069
            return "Administrator"
1070
        else:
1071
            return "root"
1072

    
1073
    def _ping_once(self, ip):
1074

    
1075
        """Test server responds to a single IPv4 or IPv6 ping"""
1076
        cmd = "ping -c 2 -w 3 %s" % (ip)
1077
        ping = subprocess.Popen(cmd, shell=True,
1078
                                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1079
        (stdout, stderr) = ping.communicate()
1080
        ret = ping.wait()
1081

    
1082
        return (ret == 0)
1083

    
1084
    def test_00001a_submit_create_server_A(self):
1085
        """Test submit create server request"""
1086

    
1087
        log.info("Creating test server A")
1088

    
1089
        serverA = self.cyclades_client.create_server(
1090
            self.servername, self.flavorid, self.imageid, personality=None)
1091

    
1092
        self.assertEqual(serverA["name"], self.servername)
1093
        self.assertEqual(serverA["flavor"]["id"], self.flavorid)
1094
        self.assertEqual(serverA["image"]["id"], self.imageid)
1095
        self.assertEqual(serverA["status"], "BUILD")
1096

    
1097
        # Update class attributes to reflect data on building server
1098
        self.serverid['A'] = serverA["id"]
1099
        self.username['A'] = None
1100
        self.password['A'] = serverA["adminPass"]
1101

    
1102
        log.info("Server A id:" + str(serverA["id"]))
1103
        log.info("Server password " + (self.password['A']))
1104

    
1105
        self.result_dict["Server A ID"] = str(serverA["id"])
1106
        self.result_dict["Server A password"] = serverA["adminPass"]
1107

    
1108
    def test_00001b_serverA_becomes_active(self):
1109
        """Test server becomes ACTIVE"""
1110

    
1111
        log.info("Waiting until test server A becomes ACTIVE")
1112
        self.result_dict.clear()
1113

    
1114
        fail_tmout = time.time() + self.action_timeout
1115
        while True:
1116
            d = self.cyclades_client.get_server_details(self.serverid['A'])
1117
            status = d['status']
1118
            if status == 'ACTIVE':
1119
                active = True
1120
                break
1121
            elif time.time() > fail_tmout:
1122
                self.assertLess(time.time(), fail_tmout)
1123
            else:
1124
                time.sleep(self.query_interval)
1125

    
1126
        self.assertTrue(active)
1127

    
1128
    def test_00002a_submit_create_server_B(self):
1129
        """Test submit create server request"""
1130

    
1131
        log.info("Creating test server B")
1132

    
1133
        serverB = self.cyclades_client.create_server(
1134
            self.servername, self.flavorid, self.imageid, personality=None)
1135

    
1136
        self.assertEqual(serverB["name"], self.servername)
1137
        self.assertEqual(serverB["flavor"]["id"], self.flavorid)
1138
        self.assertEqual(serverB["image"]["id"], self.imageid)
1139
        self.assertEqual(serverB["status"], "BUILD")
1140

    
1141
        # Update class attributes to reflect data on building server
1142
        self.serverid['B'] = serverB["id"]
1143
        self.username['B'] = None
1144
        self.password['B'] = serverB["adminPass"]
1145

    
1146
        log.info("Server B id: " + str(serverB["id"]))
1147
        log.info("Password " + (self.password['B']))
1148

    
1149
        self.result_dict.clear()
1150
        self.result_dict["Server B ID"] = str(serverB["id"])
1151
        self.result_dict["Server B password"] = serverB["adminPass"]
1152

    
1153
    def test_00002b_serverB_becomes_active(self):
1154
        """Test server becomes ACTIVE"""
1155

    
1156
        log.info("Waiting until test server B becomes ACTIVE")
1157
        self.result_dict.clear()
1158

    
1159
        fail_tmout = time.time() + self.action_timeout
1160
        while True:
1161
            d = self.cyclades_client.get_server_details(self.serverid['B'])
1162
            status = d['status']
1163
            if status == 'ACTIVE':
1164
                active = True
1165
                break
1166
            elif time.time() > fail_tmout:
1167
                self.assertLess(time.time(), fail_tmout)
1168
            else:
1169
                time.sleep(self.query_interval)
1170

    
1171
        self.assertTrue(active)
1172

    
1173
    def test_001_create_network(self):
1174
        """Test submit create network request"""
1175

    
1176
        log.info("Submit new network request")
1177
        self.result_dict.clear()
1178

    
1179
        name = SNF_TEST_PREFIX + TEST_RUN_ID
1180
        #previous_num = len(self.client.list_networks())
1181
        network = self.cyclades_client.create_network(
1182
            name, cidr='10.0.1.0/28', dhcp=True)
1183

    
1184
        #Test if right name is assigned
1185
        self.assertEqual(network['name'], name)
1186

    
1187
        # Update class attributes
1188
        cls = type(self)
1189
        cls.networkid = network['id']
1190
        #networks = self.client.list_networks()
1191

    
1192
        fail_tmout = time.time() + self.action_timeout
1193

    
1194
        #Test if new network is created
1195
        while True:
1196
            d = self.cyclades_client.get_network_details(network['id'])
1197
            if d['status'] == 'ACTIVE':
1198
                connected = True
1199
                break
1200
            elif time.time() > fail_tmout:
1201
                self.assertLess(time.time(), fail_tmout)
1202
            else:
1203
                log.info("Waiting for network to become ACTIVE")
1204
                time.sleep(self.query_interval)
1205

    
1206
        self.assertTrue(connected)
1207

    
1208
        self.result_dict["Private network ID"] = str(network['id'])
1209

    
1210
    def test_002_connect_to_network(self):
1211
        """Test connect VMs to network"""
1212

    
1213
        log.info("Connect VMs to private network")
1214
        self.result_dict.clear()
1215

    
1216
        self.cyclades_client.connect_server(self.serverid['A'], self.networkid)
1217
        self.cyclades_client.connect_server(self.serverid['B'], self.networkid)
1218

    
1219
        #Insist on connecting until action timeout
1220
        fail_tmout = time.time() + self.action_timeout
1221

    
1222
        while True:
1223

    
1224
            netsA = [x['network_id']
1225
                     for x in self.cyclades_client.get_server_details(
1226
                         self.serverid['A'])['attachments']]
1227
            netsB = [x['network_id']
1228
                     for x in self.cyclades_client.get_server_details(
1229
                         self.serverid['B'])['attachments']]
1230

    
1231
            if (self.networkid in netsA) and (self.networkid in netsB):
1232
                conn_exists = True
1233
                break
1234
            elif time.time() > fail_tmout:
1235
                self.assertLess(time.time(), fail_tmout)
1236
            else:
1237
                time.sleep(self.query_interval)
1238

    
1239
        #Adding private IPs to class attributes
1240
        cls = type(self)
1241
        cls.priv_ip = dict()
1242

    
1243
        nicsA = self.cyclades_client.get_server_details(
1244
            self.serverid['A'])['attachments']
1245
        nicsB = self.cyclades_client.get_server_details(
1246
            self.serverid['B'])['attachments']
1247

    
1248
        if conn_exists:
1249
            for nic in nicsA:
1250
                if nic["network_id"] == self.networkid:
1251
                    cls.priv_ip["A"] = nic["ipv4"]
1252
            self.result_dict["Server A private IP"] = str(cls.priv_ip["A"])
1253

    
1254
            for nic in nicsB:
1255
                if nic["network_id"] == self.networkid:
1256
                    cls.priv_ip["B"] = nic["ipv4"]
1257
            self.result_dict["Server B private IP"] = str(cls.priv_ip["B"])
1258

    
1259
        self.assertTrue(conn_exists)
1260
        self.assertIsNot(cls.priv_ip["A"], None)
1261
        self.assertIsNot(cls.priv_ip["B"], None)
1262

    
1263
    def test_002a_reboot(self):
1264
        """Rebooting server A"""
1265

    
1266
        log.info("Rebooting server A")
1267

    
1268
        self.cyclades_client.shutdown_server(self.serverid['A'])
1269

    
1270
        fail_tmout = time.time() + self.action_timeout
1271
        while True:
1272
            d = self.cyclades_client.get_server_details(self.serverid['A'])
1273
            status = d['status']
1274
            if status == 'STOPPED':
1275
                break
1276
            elif time.time() > fail_tmout:
1277
                self.assertLess(time.time(), fail_tmout)
1278
            else:
1279
                time.sleep(self.query_interval)
1280

    
1281
        self.cyclades_client.start_server(self.serverid['A'])
1282

    
1283
        while True:
1284
            d = self.cyclades_client.get_server_details(self.serverid['A'])
1285
            status = d['status']
1286
            if status == 'ACTIVE':
1287
                active = True
1288
                break
1289
            elif time.time() > fail_tmout:
1290
                self.assertLess(time.time(), fail_tmout)
1291
            else:
1292
                time.sleep(self.query_interval)
1293

    
1294
        self.assertTrue(active)
1295

    
1296
    def test_002b_ping_server_A(self):
1297
        "Test if server A responds to IPv4 pings"
1298

    
1299
        log.info("Testing if server A responds to IPv4 pings ")
1300
        self.result_dict.clear()
1301

    
1302
        server = self.cyclades_client.get_server_details(self.serverid['A'])
1303
        ip = self._get_ipv4(server)
1304

    
1305
        fail_tmout = time.time() + self.action_timeout
1306

    
1307
        s = False
1308

    
1309
        self.result_dict["Server A public IP"] = str(ip)
1310

    
1311
        while True:
1312

    
1313
            if self._ping_once(ip):
1314
                s = True
1315
                break
1316

    
1317
            elif time.time() > fail_tmout:
1318
                self.assertLess(time.time(), fail_tmout)
1319

    
1320
            else:
1321
                time.sleep(self.query_interval)
1322

    
1323
        self.assertTrue(s)
1324

    
1325
    def test_002c_reboot(self):
1326
        """Reboot server B"""
1327

    
1328
        log.info("Rebooting server B")
1329
        self.result_dict.clear()
1330

    
1331
        self.cyclades_client.shutdown_server(self.serverid['B'])
1332

    
1333
        fail_tmout = time.time() + self.action_timeout
1334
        while True:
1335
            d = self.cyclades_client.get_server_details(self.serverid['B'])
1336
            status = d['status']
1337
            if status == 'STOPPED':
1338
                break
1339
            elif time.time() > fail_tmout:
1340
                self.assertLess(time.time(), fail_tmout)
1341
            else:
1342
                time.sleep(self.query_interval)
1343

    
1344
        self.cyclades_client.start_server(self.serverid['B'])
1345

    
1346
        while True:
1347
            d = self.cyclades_client.get_server_details(self.serverid['B'])
1348
            status = d['status']
1349
            if status == 'ACTIVE':
1350
                active = True
1351
                break
1352
            elif time.time() > fail_tmout:
1353
                self.assertLess(time.time(), fail_tmout)
1354
            else:
1355
                time.sleep(self.query_interval)
1356

    
1357
        self.assertTrue(active)
1358

    
1359
    def test_002d_ping_server_B(self):
1360
        """Test if server B responds to IPv4 pings"""
1361

    
1362
        log.info("Testing if server B responds to IPv4 pings")
1363
        self.result_dict.clear()
1364

    
1365
        server = self.cyclades_client.get_server_details(self.serverid['B'])
1366
        ip = self._get_ipv4(server)
1367

    
1368
        fail_tmout = time.time() + self.action_timeout
1369

    
1370
        s = False
1371

    
1372
        self.result_dict["Server B public IP"] = str(ip)
1373

    
1374
        while True:
1375
            if self._ping_once(ip):
1376
                s = True
1377
                break
1378

    
1379
            elif time.time() > fail_tmout:
1380
                self.assertLess(time.time(), fail_tmout)
1381

    
1382
            else:
1383
                time.sleep(self.query_interval)
1384

    
1385
        self.assertTrue(s)
1386

    
1387
    def test_003a_setup_interface_A(self):
1388
        """Setup eth1 for server A"""
1389

    
1390
        self._skipIf(self.is_windows, "only valid for Linux servers")
1391

    
1392
        log.info("Setting up interface eth1 for server A")
1393
        self.result_dict.clear()
1394

    
1395
        server = self.cyclades_client.get_server_details(self.serverid['A'])
1396
        image = self.cyclades_client.get_image_details(self.imageid)
1397
        os_value = image['metadata']['os']
1398

    
1399
        users = image["metadata"].get("users", None)
1400
        userlist = users.split()
1401

    
1402
        if "root" in userlist:
1403
            loginname = "root"
1404
        elif users is None:
1405
            loginname = self._connect_loginname(os_value)
1406
        else:
1407
            loginname = choice(userlist)
1408

    
1409
        hostip = self._get_ipv4(server)
1410
        myPass = self.password['A']
1411

    
1412
        log.info("SSH in server A as %s/%s" % (loginname, myPass))
1413
        command = "ifconfig eth1 %s && ifconfig eth1 | " \
1414
                  "grep 'inet addr:' | cut -d: -f2 | awk '{ print $1}'" \
1415
                  % self.priv_ip["A"]
1416
        output, status = _ssh_execute(
1417
            hostip, loginname, myPass, command)
1418

    
1419
        self.assertEquals(status, 0)
1420
        self.assertEquals(output[0].strip(), self.priv_ip["A"])
1421

    
1422
    def test_003b_setup_interface_B(self):
1423
        """Setup eth1 for server B"""
1424

    
1425
        self._skipIf(self.is_windows, "only valid for Linux servers")
1426

    
1427
        log.info("Setting up interface eth1 for server B")
1428

    
1429
        server = self.cyclades_client.get_server_details(self.serverid['B'])
1430
        image = self.cyclades_client.get_image_details(self.imageid)
1431
        os_value = image['metadata']['os']
1432

    
1433
        users = image["metadata"].get("users", None)
1434
        userlist = users.split()
1435

    
1436
        if "root" in userlist:
1437
            loginname = "root"
1438
        elif users is None:
1439
            loginname = self._connect_loginname(os_value)
1440
        else:
1441
            loginname = choice(userlist)
1442

    
1443
        hostip = self._get_ipv4(server)
1444
        myPass = self.password['B']
1445

    
1446
        log.info("SSH in server B as %s/%s" % (loginname, myPass))
1447
        command = "ifconfig eth1 %s && ifconfig eth1 | " \
1448
                  "grep 'inet addr:' | cut -d: -f2 | awk '{ print $1}'" \
1449
                  % self.priv_ip["B"]
1450
        output, status = _ssh_execute(
1451
            hostip, loginname, myPass, command)
1452

    
1453
        self.assertEquals(status, 0)
1454
        self.assertEquals(output[0].strip(), self.priv_ip["B"])
1455

    
1456
    def test_003c_test_connection_exists(self):
1457
        """Ping server B from server A to test if connection exists"""
1458

    
1459
        self._skipIf(self.is_windows, "only valid for Linux servers")
1460

    
1461
        log.info("Testing if server A is actually connected to server B")
1462

    
1463
        server = self.cyclades_client.get_server_details(self.serverid['A'])
1464
        image = self.cyclades_client.get_image_details(self.imageid)
1465
        os_value = image['metadata']['os']
1466
        hostip = self._get_ipv4(server)
1467

    
1468
        users = image["metadata"].get("users", None)
1469
        userlist = users.split()
1470

    
1471
        if "root" in userlist:
1472
            loginname = "root"
1473
        elif users is None:
1474
            loginname = self._connect_loginname(os_value)
1475
        else:
1476
            loginname = choice(userlist)
1477

    
1478
        myPass = self.password['A']
1479

    
1480
        cmd = "if ping -c 7 -w 20 %s >/dev/null; \
1481
               then echo \'True\'; fi;" % self.priv_ip["B"]
1482
        lines, status = _ssh_execute(
1483
            hostip, loginname, myPass, cmd)
1484

    
1485
        exists = False
1486

    
1487
        if 'True\n' in lines:
1488
            exists = True
1489

    
1490
        self.assertTrue(exists)
1491

    
1492
    def test_004_disconnect_from_network(self):
1493
        "Disconnecting server A and B from network"
1494

    
1495
        log.info("Disconnecting servers from private network")
1496

    
1497
        prev_state = self.cyclades_client.get_network_details(self.networkid)
1498
        prev_nics = prev_state['attachments']
1499
        #prev_conn = len(prev_nics)
1500

    
1501
        nicsA = [x['id']
1502
                 for x in self.cyclades_client.get_server_details(
1503
                     self.serverid['A'])['attachments']]
1504
        nicsB = [x['id']
1505
                 for x in self.cyclades_client.get_server_details(
1506
                     self.serverid['B'])['attachments']]
1507

    
1508
        for nic in prev_nics:
1509
            if nic in nicsA:
1510
                self.cyclades_client.disconnect_server(self.serverid['A'], nic)
1511
            if nic in nicsB:
1512
                self.cyclades_client.disconnect_server(self.serverid['B'], nic)
1513

    
1514
        #Insist on deleting until action timeout
1515
        fail_tmout = time.time() + self.action_timeout
1516

    
1517
        while True:
1518
            netsA = [x['network_id']
1519
                     for x in self.cyclades_client.get_server_details(
1520
                         self.serverid['A'])['attachments']]
1521
            netsB = [x['network_id']
1522
                     for x in self.cyclades_client.get_server_details(
1523
                         self.serverid['B'])['attachments']]
1524

    
1525
            #connected = (self.client.get_network_details(self.networkid))
1526
            #connections = connected['attachments']
1527
            if (self.networkid not in netsA) and (self.networkid not in netsB):
1528
                conn_exists = False
1529
                break
1530
            elif time.time() > fail_tmout:
1531
                self.assertLess(time.time(), fail_tmout)
1532
            else:
1533
                time.sleep(self.query_interval)
1534

    
1535
        self.assertFalse(conn_exists)
1536

    
1537
    def test_005_destroy_network(self):
1538
        """Test submit delete network request"""
1539

    
1540
        log.info("Submitting delete network request")
1541

    
1542
        self.cyclades_client.delete_network(self.networkid)
1543

    
1544
        fail_tmout = time.time() + self.action_timeout
1545

    
1546
        while True:
1547

    
1548
            curr_net = []
1549
            networks = self.cyclades_client.list_networks()
1550

    
1551
            for net in networks:
1552
                curr_net.append(net['id'])
1553

    
1554
            if self.networkid not in curr_net:
1555
                self.assertTrue(self.networkid not in curr_net)
1556
                break
1557

    
1558
            elif time.time() > fail_tmout:
1559
                self.assertLess(time.time(), fail_tmout)
1560

    
1561
            else:
1562
                time.sleep(self.query_interval)
1563

    
1564
    def test_006_cleanup_servers(self):
1565
        """Cleanup servers created for this test"""
1566

    
1567
        log.info("Delete servers created for this test")
1568

    
1569
        self.cyclades_client.delete_server(self.serverid['A'])
1570
        self.cyclades_client.delete_server(self.serverid['B'])
1571

    
1572
        fail_tmout = time.time() + self.action_timeout
1573

    
1574
        #Ensure server gets deleted
1575
        status = dict()
1576

    
1577
        while True:
1578
            details = \
1579
                self.cyclades_client.get_server_details(self.serverid['A'])
1580
            status['A'] = details['status']
1581
            details = \
1582
                self.cyclades_client.get_server_details(self.serverid['B'])
1583
            status['B'] = details['status']
1584
            if (status['A'] == 'DELETED') and (status['B'] == 'DELETED'):
1585
                deleted = True
1586
                break
1587
            elif time.time() > fail_tmout:
1588
                self.assertLess(time.time(), fail_tmout)
1589
            else:
1590
                time.sleep(self.query_interval)
1591

    
1592
        self.assertTrue(deleted)
1593

    
1594

    
1595
class TestRunnerProcess(Process):
1596
    """A distinct process used to execute part of the tests in parallel"""
1597
    def __init__(self, **kw):
1598
        Process.__init__(self, **kw)
1599
        kwargs = kw["kwargs"]
1600
        self.testq = kwargs["testq"]
1601
        self.worker_folder = kwargs["worker_folder"]
1602

    
1603
    def run(self):
1604
        # Make sure this test runner process dies with the parent
1605
        # and is not left behind.
1606
        #
1607
        # WARNING: This uses the prctl(2) call and is
1608
        # Linux-specific.
1609

    
1610
        prctl.set_pdeathsig(signal.SIGHUP)
1611

    
1612
        multi = logging.getLogger("multiprocess")
1613

    
1614
        while True:
1615
            multi.debug("I am process %d, GETting from queue is %s" %
1616
                        (os.getpid(), self.testq))
1617
            msg = self.testq.get()
1618

    
1619
            multi.debug("Dequeued msg: %s" % msg)
1620

    
1621
            if msg == "TEST_RUNNER_TERMINATE":
1622
                raise SystemExit
1623

    
1624
            elif issubclass(msg, unittest.TestCase):
1625
                # Assemble a TestSuite, and run it
1626

    
1627
                log_file = os.path.join(self.worker_folder, 'details_' +
1628
                                        (msg.__name__) + "_" +
1629
                                        TEST_RUN_ID + '.log')
1630

    
1631
                fail_file = os.path.join(self.worker_folder, 'failed_' +
1632
                                         (msg.__name__) + "_" +
1633
                                         TEST_RUN_ID + '.log')
1634
                error_file = os.path.join(self.worker_folder, 'error_' +
1635
                                          (msg.__name__) + "_" +
1636
                                          TEST_RUN_ID + '.log')
1637

    
1638
                f = open(log_file, 'w')
1639
                fail = open(fail_file, 'w')
1640
                error = open(error_file, 'w')
1641

    
1642
                log.info(yellow + '* Starting testcase: %s' % msg + normal)
1643

    
1644
                runner = unittest.TextTestRunner(
1645
                    f, verbosity=2, failfast=True,
1646
                    resultclass=BurninTestResult)
1647
                suite = unittest.TestLoader().loadTestsFromTestCase(msg)
1648
                result = runner.run(suite)
1649

    
1650
                for res in result.errors:
1651
                    log.error("snf-burnin encountered an error in "
1652
                              "testcase: %s" % msg)
1653
                    log.error("See log for details")
1654
                    error.write(str(res[0]) + '\n')
1655
                    error.write(str(res[0].shortDescription()) + '\n')
1656
                    error.write('\n')
1657

    
1658
                for res in result.failures:
1659
                    log.error("snf-burnin failed in testcase: %s" % msg)
1660
                    log.error("See log for details")
1661
                    fail.write(str(res[0]) + '\n')
1662
                    fail.write(str(res[0].shortDescription()) + '\n')
1663
                    fail.write('\n')
1664
                    if not NOFAILFAST:
1665
                        sys.exit()
1666

    
1667
                if (len(result.failures) == 0) and (len(result.errors) == 0):
1668
                    log.debug("Passed testcase: %s" % msg)
1669

    
1670
                f.close()
1671
                fail.close()
1672
                error.close()
1673

    
1674
            else:
1675
                raise Exception("Cannot handle msg: %s" % msg)
1676

    
1677

    
1678
def _run_cases_in_series(cases, image_folder):
1679
    """Run instances of TestCase in series"""
1680

    
1681
    for case in cases:
1682

    
1683
        test = case.__name__
1684

    
1685
        log.info(yellow + '* Starting testcase: %s' % test + normal)
1686
        log_file = os.path.join(image_folder, 'details_' +
1687
                                (case.__name__) + "_" +
1688
                                TEST_RUN_ID + '.log')
1689
        fail_file = os.path.join(image_folder, 'failed_' +
1690
                                 (case.__name__) + "_" +
1691
                                 TEST_RUN_ID + '.log')
1692
        error_file = os.path.join(image_folder, 'error_' +
1693
                                  (case.__name__) + "_" +
1694
                                  TEST_RUN_ID + '.log')
1695

    
1696
        f = open(log_file, "w")
1697
        fail = open(fail_file, "w")
1698
        error = open(error_file, "w")
1699

    
1700
        suite = unittest.TestLoader().loadTestsFromTestCase(case)
1701
        runner = unittest.TextTestRunner(
1702
            f, verbosity=2, failfast=True,
1703
            resultclass=BurninTestResult)
1704
        result = runner.run(suite)
1705

    
1706
        for res in result.errors:
1707
            log.error("snf-burnin encountered an error in "
1708
                      "testcase: %s" % test)
1709
            log.error("See log for details")
1710
            error.write(str(res[0]) + '\n')
1711
            error.write(str(res[0].shortDescription()) + '\n')
1712
            error.write('\n')
1713

    
1714
        for res in result.failures:
1715
            log.error("snf-burnin failed in testcase: %s" % test)
1716
            log.error("See log for details")
1717
            fail.write(str(res[0]) + '\n')
1718
            fail.write(str(res[0].shortDescription()) + '\n')
1719
            fail.write('\n')
1720
            if not NOFAILFAST:
1721
                sys.exit()
1722

    
1723
        if (len(result.failures) == 0) and (len(result.errors) == 0):
1724
            log.debug("Passed testcase: %s" % test)
1725

    
1726

    
1727
def _run_cases_in_parallel(cases, fanout, image_folder):
1728
    """Run instances of TestCase in parallel, in a number of distinct processes
1729

1730
    The cases iterable specifies the TestCases to be executed in parallel,
1731
    by test runners running in distinct processes.
1732
    The fanout parameter specifies the number of processes to spawn,
1733
    and defaults to 1.
1734
    The runner argument specifies the test runner class to use inside each
1735
    runner process.
1736

1737
    """
1738

    
1739
    multi = logging.getLogger("multiprocess")
1740
    handler = logging.StreamHandler()
1741
    multi.addHandler(handler)
1742

    
1743
    if VERBOSE:
1744
        multi.setLevel(logging.DEBUG)
1745
    else:
1746
        multi.setLevel(logging.INFO)
1747

    
1748
    testq = []
1749
    worker_folder = []
1750
    runners = []
1751

    
1752
    for i in xrange(0, fanout):
1753
        testq.append(Queue())
1754
        worker_folder.append(os.path.join(image_folder, 'process'+str(i)))
1755
        os.mkdir(worker_folder[i])
1756

    
1757
    for i in xrange(0, fanout):
1758
        kwargs = dict(testq=testq[i], worker_folder=worker_folder[i])
1759
        runners.append(TestRunnerProcess(kwargs=kwargs))
1760

    
1761
    multi.debug("Spawning %d test runner processes" % len(runners))
1762

    
1763
    for p in runners:
1764
        p.start()
1765

    
1766
    # Enqueue test cases
1767
    for i in xrange(0, fanout):
1768
        map(testq[i].put, cases)
1769
        testq[i].put("TEST_RUNNER_TERMINATE")
1770

    
1771
    multi.debug("Spawned %d test runners, PIDs are %s" %
1772
                (len(runners), [p.pid for p in runners]))
1773

    
1774
    multi.debug("Joining %d processes" % len(runners))
1775

    
1776
    for p in runners:
1777
        p.join()
1778

    
1779
    multi.debug("Done joining %d processes" % len(runners))
1780

    
1781

    
1782
def _images_test_case(**kwargs):
1783
    """Construct a new unit test case class from ImagesTestCase"""
1784
    name = "ImagesTestCase_%s" % kwargs["imageid"]
1785
    cls = type(name, (ImagesTestCase,), kwargs)
1786

    
1787
    #Patch extra parameters into test names by manipulating method docstrings
1788
    for (mname, m) in \
1789
            inspect.getmembers(cls, lambda x: inspect.ismethod(x)):
1790
        if hasattr(m, __doc__):
1791
            m.__func__.__doc__ = "[%s] %s" % (cls.imagename, m.__doc__)
1792

    
1793
    # Make sure the class can be pickled, by listing it among
1794
    # the attributes of __main__. A PicklingError is raised otherwise.
1795
    thismodule = sys.modules[__name__]
1796
    setattr(thismodule, name, cls)
1797
    return cls
1798

    
1799

    
1800
def _spawn_server_test_case(**kwargs):
1801
    """Construct a new unit test case class from SpawnServerTestCase"""
1802

    
1803
    name = "SpawnServerTestCase_%s" % kwargs["imageid"]
1804
    cls = type(name, (SpawnServerTestCase,), kwargs)
1805

    
1806
    # Patch extra parameters into test names by manipulating method docstrings
1807
    for (mname, m) in \
1808
            inspect.getmembers(cls, lambda x: inspect.ismethod(x)):
1809
        if hasattr(m, __doc__):
1810
            m.__func__.__doc__ = "[%s] %s" % (cls.imagename, m.__doc__)
1811

    
1812
    # Make sure the class can be pickled, by listing it among
1813
    # the attributes of __main__. A PicklingError is raised otherwise.
1814

    
1815
    thismodule = sys.modules[__name__]
1816
    setattr(thismodule, name, cls)
1817
    return cls
1818

    
1819

    
1820
def _spawn_network_test_case(**kwargs):
1821
    """Construct a new unit test case class from NetworkTestCase"""
1822

    
1823
    name = "NetworkTestCase" + TEST_RUN_ID
1824
    cls = type(name, (NetworkTestCase,), kwargs)
1825

    
1826
    # Make sure the class can be pickled, by listing it among
1827
    # the attributes of __main__. A PicklingError is raised otherwise.
1828

    
1829
    thismodule = sys.modules[__name__]
1830
    setattr(thismodule, name, cls)
1831
    return cls
1832

    
1833

    
1834
# --------------------------------------------------------------------
1835
# Clean up servers/networks functions
1836
def cleanup_servers(timeout, query_interval, delete_stale=False):
1837

    
1838
    astakos_client = AstakosClient(AUTH_URL, TOKEN)
1839
    astakos_client.CONNECTION_RETRY_LIMIT = 2
1840
    # Compute Client
1841
    compute_url = astakos_client.get_service_endpoints('compute')['publicURL']
1842
    compute_client = ComputeClient(compute_url, TOKEN)
1843
    compute_client.CONNECTION_RETRY_LIMIT = 2
1844

    
1845
    servers = compute_client.list_servers()
1846
    stale = [s for s in servers if s["name"].startswith(SNF_TEST_PREFIX)]
1847

    
1848
    if len(stale) == 0:
1849
        return
1850

    
1851
    # Show staled servers
1852
    print >>sys.stderr, yellow + \
1853
        "Found these stale servers from previous runs:" + \
1854
        normal
1855
    print >>sys.stderr, "    " + \
1856
        "\n    ".join(["%d: %s" % (s["id"], s["name"]) for s in stale])
1857

    
1858
    # Delete staled servers
1859
    if delete_stale:
1860
        print >> sys.stderr, "Deleting %d stale servers:" % len(stale)
1861
        fail_tmout = time.time() + timeout
1862
        for s in stale:
1863
            compute_client.delete_server(s["id"])
1864
        # Wait for all servers to be deleted
1865
        while True:
1866
            servers = compute_client.list_servers()
1867
            stale = [s for s in servers
1868
                     if s["name"].startswith(SNF_TEST_PREFIX)]
1869
            if len(stale) == 0:
1870
                print >> sys.stderr, green + "    ...done" + normal
1871
                break
1872
            elif time.time() > fail_tmout:
1873
                print >> sys.stderr, red + \
1874
                    "Not all stale servers deleted. Action timed out." + \
1875
                    normal
1876
                sys.exit(1)
1877
            else:
1878
                time.sleep(query_interval)
1879
    else:
1880
        print >> sys.stderr, "Use --delete-stale to delete them."
1881

    
1882

    
1883
def cleanup_networks(action_timeout, query_interval, delete_stale=False):
1884

    
1885
    astakos_client = AstakosClient(AUTH_URL, TOKEN)
1886
    astakos_client.CONNECTION_RETRY_LIMIT = 2
1887
    # Cyclades Client
1888
    compute_url = astakos_client.get_service_endpoints('compute')['publicURL']
1889
    cyclades_client = CycladesClient(compute_url, TOKEN)
1890
    cyclades_client.CONNECTION_RETRY_LIMIT = 2
1891

    
1892
    networks = cyclades_client.list_networks()
1893
    stale = [n for n in networks if n["name"].startswith(SNF_TEST_PREFIX)]
1894

    
1895
    if len(stale) == 0:
1896
        return
1897

    
1898
    # Show staled networks
1899
    print >> sys.stderr, yellow + \
1900
        "Found these stale networks from previous runs:" + \
1901
        normal
1902
    print "    " + \
1903
        "\n    ".join(["%s: %s" % (str(n["id"]), n["name"]) for n in stale])
1904

    
1905
    # Delete staled networks
1906
    if delete_stale:
1907
        print >> sys.stderr, "Deleting %d stale networks:" % len(stale)
1908
        fail_tmout = time.time() + action_timeout
1909
        for n in stale:
1910
            cyclades_client.delete_network(n["id"])
1911
        # Wait for all networks to be deleted
1912
        while True:
1913
            networks = cyclades_client.list_networks()
1914
            stale = [n for n in networks
1915
                     if n["name"].startswith(SNF_TEST_PREFIX)]
1916
            if len(stale) == 0:
1917
                print >> sys.stderr, green + "    ...done" + normal
1918
                break
1919
            elif time.time() > fail_tmout:
1920
                print >> sys.stderr, red + \
1921
                    "Not all stale networks deleted. Action timed out." + \
1922
                    normal
1923
                sys.exit(1)
1924
            else:
1925
                time.sleep(query_interval)
1926
    else:
1927
        print >> sys.stderr, "Use --delete-stale to delete them."
1928

    
1929

    
1930
# --------------------------------------------------------------------
1931
# Parse arguments functions
1932
def parse_comma(option, opt, value, parser):
1933
    tests = set(['all', 'auth', 'images', 'flavors',
1934
                 'pithos', 'servers', 'server_spawn',
1935
                 'network_spawn'])
1936
    parse_input = value.split(',')
1937

    
1938
    if not (set(parse_input)).issubset(tests):
1939
        raise OptionValueError("The selected set of tests is invalid")
1940

    
1941
    setattr(parser.values, option.dest, value.split(','))
1942

    
1943

    
1944
def parse_arguments(args):
1945

    
1946
    kw = {}
1947
    kw["usage"] = "%prog [options]"
1948
    kw["description"] = \
1949
        "%prog runs a number of test scenarios on a " \
1950
        "Synnefo deployment."
1951

    
1952
    parser = OptionParser(**kw)
1953
    parser.disable_interspersed_args()
1954

    
1955
    parser.add_option("--auth-url",
1956
                      action="store", type="string", dest="auth_url",
1957
                      help="The AUTH URI to use to reach the Synnefo API",
1958
                      default=None)
1959
    parser.add_option("--plankton-user",
1960
                      action="store", type="string", dest="plankton_user",
1961
                      help="Owner of system images",
1962
                      default=DEFAULT_PLANKTON_USER)
1963
    parser.add_option("--token",
1964
                      action="store", type="string", dest="token",
1965
                      help="The token to use for authentication to the API")
1966
    parser.add_option("--nofailfast",
1967
                      action="store_true", dest="nofailfast",
1968
                      help="Do not fail immediately if one of the tests "
1969
                           "fails (EXPERIMENTAL)",
1970
                      default=False)
1971
    parser.add_option("--no-ipv6",
1972
                      action="store_true", dest="no_ipv6",
1973
                      help="Disables ipv6 related tests",
1974
                      default=False)
1975
    parser.add_option("--action-timeout",
1976
                      action="store", type="int", dest="action_timeout",
1977
                      metavar="TIMEOUT",
1978
                      help="Wait SECONDS seconds for a server action to "
1979
                           "complete, then the test is considered failed",
1980
                      default=100)
1981
    parser.add_option("--build-warning",
1982
                      action="store", type="int", dest="build_warning",
1983
                      metavar="TIMEOUT",
1984
                      help="Warn if TIMEOUT seconds have passed and a "
1985
                           "build operation is still pending",
1986
                      default=600)
1987
    parser.add_option("--build-fail",
1988
                      action="store", type="int", dest="build_fail",
1989
                      metavar="BUILD_TIMEOUT",
1990
                      help="Fail the test if TIMEOUT seconds have passed "
1991
                           "and a build operation is still incomplete",
1992
                      default=900)
1993
    parser.add_option("--query-interval",
1994
                      action="store", type="int", dest="query_interval",
1995
                      metavar="INTERVAL",
1996
                      help="Query server status when requests are pending "
1997
                           "every INTERVAL seconds",
1998
                      default=3)
1999
    parser.add_option("--fanout",
2000
                      action="store", type="int", dest="fanout",
2001
                      metavar="COUNT",
2002
                      help="Spawn up to COUNT child processes to execute "
2003
                           "in parallel, essentially have up to COUNT "
2004
                           "server build requests outstanding (EXPERIMENTAL)",
2005
                      default=1)
2006
    parser.add_option("--force-flavor",
2007
                      action="store", type="int", dest="force_flavorid",
2008
                      metavar="FLAVOR ID",
2009
                      help="Force all server creations to use the specified "
2010
                           "FLAVOR ID instead of a randomly chosen one, "
2011
                           "useful if disk space is scarce",
2012
                      default=None)
2013
    parser.add_option("--image-id",
2014
                      action="store", type="string", dest="force_imageid",
2015
                      metavar="IMAGE ID",
2016
                      help="Test the specified image id, use 'all' to test "
2017
                           "all available images (mandatory argument)",
2018
                      default=None)
2019
    parser.add_option("--show-stale",
2020
                      action="store_true", dest="show_stale",
2021
                      help="Show stale servers from previous runs, whose "
2022
                           "name starts with `%s'" % SNF_TEST_PREFIX,
2023
                      default=False)
2024
    parser.add_option("--delete-stale",
2025
                      action="store_true", dest="delete_stale",
2026
                      help="Delete stale servers from previous runs, whose "
2027
                           "name starts with `%s'" % SNF_TEST_PREFIX,
2028
                      default=False)
2029
    parser.add_option("--force-personality",
2030
                      action="store", type="string", dest="personality_path",
2031
                      help="Force a personality file injection.\
2032
                            File path required. ",
2033
                      default=None)
2034
    parser.add_option("--log-folder",
2035
                      action="store", type="string", dest="log_folder",
2036
                      help="Define the absolute path where the output \
2037
                            log is stored. ",
2038
                      default="/var/log/burnin/")
2039
    parser.add_option("--verbose", "-V",
2040
                      action="store_true", dest="verbose",
2041
                      help="Print detailed output about multiple "
2042
                           "processes spawning",
2043
                      default=False)
2044
    parser.add_option("--set-tests",
2045
                      action="callback",
2046
                      dest="tests",
2047
                      type="string",
2048
                      help='Set comma seperated tests for this run. \
2049
                            Available tests: auth, images, flavors, \
2050
                                             servers, server_spawn, \
2051
                                             network_spawn, pithos. \
2052
                            Default = all',
2053
                      default='all',
2054
                      callback=parse_comma)
2055

    
2056
    (opts, args) = parser.parse_args(args)
2057

    
2058
    # -----------------------
2059
    # Verify arguments
2060

    
2061
    # `delete_stale' implies `show_stale'
2062
    if opts.delete_stale:
2063
        opts.show_stale = True
2064

    
2065
    # `token' is mandatory
2066
    _mandatory_argument(opts.token, "--token")
2067
    # `auth_url' is mandatory
2068
    _mandatory_argument(opts.auth_url, "--auth-url")
2069

    
2070
    if not opts.show_stale:
2071
        # `image-id' is mandatory
2072
        _mandatory_argument(opts.force_imageid, "--image-id")
2073
        if opts.force_imageid != 'all':
2074
            try:
2075
                opts.force_imageid = str(opts.force_imageid)
2076
            except ValueError:
2077
                print >>sys.stderr, red + \
2078
                    "Invalid value specified for" + \
2079
                    "--image-id. Use a valid id, or `all'." + \
2080
                    normal
2081
                sys.exit(1)
2082

    
2083
    return (opts, args)
2084

    
2085

    
2086
def _mandatory_argument(Arg, Str):
2087
    if (Arg is None) or (Arg == ""):
2088
        print >>sys.stderr, red + \
2089
            "The " + Str + " argument is mandatory.\n" + \
2090
            normal
2091
        sys.exit(1)
2092

    
2093

    
2094
# --------------------------------------------------------------------
2095
# Burnin main function
2096
def main():
2097
    """Assemble test cases into a test suite, and run it
2098

2099
    IMPORTANT: Tests have dependencies and have to be run in the specified
2100
    order inside a single test case. They communicate through attributes of the
2101
    corresponding TestCase class (shared fixtures). Distinct subclasses of
2102
    TestCase MAY SHARE NO DATA, since they are run in parallel, in distinct
2103
    test runner processes.
2104

2105
    """
2106

    
2107
    # Parse arguments using `optparse'
2108
    (opts, args) = parse_arguments(sys.argv[1:])
2109

    
2110
    # Some global variables
2111
    global AUTH_URL, TOKEN, PLANKTON_USER
2112
    global NO_IPV6, VERBOSE, NOFAILFAST
2113
    AUTH_URL = opts.auth_url
2114
    TOKEN = opts.token
2115
    PLANKTON_USER = opts.plankton_user
2116
    NO_IPV6 = opts.no_ipv6
2117
    VERBOSE = opts.verbose
2118
    NOFAILFAST = opts.nofailfast
2119

    
2120
    # If `show_stale', cleanup stale servers
2121
    # from previous runs and exit
2122
    if opts.show_stale:
2123
        # We must clean the servers first
2124
        cleanup_servers(opts.action_timeout, opts.query_interval,
2125
                        delete_stale=opts.delete_stale)
2126
        cleanup_networks(opts.action_timeout, opts.query_interval,
2127
                         delete_stale=opts.delete_stale)
2128
        return 0
2129

    
2130
    # Initialize a kamaki instance, get flavors, images
2131
    astakos_client = AstakosClient(AUTH_URL, TOKEN)
2132
    astakos_client.CONNECTION_RETRY_LIMIT = 2
2133
    # Compute Client
2134
    compute_url = astakos_client.get_service_endpoints('compute')['publicURL']
2135
    compute_client = ComputeClient(compute_url, TOKEN)
2136
    compute_client.CONNECTION_RETRY_LIMIT = 2
2137
    DIMAGES = compute_client.list_images(detail=True)
2138
    DFLAVORS = compute_client.list_flavors(detail=True)
2139

    
2140
    # FIXME: logging, log, LOG PID, TEST_RUN_ID, arguments
2141
    # Run them: FIXME: In parallel, FAILEARLY, catchbreak?
2142
    #unittest.main(verbosity=2, catchbreak=True)
2143

    
2144
    # Get a list of images we are going to test
2145
    if opts.force_imageid == 'all':
2146
        test_images = DIMAGES
2147
    else:
2148
        test_images = filter(lambda x: x["id"] == opts.force_imageid, DIMAGES)
2149

    
2150
    # Create output (logging) folder
2151
    if not os.path.exists(opts.log_folder):
2152
        os.mkdir(opts.log_folder)
2153
    test_folder = os.path.join(opts.log_folder, TEST_RUN_ID)
2154
    os.mkdir(test_folder)
2155

    
2156
    for image in test_images:
2157
        imageid = str(image["id"])
2158
        imagename = image["name"]
2159
        # Choose a flavor (given from user or random)
2160
        if opts.force_flavorid:
2161
            flavorid = opts.force_flavorid
2162
        else:
2163
            flavorid = choice([f["id"] for f in DFLAVORS if f["disk"] >= 20])
2164
        # Personality dictionary for file injection test
2165
        if opts.personality_path is not None:
2166
            f = open(opts.personality_path)
2167
            content = b64encode(f.read())
2168
            personality = []
2169
            st = os.stat(opts.personality_path)
2170
            personality.append({
2171
                'path': '/root/test_inj_file',
2172
                'owner': 'root',
2173
                'group': 'root',
2174
                'mode': 0x7777 & st.st_mode,
2175
                'contents': content})
2176
        else:
2177
            personality = None
2178
        # Give a name to our test servers
2179
        servername = "%s%s for %s" % (SNF_TEST_PREFIX, TEST_RUN_ID, imagename)
2180
        is_windows = imagename.lower().find("windows") >= 0
2181

    
2182
        # Create Server TestCases
2183
        ServerTestCase = _spawn_server_test_case(
2184
            imageid=imageid,
2185
            flavorid=flavorid,
2186
            imagename=imagename,
2187
            personality=personality,
2188
            servername=servername,
2189
            is_windows=is_windows,
2190
            action_timeout=opts.action_timeout,
2191
            build_warning=opts.build_warning,
2192
            build_fail=opts.build_fail,
2193
            query_interval=opts.query_interval)
2194
        # Create Network TestCases
2195
        NetworkTestCase = _spawn_network_test_case(
2196
            action_timeout=opts.action_timeout,
2197
            imageid=imageid,
2198
            flavorid=flavorid,
2199
            imagename=imagename,
2200
            query_interval=opts.query_interval)
2201
        # Create Images TestCase
2202
        CImagesTestCase = _images_test_case(
2203
            action_timeout=opts.action_timeout,
2204
            imageid=imageid,
2205
            flavorid=flavorid,
2206
            imagename=imagename,
2207
            query_interval=opts.query_interval)
2208

    
2209
        # Choose the tests we are going to run
2210
        test_dict = {'auth': UnauthorizedTestCase,
2211
                     'images': CImagesTestCase,
2212
                     'flavors': FlavorsTestCase,
2213
                     'servers': ServersTestCase,
2214
                     'pithos': PithosTestCase,
2215
                     'server_spawn': ServerTestCase,
2216
                     'network_spawn': NetworkTestCase}
2217
        seq_cases = []
2218
        if 'all' in opts.tests:
2219
            seq_cases = [UnauthorizedTestCase, CImagesTestCase,
2220
                         FlavorsTestCase, ServersTestCase,
2221
                         PithosTestCase, ServerTestCase,
2222
                         NetworkTestCase]
2223
        else:
2224
            for test in opts.tests:
2225
                seq_cases.append(test_dict[test])
2226

    
2227
        # Folder for each image
2228
        image_folder = os.path.join(test_folder, imageid)
2229
        os.mkdir(image_folder)
2230

    
2231
        # Run each test
2232
        if opts.fanout > 1:
2233
            _run_cases_in_parallel(seq_cases, opts.fanout, image_folder)
2234
        else:
2235
            _run_cases_in_series(seq_cases, image_folder)
2236

    
2237

    
2238
# --------------------------------------------------------------------
2239
# Call main
2240
if __name__ == "__main__":
2241
    sys.exit(main())