root / snf-cyclades-app / synnefo / db / pools / tests.py @ 698306b8
History | View | Annotate | Download (4.4 kB)
1 | 2dbe7563 | Christos Stavrakakis | import sys |
---|---|---|---|
2 | 2dbe7563 | Christos Stavrakakis | from synnefo.db.pools import (PoolManager, EmptyPool, BridgePool, |
3 | 2dbe7563 | Christos Stavrakakis | MacPrefixPool, IPPool) |
4 | 2dbe7563 | Christos Stavrakakis | |
5 | 2dbe7563 | Christos Stavrakakis | # Use backported unittest functionality if Python < 2.7
|
6 | 2dbe7563 | Christos Stavrakakis | try:
|
7 | 2dbe7563 | Christos Stavrakakis | import unittest2 as unittest |
8 | 2dbe7563 | Christos Stavrakakis | except ImportError: |
9 | 2dbe7563 | Christos Stavrakakis | if sys.version_info < (2, 7): |
10 | 2dbe7563 | Christos Stavrakakis | raise Exception("The unittest2 package is required for Python < 2.7") |
11 | 2dbe7563 | Christos Stavrakakis | import unittest |
12 | 2dbe7563 | Christos Stavrakakis | |
13 | 2dbe7563 | Christos Stavrakakis | |
14 | 2dbe7563 | Christos Stavrakakis | class DummyObject(): |
15 | 2dbe7563 | Christos Stavrakakis | def __init__(self, size): |
16 | 2dbe7563 | Christos Stavrakakis | self.size = size
|
17 | 2dbe7563 | Christos Stavrakakis | |
18 | 2dbe7563 | Christos Stavrakakis | self.available_map = '' |
19 | 2dbe7563 | Christos Stavrakakis | self.reserved_map = '' |
20 | 2dbe7563 | Christos Stavrakakis | |
21 | 2dbe7563 | Christos Stavrakakis | def save(self): |
22 | 2dbe7563 | Christos Stavrakakis | pass
|
23 | 2dbe7563 | Christos Stavrakakis | |
24 | 2dbe7563 | Christos Stavrakakis | |
25 | 2dbe7563 | Christos Stavrakakis | class DummyPool(PoolManager): |
26 | 2dbe7563 | Christos Stavrakakis | def value_to_index(self, index): |
27 | 2dbe7563 | Christos Stavrakakis | return index
|
28 | 2dbe7563 | Christos Stavrakakis | |
29 | 2dbe7563 | Christos Stavrakakis | def index_to_value(self, value): |
30 | 2dbe7563 | Christos Stavrakakis | return value
|
31 | 2dbe7563 | Christos Stavrakakis | |
32 | 2dbe7563 | Christos Stavrakakis | |
33 | 2dbe7563 | Christos Stavrakakis | class PoolManagerTestCase(unittest.TestCase): |
34 | 2dbe7563 | Christos Stavrakakis | def test_created_pool(self): |
35 | 2dbe7563 | Christos Stavrakakis | obj = DummyObject(42)
|
36 | 2dbe7563 | Christos Stavrakakis | pool = DummyPool(obj) |
37 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.to_01(), '1' * 42 + '0' * 6) |
38 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.to_map(), '.' * 42 + 'X' * 6) |
39 | 2dbe7563 | Christos Stavrakakis | |
40 | 2dbe7563 | Christos Stavrakakis | def test_save_pool(self): |
41 | 2dbe7563 | Christos Stavrakakis | obj = DummyObject(42)
|
42 | 2dbe7563 | Christos Stavrakakis | pool = DummyPool(obj) |
43 | 2dbe7563 | Christos Stavrakakis | pool.save() |
44 | 2dbe7563 | Christos Stavrakakis | self.assertNotEqual(obj.available_map, '') |
45 | 2dbe7563 | Christos Stavrakakis | available_map = obj.available_map |
46 | 2dbe7563 | Christos Stavrakakis | b = DummyPool(obj) |
47 | 2dbe7563 | Christos Stavrakakis | b.save() |
48 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(obj.available_map, available_map)
|
49 | 2dbe7563 | Christos Stavrakakis | |
50 | 2dbe7563 | Christos Stavrakakis | def test_empty_pool(self): |
51 | 2dbe7563 | Christos Stavrakakis | obj = DummyObject(42)
|
52 | 2dbe7563 | Christos Stavrakakis | pool = DummyPool(obj) |
53 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.empty(), False) |
54 | 2dbe7563 | Christos Stavrakakis | for i in range(0, 42): |
55 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.get(), i)
|
56 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.empty(), True) |
57 | 2dbe7563 | Christos Stavrakakis | self.assertRaises(EmptyPool, pool.get)
|
58 | 2dbe7563 | Christos Stavrakakis | |
59 | 2dbe7563 | Christos Stavrakakis | def test_reserved_value(self): |
60 | 2dbe7563 | Christos Stavrakakis | obj = DummyObject(42)
|
61 | 2dbe7563 | Christos Stavrakakis | pool = DummyPool(obj) |
62 | 2dbe7563 | Christos Stavrakakis | available = pool.count_available() |
63 | 2dbe7563 | Christos Stavrakakis | value = pool.get() |
64 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.is_available(value), False) |
65 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.count_available(), available - 1) |
66 | 2dbe7563 | Christos Stavrakakis | pool.put(value) |
67 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.is_available(value), True) |
68 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.count_available(), available)
|
69 | 2dbe7563 | Christos Stavrakakis | |
70 | 2dbe7563 | Christos Stavrakakis | def test_external_reserved(self): |
71 | 2dbe7563 | Christos Stavrakakis | obj = DummyObject(42)
|
72 | 2dbe7563 | Christos Stavrakakis | pool = DummyPool(obj) |
73 | 2dbe7563 | Christos Stavrakakis | for i in range(42, 48): |
74 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.is_available(i), False) |
75 | 2dbe7563 | Christos Stavrakakis | pool.reserve(32, external=True) |
76 | 2dbe7563 | Christos Stavrakakis | values = [] |
77 | 2dbe7563 | Christos Stavrakakis | while True: |
78 | 2dbe7563 | Christos Stavrakakis | try:
|
79 | 2dbe7563 | Christos Stavrakakis | values.append(pool.get()) |
80 | 2dbe7563 | Christos Stavrakakis | except EmptyPool:
|
81 | 2dbe7563 | Christos Stavrakakis | break
|
82 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(32 not in values, True) |
83 | 2dbe7563 | Christos Stavrakakis | |
84 | 2dbe7563 | Christos Stavrakakis | def test_external_reserved_2(self): |
85 | 2dbe7563 | Christos Stavrakakis | obj = DummyObject(42)
|
86 | 2dbe7563 | Christos Stavrakakis | pool = DummyPool(obj) |
87 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.get(), 0) |
88 | 2dbe7563 | Christos Stavrakakis | pool.reserve(0, external=True) |
89 | 2dbe7563 | Christos Stavrakakis | pool.put(0)
|
90 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.get(), 1) |
91 | 2dbe7563 | Christos Stavrakakis | |
92 | 2dbe7563 | Christos Stavrakakis | |
93 | 2dbe7563 | Christos Stavrakakis | class BridgePoolTestCase(unittest.TestCase): |
94 | 2dbe7563 | Christos Stavrakakis | def test_bridge_conversion(self): |
95 | 2dbe7563 | Christos Stavrakakis | obj = DummyObject(13)
|
96 | 2dbe7563 | Christos Stavrakakis | obj.base = "bridge"
|
97 | 2dbe7563 | Christos Stavrakakis | pool = BridgePool(obj) |
98 | 2dbe7563 | Christos Stavrakakis | for i in range(0, 13): |
99 | 2dbe7563 | Christos Stavrakakis | self.assertEqual("bridge" + str(i), pool.get()) |
100 | 2dbe7563 | Christos Stavrakakis | pool.put("bridge2")
|
101 | 2dbe7563 | Christos Stavrakakis | pool.put("bridge6")
|
102 | 2dbe7563 | Christos Stavrakakis | self.assertEqual("bridge2", pool.get()) |
103 | 2dbe7563 | Christos Stavrakakis | self.assertEqual("bridge6", pool.get()) |
104 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.empty(), True) |
105 | 2dbe7563 | Christos Stavrakakis | |
106 | 2dbe7563 | Christos Stavrakakis | |
107 | 2dbe7563 | Christos Stavrakakis | class MacPrefixPoolTestCase(unittest.TestCase): |
108 | 2dbe7563 | Christos Stavrakakis | def test_invalid_mac_reservation(self): |
109 | 2dbe7563 | Christos Stavrakakis | obj = DummyObject(65636)
|
110 | 2dbe7563 | Christos Stavrakakis | obj.base = 'ab:ff:ff'
|
111 | 2dbe7563 | Christos Stavrakakis | pool = MacPrefixPool(obj) |
112 | 2dbe7563 | Christos Stavrakakis | for i in range(0, 65536): |
113 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.is_available(i, index=True), False) |
114 | 2dbe7563 | Christos Stavrakakis | |
115 | 2dbe7563 | Christos Stavrakakis | class IPPoolTestCase(unittest.TestCase): |
116 | 2dbe7563 | Christos Stavrakakis | def test_auto_reservations(self): |
117 | 2dbe7563 | Christos Stavrakakis | obj = DummyObject(0)
|
118 | 2dbe7563 | Christos Stavrakakis | network = DummyObject(0)
|
119 | 2dbe7563 | Christos Stavrakakis | obj.network = network |
120 | 2dbe7563 | Christos Stavrakakis | network.subnet = '192.168.2.0/24'
|
121 | 2dbe7563 | Christos Stavrakakis | network.gateway = '192.168.2.1'
|
122 | 2dbe7563 | Christos Stavrakakis | pool = IPPool(obj) |
123 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.is_available('192.168.2.0'), False) |
124 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.is_available('192.168.2.1'), False) |
125 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.is_available('192.168.2.255'), False) |
126 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.count_available(), 253) |
127 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.get(), '192.168.2.2') |
128 | 2dbe7563 | Christos Stavrakakis | |
129 | 2dbe7563 | Christos Stavrakakis | def test_auto_reservations_2(self): |
130 | 2dbe7563 | Christos Stavrakakis | obj = DummyObject(0)
|
131 | 2dbe7563 | Christos Stavrakakis | network = DummyObject(0)
|
132 | 2dbe7563 | Christos Stavrakakis | obj.network = network |
133 | 2dbe7563 | Christos Stavrakakis | network.subnet = '192.168.2.0/31'
|
134 | 2dbe7563 | Christos Stavrakakis | network.gateway = '192.168.2.1'
|
135 | 2dbe7563 | Christos Stavrakakis | pool = IPPool(obj) |
136 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.is_available('192.168.2.0'), False) |
137 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.is_available('192.168.2.1'), False) |
138 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.size(), 8) |
139 | 2dbe7563 | Christos Stavrakakis | self.assertEqual(pool.empty(), True) |
140 | 2dbe7563 | Christos Stavrakakis | |
141 | 2dbe7563 | Christos Stavrakakis | |
142 | 2dbe7563 | Christos Stavrakakis | if __name__ == '__main__': |
143 | 2dbe7563 | Christos Stavrakakis | unittest.main() |