Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / tools / burnin.py @ 567ffb85

History | View | Annotate | Download (43 kB)

1
#!/usr/bin/env python
2

    
3
# Copyright 2011 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35

    
36
"""Perform integration testing on a running Synnefo deployment"""
37

    
38
import __main__
39
import datetime
40
import inspect
41
import logging
42
import os
43
import paramiko
44
import prctl
45
import subprocess
46
import signal
47
import socket
48
import struct
49
import sys
50
import time
51
import hashlib
52
from base64 import b64encode
53
from pwd import getpwuid
54
from grp import getgrgid
55
from IPy import IP
56
from multiprocessing import Process, Queue
57
from random import choice
58

    
59
from kamaki.clients import ClientError, ComputeClient, CycladesClient
60
from kamaki.config import Config
61

    
62
from vncauthproxy.d3des import generate_response as d3des_generate_response
63

    
64
# Use backported unittest functionality if Python < 2.7
65
try:
66
    import unittest2 as unittest
67
except ImportError:
68
    if sys.version_info < (2, 7):
69
        raise Exception("The unittest2 package is required for Python < 2.7")
70
    import unittest
71

    
72

    
73
API = None
74
TOKEN = None
75
DEFAULT_API = "https://cyclades.okeanos.grnet.gr/api/v1.1"
76

    
77
# A unique id identifying this test run
78
TEST_RUN_ID = datetime.datetime.strftime(datetime.datetime.now(),
79
                                         "%Y%m%d%H%M%S")
80
SNF_TEST_PREFIX = "snf-test-"
81

    
82
# Setup logging (FIXME - verigak)
83
logging.basicConfig(format="%(message)s")
84
log = logging.getLogger("burnin")
85
log.setLevel(logging.INFO)
86

    
87
class UnauthorizedTestCase(unittest.TestCase):
88
    def test_unauthorized_access(self):
89
        """Test access without a valid token fails"""
90
        falseToken = '12345'
91
        c=ComputeClient(API, falseToken)
92

    
93
        with self.assertRaises(ClientError) as cm:
94
            c.list_servers()
95
        self.assertEqual(cm.exception.status, 401)
96

    
97

    
98
class ImagesTestCase(unittest.TestCase):
99
    """Test image lists for consistency"""
100
    @classmethod
101
    def setUpClass(cls):
102
        """Initialize kamaki, get (detailed) list of images"""
103
        log.info("Getting simple and detailed list of images")
104

    
105
        cls.client = ComputeClient(API, TOKEN)
106
        cls.images = cls.client.list_images()
107
        cls.dimages = cls.client.list_images(detail=True)
108

    
109
    def test_001_list_images(self):
110
        """Test image list actually returns images"""
111
        self.assertGreater(len(self.images), 0)
112

    
113
    def test_002_list_images_detailed(self):
114
        """Test detailed image list is the same length as list"""
115
        self.assertEqual(len(self.dimages), len(self.images))
116

    
117
    def test_003_same_image_names(self):
118
        """Test detailed and simple image list contain same names"""
119
        names = sorted(map(lambda x: x["name"], self.images))
120
        dnames = sorted(map(lambda x: x["name"], self.dimages))
121
        self.assertEqual(names, dnames)
122

    
123
    def test_004_unique_image_names(self):
124
        """Test images have unique names"""
125
        names = sorted(map(lambda x: x["name"], self.images))
126
        self.assertEqual(sorted(list(set(names))), names)
127

    
128
    def test_005_image_metadata(self):
129
        """Test every image has specific metadata defined"""
130
        keys = frozenset(["os", "description", "size"])
131
        for i in self.dimages:
132
            self.assertTrue(keys.issubset(i["metadata"]["values"].keys()))
133

    
134

    
135
class FlavorsTestCase(unittest.TestCase):
136
    """Test flavor lists for consistency"""
137
    @classmethod
138
    def setUpClass(cls):
139
        """Initialize kamaki, get (detailed) list of flavors"""
140
        log.info("Getting simple and detailed list of flavors")
141

    
142
        cls.client = ComputeClient(API, TOKEN)
143
        cls.flavors = cls.client.list_flavors()
144
        cls.dflavors = cls.client.list_flavors(detail=True)
145

    
146
    def test_001_list_flavors(self):
147
        """Test flavor list actually returns flavors"""
148
        self.assertGreater(len(self.flavors), 0)
149

    
150
    def test_002_list_flavors_detailed(self):
151
        """Test detailed flavor list is the same length as list"""
152
        self.assertEquals(len(self.dflavors), len(self.flavors))
153

    
154
    def test_003_same_flavor_names(self):
155
        """Test detailed and simple flavor list contain same names"""
156
        names = sorted(map(lambda x: x["name"], self.flavors))
157
        dnames = sorted(map(lambda x: x["name"], self.dflavors))
158
        self.assertEqual(names, dnames)
159

    
160
    def test_004_unique_flavor_names(self):
161
        """Test flavors have unique names"""
162
        names = sorted(map(lambda x: x["name"], self.flavors))
163
        self.assertEqual(sorted(list(set(names))), names)
164

    
165
    def test_005_well_formed_flavor_names(self):
166
        """Test flavors have names of the form CxxRyyDzz
167

168
        Where xx is vCPU count, yy is RAM in MiB, zz is Disk in GiB
169

170
        """
171
        for f in self.dflavors:
172
            self.assertEqual("C%dR%dD%d" % (f["cpu"], f["ram"], f["disk"]),
173
                             f["name"],
174
                             "Flavor %s does not match its specs." % f["name"])
