Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / lib / pool / tests.py @ 4ab1af1a

History | View | Annotate | Download (13 kB)

1
#!/usr/bin/env python
2
#
3
# -*- coding: utf-8 -*-
4
#
5
# Copyright 2011 GRNET S.A. All rights reserved.
6
#
7
# Redistribution and use in source and binary forms, with or
8
# without modification, are permitted provided that the following
9
# conditions are met:
10
#
11
#   1. Redistributions of source code must retain the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer.
14
#
15
#   2. Redistributions in binary form must reproduce the above
16
#      copyright notice, this list of conditions and the following
17
#      disclaimer in the documentation and/or other materials
18
#      provided with the distribution.
19
#
20
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
21
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
22
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
24
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
27
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
28
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31
# POSSIBILITY OF SUCH DAMAGE.
32
#
33
# The views and conclusions contained in the software and
34
# documentation are those of the authors and should not be
35
# interpreted as representing official policies, either expressed
36
# or implied, of GRNET S.A.
37
#
38
#
39

    
40
"""Unit Tests for the pool classes in synnefo.lib.pool
41

42
Provides unit tests for the code implementing pool
43
classes in the synnefo.lib.pool module.
44

45
"""
46

    
47
# Support running under a gevent-monkey-patched environment
48
# if the "monkey" argument is specified in the command line.
49
import sys
50
if "monkey" in sys.argv:
51
    from gevent import monkey
52
    monkey.patch_all()
53
    sys.argv.pop(sys.argv.index("monkey"))
54

    
55
import sys
56
import time
57
import threading
58
from collections import defaultdict
59

    
60
from socket import socket, AF_INET, SOCK_STREAM, IPPROTO_TCP, SHUT_RDWR
61

    
62
from synnefo.lib.pool import ObjectPool, PoolLimitError, PoolVerificationError
63
from synnefo.lib.pool.http import PooledHTTPConnection, HTTPConnectionPool
64
from synnefo.lib.pool.http import _pools as _http_pools
65

    
66
# Use backported unittest functionality if Python < 2.7
67
try:
68
    import unittest2 as unittest
69
except ImportError:
70
    if sys.version_info < (2, 7):
71
        raise Exception("The unittest2 package is required for Python < 2.7")
72
    import unittest
73

    
74

    
75
from threading import Lock
76

    
77
mutex = Lock()
78

    
79

    
80
class NumbersPool(ObjectPool):
81
    max = 0
82

    
83
    def _pool_create_safe(self):
84
        with mutex:
85
            n = self.max
86
            self.max += 1
87
        return n
88

    
89
    def _pool_create_unsafe(self):
90
        n = self.max
91
        self.max += 1
92
        return n
93

    
94
    # set this to _pool_create_unsafe to check
95
    # the thread-safety test
96
    #_pool_create = _pool_create_unsafe
97
    _pool_create = _pool_create_safe
98

    
99
    def _pool_verify(self, obj):
100
        return True
101

    
102
    def _pool_cleanup(self, obj):
103
        n = int(obj)
104
        if n < 0:
105
            return True
106
        return False
107

    
108

    
109
class ObjectPoolTestCase(unittest.TestCase):
110
    def test_create_pool_requires_size(self):
111
        """Test __init__() requires valid size argument"""
112
        self.assertRaises(ValueError, ObjectPool)
113
        self.assertRaises(ValueError, ObjectPool, size="size10")
114
        self.assertRaises(ValueError, ObjectPool, size=0)
115
        self.assertRaises(ValueError, ObjectPool, size=-1)
116

    
117
    def test_create_pool(self):
118
        """Test pool creation works"""
119
        pool = ObjectPool(100)
120
        self.assertEqual(pool.size, 100)
121

    
122
    def test_get_not_implemented(self):
123
        """Test pool_get() method not implemented in abstract class"""
124
        pool = ObjectPool(100)
125
        self.assertRaises(NotImplementedError, pool._pool_create)
126
        self.assertRaises(NotImplementedError, pool._pool_verify, None)
127

    
128
    def test_put_not_implemented(self):
129
        """Test pool_put() method not implemented in abstract class"""
130
        pool = ObjectPool(100)
131
        self.assertRaises(NotImplementedError, pool._pool_cleanup, None)
132

    
133

    
134
class NumbersPoolTestCase(unittest.TestCase):
135
    N = 1500
136
    SEC = 0.5
137
    maxDiff = None
138

    
139
    def setUp(self):
140
        self.numbers = NumbersPool(self.N)
141

    
142
    def test_initially_empty(self):
143
        """Test pool is empty upon creation"""
144
        self.assertEqual(self.numbers._set, set([]))
145

    
146
    def test_seq_allocate_all(self):
147
        """Test allocation and deallocation of all pool objects"""
