4 # Copyright (C) 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the workerpool module"""
31 from ganeti import workerpool
32 from ganeti import errors
37 class CountingContext(object):
39 self._lock = threading.Condition(threading.Lock())
49 def GetDoneTasks(self):
57 def UpdateChecksum(current, value):
58 return zlib.adler32(str(value), current)
61 class CountingBaseWorker(workerpool.BaseWorker):
62 def RunTask(self, ctx, text):
66 class ChecksumContext:
67 CHECKSUM_START = zlib.adler32("")
70 self.lock = threading.Condition(threading.Lock())
71 self.checksum = self.CHECKSUM_START
74 def UpdateChecksum(current, value):
75 return zlib.adler32(str(value), current)
78 class ChecksumBaseWorker(workerpool.BaseWorker):
79 def RunTask(self, ctx, number):
80 name = "number%s" % number
81 self.SetTaskName(name)
83 # This assertion needs to be checked before updating the checksum. A
84 # failing assertion will then cause the result to be wrong.
85 assert self.getName() == ("%s/%s" % (self._worker_id, name))
89 ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
94 class ListBuilderContext:
96 self.lock = threading.Lock()
101 class ListBuilderWorker(workerpool.BaseWorker):
102 def RunTask(self, ctx, data):
105 ctx.result.append((self.GetCurrentPriority(), data))
106 ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
111 class DeferringTaskContext:
113 self.lock = threading.Lock()
115 self.samepriodefer = {}
118 class DeferringWorker(workerpool.BaseWorker):
119 def RunTask(self, ctx, num, targetprio):
122 if num in ctx.samepriodefer:
123 del ctx.samepriodefer[num]
124 raise workerpool.DeferTask()
126 if self.GetCurrentPriority() > targetprio:
127 raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)
129 ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
134 class TestWorkerpool(unittest.TestCase):
135 """Workerpool tests"""
137 def testCounting(self):
138 ctx = CountingContext()
139 wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
141 self._CheckWorkerCount(wp, 3)
144 wp.AddTask((ctx, "Hello world %s" % i))
148 wp.TerminateWorkers()
149 self._CheckWorkerCount(wp, 0)
151 self.assertEquals(ctx.GetDoneTasks(), 10)
153 def testNoTasks(self):
154 wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
156 self._CheckWorkerCount(wp, 3)
157 self._CheckNoTasks(wp)
159 wp.TerminateWorkers()
160 self._CheckWorkerCount(wp, 0)
162 def testNoTasksQuiesce(self):
163 wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
165 self._CheckWorkerCount(wp, 3)
166 self._CheckNoTasks(wp)
168 self._CheckNoTasks(wp)
170 wp.TerminateWorkers()
171 self._CheckWorkerCount(wp, 0)
173 def testChecksum(self):
174 # Tests whether all tasks are run and, since we're only using a single
175 # thread, whether everything is started in order.
176 wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
178 self._CheckWorkerCount(wp, 1)
180 ctx = ChecksumContext()
181 checksum = ChecksumContext.CHECKSUM_START
182 for i in range(1, 100):
183 checksum = ChecksumContext.UpdateChecksum(checksum, i)
188 self._CheckNoTasks(wp)
193 self.assertEqual(checksum, ctx.checksum)
197 wp.TerminateWorkers()
198 self._CheckWorkerCount(wp, 0)
200 def testAddManyTasks(self):
201 ctx = CountingContext()
202 wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
204 self._CheckWorkerCount(wp, 3)
206 wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
207 wp.AddTask((ctx, "A separate hello"))
208 wp.AddTask((ctx, "Once more, hi!"))
209 wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
213 self._CheckNoTasks(wp)
215 wp.TerminateWorkers()
216 self._CheckWorkerCount(wp, 0)
218 self.assertEquals(ctx.GetDoneTasks(), 22)
220 def testManyTasksSequence(self):
221 ctx = CountingContext()
222 wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
224 self._CheckWorkerCount(wp, 3)
225 self.assertRaises(AssertionError, wp.AddManyTasks,
226 ["Hello world %s" % i for i in range(10)])
227 self.assertRaises(AssertionError, wp.AddManyTasks,
228 [i for i in range(10)])
230 wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
231 wp.AddTask((ctx, "A separate hello"))
235 self._CheckNoTasks(wp)
237 wp.TerminateWorkers()
238 self._CheckWorkerCount(wp, 0)
240 self.assertEquals(ctx.GetDoneTasks(), 11)
242 def _CheckNoTasks(self, wp):
245 # The task queue must be empty now
246 self.failUnless(not wp._tasks)
250 def _CheckWorkerCount(self, wp, num_workers):
253 self.assertEqual(len(wp._workers), num_workers)
257 def testPriorityChecksum(self):
258 # Tests whether all tasks are run and, since we're only using a single
259 # thread, whether everything is started in order and respects the priority
260 wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
262 self._CheckWorkerCount(wp, 1)
264 ctx = ChecksumContext()
269 for i in range(1, 333):
271 tasks.append((ctx, i))
272 priorities.append(prio)
273 data.setdefault(prio, []).append(i)
275 wp.AddManyTasks(tasks, priority=priorities)
279 self._CheckNoTasks(wp)
284 checksum = ChecksumContext.CHECKSUM_START
285 for priority in sorted(data.keys()):
286 for i in data[priority]:
287 checksum = ChecksumContext.UpdateChecksum(checksum, i)
289 self.assertEqual(checksum, ctx.checksum)
293 self._CheckWorkerCount(wp, 1)
295 wp.TerminateWorkers()
296 self._CheckWorkerCount(wp, 0)
298 def testPriorityListManyTasks(self):
299 # Tests whether all tasks are run and, since we're only using a single
300 # thread, whether everything is started in order and respects the priority
301 wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
303 self._CheckWorkerCount(wp, 1)
305 ctx = ListBuilderContext()
307 # Use static seed for this test
308 rnd = random.Random(0)
313 for i in range(1, 333):
314 prio = int(rnd.random() * 10)
315 tasks.append((ctx, i))
316 priorities.append(prio)
317 data.setdefault(prio, []).append((prio, i))
319 wp.AddManyTasks(tasks, priority=priorities)
321 self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
322 [("x", ), ("y", )], priority=[1] * 5)
326 self._CheckNoTasks(wp)
332 for priority in sorted(data.keys()):
333 expresult.extend(data[priority])
335 self.assertEqual(expresult, ctx.result)
339 self._CheckWorkerCount(wp, 1)
341 wp.TerminateWorkers()
342 self._CheckWorkerCount(wp, 0)
344 def testPriorityListSingleTasks(self):
345 # Tests whether all tasks are run and, since we're only using a single
346 # thread, whether everything is started in order and respects the priority
347 wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
349 self._CheckWorkerCount(wp, 1)
351 ctx = ListBuilderContext()
353 # Use static seed for this test
354 rnd = random.Random(26279)
357 for i in range(1, 333):
358 prio = int(rnd.random() * 30)
359 wp.AddTask((ctx, i), priority=prio)
360 data.setdefault(prio, []).append(i)
362 # Cause some distortion
370 self._CheckNoTasks(wp)
375 self.assertEqual(data, ctx.prioresult)
379 self._CheckWorkerCount(wp, 1)
381 wp.TerminateWorkers()
382 self._CheckWorkerCount(wp, 0)
384 def testPriorityListSingleTasks(self):
385 # Tests whether all tasks are run and, since we're only using a single
386 # thread, whether everything is started in order and respects the priority
387 wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
389 self._CheckWorkerCount(wp, 1)
391 ctx = ListBuilderContext()
393 # Use static seed for this test
394 rnd = random.Random(26279)
397 for i in range(1, 333):
398 prio = int(rnd.random() * 30)
399 wp.AddTask((ctx, i), priority=prio)
400 data.setdefault(prio, []).append(i)
402 # Cause some distortion
410 self._CheckNoTasks(wp)
415 self.assertEqual(data, ctx.prioresult)
419 self._CheckWorkerCount(wp, 1)
421 wp.TerminateWorkers()
422 self._CheckWorkerCount(wp, 0)
424 def testDeferTask(self):
425 # Tests whether all tasks are run and, since we're only using a single
426 # thread, whether everything is started in order and respects the priority
427 wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
429 self._CheckWorkerCount(wp, 1)
431 ctx = DeferringTaskContext()
433 # Use static seed for this test
434 rnd = random.Random(14921)
437 for i in range(1, 333):
441 ctx.samepriodefer[i] = True
445 prio = int(rnd.random() * 30)
446 wp.AddTask((ctx, i, prio), priority=50)
447 data.setdefault(prio, set()).add(i)
449 # Cause some distortion
457 self._CheckNoTasks(wp)
462 self.assertEqual(data, ctx.prioresult)
466 self._CheckWorkerCount(wp, 1)
468 wp.TerminateWorkers()
469 self._CheckWorkerCount(wp, 0)
472 if __name__ == '__main__':
473 testutils.GanetiTestProgram()