Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / db / pools / tests.py @ 698306b8

History | View | Annotate | Download (4.4 kB)

1 2dbe7563 Christos Stavrakakis
import sys
2 2dbe7563 Christos Stavrakakis
from synnefo.db.pools import (PoolManager, EmptyPool, BridgePool,
3 2dbe7563 Christos Stavrakakis
                              MacPrefixPool, IPPool)
4 2dbe7563 Christos Stavrakakis
5 2dbe7563 Christos Stavrakakis
# Use backported unittest functionality if Python < 2.7
6 2dbe7563 Christos Stavrakakis
try:
7 2dbe7563 Christos Stavrakakis
    import unittest2 as unittest
8 2dbe7563 Christos Stavrakakis
except ImportError:
9 2dbe7563 Christos Stavrakakis
    if sys.version_info < (2, 7):
10 2dbe7563 Christos Stavrakakis
        raise Exception("The unittest2 package is required for Python < 2.7")
11 2dbe7563 Christos Stavrakakis
    import unittest
12 2dbe7563 Christos Stavrakakis
13 2dbe7563 Christos Stavrakakis
14 2dbe7563 Christos Stavrakakis
class DummyObject():
15 2dbe7563 Christos Stavrakakis
    def __init__(self, size):
16 2dbe7563 Christos Stavrakakis
        self.size = size
17 2dbe7563 Christos Stavrakakis
18 2dbe7563 Christos Stavrakakis
        self.available_map = ''
19 2dbe7563 Christos Stavrakakis
        self.reserved_map = ''
20 2dbe7563 Christos Stavrakakis
21 2dbe7563 Christos Stavrakakis
    def save(self):
22 2dbe7563 Christos Stavrakakis
        pass
23 2dbe7563 Christos Stavrakakis
24 2dbe7563 Christos Stavrakakis
25 2dbe7563 Christos Stavrakakis
class DummyPool(PoolManager):
26 2dbe7563 Christos Stavrakakis
    def value_to_index(self, index):
27 2dbe7563 Christos Stavrakakis
        return index
28 2dbe7563 Christos Stavrakakis
29 2dbe7563 Christos Stavrakakis
    def index_to_value(self, value):
30 2dbe7563 Christos Stavrakakis
        return value
31 2dbe7563 Christos Stavrakakis
32 2dbe7563 Christos Stavrakakis
33 2dbe7563 Christos Stavrakakis
class PoolManagerTestCase(unittest.TestCase):
34 2dbe7563 Christos Stavrakakis
    def test_created_pool(self):
35 2dbe7563 Christos Stavrakakis
        obj = DummyObject(42)
36 2dbe7563 Christos Stavrakakis
        pool = DummyPool(obj)
37 2dbe7563 Christos Stavrakakis
        self.assertEqual(pool.to_01(), '1' * 42 + '0' * 6)
38 2dbe7563 Christos Stavrakakis
        self.assertEqual(pool.to_map(), '.' * 42 + 'X' * 6)
39 2dbe7563 Christos Stavrakakis
40 2dbe7563 Christos Stavrakakis
    def test_save_pool(self):
41 2dbe7563 Christos Stavrakakis
        obj = DummyObject(42)
42 2dbe7563 Christos Stavrakakis
        pool = DummyPool(obj)
43 2dbe7563 Christos Stavrakakis
        pool.save()
44 2dbe7563 Christos Stavrakakis
        self.assertNotEqual(obj.available_map, '')
45 2dbe7563 Christos Stavrakakis
        available_map = obj.available_map
46 2dbe7563 Christos Stavrakakis
        b = DummyPool(obj)
47 2dbe7563 Christos Stavrakakis
        b.save()
48 2dbe7563 Christos Stavrakakis
        self.assertEqual(obj.available_map, available_map)
49 2dbe7563 Christos Stavrakakis
50 2dbe7563 Christos Stavrakakis
    def test_empty_pool(self):
51 2dbe7563 Christos Stavrakakis
        obj = DummyObject(42)
