Enable auto-unit formatting in script output
[ganeti-local] / test / ganeti.workerpool_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the workerpool module"""
23
24 import unittest
25 import threading
26 import time
27 import sys
28 import zlib
29
30 from ganeti import workerpool
31
32
33 class DummyBaseWorker(workerpool.BaseWorker):
34   def RunTask(self, text):
35     pass
36
37
38 class ChecksumContext:
39   CHECKSUM_START = zlib.adler32("")
40
41   def __init__(self):
42     self.lock = threading.Condition(threading.Lock())
43     self.checksum = self.CHECKSUM_START
44
45   @staticmethod
46   def UpdateChecksum(current, value):
47     return zlib.adler32(str(value), current)
48
49
50 class ChecksumBaseWorker(workerpool.BaseWorker):
51   def RunTask(self, ctx, number):
52     ctx.lock.acquire()
53     try:
54       ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
55     finally:
56       ctx.lock.release()
57
58
59 class TestWorkerpool(unittest.TestCase):
60   """Workerpool tests"""
61
62   def testDummy(self):
63     wp = workerpool.WorkerPool(3, DummyBaseWorker)
64     try:
65       self._CheckWorkerCount(wp, 3)
66
67       for i in xrange(10):
68         wp.AddTask("Hello world %s" % i)
69
70       wp.Quiesce()
71     finally:
72       wp.TerminateWorkers()
73       self._CheckWorkerCount(wp, 0)
74
75   def testNoTasks(self):
76     wp = workerpool.WorkerPool(3, DummyBaseWorker)
77     try:
78       self._CheckWorkerCount(wp, 3)
79       self._CheckNoTasks(wp)
80     finally:
81       wp.TerminateWorkers()
82       self._CheckWorkerCount(wp, 0)
83
84   def testNoTasksQuiesce(self):
85     wp = workerpool.WorkerPool(3, DummyBaseWorker)
86     try:
87       self._CheckWorkerCount(wp, 3)
88       self._CheckNoTasks(wp)
89       wp.Quiesce()
90       self._CheckNoTasks(wp)
91     finally:
92       wp.TerminateWorkers()
93       self._CheckWorkerCount(wp, 0)
94
95   def testChecksum(self):
96     # Tests whether all tasks are run and, since we're only using a single
97     # thread, whether everything is started in order.
98     wp = workerpool.WorkerPool(1, ChecksumBaseWorker)
99     try:
100       self._CheckWorkerCount(wp, 1)
101
102       ctx = ChecksumContext()
103       checksum = ChecksumContext.CHECKSUM_START
104       for i in xrange(1, 100):
105         checksum = ChecksumContext.UpdateChecksum(checksum, i)
106         wp.AddTask(ctx, i)
107
108       wp.Quiesce()
109
110       self._CheckNoTasks(wp)
111
112       # Check sum
113       ctx.lock.acquire()
114       try:
115         self.assertEqual(checksum, ctx.checksum)
116       finally:
117         ctx.lock.release()
118     finally:
119       wp.TerminateWorkers()
120       self._CheckWorkerCount(wp, 0)
121
122   def _CheckNoTasks(self, wp):
123     wp._lock.acquire()
124     try:
125       # The task queue must be empty now
126       self.failUnless(not wp._tasks)
127     finally:
128       wp._lock.release()
129
130   def _CheckWorkerCount(self, wp, num_workers):
131     wp._lock.acquire()
132     try:
133       self.assertEqual(len(wp._workers), num_workers)
134     finally:
135       wp._lock.release()
136
137
138 if __name__ == '__main__':
139   unittest.main()