root / snf-common / synnefo / lib / pool / tests.py @ 57607cbd
History | View | Annotate | Download (9.1 kB)
1 |
#!/usr/bin/env python
|
---|---|
2 |
#
|
3 |
# -*- coding: utf-8 -*-
|
4 |
#
|
5 |
# Copyright 2011 GRNET S.A. All rights reserved.
|
6 |
#
|
7 |
# Redistribution and use in source and binary forms, with or
|
8 |
# without modification, are permitted provided that the following
|
9 |
# conditions are met:
|
10 |
#
|
11 |
# 1. Redistributions of source code must retain the above
|
12 |
# copyright notice, this list of conditions and the following
|
13 |
# disclaimer.
|
14 |
#
|
15 |
# 2. Redistributions in binary form must reproduce the above
|
16 |
# copyright notice, this list of conditions and the following
|
17 |
# disclaimer in the documentation and/or other materials
|
18 |
# provided with the distribution.
|
19 |
#
|
20 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
21 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
22 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
23 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
24 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
25 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
26 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
27 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
28 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
29 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
30 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
31 |
# POSSIBILITY OF SUCH DAMAGE.
|
32 |
#
|
33 |
# The views and conclusions contained in the software and
|
34 |
# documentation are those of the authors and should not be
|
35 |
# interpreted as representing official policies, either expressed
|
36 |
# or implied, of GRNET S.A.
|
37 |
#
|
38 |
#
|
39 |
|
40 |
"""Unit Tests for the pool classes in synnefo.lib.pool
|
41 |
|
42 |
Provides unit tests for the code implementing pool
|
43 |
classes in the synnefo.lib.pool module.
|
44 |
|
45 |
"""
|
46 |
|
47 |
# Support running under a gevent-monkey-patched environment
|
48 |
# if the "monkey" argument is specified in the command line.
|
49 |
import sys |
50 |
if "monkey" in sys.argv: |
51 |
from gevent import monkey |
52 |
monkey.patch_all() |
53 |
sys.argv.pop(sys.argv.index("monkey"))
|
54 |
|
55 |
import sys |
56 |
import time |
57 |
import threading |
58 |
from collections import defaultdict |
59 |
|
60 |
from synnefo.lib.pool import ObjectPool, PoolLimitError, PoolVerificationError |
61 |
|
62 |
# Use backported unittest functionality if Python < 2.7
|
63 |
try:
|
64 |
import unittest2 as unittest |
65 |
except ImportError: |
66 |
if sys.version_info < (2, 7): |
67 |
raise Exception("The unittest2 package is required for Python < 2.7") |
68 |
import unittest |
69 |
|
70 |
|
71 |
from threading import Lock |
72 |
|
73 |
mutex = Lock() |
74 |
|
75 |
class NumbersPool(ObjectPool): |
76 |
max = 0
|
77 |
|
78 |
def _pool_create_safe(self): |
79 |
with mutex:
|
80 |
n = self.max
|
81 |
self.max += 1 |
82 |
return n
|
83 |
|
84 |
def _pool_create_unsafe(self): |
85 |
n = self.max
|
86 |
self.max += 1 |
87 |
return n
|
88 |
|
89 |
# set this to _pool_create_unsafe to check
|
90 |
# the thread-safety test
|
91 |
#_pool_create = _pool_create_unsafe
|
92 |
_pool_create = _pool_create_safe |
93 |
|
94 |
def _pool_verify(self, obj): |
95 |
return True |
96 |
|
97 |
def _pool_cleanup(self, obj): |
98 |
n = int(obj)
|
99 |
if n < 0: |
100 |
return True |
101 |
return False |
102 |
|
103 |
|
104 |
class ObjectPoolTestCase(unittest.TestCase): |
105 |
def test_create_pool_requires_size(self): |
106 |
"""Test __init__() requires valid size argument"""
|
107 |
self.assertRaises(ValueError, ObjectPool) |
108 |
self.assertRaises(ValueError, ObjectPool, size="size10") |
109 |
self.assertRaises(ValueError, ObjectPool, size=0) |
110 |
self.assertRaises(ValueError, ObjectPool, size=-1) |
111 |
|
112 |
def test_create_pool(self): |
113 |
"""Test pool creation works"""
|
114 |
pool = ObjectPool(100)
|
115 |
self.assertEqual(pool.size, 100) |
116 |
|
117 |
def test_get_not_implemented(self): |
118 |
"""Test pool_get() method not implemented in abstract class"""
|
119 |
pool = ObjectPool(100)
|
120 |
self.assertRaises(NotImplementedError, pool._pool_create) |
121 |
self.assertRaises(NotImplementedError, pool._pool_verify, None) |
122 |
|
123 |
def test_put_not_implemented(self): |
124 |
"""Test pool_put() method not implemented in abstract class"""
|
125 |
pool = ObjectPool(100)
|
126 |
self.assertRaises(NotImplementedError, pool._pool_cleanup, None) |
127 |
|
128 |
|
129 |
class NumbersPoolTestCase(unittest.TestCase): |
130 |
N = 1500
|
131 |
SEC = 0.5
|
132 |
maxDiff = None
|
133 |
|
134 |
def setUp(self): |
135 |
self.numbers = NumbersPool(self.N) |
136 |
|
137 |
def test_initially_empty(self): |
138 |
"""Test pool is empty upon creation"""
|
139 |
self.assertEqual(self.numbers._set, set([])) |
140 |
|
141 |
def test_seq_allocate_all(self): |
142 |
"""Test allocation and deallocation of all pool objects"""
|
143 |
n = [] |
144 |
for _ in xrange(0, self.N): |
145 |
n.append(self.numbers.pool_get())
|
146 |
self.assertEqual(n, range(0, self.N)) |
147 |
for i in n: |
148 |
self.numbers.pool_put(i)
|
149 |
self.assertEqual(self.numbers._set, set(n)) |
150 |
|
151 |
def test_parallel_allocate_all(self): |
152 |
"""Allocate all pool objects in parallel"""
|
153 |
def allocate_one(pool, results, index): |
154 |
n = pool.pool_get() |
155 |
results[index] = n |
156 |
|
157 |
results = [None] * self.N |
158 |
threads = [threading.Thread(target=allocate_one, |
159 |
args=(self.numbers, results, i))
|
160 |
for i in xrange(0, self.N)] |
161 |
|
162 |
for t in threads: |
163 |
t.start() |
164 |
for t in threads: |
165 |
t.join() |
166 |
|
167 |
# This nonblocking pool_get() should fail
|
168 |
self.assertRaises(PoolLimitError, self.numbers.pool_get, |
169 |
blocking=False)
|
170 |
self.assertEqual(sorted(results), range(0, self.N)) |
171 |
|
172 |
def test_allocate_no_create(self): |
173 |
"""Allocate objects from the pool without creating them"""
|
174 |
for i in xrange(0, self.N): |
175 |
self.assertIsNone(self.numbers.pool_get(create=False)) |
176 |
|
177 |
# This nonblocking pool_get() should fail
|
178 |
self.assertRaises(PoolLimitError, self.numbers.pool_get, |
179 |
blocking=False)
|
180 |
|
181 |
def test_pool_cleanup_returns_failure(self): |
182 |
"""Put a broken object, test a new one is retrieved eventually"""
|
183 |
n = [] |
184 |
for _ in xrange(0, self.N): |
185 |
n.append(self.numbers.pool_get())
|
186 |
self.assertEqual(n, range(0, self.N)) |
187 |
|
188 |
del n[-1:] |
189 |
self.numbers.pool_put(-1) # This is a broken object |
190 |
self.assertFalse(self.numbers._set) |
191 |
self.assertEqual(self.numbers.pool_get(), self.N) |
192 |
|
193 |
def test_parallel_get_blocks(self): |
194 |
"""Test threads block if no object left in the pool"""
|
195 |
def allocate_one_and_sleep(pool, sec, result, index): |
196 |
n = pool.pool_get() |
197 |
time.sleep(sec) |
198 |
result[index] = n |
199 |
pool.pool_put(n) |
200 |
|
201 |
nr_threads = 2 * self.N + 1 |
202 |
results = [None] * nr_threads
|
203 |
threads = [threading.Thread(target=allocate_one_and_sleep, |
204 |
args=(self.numbers, self.SEC, results, i)) |
205 |
for i in xrange(nr_threads)] |
206 |
|
207 |
# This should take 3 * SEC seconds
|
208 |
start = time.time() |
209 |
for t in threads: |
210 |
t.start() |
211 |
for t in threads: |
212 |
t.join() |
213 |
diff = time.time() - start |
214 |
self.assertTrue(diff > 3 * self.SEC) |
215 |
self.assertLess((diff - 3 * self.SEC) / 3 * self.SEC, .5) |
216 |
|
217 |
freq = defaultdict(int)
|
218 |
for r in results: |
219 |
freq[r] += 1
|
220 |
|
221 |
# The maximum number used must be exactly the pool size.
|
222 |
self.assertEqual(max(results), self.N - 1) |
223 |
# At least one number must have been used three times
|
224 |
triples = [r for r in freq if freq[r] == 3] |
225 |
self.assertGreater(len(triples), 0) |
226 |
# The sum of all frequencies must equal to the number of threads.
|
227 |
self.assertEqual(sum(freq.values()), nr_threads) |
228 |
|
229 |
def test_verify_create(self): |
230 |
numbers = self.numbers
|
231 |
nums = [numbers.pool_get() for _ in xrange(self.N)] |
232 |
for num in nums: |
233 |
numbers.pool_put(num) |
234 |
|
235 |
def verify(num): |
236 |
if num in nums: |
237 |
return False |
238 |
return True |
239 |
|
240 |
self.numbers._pool_verify = verify
|
241 |
self.assertEqual(numbers.pool_get(), self.N) |
242 |
|
243 |
def test_verify_error(self): |
244 |
numbers = self.numbers
|
245 |
nums = [numbers.pool_get() for _ in xrange(self.N)] |
246 |
for num in nums: |
247 |
numbers.pool_put(num) |
248 |
|
249 |
def false(*args): |
250 |
return False |
251 |
|
252 |
self.numbers._pool_verify = false
|
253 |
self.assertRaises(PoolVerificationError, numbers.pool_get)
|
254 |
|
255 |
class ThreadSafetyTest(unittest.TestCase): |
256 |
|
257 |
pool_class = NumbersPool |
258 |
|
259 |
def setUp(self): |
260 |
size = 3000
|
261 |
self.size = size
|
262 |
self.pool = self.pool_class(size) |
263 |
|
264 |
def test_parallel_sleeping_create(self): |
265 |
def create(pool, results, i): |
266 |
time.sleep(1)
|
267 |
results[i] = pool._pool_create() |
268 |
|
269 |
pool = self.pool
|
270 |
N = self.size
|
271 |
results = [None] * N
|
272 |
threads = [threading.Thread(target=create, args=(pool, results, i)) |
273 |
for i in xrange(N)] |
274 |
for t in threads: |
275 |
t.start() |
276 |
for t in threads: |
277 |
t.join() |
278 |
|
279 |
freq = defaultdict(int)
|
280 |
for r in results: |
281 |
freq[r] += 1
|
282 |
|
283 |
mults = [(n, c) for n, c in freq.items() if c > 1] |
284 |
if mults:
|
285 |
#print mults
|
286 |
raise AssertionError("_pool_create() is not thread safe") |
287 |
|
288 |
|
289 |
if __name__ == '__main__': |
290 |
unittest.main() |