Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / lib / pool / tests.py @ 57607cbd

History | View | Annotate | Download (9.1 kB)

1
#!/usr/bin/env python
2
#
3
# -*- coding: utf-8 -*-
4
#
5
# Copyright 2011 GRNET S.A. All rights reserved.
6
#
7
# Redistribution and use in source and binary forms, with or
8
# without modification, are permitted provided that the following
9
# conditions are met:
10
#
11
#   1. Redistributions of source code must retain the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer.
14
#
15
#   2. Redistributions in binary form must reproduce the above
16
#      copyright notice, this list of conditions and the following
17
#      disclaimer in the documentation and/or other materials
18
#      provided with the distribution.
19
#
20
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
21
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
22
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
24
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
27
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
28
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31
# POSSIBILITY OF SUCH DAMAGE.
32
#
33
# The views and conclusions contained in the software and
34
# documentation are those of the authors and should not be
35
# interpreted as representing official policies, either expressed
36
# or implied, of GRNET S.A.
37
#
38
#
39

    
40
"""Unit Tests for the pool classes in synnefo.lib.pool
41

42
Provides unit tests for the code implementing pool
43
classes in the synnefo.lib.pool module.
44

45
"""
46

    
47
# Support running under a gevent-monkey-patched environment
48
# if the "monkey" argument is specified in the command line.
49
import sys
50
if "monkey" in sys.argv:
51
    from gevent import monkey
52
    monkey.patch_all()
53
    sys.argv.pop(sys.argv.index("monkey"))
54

    
55
import sys
56
import time
57
import threading
58
from collections import defaultdict
59

    
60
from synnefo.lib.pool import ObjectPool, PoolLimitError, PoolVerificationError
61

    
62
# Use backported unittest functionality if Python < 2.7
63
try:
64
    import unittest2 as unittest
65
except ImportError:
66
    if sys.version_info < (2, 7):
67
        raise Exception("The unittest2 package is required for Python < 2.7")
68
    import unittest
69

    
70

    
71
from threading import Lock
72

    
73
mutex = Lock()
74

    
75
class NumbersPool(ObjectPool):
76
    max = 0
77

    
78
    def _pool_create_safe(self):
79
        with mutex:
80
            n = self.max
81
            self.max += 1
82
        return n
83

    
84
    def _pool_create_unsafe(self):
85
        n = self.max
86
        self.max += 1
87
        return n
88

    
89
    # set this to _pool_create_unsafe to check
90
    # the thread-safety test
91
    #_pool_create = _pool_create_unsafe
92
    _pool_create = _pool_create_safe
93

    
94
    def _pool_verify(self, obj):
95
        return True
96

    
97
    def _pool_cleanup(self, obj):
98
        n = int(obj)
99
        if n < 0:
100
            return True
101
        return False
102

    
103

    
104
class ObjectPoolTestCase(unittest.TestCase):
105
    def test_create_pool_requires_size(self):
106
        """Test __init__() requires valid size argument"""
107
        self.assertRaises(ValueError, ObjectPool)
108
        self.assertRaises(ValueError, ObjectPool, size="size10")
109
        self.assertRaises(ValueError, ObjectPool, size=0)
110
        self.assertRaises(ValueError, ObjectPool, size=-1)
111

    
112
    def test_create_pool(self):
113
        """Test pool creation works"""
114
        pool = ObjectPool(100)
115
        self.assertEqual(pool.size, 100)
116

    
117
    def test_get_not_implemented(self):
118
        """Test pool_get() method not implemented in abstract class"""
119
        pool = ObjectPool(100)
120
        self.assertRaises(NotImplementedError, pool._pool_create)
121
        self.assertRaises(NotImplementedError, pool._pool_verify, None)
122

    
123
    def test_put_not_implemented(self):
124
        """Test pool_put() method not implemented in abstract class"""
125
        pool = ObjectPool(100)
126
        self.assertRaises(NotImplementedError, pool._pool_cleanup, None)
127

    
128

    
129
class NumbersPoolTestCase(unittest.TestCase):
130
    N = 1500
131
    SEC = 0.5
132
    maxDiff = None
133

    
134
    def setUp(self):
135
        self.numbers = NumbersPool(self.N)
136

    
137
    def test_initially_empty(self):
138
        """Test pool is empty upon creation"""
139
        self.assertEqual(self.numbers._set, set([]))
140

    
141
    def test_seq_allocate_all(self):
142
        """Test allocation and deallocation of all pool objects"""
143
        n = []
144
        for _ in xrange(0, self.N):
145
            n.append(self.numbers.pool_get())
146
        self.assertEqual(n, range(0, self.N))
147
        for i in n:
148
            self.numbers.pool_put(i)
149
        self.assertEqual(self.numbers._set, set(n))
150

    
151
    def test_parallel_allocate_all(self):
152
        """Allocate all pool objects in parallel"""