148
        n = []
149
        for _ in xrange(0, self.N):
150
            n.append(self.numbers.pool_get())
151
        self.assertEqual(n, range(0, self.N))
152
        for i in n:
153
            self.numbers.pool_put(i)
154
        self.assertEqual(self.numbers._set, set(n))
155

    
156
    def test_parallel_allocate_all(self):
157
        """Allocate all pool objects in parallel"""
158
        def allocate_one(pool, results, index):
159
            n = pool.pool_get()
160
            results[index] = n
161

    
162
        results = [None] * self.N
163
        threads = [threading.Thread(target=allocate_one,
164
                                    args=(self.numbers, results, i))
165
                   for i in xrange(0, self.N)]
166

    
167
        for t in threads:
168
            t.start()
169
        for t in threads:
170
            t.join()
171

    
172
        # This nonblocking pool_get() should fail
173
        self.assertRaises(PoolLimitError, self.numbers.pool_get,
174
                          blocking=False)
175
        self.assertEqual(sorted(results), range(0, self.N))
176

    
177
    def test_allocate_no_create(self):
178
        """Allocate objects from the pool without creating them"""
179
        for i in xrange(0, self.N):
180
            self.assertIsNone(self.numbers.pool_get(create=False))
181

    
182
        # This nonblocking pool_get() should fail
183
        self.assertRaises(PoolLimitError, self.numbers.pool_get,
184
                          blocking=False)
185

    
186
    def test_pool_cleanup_returns_failure(self):
187
        """Put a broken object, test a new one is retrieved eventually"""
188
        n = []
189
        for _ in xrange(0, self.N):
190
            n.append(self.numbers.pool_get())
191
        self.assertEqual(n, range(0, self.N))
192

    
193
        del n[-1:]
194
        self.numbers.pool_put(-1)  # This is a broken object
195
        self.assertFalse(self.numbers._set)
196
        self.assertEqual(self.numbers.pool_get(), self.N)
197

    
198
    def test_parallel_get_blocks(self):
199
        """Test threads block if no object left in the pool"""
200
        def allocate_one_and_sleep(pool, sec, result, index):
201
            n = pool.pool_get()
202
            time.sleep(sec)
203
            result[index] = n
204
            pool.pool_put(n)
205

    
206
        nr_threads = 2 * self.N + 1
207
        results = [None] * nr_threads
208
        threads = [threading.Thread(target=allocate_one_and_sleep,
209
                                    args=(self.numbers, self.SEC, results, i))
210
                   for i in xrange(nr_threads)]
211

    
212
        # This should take 3 * SEC seconds
213
        start = time.time()
214
        for t in threads:
215
            t.start()
216
        for t in threads:
217
            t.join()
218
        diff = time.time() - start
219
        self.assertTrue(diff > 3 * self.SEC)
220
        self.assertLess((diff - 3 * self.SEC) / 3 * self.SEC, .5)
221

    
222
        freq = defaultdict(int)
223
        for r in results:
224
            freq[r] += 1
225

    
226
        # The maximum number used must be exactly the pool size.
227
        self.assertEqual(max(results), self.N - 1)
228
        # At least one number must have been used three times
229
        triples = [r for r in freq if freq[r] == 3]
230
        self.assertGreater(len(triples), 0)
231
        # The sum of all frequencies must equal to the number of threads.
232
        self.assertEqual(sum(freq.values()), nr_threads)
233

    
234
    def test_verify_create(self):
235
        numbers = self.numbers
236
        nums = [numbers.pool_get() for _ in xrange(self.N)]
237
        for num in nums:
238
            numbers.pool_put(num)
239

    
240
        def verify(num):
241
            if num in nums:
242
                return False
243
            return True
244

    
245
        self.numbers._pool_verify = verify
246
        self.assertEqual(numbers.pool_get(), self.N)
247

    
248
    def test_verify_error(self):
249
        numbers = self.numbers
250
        nums = [numbers.pool_get() for _ in xrange(self.N)]
251
        for num in nums:
252
            numbers.pool_put(num)
253

    
254
        def false(*args):
255
            return False
256

    
257
        self.numbers._pool_verify = false
258
        self.assertRaises(PoolVerificationError, numbers.pool_get)
259

    
260

    
261
class ThreadSafetyTestCase(unittest.TestCase):
262

    
263
    pool_class = NumbersPool
264

    
265
    def setUp(self):
266
        size = 3000
267
        self.size = size
268
        self.pool = self.pool_class(size)
269

    
270
    def test_parallel_sleeping_create(self):
271
        def create(pool, results, i):
272
            time.sleep(1)
273
            results[i] = pool._pool_create()
274

    
275
        pool = self.pool
276
        N = self.size
277
        results = [None] * N