52 2dbe7563 Christos Stavrakakis
        pool = DummyPool(obj)
53 2dbe7563 Christos Stavrakakis
        self.assertEqual(pool.empty(), False)
54 2dbe7563 Christos Stavrakakis
        for i in range(0, 42):
55 2dbe7563 Christos Stavrakakis
            self.assertEqual(pool.get(), i)
56 2dbe7563 Christos Stavrakakis
        self.assertEqual(pool.empty(), True)
57 2dbe7563 Christos Stavrakakis
        self.assertRaises(EmptyPool, pool.get)
58 2dbe7563 Christos Stavrakakis
59 2dbe7563 Christos Stavrakakis
    def test_reserved_value(self):
60 2dbe7563 Christos Stavrakakis
        obj = DummyObject(42)
61 2dbe7563 Christos Stavrakakis
        pool = DummyPool(obj)
62 2dbe7563 Christos Stavrakakis
        available = pool.count_available()
63 2dbe7563 Christos Stavrakakis
        value = pool.get()
64 2dbe7563 Christos Stavrakakis
        self.assertEqual(pool.is_available(value), False)
65 2dbe7563 Christos Stavrakakis
        self.assertEqual(pool.count_available(), available - 1)
66 2dbe7563 Christos Stavrakakis
        pool.put(value)
67 2dbe7563 Christos Stavrakakis
        self.assertEqual(pool.is_available(value), True)
68 2dbe7563 Christos Stavrakakis
        self.assertEqual(pool.count_available(), available)
69 2dbe7563 Christos Stavrakakis
70 2dbe7563 Christos Stavrakakis
    def test_external_reserved(self):
71 2dbe7563 Christos Stavrakakis
        obj = DummyObject(42)
72 2dbe7563 Christos Stavrakakis
        pool = DummyPool(obj)
73 2dbe7563 Christos Stavrakakis
        for i in range(42, 48):
74 2dbe7563 Christos Stavrakakis
            self.assertEqual(pool.is_available(i), False)
75 2dbe7563 Christos Stavrakakis
        pool.reserve(32, external=True)
76 2dbe7563 Christos Stavrakakis
        values = []
77 2dbe7563 Christos Stavrakakis
        while True:
78 2dbe7563 Christos Stavrakakis
            try:
79 2dbe7563 Christos Stavrakakis
                values.append(pool.get())
80 2dbe7563 Christos Stavrakakis
            except EmptyPool:
81 2dbe7563 Christos Stavrakakis
                break
82 2dbe7563 Christos Stavrakakis
        self.assertEqual(32 not in values, True)
83 2dbe7563 Christos Stavrakakis
84 2dbe7563 Christos Stavrakakis
    def test_external_reserved_2(self):
85 2dbe7563 Christos Stavrakakis
        obj = DummyObject(42)
86 2dbe7563 Christos Stavrakakis
        pool = DummyPool(obj)
87 2dbe7563 Christos Stavrakakis
        self.assertEqual(pool.get(), 0)
88 2dbe7563 Christos Stavrakakis
        pool.reserve(0, external=True)
89 2dbe7563 Christos Stavrakakis
        pool.put(0)
90 2dbe7563 Christos Stavrakakis
        self.assertEqual(pool.get(), 1)
91 2dbe7563 Christos Stavrakakis
92 2dbe7563 Christos Stavrakakis
93 2dbe7563 Christos Stavrakakis
class BridgePoolTestCase(unittest.TestCase):
94 2dbe7563 Christos Stavrakakis
    def test_bridge_conversion(self):
95 2dbe7563 Christos Stavrakakis
        obj = DummyObject(13)
96 2dbe7563 Christos Stavrakakis
        obj.base = "bridge"
97 2dbe7563 Christos Stavrakakis
        pool = BridgePool(obj)
98 2dbe7563 Christos Stavrakakis
        for i in range(0, 13):
99 2dbe7563 Christos Stavrakakis
            self.assertEqual("bridge" + str(i), pool.get())