175

    
176

    
177
class ServersTestCase(unittest.TestCase):
178
    """Test server lists for consistency"""
179
    @classmethod
180
    def setUpClass(cls):
181
        """Initialize kamaki, get (detailed) list of servers"""
182
        log.info("Getting simple and detailed list of servers")
183

    
184
        cls.client = ComputeClient(API, TOKEN)
185
        cls.servers = cls.client.list_servers()
186
        cls.dservers = cls.client.list_servers(detail=True)
187

    
188
    def test_001_list_servers(self):
189
        """Test server list actually returns servers"""
190
        self.assertGreater(len(self.servers), 0)
191

    
192
    def test_002_list_servers_detailed(self):
193
        """Test detailed server list is the same length as list"""
194
        self.assertEqual(len(self.dservers), len(self.servers))
195

    
196
    def test_003_same_server_names(self):
197
        """Test detailed and simple flavor list contain same names"""
198
        names = sorted(map(lambda x: x["name"], self.servers))
199
        dnames = sorted(map(lambda x: x["name"], self.dservers))
200
        self.assertEqual(names, dnames)
201

    
202

    
203
# This class gets replicated into actual TestCases dynamically
204
class SpawnServerTestCase(unittest.TestCase):
205
    """Test scenario for server of the specified image"""
206

    
207
    @classmethod
208
    def setUpClass(cls):
209
        """Initialize a kamaki instance"""
210
        log.info("Spawning server for image `%s'", cls.imagename)
211

    
212
        cls.client = ComputeClient(API, TOKEN)
213
        cls.cyclades = CycladesClient(API, TOKEN)
214

    
215
    def _get_ipv4(self, server):
216
        """Get the public IPv4 of a server from the detailed server info"""
217

    
218
        public_addrs = filter(lambda x: x["id"] == "public",
219
                              server["addresses"]["values"])
220
        self.assertEqual(len(public_addrs), 1)
221
        ipv4_addrs = filter(lambda x: x["version"] == 4,
222
                            public_addrs[0]["values"])
223
        self.assertEqual(len(ipv4_addrs), 1)
224
        return ipv4_addrs[0]["addr"]
225

    
226
    def _get_ipv6(self, server):
227
        """Get the public IPv6 of a server from the detailed server info"""
228
        public_addrs = filter(lambda x: x["id"] == "public",
229
                              server["addresses"]["values"])
230
        self.assertEqual(len(public_addrs), 1)
231
        ipv6_addrs = filter(lambda x: x["version"] == 6,
232
                            public_addrs[0]["values"])
233
        self.assertEqual(len(ipv6_addrs), 1)
234
        return ipv6_addrs[0]["addr"]
235

    
236
    def _connect_loginname(self, os):
237
        """Return the login name for connections based on the server OS"""
238
        if os in ("Ubuntu", "Kubuntu", "Fedora"):
239
            return "user"
240
        elif os in ("windows", "windows_alpha1"):
241
            return "Administrator"
242
        else:
243
            return "root"
244

    
245
    def _verify_server_status(self, current_status, new_status):
246
        """Verify a server has switched to a specified status"""
247
        server = self.client.get_server_details(self.serverid)
248
        if server["status"] not in (current_status, new_status):
249
            return None  # Do not raise exception, return so the test fails
250
        self.assertEquals(server["status"], new_status)
251

    
252
    def _get_connected_tcp_socket(self, family, host, port):
253
        """Get a connected socket from the specified family to host:port"""
254
        sock = None
255
        for res in \
256
            socket.getaddrinfo(host, port, family, socket.SOCK_STREAM, 0,
257
                               socket.AI_PASSIVE):
258
            af, socktype, proto, canonname, sa = res
259
            try:
260
                sock = socket.socket(af, socktype, proto)
261
            except socket.error as msg:
262
                sock = None
263
                continue
264
            try:
265
                sock.connect(sa)
266
            except socket.error as msg:
267
                sock.close()
268
                sock = None
269
                continue
270
        self.assertIsNotNone(sock)
271
        return sock
272

    
273
    def _ping_once(self, ipv6, ip):
274
        """Test server responds to a single IPv4 or IPv6 ping"""
275
        cmd = "ping%s -c 2 -w 3 %s" % ("6" if ipv6 else "", ip)
