4 # Copyright (C) 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the workerpool module"""
31 from ganeti import workerpool
32 from ganeti import errors
37 class CountingContext(object):
39 self._lock = threading.Condition(threading.Lock())
49 def GetDoneTasks(self):
57 def UpdateChecksum(current, value):
58 return zlib.adler32(str(value), current)
61 class CountingBaseWorker(workerpool.BaseWorker):
62 def RunTask(self, ctx, text):
66 class ChecksumContext:
67 CHECKSUM_START = zlib.adler32("")
70 self.lock = threading.Condition(threading.Lock())
71 self.checksum = self.CHECKSUM_START
74 def UpdateChecksum(current, value):
75 return zlib.adler32(str(value), current)
78 class ChecksumBaseWorker(workerpool.BaseWorker):
79 def RunTask(self, ctx, number):
80 name = "number%s" % number
81 self.SetTaskName(name)
83 # This assertion needs to be checked before updating the checksum. A
84 # failing assertion will then cause the result to be wrong.
85 assert self.getName() == ("%s/%s" % (self._worker_id, name))
89 ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
94 class ListBuilderContext:
96 self.lock = threading.Lock()
101 class ListBuilderWorker(workerpool.BaseWorker):
102 def RunTask(self, ctx, data):
105 ctx.result.append((self.GetCurrentPriority(), data))
106 ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
111 class DeferringTaskContext:
113 self.lock = threading.Lock()
115 self.samepriodefer = {}
118 class DeferringWorker(workerpool.BaseWorker):
119 def RunTask(self, ctx, num, targetprio):
122 if num in ctx.samepriodefer:
123 del ctx.samepriodefer[num]
124 raise workerpool.DeferTask()
126 if self.GetCurrentPriority() > targetprio:
127 raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)
129 ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
134 class TestWorkerpool(unittest.TestCase):
135 """Workerpool tests"""
137 def testCounting(self):
138 ctx = CountingContext()
139 wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
141 self._CheckWorkerCount(wp, 3)
144 wp.AddTask((ctx, "Hello world %s" % i))
148 wp.TerminateWorkers()
149 self._CheckWorkerCount(wp, 0)
151 self.assertEquals(ctx.GetDoneTasks(), 10)
153 def testNoTasks(self):
154 wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
156 self._CheckWorkerCount(wp, 3)
157 self._CheckNoTasks(wp)
159 wp.TerminateWorkers()
160 self._CheckWorkerCount(wp, 0)
162 def testNoTasksQuiesce(self):
163 wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
165 self._CheckWorkerCount(wp, 3)
166 self._CheckNoTasks(wp)
168 self._CheckNoTasks(wp)
170 wp.TerminateWorkers()
171 self._CheckWorkerCount(wp, 0)
173 def testActive(self):
174 ctx = CountingContext()
175 wp = workerpool.WorkerPool("TestActive", 5, CountingBaseWorker)
177 self._CheckWorkerCount(wp, 5)
178 self.assertTrue(wp._active)
182 wp.AddTask((ctx, None))
185 self._CheckNoTasks(wp)
186 self.assertEquals(ctx.GetDoneTasks(), 10)
189 for count in range(10):
192 self._CheckNoTasks(wp)
194 # Queue some more tasks
196 wp.AddTask((ctx, None))
199 # Short delays to give other threads a chance to cause breakage
201 wp.AddTask((ctx, "Hello world %s" % 999))
202 self.assertFalse(wp._active)
204 self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15))
206 # Start processing again
208 self.assertTrue(wp._active)
210 # Wait for tasks to finish
212 self._CheckNoTasks(wp)
213 self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15) + 15)
215 self._CheckWorkerCount(wp, 5)
217 wp.TerminateWorkers()
218 self._CheckWorkerCount(wp, 0)
220 def testChecksum(self):
221 # Tests whether all tasks are run and, since we're only using a single
222 # thread, whether everything is started in order.
223 wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
225 self._CheckWorkerCount(wp, 1)
227 ctx = ChecksumContext()
228 checksum = ChecksumContext.CHECKSUM_START
229 for i in range(1, 100):
230 checksum = ChecksumContext.UpdateChecksum(checksum, i)
235 self._CheckNoTasks(wp)
240 self.assertEqual(checksum, ctx.checksum)
244 wp.TerminateWorkers()
245 self._CheckWorkerCount(wp, 0)
247 def testAddManyTasks(self):
248 ctx = CountingContext()
249 wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
251 self._CheckWorkerCount(wp, 3)
253 wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
254 wp.AddTask((ctx, "A separate hello"))
255 wp.AddTask((ctx, "Once more, hi!"))
256 wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
260 self._CheckNoTasks(wp)
262 wp.TerminateWorkers()
263 self._CheckWorkerCount(wp, 0)
265 self.assertEquals(ctx.GetDoneTasks(), 22)
267 def testManyTasksSequence(self):
268 ctx = CountingContext()
269 wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
271 self._CheckWorkerCount(wp, 3)
272 self.assertRaises(AssertionError, wp.AddManyTasks,
273 ["Hello world %s" % i for i in range(10)])
274 self.assertRaises(AssertionError, wp.AddManyTasks,
275 [i for i in range(10)])
277 wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
278 wp.AddTask((ctx, "A separate hello"))
282 self._CheckNoTasks(wp)
284 wp.TerminateWorkers()
285 self._CheckWorkerCount(wp, 0)
287 self.assertEquals(ctx.GetDoneTasks(), 11)
289 def _CheckNoTasks(self, wp):
292 # The task queue must be empty now
293 self.failUnless(not wp._tasks)
297 def _CheckWorkerCount(self, wp, num_workers):
300 self.assertEqual(len(wp._workers), num_workers)
304 def testPriorityChecksum(self):
305 # Tests whether all tasks are run and, since we're only using a single
306 # thread, whether everything is started in order and respects the priority
307 wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
309 self._CheckWorkerCount(wp, 1)
311 ctx = ChecksumContext()
316 for i in range(1, 333):
318 tasks.append((ctx, i))
319 priorities.append(prio)
320 data.setdefault(prio, []).append(i)
322 wp.AddManyTasks(tasks, priority=priorities)
326 self._CheckNoTasks(wp)
331 checksum = ChecksumContext.CHECKSUM_START
332 for priority in sorted(data.keys()):
333 for i in data[priority]:
334 checksum = ChecksumContext.UpdateChecksum(checksum, i)
336 self.assertEqual(checksum, ctx.checksum)
340 self._CheckWorkerCount(wp, 1)
342 wp.TerminateWorkers()
343 self._CheckWorkerCount(wp, 0)
345 def testPriorityListManyTasks(self):
346 # Tests whether all tasks are run and, since we're only using a single
347 # thread, whether everything is started in order and respects the priority
348 wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
350 self._CheckWorkerCount(wp, 1)
352 ctx = ListBuilderContext()
354 # Use static seed for this test
355 rnd = random.Random(0)
360 for i in range(1, 333):
361 prio = int(rnd.random() * 10)
362 tasks.append((ctx, i))
363 priorities.append(prio)
364 data.setdefault(prio, []).append((prio, i))
366 wp.AddManyTasks(tasks, priority=priorities)
368 self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
369 [("x", ), ("y", )], priority=[1] * 5)
373 self._CheckNoTasks(wp)
379 for priority in sorted(data.keys()):
380 expresult.extend(data[priority])
382 self.assertEqual(expresult, ctx.result)
386 self._CheckWorkerCount(wp, 1)
388 wp.TerminateWorkers()
389 self._CheckWorkerCount(wp, 0)
391 def testPriorityListSingleTasks(self):
392 # Tests whether all tasks are run and, since we're only using a single
393 # thread, whether everything is started in order and respects the priority
394 wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
396 self._CheckWorkerCount(wp, 1)
398 ctx = ListBuilderContext()
400 # Use static seed for this test
401 rnd = random.Random(26279)
404 for i in range(1, 333):
405 prio = int(rnd.random() * 30)
406 wp.AddTask((ctx, i), priority=prio)
407 data.setdefault(prio, []).append(i)
409 # Cause some distortion
417 self._CheckNoTasks(wp)
422 self.assertEqual(data, ctx.prioresult)
426 self._CheckWorkerCount(wp, 1)
428 wp.TerminateWorkers()
429 self._CheckWorkerCount(wp, 0)
431 def testPriorityListSingleTasks(self):
432 # Tests whether all tasks are run and, since we're only using a single
433 # thread, whether everything is started in order and respects the priority
434 wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
436 self._CheckWorkerCount(wp, 1)
438 ctx = ListBuilderContext()
440 # Use static seed for this test
441 rnd = random.Random(26279)
444 for i in range(1, 333):
445 prio = int(rnd.random() * 30)
446 wp.AddTask((ctx, i), priority=prio)
447 data.setdefault(prio, []).append(i)
449 # Cause some distortion
457 self._CheckNoTasks(wp)
462 self.assertEqual(data, ctx.prioresult)
466 self._CheckWorkerCount(wp, 1)
468 wp.TerminateWorkers()
469 self._CheckWorkerCount(wp, 0)
471 def testDeferTask(self):
472 # Tests whether all tasks are run and, since we're only using a single
473 # thread, whether everything is started in order and respects the priority
474 wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
476 self._CheckWorkerCount(wp, 1)
478 ctx = DeferringTaskContext()
480 # Use static seed for this test
481 rnd = random.Random(14921)
484 for i in range(1, 333):
488 ctx.samepriodefer[i] = True
492 prio = int(rnd.random() * 30)
493 wp.AddTask((ctx, i, prio), priority=50)
494 data.setdefault(prio, set()).add(i)
496 # Cause some distortion
504 self._CheckNoTasks(wp)
509 self.assertEqual(data, ctx.prioresult)
513 self._CheckWorkerCount(wp, 1)
515 wp.TerminateWorkers()
516 self._CheckWorkerCount(wp, 0)
519 if __name__ == '__main__':
520 testutils.GanetiTestProgram()