100 2dbe7563 Christos Stavrakakis
        pool.put("bridge2")
101 2dbe7563 Christos Stavrakakis
        pool.put("bridge6")
102 2dbe7563 Christos Stavrakakis
        self.assertEqual("bridge2", pool.get())
103 2dbe7563 Christos Stavrakakis
        self.assertEqual("bridge6", pool.get())
104 2dbe7563 Christos Stavrakakis
        self.assertEqual(pool.empty(), True)
105 2dbe7563 Christos Stavrakakis
106 2dbe7563 Christos Stavrakakis
107 2dbe7563 Christos Stavrakakis
class MacPrefixPoolTestCase(unittest.TestCase):
108 2dbe7563 Christos Stavrakakis
    def test_invalid_mac_reservation(self):
109 2dbe7563 Christos Stavrakakis
        obj = DummyObject(65636)
110 2dbe7563 Christos Stavrakakis
        obj.base = 'ab:ff:ff'
111 2dbe7563 Christos Stavrakakis
        pool = MacPrefixPool(obj)
112 2dbe7563 Christos Stavrakakis
        for i in range(0, 65536):
113 2dbe7563 Christos Stavrakakis
            self.assertEqual(pool.is_available(i, index=True), False)
114 2dbe7563 Christos Stavrakakis
115 2dbe7563 Christos Stavrakakis
class IPPoolTestCase(unittest.TestCase):
116 2dbe7563 Christos Stavrakakis
    def test_auto_reservations(self):
117 2dbe7563 Christos Stavrakakis
        obj = DummyObject(0)
118 2dbe7563 Christos Stavrakakis
        network = DummyObject(0)
119 2dbe7563 Christos Stavrakakis
        obj.network = network
120 2dbe7563 Christos Stavrakakis
        network.subnet = '192.168.2.0/24'
121 2dbe7563 Christos Stavrakakis
        network.gateway = '192.168.2.1'
122 2dbe7563 Christos Stavrakakis
        pool = IPPool(obj)
123 2dbe7563 Christos Stavrakakis
        self.assertEqual(pool.is_available('192.168.2.0'), False)
124 2dbe7563 Christos Stavrakakis
        self.assertEqual(pool.is_available('192.168.2.1'), False)
125 2dbe7563 Christos Stavrakakis
        self.assertEqual(pool.is_available('192.168.2.255'), False)
126 2dbe7563 Christos Stavrakakis
        self.assertEqual(pool.count_available(), 253)
127 2dbe7563 Christos Stavrakakis
        self.assertEqual(pool.get(), '192.168.2.2')
128 2dbe7563 Christos Stavrakakis
129 2dbe7563 Christos Stavrakakis
    def test_auto_reservations_2(self):
130 2dbe7563 Christos Stavrakakis
        obj = DummyObject(0)
131 2dbe7563 Christos Stavrakakis
        network = DummyObject(0)
132 2dbe7563 Christos Stavrakakis
        obj.network = network
133 2dbe7563 Christos Stavrakakis
        network.subnet = '192.168.2.0/31'
134 2dbe7563 Christos Stavrakakis
        network.gateway = '192.168.2.1'
135 2dbe7563 Christos Stavrakakis
        pool = IPPool(obj)
136 2dbe7563 Christos Stavrakakis
        self.assertEqual(pool.is_available('192.168.2.0'), False)
137 2dbe7563 Christos Stavrakakis
        self.assertEqual(pool.is_available('192.168.2.1'), False)
138 2dbe7563 Christos Stavrakakis
        self.assertEqual(pool.size(), 8)
139 2dbe7563 Christos Stavrakakis
        self.assertEqual(pool.empty(), True)
140 2dbe7563 Christos Stavrakakis
141 2dbe7563 Christos Stavrakakis
142 2dbe7563 Christos Stavrakakis
if __name__ == '__main__':
143 2dbe7563 Christos Stavrakakis
    unittest.main()