278
        threads = [threading.Thread(target=create, args=(pool, results, i))
279
                   for i in xrange(N)]
280
        for t in threads:
281
            t.start()
282
        for t in threads:
283
            t.join()
284

    
285
        freq = defaultdict(int)
286
        for r in results:
287
            freq[r] += 1
288

    
289
        mults = [(n, c) for n, c in freq.items() if c > 1]
290
        if mults:
291
            #print mults
292
            raise AssertionError("_pool_create() is not thread safe")
293

    
294

    
295
class TestHTTPConnectionTestCase(unittest.TestCase):
296
    def setUp(self):
297
        #netloc = "127.0.0.1:9999"
298
        #scheme='http'
299
        #self.pool = HTTPConnectionPool(
300
        #                netloc=netloc,
301
        #                scheme=scheme,
302
        #                pool_size=1)
303
        #key = (scheme, netloc)
304
        #_http_pools[key] = pool
305

    
306
        _http_pools.clear()
307

    
308
        self.host = "127.0.0.1"
309
        self.port = 9999
310
        self.netloc = "%s:%s" % (self.host, self.port)
311
        self.scheme = "http"
312
        self.key = (self.scheme, self.netloc)
313

    
314
        sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)
315
        sock.bind((self.host, self.port))
316
        sock.listen(1)
317
        self.sock = sock
318

    
319
    def tearDown(self):
320
        sock = self.sock
321
        sock.shutdown(SHUT_RDWR)
322
        sock.close()
323

    
324
    def test_double_release(self):
325
        pooled = PooledHTTPConnection(self.netloc, self.scheme)
326
        pooled.acquire()
327
        pool = pooled._pool
328
        self.assertTrue(pooled._pool is _http_pools[(self.scheme, self.netloc)])
329
        pooled.release()
330

    
331
        poolsize = len(pool._set)
332

    
333
        if PooledHTTPConnection._pool_disable_after_release:
334
            self.assertTrue(pooled._pool is False)
335

    
336
        if not PooledHTTPConnection._pool_ignore_double_release:
337
            with self.assertRaises(AssertionError):
338
                pooled.release()
339
        else:
340
            pooled.release()
341

    
342
        self.assertEqual(poolsize, len(pool._set))
343

    
344
    def test_distinct_pools_per_scheme(self):
345
        with PooledHTTPConnection("127.0.0.1", "http",
346
                                  attach_context=True) as conn:
347
            pool = conn._pool_context._pool
348
            self.assertTrue(pool is _http_pools[("http", "127.0.0.1")])
349

    
350
        with PooledHTTPConnection("127.0.0.1", "https",
351
                                  attach_context=True) as conn2:
352
            pool2 = conn2._pool_context._pool
353
            self.assertTrue(conn is not conn2)
354
            self.assertNotEqual(pool, pool2)
355
            self.assertTrue(pool2 is _http_pools[("https", "127.0.0.1")])
356

    
357
    def test_clean_connection(self):
358
        pool = None
359
        pooled = PooledHTTPConnection(self.netloc, self.scheme)
360
        conn = pooled.acquire()
361
        pool = pooled._pool
362
        self.assertTrue(pool is not None)
363
        pooled.release()
364
        self.assertTrue(pooled._pool is False)
365
        poolset = pool._set
366
        self.assertEqual(len(poolset), 1)
367
        pooled_conn = list(poolset)[0]
368
        self.assertTrue(pooled_conn is conn)
369

    
370
    def test_dirty_connection(self):
371
        pooled = PooledHTTPConnection(self.netloc, self.scheme)
372
        conn = pooled.acquire()
373
        pool = pooled._pool
374
        conn.request("GET", "/")
375
        serversock, addr = self.sock.accept()
376
        serversock.send("HTTP/1.1 200 OK\n"
377
                        "Content-Length: 6\n"
378
                        "\n"
379
                        "HELLO\n")
380
        time.sleep(0.3)
381
        # We would read this message like this
382
        #resp = conn.getresponse()
383
        # but we won't so the connection is dirty
384
        pooled.release()
385

    
386
        poolset = pool._set
387
        self.assertEqual(len(poolset), 0)
388

    
389
    def test_context_manager_exception_safety(self):
390
        class TestError(Exception):
391
            pass
392

    
393
        for i in xrange(10):
394
            pool = None
395
            try:
396
                with PooledHTTPConnection(
397
                        self.netloc, self.scheme,
398
                        size=1, attach_context=True) as conn:
399
                    pool = conn._pool_context._pool
400
                    raise TestError()
401
            except TestError:
402
                self.assertTrue(pool is not None)
403
                self.assertEqual(pool._semaphore._Semaphore__value, 1)
404

    
405

    
406
if __name__ == '__main__':
407
    unittest.main()