class ChecksumBaseWorker(workerpool.BaseWorker):
def RunTask(self, ctx, number):
+ name = "number%s" % number
+ self.SetTaskName(name)
+
+ # This assertion needs to be checked before updating the checksum. A
+ # failing assertion will then cause the result to be wrong.
+ assert self.getName() == ("%s/%s" % (self._worker_id, name))
+
ctx.lock.acquire()
try:
ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
+ def testActive(self):
+ ctx = CountingContext()
+ wp = workerpool.WorkerPool("TestActive", 5, CountingBaseWorker)
+ try:
+ self._CheckWorkerCount(wp, 5)
+ self.assertTrue(wp._active)
+
+ # Process some tasks
+ for _ in range(10):
+ wp.AddTask((ctx, None))
+
+ wp.Quiesce()
+ self._CheckNoTasks(wp)
+ self.assertEquals(ctx.GetDoneTasks(), 10)
+
+ # Repeat a few times
+ for count in range(10):
+ # Deactivate pool
+ wp.SetActive(False)
+ self._CheckNoTasks(wp)
+
+ # Queue some more tasks
+ for _ in range(10):
+ wp.AddTask((ctx, None))
+
+ for _ in range(5):
+ # Short delays to give other threads a chance to cause breakage
+ time.sleep(.01)
+ wp.AddTask((ctx, "Hello world %s" % 999))
+ self.assertFalse(wp._active)
+
+ self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15))
+
+ # Start processing again
+ wp.SetActive(True)
+ self.assertTrue(wp._active)
+
+ # Wait for tasks to finish
+ wp.Quiesce()
+ self._CheckNoTasks(wp)
+ self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15) + 15)
+
+ self._CheckWorkerCount(wp, 5)
+ finally:
+ wp.TerminateWorkers()
+ self._CheckWorkerCount(wp, 0)
+
def testChecksum(self):
# Tests whether all tasks are run and, since we're only using a single
# thread, whether everything is started in order.