153
        def allocate_one(pool, results, index):
154
            n = pool.pool_get()
155
            results[index] = n
156

    
157
        results = [None] * self.N
158
        threads = [threading.Thread(target=allocate_one,
159
                                    args=(self.numbers, results, i))
160
                   for i in xrange(0, self.N)]
161

    
162
        for t in threads:
163
            t.start()
164
        for t in threads:
165
            t.join()
166

    
167
        # This nonblocking pool_get() should fail
168
        self.assertRaises(PoolLimitError, self.numbers.pool_get,
169
                          blocking=False)
170
        self.assertEqual(sorted(results), range(0, self.N))
171

    
172
    def test_allocate_no_create(self):
173
        """Allocate objects from the pool without creating them"""
174
        for i in xrange(0, self.N):
175
            self.assertIsNone(self.numbers.pool_get(create=False))
176

    
177
        # This nonblocking pool_get() should fail
178
        self.assertRaises(PoolLimitError, self.numbers.pool_get,
179
                          blocking=False)
180

    
181
    def test_pool_cleanup_returns_failure(self):
182
        """Put a broken object, test a new one is retrieved eventually"""
183
        n = []
184
        for _ in xrange(0, self.N):
185
            n.append(self.numbers.pool_get())
186
        self.assertEqual(n, range(0, self.N))
187

    
188
        del n[-1:]
189
        self.numbers.pool_put(-1)  # This is a broken object
190
        self.assertFalse(self.numbers._set)
191
        self.assertEqual(self.numbers.pool_get(), self.N)
192

    
193
    def test_parallel_get_blocks(self):
194
        """Test threads block if no object left in the pool"""
195
        def allocate_one_and_sleep(pool, sec, result, index):
196
            n = pool.pool_get()
197
            time.sleep(sec)
198
            result[index] = n
199
            pool.pool_put(n)
200

    
201
        nr_threads = 2 * self.N + 1
202
        results = [None] * nr_threads
203
        threads = [threading.Thread(target=allocate_one_and_sleep,
204
                                    args=(self.numbers, self.SEC, results, i))
205
                   for i in xrange(nr_threads)]
206

    
207
        # This should take 3 * SEC seconds
208
        start = time.time()
209
        for t in threads:
210
            t.start()
211
        for t in threads:
212
            t.join()
213
        diff = time.time() - start
214
        self.assertTrue(diff > 3 * self.SEC)
215
        self.assertLess((diff - 3 * self.SEC) / 3 * self.SEC, .5)
216

    
217
        freq = defaultdict(int)
218
        for r in results:
219
            freq[r] += 1
220

    
221
        # The maximum number used must be exactly the pool size.
222
        self.assertEqual(max(results), self.N - 1)
223
        # At least one number must have been used three times
224
        triples = [r for r in freq if freq[r] == 3]
225
        self.assertGreater(len(triples), 0)
226
        # The sum of all frequencies must equal to the number of threads.
227
        self.assertEqual(sum(freq.values()), nr_threads)
228

    
229
    def test_verify_create(self):
230
        numbers = self.numbers
231
        nums = [numbers.pool_get() for _ in xrange(self.N)]
232
        for num in nums:
233
            numbers.pool_put(num)
234

    
235
        def verify(num):
236
            if num in nums:
237
                return False
238
            return True
239

    
240
        self.numbers._pool_verify = verify
241
        self.assertEqual(numbers.pool_get(), self.N)
242

    
243
    def test_verify_error(self):
244
        numbers = self.numbers
245
        nums = [numbers.pool_get() for _ in xrange(self.N)]
246
        for num in nums:
247
            numbers.pool_put(num)
248

    
249
        def false(*args):
250
            return False
251

    
252
        self.numbers._pool_verify = false
253
        self.assertRaises(PoolVerificationError, numbers.pool_get)
254

    
255
class ThreadSafetyTest(unittest.TestCase):
256

    
257
    pool_class = NumbersPool
258

    
259
    def setUp(self):
260
        size = 3000
261
        self.size = size
262
        self.pool = self.pool_class(size)
263

    
264
    def test_parallel_sleeping_create(self):
265
        def create(pool, results, i):
266
            time.sleep(1)
267
            results[i] = pool._pool_create()
268

    
269
        pool = self.pool
270
        N = self.size
271
        results = [None] * N
272
        threads = [threading.Thread(target=create, args=(pool, results, i))
273
                   for i in xrange(N)]
274
        for t in threads:
275
            t.start()
276
        for t in threads:
277
            t.join()
278

    
279
        freq = defaultdict(int)
280
        for r in results:
281
            freq[r] += 1
282

    
283
        mults = [(n, c) for n, c in freq.items() if c > 1]
284
        if mults:
285
            #print mults
286
            raise AssertionError("_pool_create() is not thread safe")
287

    
288

    
289
if __name__ == '__main__':
290
    unittest.main()