4 # Copyright (C) 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the workerpool module"""
30 from ganeti import workerpool
34 class CountingContext(object):
37 self._lock = threading.Condition(threading.Lock())
47 def GetDoneTasks(self):
55 def UpdateChecksum(current, value):
56 return zlib.adler32(str(value), current)
59 class CountingBaseWorker(workerpool.BaseWorker):
61 def RunTask(self, ctx, text):
65 class ChecksumContext:
66 CHECKSUM_START = zlib.adler32("")
69 self.lock = threading.Condition(threading.Lock())
70 self.checksum = self.CHECKSUM_START
73 def UpdateChecksum(current, value):
74 return zlib.adler32(str(value), current)
77 class ChecksumBaseWorker(workerpool.BaseWorker):
78 def RunTask(self, ctx, number):
81 ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
86 class TestWorkerpool(unittest.TestCase):
87 """Workerpool tests"""
89 def testCounting(self):
90 ctx = CountingContext()
91 wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
93 self._CheckWorkerCount(wp, 3)
96 wp.AddTask((ctx, "Hello world %s" % i))
100 wp.TerminateWorkers()
101 self._CheckWorkerCount(wp, 0)
103 self.assertEquals(ctx.GetDoneTasks(), 10)
105 def testNoTasks(self):
106 wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
108 self._CheckWorkerCount(wp, 3)
109 self._CheckNoTasks(wp)
111 wp.TerminateWorkers()
112 self._CheckWorkerCount(wp, 0)
114 def testNoTasksQuiesce(self):
115 wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
117 self._CheckWorkerCount(wp, 3)
118 self._CheckNoTasks(wp)
120 self._CheckNoTasks(wp)
122 wp.TerminateWorkers()
123 self._CheckWorkerCount(wp, 0)
125 def testChecksum(self):
126 # Tests whether all tasks are run and, since we're only using a single
127 # thread, whether everything is started in order.
128 wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
130 self._CheckWorkerCount(wp, 1)
132 ctx = ChecksumContext()
133 checksum = ChecksumContext.CHECKSUM_START
134 for i in range(1, 100):
135 checksum = ChecksumContext.UpdateChecksum(checksum, i)
140 self._CheckNoTasks(wp)
145 self.assertEqual(checksum, ctx.checksum)
149 wp.TerminateWorkers()
150 self._CheckWorkerCount(wp, 0)
152 def testAddManyTasks(self):
153 ctx = CountingContext()
154 wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
156 self._CheckWorkerCount(wp, 3)
158 wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
159 wp.AddTask((ctx, "A separate hello"))
160 wp.AddTask((ctx, "Once more, hi!"))
161 wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
165 self._CheckNoTasks(wp)
167 wp.TerminateWorkers()
168 self._CheckWorkerCount(wp, 0)
170 self.assertEquals(ctx.GetDoneTasks(), 22)
172 def testManyTasksSequence(self):
173 ctx = CountingContext()
174 wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
176 self._CheckWorkerCount(wp, 3)
177 self.assertRaises(AssertionError, wp.AddManyTasks,
178 ["Hello world %s" % i for i in range(10)])
179 self.assertRaises(AssertionError, wp.AddManyTasks,
180 [i for i in range(10)])
182 wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
183 wp.AddTask((ctx, "A separate hello"))
187 self._CheckNoTasks(wp)
189 wp.TerminateWorkers()
190 self._CheckWorkerCount(wp, 0)
192 self.assertEquals(ctx.GetDoneTasks(), 11)
194 def _CheckNoTasks(self, wp):
197 # The task queue must be empty now
198 self.failUnless(not wp._tasks)
202 def _CheckWorkerCount(self, wp, num_workers):
205 self.assertEqual(len(wp._workers), num_workers)
210 if __name__ == '__main__':
211 testutils.GanetiTestProgram()