276
        ping = subprocess.Popen(cmd, shell=True,
277
                                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
278
        (stdout, stderr) = ping.communicate()
279
        ret = ping.wait()
280
        self.assertEquals(ret, 0)
281

    
282
    def _get_hostname_over_ssh(self, hostip, username, password):
283
        ssh = paramiko.SSHClient()
284
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
285
        try:
286
            ssh.connect(hostip, username=username, password=password)
287
        except socket.error:
288
            raise AssertionError
289
        stdin, stdout, stderr = ssh.exec_command("hostname")
290
        lines = stdout.readlines()
291
        self.assertEqual(len(lines), 1)
292
        return lines[0]
293

    
294
    def _try_until_timeout_expires(self, warn_timeout, fail_timeout,
295
                                   opmsg, callable, *args, **kwargs):
296
        if warn_timeout == fail_timeout:
297
            warn_timeout = fail_timeout + 1
298
        warn_tmout = time.time() + warn_timeout
299
        fail_tmout = time.time() + fail_timeout
300
        while True:
301
            self.assertLess(time.time(), fail_tmout,
302
                            "operation `%s' timed out" % opmsg)
303
            if time.time() > warn_tmout:
304
                log.warning("Server %d: `%s' operation `%s' not done yet",
305
                            self.serverid, self.servername, opmsg)
306
            try:
307
                log.info("%s... " % opmsg)
308
                return callable(*args, **kwargs)
309
            except AssertionError:
310
                pass
311
            time.sleep(self.query_interval)
312

    
313
    def _insist_on_tcp_connection(self, family, host, port):
314
        familystr = {socket.AF_INET: "IPv4", socket.AF_INET6: "IPv6",
315
                     socket.AF_UNSPEC: "Unspecified-IPv4/6"}
316
        msg = "connect over %s to %s:%s" % \
317
              (familystr.get(family, "Unknown"), host, port)
318
        sock = self._try_until_timeout_expires(
319
                self.action_timeout, self.action_timeout,
320
                msg, self._get_connected_tcp_socket,
321
                family, host, port)
322
        return sock
323

    
324
    def _insist_on_status_transition(self, current_status, new_status,
325
                                    fail_timeout, warn_timeout=None):
326
        msg = "Server %d: `%s', waiting for %s -> %s" % \
327
              (self.serverid, self.servername, current_status, new_status)
328
        if warn_timeout is None:
329
            warn_timeout = fail_timeout
330
        self._try_until_timeout_expires(warn_timeout, fail_timeout,
331
                                        msg, self._verify_server_status,
332
                                        current_status, new_status)
333
        # Ensure the status is actually the expected one
334
        server = self.client.get_server_details(self.serverid)
335
        self.assertEquals(server["status"], new_status)
336

    
337
    def _insist_on_ssh_hostname(self, hostip, username, password):
338
        msg = "SSH to %s, as %s/%s" % (hostip, username, password)
339
        hostname = self._try_until_timeout_expires(
340
                self.action_timeout, self.action_timeout,
341
                msg, self._get_hostname_over_ssh,
342
                hostip, username, password)
343

    
344
        # The hostname must be of the form 'prefix-id'
345
        self.assertTrue(hostname.endswith("-%d\n" % self.serverid))
346

    
347
    def _check_file_through_ssh(self, hostip, username, password, remotepath, content):
348
        msg = "Trying file injection through SSH to %s, as %s/%s" % (hostip, username, password)
349
        log.info(msg)
350
        try:
351
            ssh = paramiko.SSHClient()
352
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
353
            ssh.connect(hostip, username=username, password=password)
354
        except socket.error:
355
            raise AssertionError
356
        
357
        transport = paramiko.Transport((hostip,22))
358
        transport.connect(username = username, password = password)
359

    
360
        localpath = '/tmp/'+SNF_TEST_PREFIX+'injection'
361
        sftp = paramiko.SFTPClient.from_transport(transport)
362
        sftp.get(remotepath, localpath)
363
        
364
        sftp.close()
365
        transport.close()
366

    
367
        f = open(localpath)
368
        remote_content = b64encode(f.read())
369

    
370
        # Check if files are the same
371
        return (remote_content == content)
372

    
373
    def _skipIf(self, condition, msg):
374
        if condition:
375
            self.skipTest(msg)
376

    
377
    def test_001_submit_create_server(self):
378
        """Test submit create server request"""
379
        server = self.client.create_server(self.servername, self.flavorid,
380
                                           self.imageid, self.personality)
381

    
382
        self.assertEqual(server["name"], self.servername)
383
        self.assertEqual(server["flavorRef"], self.flavorid)
384
        self.assertEqual(server["imageRef"], self.imageid)
385
        self.assertEqual(server["status"], "BUILD")
386

    
387
        # Update class attributes to reflect data on building server
388
        cls = type(self)
389
        cls.serverid = server["id"]
390
        cls.username = None
391
        cls.passwd = server["adminPass"]
392

    
393
    def test_002a_server_is_building_in_list(self):
394
        """Test server is in BUILD state, in server list"""
395
        servers = self.client.list_servers(detail=True)
396
        servers = filter(lambda x: x["name"] == self.servername, servers)
397
        self.assertEqual(len(servers), 1)
398
        server = servers[0]
399
        self.assertEqual(server["name"], self.servername)
400
        self.assertEqual(server["flavorRef"], self.flavorid)
401
        self.assertEqual(server["imageRef"], self.imageid)
402
        self.assertEqual(server["status"], "BUILD")
403

    
404
    def test_002b_server_is_building_in_details(self):
405
        """Test server is in BUILD state, in details"""
406
        server = self.client.get_server_details(self.serverid)
407
        self.assertEqual(server["name"], self.servername)
408
        self.assertEqual(server["flavorRef"], self.flavorid)
409
        self.assertEqual(server["imageRef"], self.imageid)
410
        self.assertEqual(server["status"], "BUILD")
411

    
412
    def test_002c_set_server_metadata(self):
413
        image = self.client.get_image_details(self.imageid)
414
        os = image["metadata"]["values"]["os"]
415
        loginname = image["metadata"]["values"].get("users", None)
416
        self.client.update_server_metadata(self.serverid, OS=os)
417

    
418
        # Determine the username to use for future connections
419
        # to this host
420
        cls = type(self)
421
        cls.username = loginname
422
        if not cls.username:
423
            cls.username = self._connect_loginname(os)
424
        self.assertIsNotNone(cls.username)
425

    
426
    def test_002d_verify_server_metadata(self):
427
        """Test server metadata keys are set based on image metadata"""
428
        servermeta = self.client.get_server_metadata(self.serverid)
429
        imagemeta = self.client.get_image_metadata(self.imageid)
430
        self.assertEqual(servermeta["OS"], imagemeta["os"])
431

    
432
    def test_003_server_becomes_active(self):
433
        """Test server becomes ACTIVE"""
434
        self._insist_on_status_transition("BUILD", "ACTIVE",
435
                                         self.build_fail, self.build_warning)
436

    
437
    def test_003a_get_server_oob_console(self):
438
        """Test getting OOB server console over VNC
439

440
        Implementation of RFB protocol follows
441
        http://www.realvnc.com/docs/rfbproto.pdf.
442

443
        """
444
        
445
        console = self.cyclades.get_server_console(self.serverid)
446
        self.assertEquals(console['type'], "vnc")
447
        sock = self._insist_on_tcp_connection(socket.AF_UNSPEC,
448
                                        console["host"], console["port"])
449

    
450
        # Step 1. ProtocolVersion message (par. 6.1.1)
451
        version = sock.recv(1024)
452
        self.assertEquals(version, 'RFB 003.008\n')
453
        sock.send(version)
454

    
455
        # Step 2. Security (par 6.1.2): Only VNC Authentication supported
456
        sec = sock.recv(1024)
457
        self.assertEquals(list(sec), ['\x01', '\x02'])
458

    
459
        # Step 3. Request VNC Authentication (par 6.1.2)
460
        sock.send('\x02')
461

    
462
        # Step 4. Receive Challenge (par 6.2.2)
463
        challenge = sock.recv(1024)
464
        self.assertEquals(len(challenge), 16)
465

    
466
        # Step 5. DES-Encrypt challenge, use password as key (par 6.2.2)
467
        response = d3des_generate_response(
468
            (console["password"] + '\0' * 8)[:8], challenge)
469
        sock.send(response)
470

    
471
        # Step 6. SecurityResult (par 6.1.3)
472
        result = sock.recv(4)
473
        self.assertEquals(list(result), ['\x00', '\x00', '\x00', '\x00'])
474
        sock.close()
475
        
476
    def test_004_server_has_ipv4(self):
477
        """Test active server has a valid IPv4 address"""
478
        server = self.client.get_server_details(self.serverid)
479
        ipv4 = self._get_ipv4(server)
480
        self.assertEquals(IP(ipv4).version(), 4)
481

    
482
    def test_005_server_has_ipv6(self):
483
        """Test active server has a valid IPv6 address"""
484
        server = self.client.get_server_details(self.serverid)
485
        ipv6 = self._get_ipv6(server)
486
        self.assertEquals(IP(ipv6).version(), 6)
487

    
488
    def test_006_server_responds_to_ping_IPv4(self):
489
        """Test server responds to ping on IPv4 address"""
490
        server = self.client.get_server_details(self.serverid)
491
        ip = self._get_ipv4(server)
492
        self._try_until_timeout_expires(self.action_timeout,
493
                                        self.action_timeout,
494
                                        "PING IPv4 to %s" % ip,
495
                                        self._ping_once,
496
                                        False, ip)
497

    
498
    def test_007_server_responds_to_ping_IPv6(self):
499
        """Test server responds to ping on IPv6 address"""
500
        server = self.client.get_server_details(self.serverid)
501
        ip = self._get_ipv6(server)
502
        self._try_until_timeout_expires(self.action_timeout,
503
                                        self.action_timeout,
504
                                        "PING IPv6 to %s" % ip,
505
                                        self._ping_once,
506
                                        True, ip)
507

    
508
    def test_008_submit_shutdown_request(self):
509
        """Test submit request to shutdown server"""
510
        self.cyclades.shutdown_server(self.serverid)
511

    
512
    def test_009_server_becomes_stopped(self):
513
        """Test server becomes STOPPED"""
514
        self._insist_on_status_transition("ACTIVE", "STOPPED",
515
                                         self.action_timeout,
516
                                         self.action_timeout)
517

    
518
    def test_010_submit_start_request(self):
519
        """Test submit start server request"""
520
        self.cyclades.start_server(self.serverid)
521

    
522
    def test_011_server_becomes_active(self):
523
        """Test server becomes ACTIVE again"""
524
        self._insist_on_status_transition("STOPPED", "ACTIVE",
525
                                         self.action_timeout,
526
                                         self.action_timeout)
527

    
528
    def test_011a_server_responds_to_ping_IPv4(self):
529
        """Test server OS is actually up and running again"""
530
        self.test_006_server_responds_to_ping_IPv4()
531

    
532
    def test_012_ssh_to_server_IPv4(self):
533
        """Test SSH to server public IPv4 works, verify hostname"""
534
        self._skipIf(self.is_windows, "only valid for Linux servers")
535
        server = self.client.get_server_details(self.serverid)
536
        self._insist_on_ssh_hostname(self._get_ipv4(server),
537
                                     self.username, self.passwd)
538

    
539
    def test_013_ssh_to_server_IPv6(self):
540
        """Test SSH to server public IPv6 works, verify hostname"""
541
        self._skipIf(self.is_windows, "only valid for Linux servers")
542
        server = self.client.get_server_details(self.serverid)
543
        self._insist_on_ssh_hostname(self._get_ipv6(server),
544
                                     self.username, self.passwd)
545

    
546
    def test_014_rdp_to_server_IPv4(self):
547
        "Test RDP connection to server public IPv4 works"""
548
        self._skipIf(not self.is_windows, "only valid for Windows servers")
549
        server = self.client.get_server_details(self.serverid)
550
        ipv4 = self._get_ipv4(server)
551
        sock = _insist_on_tcp_connection(socket.AF_INET, ipv4, 3389)
552

    
553
        # No actual RDP processing done. We assume the RDP server is there
554
        # if the connection to the RDP port is successful.
555
        # FIXME: Use rdesktop, analyze exit code? see manpage [costasd]
556
        sock.close()
557

    
558
    def test_015_rdp_to_server_IPv6(self):
559
        "Test RDP connection to server public IPv6 works"""
560
        self._skipIf(not self.is_windows, "only valid for Windows servers")
561
        server = self.client.get_server_details(self.serverid)
562
        ipv6 = self._get_ipv6(server)
563
        sock = _get_tcp_connection(socket.AF_INET6, ipv6, 3389)
564

    
565
        # No actual RDP processing done. We assume the RDP server is there
566
        # if the connection to the RDP port is successful.
567
        sock.close()
568

    
569
    def test_016_personality_is_enforced(self):
570
        """Test file injection for personality enforcement"""
571
        self._skipIf(self.is_windows, "only implemented for Linux servers")
572
        self._skipIf(self.personality == None, "No personality file selected")
573

    
574
        server = self.client.get_server_details(self.serverid)
575

    
576
        for inj_file in self.personality:
577
            equal_files = self._check_file_through_ssh(self._get_ipv4(server), inj_file['owner'], 
578
                                                       self.passwd, inj_file['path'], inj_file['contents'])
579
            self.assertTrue(equal_files)
580
        
581

    
582
    def test_017_submit_delete_request(self):
583
        """Test submit request to delete server"""
584
        self.client.delete_server(self.serverid)
585

    
586
    def test_018_server_becomes_deleted(self):
587
        """Test server becomes DELETED"""
588
        self._insist_on_status_transition("ACTIVE", "DELETED",
589
                                         self.action_timeout,
590
                                         self.action_timeout)
591

    
592
    def test_019_server_no_longer_in_server_list(self):
593
        """Test server is no longer in server list"""
594
        servers = self.client.list_servers()
595
        self.assertNotIn(self.serverid, [s["id"] for s in servers])
596

    
597

    
598
class NetworkTestCase(unittest.TestCase):
599
    """ Testing networking in cyclades """
600
  
601
    @classmethod
602
    def setUpClass(cls):
603
        "Initialize kamaki, get list of current networks"
604

    
605
        cls.client = CycladesClient(API, TOKEN)
606
        cls.compute = ComputeClient(API, TOKEN)
607

    
608
        images = cls.compute.list_images(detail = True)
609
        flavors = cls.compute.list_flavors(detail = True)
610
        imageid = choice([im['id'] for im in images])
611
        flavorid = choice([f["id"] for f in flavors if f["disk"] >= 20])
612

    
613
        for image in images:
614
            if image['id'] == imageid:
615
                imagename = image['name']
616

    
617
        servername = "%s%s for %s" % (SNF_TEST_PREFIX, TEST_RUN_ID, imagename)
618
        is_windows = imagename.lower().find("windows") >= 0
619

    
620
        #Run testcases for server spawning in order to ensure it is done right
621
        setupCase =  _spawn_server_test_case(imageid=str(imageid), flavorid=flavorid,
622
                                             imagename=imagename,
623
                                             personality=None,
624
                                             servername=servername,
625
                                             is_windows=is_windows,
626
                                             action_timeout=200,
627
                                             build_warning=1200,
628
                                             build_fail=500,
629
                                             query_interval=3)
630

    
631
        #Using already implemented tests for server list population
632
        suite = unittest.TestSuite()
633
        suite.addTest(setupCase('test_001_submit_create_server'))
634
        suite.addTest(setupCase('test_002a_server_is_building_in_list'))
635
        suite.addTest(setupCase('test_002b_server_is_building_in_details'))        
636
        suite.addTest(setupCase('test_003_server_becomes_active'))
637
        unittest.TextTestRunner(verbosity=2).run(suite)
638
        unittest.TextTestRunner(verbosity=2).run(suite)
639

    
640

    
641
    def test_001_create_network(self):
642
        """Test submit create network request"""
643
        name = SNF_TEST_PREFIX+TEST_RUN_ID
644
        previous_num = len(self.client.list_networks())
645
        network =  self.client.create_network(name)        
646
       
647
        #Test if right name is assigned
648
        self.assertEqual(network['name'], name)
649
        
650
        # Update class attributes
651
        cls = type(self)
652
        cls.networkid = network['id']
653
        networks = self.client.list_networks()
654

    
655
        #Test if new network is created
656
        self.assertTrue(len(networks) > previous_num)
657
        
658
    
659
    def test_002_connect_to_network(self):
660
        """Test connect VM to network"""
661
        servers = self.compute.list_servers()
662

    
663
        #Pick a server created only for the test
664
        server = choice([s for s in servers if s['name'].startswith(SNF_TEST_PREFIX)])
665
        self.client.connect_server(server['id'], self.networkid)
666
        
667
        #Update class attributes
668
        cls = type(self)
669
        cls.serverid = server['id']
670

    
671
        #Insist on connecting until action timeout
672
        fail_tmout = time.time()+self.action_timeout
673

    
674
        while True:
675
            connected = (self.client.get_network_details(self.networkid))
676
            connections = connected['servers']['values']
677
            if (self.serverid in connections):
678
                conn_exists = True
679
                break
680
            elif time.time() > fail_tmout:
681
                self.assertLess(time.time(), fail_tmout)
682
            else:
683
                time.sleep(self.query_interval)
684

    
685
        self.assertTrue(conn_exists)
686
            
687

    
688
    def test_003_disconnect_from_network(self):
689
        prev_state = self.client.get_network_details(self.networkid)
690
        prev_conn = len(prev_state['servers']['values'])
691

    
692
        self.client.disconnect_server(self.serverid, self.networkid)
693
        time.sleep(15)
694

    
695
        #Insist on deleting until action timeout
696
        fail_tmout = time.time()+self.action_timeout
697

    
698
        while True:
699
            connected = (self.client.get_network_details(self.networkid))
700
            connections = connected['servers']['values']
701
            if (self.serverid not in connections):
702
                conn_exists = False
703
                break
704
            elif time.time() > fail_tmout:
705
                self.assertLess(time.time(), fail_tmout)
706
            else:
707
                time.sleep(self.query_interval)
708

    
709
        self.assertFalse(conn_exists)
710

    
711
    def test_004_destroy_network(self):
712
        """Test submit delete network request"""
713
        self.client.delete_network(self.networkid)        
714
        networks = self.client.list_networks()
715

    
716
        curr_net = []
717
        for net in networks:
718
            curr_net.append(net['id'])
719

    
720
        self.assertTrue(self.networkid not in curr_net)
721
        
722
    def test_005_cleanup_servers(self):
723
        """Cleanup servers created for this test"""
724
        self.compute.delete_server(self.serverid)
725
        fail_tmout = time.time()+self.action_timeout
726

    
727
        #Ensure server gets deleted
728
        while True:
729
            details = self.compute.get_server_details(self.serverid)
730
            status = details['status']
731
            if status == 'DELETED':
732
                deleted = True
733
                break
734
            elif time.time() > fail_tmout: 
735
                self.assertLess(time.time(), fail_tmout)
736
            else:
737
                time.sleep(self.query_interval)
738

    
739
        self.assertTrue(deleted)
740

    
741
class TestRunnerProcess(Process):
742
    """A distinct process used to execute part of the tests in parallel"""
743
    def __init__(self, **kw):
744
        Process.__init__(self, **kw)
745
        kwargs = kw["kwargs"]
746
        self.testq = kwargs["testq"]
747
        self.runner = kwargs["runner"]
748

    
749
    def run(self):
750
        # Make sure this test runner process dies with the parent
751
        # and is not left behind.
752
        #
753
        # WARNING: This uses the prctl(2) call and is
754
        # Linux-specific.
755
        prctl.set_pdeathsig(signal.SIGHUP)
756

    
757
        while True:
758
            log.debug("I am process %d, GETting from queue is %s",
759
                     os.getpid(), self.testq)
760
            msg = self.testq.get()
761
            log.debug("Dequeued msg: %s", msg)
762

    
763
            if msg == "TEST_RUNNER_TERMINATE":
764
                raise SystemExit
765
            elif issubclass(msg, unittest.TestCase):
766
                # Assemble a TestSuite, and run it
767
                suite = unittest.TestLoader().loadTestsFromTestCase(msg)
768
                self.runner.run(suite)
769
            else:
770
                raise Exception("Cannot handle msg: %s" % msg)
771

    
772

    
773

    
774
def _run_cases_in_parallel(cases, fanout=1, runner=None):
775
    """Run instances of TestCase in parallel, in a number of distinct processes
776

777
    The cases iterable specifies the TestCases to be executed in parallel,
778
    by test runners running in distinct processes.
779
    The fanout parameter specifies the number of processes to spawn,
780
    and defaults to 1.
781
    The runner argument specifies the test runner class to use inside each
782
    runner process.
783

784
    """
785
    if runner is None:
786
        runner = unittest.TextTestRunner(verbosity=2, failfast=True)
787

    
788
    # testq: The master process enqueues TestCase objects into this queue,
789
    #        test runner processes pick them up for execution, in parallel.
790
    testq = Queue()
791
    runners = []
792
    for i in xrange(0, fanout):
793
        kwargs = dict(testq=testq, runner=runner)
794
        runners.append(TestRunnerProcess(kwargs=kwargs))
795

    
796
    log.info("Spawning %d test runner processes", len(runners))
797
    for p in runners:
798
        p.start()
799
    log.debug("Spawned %d test runners, PIDs are %s",
800
              len(runners), [p.pid for p in runners])
801

    
802
    # Enqueue test cases
803
    map(testq.put, cases)
804
    map(testq.put, ["TEST_RUNNER_TERMINATE"] * len(runners))
805

    
806
    log.debug("Joining %d processes", len(runners))
807
    for p in runners:
808
        p.join()
809
    log.debug("Done joining %d processes", len(runners))
810

    
811

    
812
def _spawn_server_test_case(**kwargs):
813
    """Construct a new unit test case class from SpawnServerTestCase"""
814

    
815
    name = "SpawnServerTestCase_%s" % kwargs["imageid"]
816
    cls = type(name, (SpawnServerTestCase,), kwargs)
817

    
818
    # Patch extra parameters into test names by manipulating method docstrings
819
    for (mname, m) in \
820
        inspect.getmembers(cls, lambda x: inspect.ismethod(x)):
821
            if hasattr(m, __doc__):
822
                m.__func__.__doc__ = "[%s] %s" % (imagename, m.__doc__)
823

    
824
    # Make sure the class can be pickled, by listing it among
825
    # the attributes of __main__. A PicklingError is raised otherwise.
826
    setattr(__main__, name, cls)
827
    return cls 
828

    
829
def _spawn_network_test_case(**kwargs):
830
    """Construct a new unit test case class from NetworkTestCase"""
831

    
832
    name = "NetworkTestCase"+TEST_RUN_ID
833
    cls = type(name, (NetworkTestCase,), kwargs)
834

    
835
    # Make sure the class can be pickled, by listing it among
836
    # the attributes of __main__. A PicklingError is raised otherwise.
837
    setattr(__main__, name, cls)
838
    return cls 
839

    
840

    
841
def cleanup_servers(delete_stale=False):
842

    
843
    c = ComputeClient(API, TOKEN)
844

    
845
    servers = c.list_servers()
846
    stale = [s for s in servers if s["name"].startswith(SNF_TEST_PREFIX)]
847

    
848
    if len(stale) == 0:
849
        return
850

    
851
    print >> sys.stderr, "Found these stale servers from previous runs:"
852
    print "    " + \
853
          "\n    ".join(["%d: %s" % (s["id"], s["name"]) for s in stale])
854

    
855
    if delete_stale:
856
        print >> sys.stderr, "Deleting %d stale servers:" % len(stale)
857
        for server in stale:
858
            c.delete_server(server["id"])
859
        print >> sys.stderr, "    ...done"
860
    else:
861
        print >> sys.stderr, "Use --delete-stale to delete them."
862

    
863

    
864
def parse_arguments(args):
865
    from optparse import OptionParser
866

    
867
    kw = {}
868
    kw["usage"] = "%prog [options]"
869
    kw["description"] = \
870
        "%prog runs a number of test scenarios on a " \
871
        "Synnefo deployment."
872

    
873
    parser = OptionParser(**kw)
874
    parser.disable_interspersed_args()
875
    parser.add_option("--api",
876
                      action="store", type="string", dest="api",
877
                      help="The API URI to use to reach the Synnefo API",
878
                      default=DEFAULT_API)
879
    parser.add_option("--token",
880
                      action="store", type="string", dest="token",
881
                      help="The token to use for authentication to the API")
882
    parser.add_option("--nofailfast",
883
                      action="store_true", dest="nofailfast",
884
                      help="Do not fail immediately if one of the tests " \
885
                           "fails (EXPERIMENTAL)",
886
                      default=False)
887
    parser.add_option("--action-timeout",
888
                      action="store", type="int", dest="action_timeout",
889
                      metavar="TIMEOUT",
890
                      help="Wait SECONDS seconds for a server action to " \
891
                           "complete, then the test is considered failed",
892
                      default=100)
893
    parser.add_option("--build-warning",
894
                      action="store", type="int", dest="build_warning",
895
                      metavar="TIMEOUT",
896
                      help="Warn if TIMEOUT seconds have passed and a " \
897
                           "build operation is still pending",
898
                      default=600)
899
    parser.add_option("--build-fail",
900
                      action="store", type="int", dest="build_fail",
901
                      metavar="BUILD_TIMEOUT",
902
                      help="Fail the test if TIMEOUT seconds have passed " \
903
                           "and a build operation is still incomplete",
904
                      default=900)
905
    parser.add_option("--query-interval",
906
                      action="store", type="int", dest="query_interval",
907
                      metavar="INTERVAL",
908
                      help="Query server status when requests are pending " \
909
                           "every INTERVAL seconds",
910
                      default=3)
911
    parser.add_option("--fanout",
912
                      action="store", type="int", dest="fanout",
913
                      metavar="COUNT",
914
                      help="Spawn up to COUNT child processes to execute " \
915
                           "in parallel, essentially have up to COUNT " \
916
                           "server build requests outstanding (EXPERIMENTAL)",
917
                      default=1)
918
    parser.add_option("--force-flavor",
919
                      action="store", type="int", dest="force_flavorid",
920
                      metavar="FLAVOR ID",
921
                      help="Force all server creations to use the specified "\
922
                           "FLAVOR ID instead of a randomly chosen one, " \
923
                           "useful if disk space is scarce",
924
                      default=None)
925
    parser.add_option("--image-id",
926
                      action="store", type="string", dest="force_imageid",
927
                      metavar="IMAGE ID",
928
                      help="Test the specified image id, use 'all' to test " \
929
                           "all available images (mandatory argument)",
930
                      default=None)
931
    parser.add_option("--show-stale",
932
                      action="store_true", dest="show_stale",
933
                      help="Show stale servers from previous runs, whose "\
934
                           "name starts with `%s'" % SNF_TEST_PREFIX,
935
                      default=False)
936
    parser.add_option("--delete-stale",
937
                      action="store_true", dest="delete_stale",
938
                      help="Delete stale servers from previous runs, whose "\
939
                           "name starts with `%s'" % SNF_TEST_PREFIX,
940
                      default=False)
941
    parser.add_option("--force-personality",
942
                      action="store", type="string", dest="personality_path",
943
                      help="Force a personality file injection. File path required. ",
944
                      default=None)
945
    
946

    
947
    # FIXME: Change the default for build-fanout to 10
948
    # FIXME: Allow the user to specify a specific set of Images to test
949

    
950
    (opts, args) = parser.parse_args(args)
951

    
952
    # Verify arguments
953
    if opts.delete_stale:
954
        opts.show_stale = True
955

    
956
    if not opts.show_stale:
957
        if not opts.force_imageid:
958
            print >>sys.stderr, "The --image-id argument is mandatory."
959
            parser.print_help()
960
            sys.exit(1)
961

    
962
        if opts.force_imageid != 'all':
963
            try:
964
                opts.force_imageid = str(opts.force_imageid)
965
            except ValueError:
966
                print >>sys.stderr, "Invalid value specified for --image-id." \
967
                                    "Use a valid id, or `all'."
968
                sys.exit(1)
969

    
970
    return (opts, args)
971

    
972

    
973
def main():
974
    """Assemble test cases into a test suite, and run it
975

976
    IMPORTANT: Tests have dependencies and have to be run in the specified
977
    order inside a single test case. They communicate through attributes of the
978
    corresponding TestCase class (shared fixtures). Distinct subclasses of
979
    TestCase MAY SHARE NO DATA, since they are run in parallel, in distinct
980
    test runner processes.
981

982
    """
983
    (opts, args) = parse_arguments(sys.argv[1:])
984

    
985
    global API, TOKEN
986
    API = opts.api
987
    TOKEN = opts.token
988

    
989
    # Cleanup stale servers from previous runs
990
    if opts.show_stale:
991
        cleanup_servers(delete_stale=opts.delete_stale)
992
        return 0
993

    
994
    # Initialize a kamaki instance, get flavors, images
995

    
996
    c = ComputeClient(API, TOKEn)
997

    
998
    DIMAGES = c.list_images(detail=True)
999
    DFLAVORS = c.list_flavors(detail=True)
1000

    
1001
    # FIXME: logging, log, LOG PID, TEST_RUN_ID, arguments
1002
    # Run them: FIXME: In parallel, FAILEARLY, catchbreak?
1003
    #unittest.main(verbosity=2, catchbreak=True)
1004

    
1005
    if opts.force_imageid == 'all':
1006
        test_images = DIMAGES
1007
    else:
1008
        test_images = filter(lambda x: x["id"] == opts.force_imageid, DIMAGES)
1009

    
1010
    for image in test_images:
1011
        imageid = str(image["id"])
1012
        flavorid = choice([f["id"] for f in DFLAVORS if f["disk"] >= 20])
1013
        imagename = image["name"]
1014
        
1015
        #Personality dictionary for file injection test
1016
        if opts.personality_path != None:
1017
            f = open(opts.personality_path)
1018
            content = b64encode(f.read())
1019
            personality = []
1020
            st = os.stat(opts.personality_path)
1021
            personality.append({
1022
                    'path': '/root/test_inj_file',
1023
                    'owner': 'root',
1024
                    'group': 'root',
1025
                    'mode': 0x7777 & st.st_mode,
1026
                    'contents': content
1027
                    })
1028
        else:
1029
            personality = None
1030

    
1031
        servername = "%s%s for %s" % (SNF_TEST_PREFIX, TEST_RUN_ID, imagename)
1032
        is_windows = imagename.lower().find("windows") >= 0
1033
        
1034
    ServerTestCase = _spawn_server_test_case(imageid=imageid, flavorid=flavorid,
1035
                                             imagename=imagename,
1036
                                             personality=personality,
1037
                                             servername=servername,
1038
                                             is_windows=is_windows,
1039
                                             action_timeout=opts.action_timeout,
1040
                                             build_warning=opts.build_warning,
1041
                                             build_fail=opts.build_fail,
1042
                                             query_interval=opts.query_interval,
1043
                                             )
1044

    
1045

    
1046
    #Running all the testcases sequentially
1047
    
1048
    #To run all cases
1049
    #seq_cases = [UnauthorizedTestCase, FlavorsTestCase, ImagesTestCase, ServerTestCase, NetworkTestCase]
1050
    
1051
    newNetworkTestCase = _spawn_network_test_case(action_timeout = opts.action_timeout,query_interval = opts.query_interval)    
1052
    seq_cases = [newNetworkTestCase]
1053

    
1054
    for case in seq_cases:
1055
        suite = unittest.TestLoader().loadTestsFromTestCase(case)
1056
        unittest.TextTestRunner(verbosity=2).run(suite)
1057
        
1058
    
1059

    
1060
    # # The Following cases run sequentially
1061
    # seq_cases = [UnauthorizedTestCase, FlavorsTestCase, ImagesTestCase]
1062
    # _run_cases_in_parallel(seq_cases, fanout=3, runner=runner)
1063

    
1064
    # # The following cases run in parallel
1065
    # par_cases = []
1066

    
1067
    # if opts.force_imageid == 'all':
1068
    #     test_images = DIMAGES
1069
    # else:
1070
    #     test_images = filter(lambda x: x["id"] == opts.force_imageid, DIMAGES)
1071

    
1072
    # for image in test_images:
1073
    #     imageid = image["id"]
1074
    #     imagename = image["name"]
1075
    #     if opts.force_flavorid:
1076
    #         flavorid = opts.force_flavorid
1077
    #     else:
1078
    #         flavorid = choice([f["id"] for f in DFLAVORS if f["disk"] >= 20])
1079
    #     personality = None   # FIXME
1080
    #     servername = "%s%s for %s" % (SNF_TEST_PREFIX, TEST_RUN_ID, imagename)
1081
    #     is_windows = imagename.lower().find("windows") >= 0
1082
    #     case = _spawn_server_test_case(imageid=str(imageid), flavorid=flavorid,
1083
    #                                    imagename=imagename,
1084
    #                                    personality=personality,
1085
    #                                    servername=servername,
1086
    #                                    is_windows=is_windows,
1087
    #                                    action_timeout=opts.action_timeout,
1088
    #                                    build_warning=opts.build_warning,
1089
    #                                    build_fail=opts.build_fail,
1090
    #                                    query_interval=opts.query_interval)
1091
    #     par_cases.append(case)
1092

    
1093
    # _run_cases_in_parallel(par_cases, fanout=opts.fanout, runner=runner)
1094

    
1095
if __name__ == "__main__":
1096
    sys.exit(main())