4 # Copyright (C) 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the workerpool module"""
31 from ganeti import workerpool
32 from ganeti import errors
33 from ganeti import utils
34 from ganeti import compat
39 class CountingContext(object):
41 self._lock = threading.Condition(threading.Lock())
51 def GetDoneTasks(self):
59 def UpdateChecksum(current, value):
60 return zlib.adler32(str(value), current)
63 class CountingBaseWorker(workerpool.BaseWorker):
64 def RunTask(self, ctx, text):
68 class ChecksumContext:
69 CHECKSUM_START = zlib.adler32("")
72 self.lock = threading.Condition(threading.Lock())
73 self.checksum = self.CHECKSUM_START
76 def UpdateChecksum(current, value):
77 return zlib.adler32(str(value), current)
80 class ChecksumBaseWorker(workerpool.BaseWorker):
81 def RunTask(self, ctx, number):
82 name = "number%s" % number
83 self.SetTaskName(name)
85 # This assertion needs to be checked before updating the checksum. A
86 # failing assertion will then cause the result to be wrong.
87 assert self.getName() == ("%s/%s" % (self._worker_id, name))
91 ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
96 class ListBuilderContext:
98 self.lock = threading.Lock()
103 class ListBuilderWorker(workerpool.BaseWorker):
104 def RunTask(self, ctx, data):
107 ctx.result.append((self.GetCurrentPriority(), data))
108 ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
113 class DeferringTaskContext:
115 self.lock = threading.Lock()
117 self.samepriodefer = {}
118 self.num2ordertaskid = {}
121 class DeferringWorker(workerpool.BaseWorker):
122 def RunTask(self, ctx, num, targetprio):
125 otilst = ctx.num2ordertaskid.setdefault(num, [])
126 otilst.append(self._GetCurrentOrderAndTaskId())
128 if num in ctx.samepriodefer:
129 del ctx.samepriodefer[num]
130 raise workerpool.DeferTask()
132 if self.GetCurrentPriority() > targetprio:
133 raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)
135 ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
140 class PriorityContext:
142 self.lock = threading.Lock()
146 class PriorityWorker(workerpool.BaseWorker):
147 def RunTask(self, ctx, data):
150 ctx.result.append((self.GetCurrentPriority(), data))
155 class NotImplementedWorker(workerpool.BaseWorker):
157 raise NotImplementedError
160 class TestWorkerpool(unittest.TestCase):
161 """Workerpool tests"""
163 def testCounting(self):
164 ctx = CountingContext()
165 wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
167 self._CheckWorkerCount(wp, 3)
170 wp.AddTask((ctx, "Hello world %s" % i))
174 wp.TerminateWorkers()
175 self._CheckWorkerCount(wp, 0)
177 self.assertEquals(ctx.GetDoneTasks(), 10)
179 def testNoTasks(self):
180 wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
182 self._CheckWorkerCount(wp, 3)
183 self._CheckNoTasks(wp)
185 wp.TerminateWorkers()
186 self._CheckWorkerCount(wp, 0)
188 def testNoTasksQuiesce(self):
189 wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
191 self._CheckWorkerCount(wp, 3)
192 self._CheckNoTasks(wp)
194 self._CheckNoTasks(wp)
196 wp.TerminateWorkers()
197 self._CheckWorkerCount(wp, 0)
199 def testActive(self):
200 ctx = CountingContext()
201 wp = workerpool.WorkerPool("TestActive", 5, CountingBaseWorker)
203 self._CheckWorkerCount(wp, 5)
204 self.assertTrue(wp._active)
208 wp.AddTask((ctx, None))
211 self._CheckNoTasks(wp)
212 self.assertEquals(ctx.GetDoneTasks(), 10)
215 for count in range(10):
218 self._CheckNoTasks(wp)
220 # Queue some more tasks
222 wp.AddTask((ctx, None))
225 # Short delays to give other threads a chance to cause breakage
227 wp.AddTask((ctx, "Hello world %s" % 999))
228 self.assertFalse(wp._active)
230 self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15))
232 # Start processing again
234 self.assertTrue(wp._active)
236 # Wait for tasks to finish
238 self._CheckNoTasks(wp)
239 self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15) + 15)
241 self._CheckWorkerCount(wp, 5)
243 wp.TerminateWorkers()
244 self._CheckWorkerCount(wp, 0)
246 def testChecksum(self):
247 # Tests whether all tasks are run and, since we're only using a single
248 # thread, whether everything is started in order.
249 wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
251 self._CheckWorkerCount(wp, 1)
253 ctx = ChecksumContext()
254 checksum = ChecksumContext.CHECKSUM_START
255 for i in range(1, 100):
256 checksum = ChecksumContext.UpdateChecksum(checksum, i)
261 self._CheckNoTasks(wp)
266 self.assertEqual(checksum, ctx.checksum)
270 wp.TerminateWorkers()
271 self._CheckWorkerCount(wp, 0)
273 def testAddManyTasks(self):
274 ctx = CountingContext()
275 wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
277 self._CheckWorkerCount(wp, 3)
279 wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
280 wp.AddTask((ctx, "A separate hello"))
281 wp.AddTask((ctx, "Once more, hi!"))
282 wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
286 self._CheckNoTasks(wp)
288 wp.TerminateWorkers()
289 self._CheckWorkerCount(wp, 0)
291 self.assertEquals(ctx.GetDoneTasks(), 22)
293 def testManyTasksSequence(self):
294 ctx = CountingContext()
295 wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
297 self._CheckWorkerCount(wp, 3)
298 self.assertRaises(AssertionError, wp.AddManyTasks,
299 ["Hello world %s" % i for i in range(10)])
300 self.assertRaises(AssertionError, wp.AddManyTasks,
301 [i for i in range(10)])
302 self.assertRaises(AssertionError, wp.AddManyTasks, [], task_id=0)
304 wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
305 wp.AddTask((ctx, "A separate hello"))
309 self._CheckNoTasks(wp)
311 wp.TerminateWorkers()
312 self._CheckWorkerCount(wp, 0)
314 self.assertEquals(ctx.GetDoneTasks(), 11)
316 def _CheckNoTasks(self, wp):
319 # The task queue must be empty now
320 self.assertFalse(wp._tasks)
321 self.assertFalse(wp._taskdata)
325 def _CheckWorkerCount(self, wp, num_workers):
328 self.assertEqual(len(wp._workers), num_workers)
332 def testPriorityChecksum(self):
333 # Tests whether all tasks are run and, since we're only using a single
334 # thread, whether everything is started in order and respects the priority
335 wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
337 self._CheckWorkerCount(wp, 1)
339 ctx = ChecksumContext()
344 for i in range(1, 333):
346 tasks.append((ctx, i))
347 priorities.append(prio)
348 data.setdefault(prio, []).append(i)
350 wp.AddManyTasks(tasks, priority=priorities)
354 self._CheckNoTasks(wp)
359 checksum = ChecksumContext.CHECKSUM_START
360 for priority in sorted(data.keys()):
361 for i in data[priority]:
362 checksum = ChecksumContext.UpdateChecksum(checksum, i)
364 self.assertEqual(checksum, ctx.checksum)
368 self._CheckWorkerCount(wp, 1)
370 wp.TerminateWorkers()
371 self._CheckWorkerCount(wp, 0)
373 def testPriorityListManyTasks(self):
374 # Tests whether all tasks are run and, since we're only using a single
375 # thread, whether everything is started in order and respects the priority
376 wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
378 self._CheckWorkerCount(wp, 1)
380 ctx = ListBuilderContext()
382 # Use static seed for this test
383 rnd = random.Random(0)
388 for i in range(1, 333):
389 prio = int(rnd.random() * 10)
390 tasks.append((ctx, i))
391 priorities.append(prio)
392 data.setdefault(prio, []).append((prio, i))
394 wp.AddManyTasks(tasks, priority=priorities)
396 self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
397 [("x", ), ("y", )], priority=[1] * 5)
398 self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
399 [("x", ), ("y", )], task_id=[1] * 5)
403 self._CheckNoTasks(wp)
409 for priority in sorted(data.keys()):
410 expresult.extend(data[priority])
412 self.assertEqual(expresult, ctx.result)
416 self._CheckWorkerCount(wp, 1)
418 wp.TerminateWorkers()
419 self._CheckWorkerCount(wp, 0)
421 def testPriorityListSingleTasks(self):
422 # Tests whether all tasks are run and, since we're only using a single
423 # thread, whether everything is started in order and respects the priority
424 wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
426 self._CheckWorkerCount(wp, 1)
428 ctx = ListBuilderContext()
430 # Use static seed for this test
431 rnd = random.Random(26279)
434 for i in range(1, 333):
435 prio = int(rnd.random() * 30)
436 wp.AddTask((ctx, i), priority=prio)
437 data.setdefault(prio, []).append(i)
439 # Cause some distortion
447 self._CheckNoTasks(wp)
452 self.assertEqual(data, ctx.prioresult)
456 self._CheckWorkerCount(wp, 1)
458 wp.TerminateWorkers()
459 self._CheckWorkerCount(wp, 0)
461 def testDeferTask(self):
462 # Tests whether all tasks are run and, since we're only using a single
463 # thread, whether everything is started in order and respects the priority
464 wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
466 self._CheckWorkerCount(wp, 1)
468 ctx = DeferringTaskContext()
470 # Use static seed for this test
471 rnd = random.Random(14921)
475 for i in range(1, 333):
479 ctx.samepriodefer[i] = True
483 prio = int(rnd.random() * 30)
484 num2taskid[i] = 1000 * i
485 wp.AddTask((ctx, i, prio), priority=50,
486 task_id=num2taskid[i])
487 data.setdefault(prio, set()).add(i)
489 # Cause some distortion
497 self._CheckNoTasks(wp)
502 self.assertEqual(data, ctx.prioresult)
506 for (num, numordertaskid) in ctx.num2ordertaskid.items():
507 order_ids = map(compat.fst, numordertaskid)
508 self.assertFalse(utils.FindDuplicates(order_ids),
509 msg="Order ID has been reused")
510 all_order_ids.extend(order_ids)
512 for task_id in map(compat.snd, numordertaskid):
513 self.assertEqual(task_id, num2taskid[num],
514 msg=("Task %s used different task IDs" % num))
516 self.assertFalse(utils.FindDuplicates(all_order_ids),
517 msg="Order ID has been reused")
521 self._CheckWorkerCount(wp, 1)
523 wp.TerminateWorkers()
524 self._CheckWorkerCount(wp, 0)
526 def testChangeTaskPriority(self):
527 wp = workerpool.WorkerPool("Test", 1, PriorityWorker)
529 self._CheckWorkerCount(wp, 1)
531 ctx = PriorityContext()
533 # Use static seed for this test
534 rnd = random.Random(4727)
536 # Disable processing of tasks
540 self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority,
543 # Pre-generate task IDs and priorities
545 task_ids = range(0, count)
546 priorities = range(200, 200 + count) * 2
548 rnd.shuffle(task_ids)
549 rnd.shuffle(priorities)
551 # Make sure there are some duplicate priorities, but not all
552 priorities[count * 2 - 10:count * 2 - 1] = \
553 priorities[count - 10: count - 1]
555 assert len(priorities) == 2 * count
556 assert priorities[0:(count - 1)] != priorities[count:(2 * count - 1)]
558 # Add some tasks; this loop consumes the first half of all previously
559 # generated priorities
560 for (idx, task_id) in enumerate(task_ids):
561 wp.AddTask((ctx, idx),
562 priority=priorities.pop(),
565 self.assertEqual(len(wp._tasks), len(task_ids))
566 self.assertEqual(len(wp._taskdata), len(task_ids))
568 # Tasks have been added, so half of the priorities should have been
570 assert len(priorities) == len(task_ids)
572 # Change task priority
574 for ((idx, task_id), prio) in zip(enumerate(task_ids), priorities):
575 wp.ChangeTaskPriority(task_id, prio)
576 expected.append((prio, idx))
578 self.assertEqual(len(wp._taskdata), len(task_ids))
580 # Half the entries are now abandoned tasks
581 self.assertEqual(len(wp._tasks), len(task_ids) * 2)
583 assert len(priorities) == count
584 assert len(task_ids) == count
589 # Wait for tasks to finish
592 self._CheckNoTasks(wp)
594 for task_id in task_ids:
596 self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority,
602 self.assertEqual(ctx.result, sorted(expected))
606 self._CheckWorkerCount(wp, 1)
608 wp.TerminateWorkers()
609 self._CheckWorkerCount(wp, 0)
611 def testChangeTaskPriorityInteralStructures(self):
612 wp = workerpool.WorkerPool("Test", 1, NotImplementedWorker)
614 self._CheckWorkerCount(wp, 1)
616 # Use static seed for this test
617 rnd = random.Random(643)
619 (num1, num2) = rnd.sample(range(1000), 2)
621 # Disable processing of tasks
624 self.assertFalse(wp._tasks)
625 self.assertFalse(wp._taskdata)
627 # No priority or task ID
629 self.assertEqual(wp._tasks, [
630 [workerpool._DEFAULT_PRIORITY, 0, None, ()],
632 self.assertFalse(wp._taskdata)
635 wp.AddTask((), priority=7413)
636 self.assertEqual(wp._tasks, [
637 [workerpool._DEFAULT_PRIORITY, 0, None, ()],
640 self.assertFalse(wp._taskdata)
642 # Start adding real tasks
643 wp.AddTask((), priority=10267659, task_id=num1)
644 self.assertEqual(wp._tasks, [
645 [workerpool._DEFAULT_PRIORITY, 0, None, ()],
647 [10267659, 2, num1, ()],
649 self.assertEqual(wp._taskdata, {
650 num1: [10267659, 2, num1, ()],
653 wp.AddTask((), priority=123, task_id=num2)
654 self.assertEqual(sorted(wp._tasks), [
655 [workerpool._DEFAULT_PRIORITY, 0, None, ()],
658 [10267659, 2, num1, ()],
660 self.assertEqual(wp._taskdata, {
661 num1: [10267659, 2, num1, ()],
662 num2: [123, 3, num2, ()],
665 wp.ChangeTaskPriority(num1, 100)
666 self.assertEqual(sorted(wp._tasks), [
667 [workerpool._DEFAULT_PRIORITY, 0, None, ()],
671 [10267659, 2, num1, None],
673 self.assertEqual(wp._taskdata, {
674 num1: [100, 2, num1, ()],
675 num2: [123, 3, num2, ()],
678 wp.ChangeTaskPriority(num2, 91337)
679 self.assertEqual(sorted(wp._tasks), [
680 [workerpool._DEFAULT_PRIORITY, 0, None, ()],
682 [123, 3, num2, None],
684 [91337, 3, num2, ()],
685 [10267659, 2, num1, None],
687 self.assertEqual(wp._taskdata, {
688 num1: [100, 2, num1, ()],
689 num2: [91337, 3, num2, ()],
692 wp.ChangeTaskPriority(num1, 10139)
693 self.assertEqual(sorted(wp._tasks), [
694 [workerpool._DEFAULT_PRIORITY, 0, None, ()],
695 [100, 2, num1, None],
696 [123, 3, num2, None],
698 [10139, 2, num1, ()],
699 [91337, 3, num2, ()],
700 [10267659, 2, num1, None],
702 self.assertEqual(wp._taskdata, {
703 num1: [10139, 2, num1, ()],
704 num2: [91337, 3, num2, ()],
707 # Change to the same priority once again
708 wp.ChangeTaskPriority(num1, 10139)
709 self.assertEqual(sorted(wp._tasks), [
710 [workerpool._DEFAULT_PRIORITY, 0, None, ()],
711 [100, 2, num1, None],
712 [123, 3, num2, None],
714 [10139, 2, num1, None],
715 [10139, 2, num1, ()],
716 [91337, 3, num2, ()],
717 [10267659, 2, num1, None],
719 self.assertEqual(wp._taskdata, {
720 num1: [10139, 2, num1, ()],
721 num2: [91337, 3, num2, ()],
724 self._CheckWorkerCount(wp, 1)
726 wp.TerminateWorkers()
727 self._CheckWorkerCount(wp, 0)
730 if __name__ == "__main__":
731 testutils.GanetiTestProgram()