root / snf-cyclades-app / synnefo / db / pools / tests.py @ 3e7c63f8
History | View | Annotate | Download (5.3 kB)
1 | 2dbe7563 | Christos Stavrakakis | import sys |
---|---|---|---|
2 | 2dbe7563 | Christos Stavrakakis | from synnefo.db.pools import (PoolManager, EmptyPool, BridgePool, |
3 | 2dbe7563 | Christos Stavrakakis | MacPrefixPool, IPPool) |
4 | 3e7c63f8 | Christos Stavrakakis | from bitarray import bitarray |
5 | 2dbe7563 | Christos Stavrakakis | |
6 | 2dbe7563 | Christos Stavrakakis | # Use backported unittest functionality if Python < 2.7
|
7 | 2dbe7563 | Christos Stavrakakis | try:
|
8 | 2dbe7563 | Christos Stavrakakis | import unittest2 as unittest |
9 | 2dbe7563 | Christos Stavrakakis | except ImportError: |
10 | 2dbe7563 | Christos Stavrakakis | if sys.version_info < (2, 7): |
11 | 2dbe7563 | Christos Stavrakakis | raise Exception("The unittest2 package is required for Python < 2.7") |
12 | 2dbe7563 | Christos Stavrakakis | import unittest |
13 | 2dbe7563 | Christos Stavrakakis | |
14 | 2dbe7563 | Christos Stavrakakis | |
15 | 2dbe7563 | Christos Stavrakakis | class DummyObject(): |
16 | 2dbe7563 | Christos Stavrakakis | def __init__(self, size): |
17 | 2dbe7563 | Christos Stavrakakis | self.size = size
|
18 | 2dbe7563 | Christos Stavrakakis | |
19 | 2dbe7563 | Christos Stavrakakis | self.available_map = '' |
20 | 2dbe7563 | Christos Stavrakakis | self.reserved_map = '' |
21 | 2dbe7563 | Christos Stavrakakis | |
22 | 2dbe7563 | Christos Stavrakakis | def save(self): |
23 | 2dbe7563 | Christos Stavrakakis | pass
|
24 | 2dbe7563 | Christos Stavrakakis | |
25 | 2dbe7563 | Christos Stavrakakis | |
26 | 2dbe7563 | Christos Stavrakakis | class DummyPool(PoolManager): |
27 | 2dbe7563 | Christos Stavrakakis | def value_to_index(self, index): |
28 | 2dbe7563 | Christos Stavrakakis | return index
|
29 | 2dbe7563 | Christos Stavrakakis | |
30 | 2dbe7563 | Christos Stavrakakis | def index_to_value(self, value): |
31 | 2dbe7563 | Christos Stavrakakis | return value
|
32 | 2dbe7563 | Christos Stavrakakis | |
33 | 2dbe7563 | Christos Stavrakakis | |
34 | 2dbe7563 | Christos Stavrakakis | class PoolManagerTestCase(unittest.TestCase): |
35 | 2dbe7563 | Christos Stavrakakis | def test_created_pool(self): |
36 | 2dbe7563 | Christos Stavrakakis | obj = DummyObject(42)
|
37 | 2dbe7563 | Christos Stavrakakis | pool = DummyPool(obj) |
38 | 3e7c63f8 | Christos Stavrakakis | self.assertEqual(pool.to_01(), '1' * 42) |
39 | 3e7c63f8 | Christos Stavrakakis | self.assertEqual(pool.to_map(), '.' * 42) |
40 | 3e7c63f8 | Christos Stavrakakis | self.assertEqual(pool.available, bitarray('1' * 42 + '0' * 6)) |
41 | 3e7c63f8 | Christos Stavrakakis | self.assertEqual(pool.reserved, bitarray('1' * 42 + '0' * 6)) |
42 | 2dbe7563 | Christos Stavrakakis | |
43 | 2dbe7563 | Christos Stavrakakis | def test_save_pool(self): |
44 | 2dbe7563 | Christos Stavrakakis | obj = DummyObject(42)
|
45 | 2dbe7563 | Christos Stavrakakis | pool = DummyPool(obj) |
46 | 2dbe7563 | Christos Stavrakakis | pool.save() |
47 | 2dbe7563 | Christos Stavrakakis | self.assertNotEqual(obj.available_map, '') |
48 | 2dbe7563 | Christos Stavrakakis | available_map = obj.available_map |
49 | 2dbe7563 | Christos Stavrakakis | b = DummyPool(obj) |
50 | 2dbe7563 | Christos Stavrakakis | b.save() |
51 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(obj.available_map, available_map)
|
52 | 2dbe7563 | Christos Stavrakakis | |
53 | 2dbe7563 | Christos Stavrakakis | def test_empty_pool(self): |
54 | 2dbe7563 | Christos Stavrakakis | obj = DummyObject(42)
|
55 | 2dbe7563 | Christos Stavrakakis | pool = DummyPool(obj) |
56 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.empty(), False) |
57 | 2dbe7563 | Christos Stavrakakis | for i in range(0, 42): |
58 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.get(), i)
|
59 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.empty(), True) |
60 | 2dbe7563 | Christos Stavrakakis | self.assertRaises(EmptyPool, pool.get)
|
61 | 2dbe7563 | Christos Stavrakakis | |
62 | 2dbe7563 | Christos Stavrakakis | def test_reserved_value(self): |
63 | 2dbe7563 | Christos Stavrakakis | obj = DummyObject(42)
|
64 | 2dbe7563 | Christos Stavrakakis | pool = DummyPool(obj) |
65 | 2dbe7563 | Christos Stavrakakis | available = pool.count_available() |
66 | 2dbe7563 | Christos Stavrakakis | value = pool.get() |
67 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.is_available(value), False) |
68 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.count_available(), available - 1) |
69 | 2dbe7563 | Christos Stavrakakis | pool.put(value) |
70 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.is_available(value), True) |
71 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.count_available(), available)
|
72 | 2dbe7563 | Christos Stavrakakis | |
73 | 2dbe7563 | Christos Stavrakakis | def test_external_reserved(self): |
74 | 2dbe7563 | Christos Stavrakakis | obj = DummyObject(42)
|
75 | 2dbe7563 | Christos Stavrakakis | pool = DummyPool(obj) |
76 | 2dbe7563 | Christos Stavrakakis | for i in range(42, 48): |
77 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.is_available(i), False) |
78 | 2dbe7563 | Christos Stavrakakis | pool.reserve(32, external=True) |
79 | 2dbe7563 | Christos Stavrakakis | values = [] |
80 | 2dbe7563 | Christos Stavrakakis | while True: |
81 | 2dbe7563 | Christos Stavrakakis | try:
|
82 | 2dbe7563 | Christos Stavrakakis | values.append(pool.get()) |
83 | 2dbe7563 | Christos Stavrakakis | except EmptyPool:
|
84 | 2dbe7563 | Christos Stavrakakis | break
|
85 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(32 not in values, True) |
86 | 2dbe7563 | Christos Stavrakakis | |
87 | 2dbe7563 | Christos Stavrakakis | def test_external_reserved_2(self): |
88 | 2dbe7563 | Christos Stavrakakis | obj = DummyObject(42)
|
89 | 2dbe7563 | Christos Stavrakakis | pool = DummyPool(obj) |
90 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.get(), 0) |
91 | 2dbe7563 | Christos Stavrakakis | pool.reserve(0, external=True) |
92 | 2dbe7563 | Christos Stavrakakis | pool.put(0)
|
93 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.get(), 1) |
94 | 2dbe7563 | Christos Stavrakakis | |
95 | 3e7c63f8 | Christos Stavrakakis | def test_extend_pool(self): |
96 | 3e7c63f8 | Christos Stavrakakis | obj = DummyObject(42)
|
97 | 3e7c63f8 | Christos Stavrakakis | pool = DummyPool(obj) |
98 | 3e7c63f8 | Christos Stavrakakis | pool.extend(7)
|
99 | 3e7c63f8 | Christos Stavrakakis | self.assertEqual(pool.to_01(), '1' * 49) |
100 | 3e7c63f8 | Christos Stavrakakis | self.assertEqual(pool.to_map(), '.' * 49) |
101 | 3e7c63f8 | Christos Stavrakakis | self.assertEqual(pool.available, bitarray('1' * 49 + '0' * 7)) |
102 | 3e7c63f8 | Christos Stavrakakis | self.assertEqual(pool.reserved, bitarray('1' * 49 + '0' * 7)) |
103 | 3e7c63f8 | Christos Stavrakakis | |
104 | 3e7c63f8 | Christos Stavrakakis | def test_shrink_pool(self): |
105 | 3e7c63f8 | Christos Stavrakakis | obj = DummyObject(42)
|
106 | 3e7c63f8 | Christos Stavrakakis | pool = DummyPool(obj) |
107 | 3e7c63f8 | Christos Stavrakakis | pool.shrink(3)
|
108 | 3e7c63f8 | Christos Stavrakakis | self.assertEqual(pool.to_01(), '1' * 39) |
109 | 3e7c63f8 | Christos Stavrakakis | self.assertEqual(pool.to_map(), '.' * 39) |
110 | 3e7c63f8 | Christos Stavrakakis | self.assertEqual(pool.available, bitarray('1' * 39 + '0' * 1)) |
111 | 3e7c63f8 | Christos Stavrakakis | self.assertEqual(pool.reserved, bitarray('1' * 39 + '0' * 1)) |
112 | 3e7c63f8 | Christos Stavrakakis | |
113 | 2dbe7563 | Christos Stavrakakis | |
114 | 2dbe7563 | Christos Stavrakakis | class BridgePoolTestCase(unittest.TestCase): |
115 | 2dbe7563 | Christos Stavrakakis | def test_bridge_conversion(self): |
116 | 2dbe7563 | Christos Stavrakakis | obj = DummyObject(13)
|
117 | 2dbe7563 | Christos Stavrakakis | obj.base = "bridge"
|
118 | 2dbe7563 | Christos Stavrakakis | pool = BridgePool(obj) |
119 | 2dbe7563 | Christos Stavrakakis | for i in range(0, 13): |
120 | 3e7c63f8 | Christos Stavrakakis | self.assertEqual("bridge" + str(i + 1), pool.get()) |
121 | 2dbe7563 | Christos Stavrakakis | pool.put("bridge2")
|
122 | 2dbe7563 | Christos Stavrakakis | pool.put("bridge6")
|
123 | 2dbe7563 | Christos Stavrakakis | self.assertEqual("bridge2", pool.get()) |
124 | 2dbe7563 | Christos Stavrakakis | self.assertEqual("bridge6", pool.get()) |
125 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.empty(), True) |
126 | 2dbe7563 | Christos Stavrakakis | |
127 | 2dbe7563 | Christos Stavrakakis | |
128 | 2dbe7563 | Christos Stavrakakis | class MacPrefixPoolTestCase(unittest.TestCase): |
129 | 2dbe7563 | Christos Stavrakakis | def test_invalid_mac_reservation(self): |
130 | 2dbe7563 | Christos Stavrakakis | obj = DummyObject(65636)
|
131 | 2dbe7563 | Christos Stavrakakis | obj.base = 'ab:ff:ff'
|
132 | 2dbe7563 | Christos Stavrakakis | pool = MacPrefixPool(obj) |
133 | 2dbe7563 | Christos Stavrakakis | for i in range(0, 65536): |
134 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.is_available(i, index=True), False) |
135 | 2dbe7563 | Christos Stavrakakis | |
136 | 2dbe7563 | Christos Stavrakakis | class IPPoolTestCase(unittest.TestCase): |
137 | 2dbe7563 | Christos Stavrakakis | def test_auto_reservations(self): |
138 | 2dbe7563 | Christos Stavrakakis | obj = DummyObject(0)
|
139 | 2dbe7563 | Christos Stavrakakis | network = DummyObject(0)
|
140 | 2dbe7563 | Christos Stavrakakis | obj.network = network |
141 | 2dbe7563 | Christos Stavrakakis | network.subnet = '192.168.2.0/24'
|
142 | 2dbe7563 | Christos Stavrakakis | network.gateway = '192.168.2.1'
|
143 | 2dbe7563 | Christos Stavrakakis | pool = IPPool(obj) |
144 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.is_available('192.168.2.0'), False) |
145 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.is_available('192.168.2.1'), False) |
146 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.is_available('192.168.2.255'), False) |
147 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.count_available(), 253) |
148 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.get(), '192.168.2.2') |
149 | 2dbe7563 | Christos Stavrakakis | |
150 | 2dbe7563 | Christos Stavrakakis | def test_auto_reservations_2(self): |
151 | 2dbe7563 | Christos Stavrakakis | obj = DummyObject(0)
|
152 | 2dbe7563 | Christos Stavrakakis | network = DummyObject(0)
|
153 | 2dbe7563 | Christos Stavrakakis | obj.network = network |
154 | 2dbe7563 | Christos Stavrakakis | network.subnet = '192.168.2.0/31'
|
155 | 2dbe7563 | Christos Stavrakakis | network.gateway = '192.168.2.1'
|
156 | 2dbe7563 | Christos Stavrakakis | pool = IPPool(obj) |
157 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.is_available('192.168.2.0'), False) |
158 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.is_available('192.168.2.1'), False) |
159 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.size(), 8) |
160 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.empty(), True) |
161 | 2dbe7563 | Christos Stavrakakis | |
162 | 2dbe7563 | Christos Stavrakakis | |
163 | 2dbe7563 | Christos Stavrakakis | if __name__ == '__main__': |
164 | 2dbe7563 | Christos Stavrakakis | unittest.main() |