+ def testPriorityChecksum(self):
+ # Tests whether all tasks are run and, since we're only using a single
+ # thread, whether everything is started in order and respects the priority
+ wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
+ try:
+ self._CheckWorkerCount(wp, 1)
+
+ ctx = ChecksumContext()
+
+ data = {}
+ tasks = []
+ priorities = []
+ for i in range(1, 333):
+ prio = i % 7
+ tasks.append((ctx, i))
+ priorities.append(prio)
+ data.setdefault(prio, []).append(i)
+
+ wp.AddManyTasks(tasks, priority=priorities)
+
+ wp.Quiesce()
+
+ self._CheckNoTasks(wp)
+
+ # Check sum
+ ctx.lock.acquire()
+ try:
+ checksum = ChecksumContext.CHECKSUM_START
+ for priority in sorted(data.keys()):
+ for i in data[priority]:
+ checksum = ChecksumContext.UpdateChecksum(checksum, i)
+
+ self.assertEqual(checksum, ctx.checksum)
+ finally:
+ ctx.lock.release()
+
+ self._CheckWorkerCount(wp, 1)
+ finally:
+ wp.TerminateWorkers()
+ self._CheckWorkerCount(wp, 0)
+
+ def testPriorityListManyTasks(self):
+ # Tests whether all tasks are run and, since we're only using a single
+ # thread, whether everything is started in order and respects the priority
+ wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
+ try:
+ self._CheckWorkerCount(wp, 1)
+
+ ctx = ListBuilderContext()
+
+ # Use static seed for this test
+ rnd = random.Random(0)
+
+ data = {}
+ tasks = []
+ priorities = []
+ for i in range(1, 333):
+ prio = int(rnd.random() * 10)
+ tasks.append((ctx, i))
+ priorities.append(prio)
+ data.setdefault(prio, []).append((prio, i))
+
+ wp.AddManyTasks(tasks, priority=priorities)
+
+ self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
+ [("x", ), ("y", )], priority=[1] * 5)
+
+ wp.Quiesce()
+
+ self._CheckNoTasks(wp)
+
+ # Check result
+ ctx.lock.acquire()
+ try:
+ expresult = []
+ for priority in sorted(data.keys()):
+ expresult.extend(data[priority])
+
+ self.assertEqual(expresult, ctx.result)
+ finally:
+ ctx.lock.release()
+
+ self._CheckWorkerCount(wp, 1)
+ finally:
+ wp.TerminateWorkers()
+ self._CheckWorkerCount(wp, 0)
+
+ def testPriorityListSingleTasks(self):
+ # Tests whether all tasks are run and, since we're only using a single
+ # thread, whether everything is started in order and respects the priority
+ wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
+ try:
+ self._CheckWorkerCount(wp, 1)
+
+ ctx = ListBuilderContext()
+
+ # Use static seed for this test
+ rnd = random.Random(26279)
+
+ data = {}
+ for i in range(1, 333):
+ prio = int(rnd.random() * 30)
+ wp.AddTask((ctx, i), priority=prio)
+ data.setdefault(prio, []).append(i)
+
+ # Cause some distortion
+ if i % 11 == 0:
+ time.sleep(.001)
+ if i % 41 == 0:
+ wp.Quiesce()
+
+ wp.Quiesce()
+
+ self._CheckNoTasks(wp)
+
+ # Check result
+ ctx.lock.acquire()
+ try:
+ self.assertEqual(data, ctx.prioresult)
+ finally:
+ ctx.lock.release()
+
+ self._CheckWorkerCount(wp, 1)
+ finally:
+ wp.TerminateWorkers()
+ self._CheckWorkerCount(wp, 0)
+
+ def testPriorityListSingleTasks(self):
+ # Tests whether all tasks are run and, since we're only using a single
+ # thread, whether everything is started in order and respects the priority
+ wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
+ try:
+ self._CheckWorkerCount(wp, 1)
+
+ ctx = ListBuilderContext()
+
+ # Use static seed for this test
+ rnd = random.Random(26279)
+
+ data = {}
+ for i in range(1, 333):
+ prio = int(rnd.random() * 30)
+ wp.AddTask((ctx, i), priority=prio)
+ data.setdefault(prio, []).append(i)
+
+ # Cause some distortion
+ if i % 11 == 0:
+ time.sleep(.001)
+ if i % 41 == 0:
+ wp.Quiesce()
+
+ wp.Quiesce()
+
+ self._CheckNoTasks(wp)
+
+ # Check result
+ ctx.lock.acquire()
+ try:
+ self.assertEqual(data, ctx.prioresult)
+ finally:
+ ctx.lock.release()
+
+ self._CheckWorkerCount(wp, 1)
+ finally:
+ wp.TerminateWorkers()
+ self._CheckWorkerCount(wp, 0)
+
+ def testDeferTask(self):
+ # Tests whether all tasks are run and, since we're only using a single
+ # thread, whether everything is started in order and respects the priority
+ wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
+ try:
+ self._CheckWorkerCount(wp, 1)
+
+ ctx = DeferringTaskContext()
+
+ # Use static seed for this test
+ rnd = random.Random(14921)
+
+ data = {}
+ for i in range(1, 333):
+ ctx.lock.acquire()
+ try:
+ if i % 5 == 0:
+ ctx.samepriodefer[i] = True
+ finally:
+ ctx.lock.release()
+
+ prio = int(rnd.random() * 30)
+ wp.AddTask((ctx, i, prio), priority=50)
+ data.setdefault(prio, set()).add(i)
+
+ # Cause some distortion
+ if i % 24 == 0:
+ time.sleep(.001)
+ if i % 31 == 0:
+ wp.Quiesce()
+
+ wp.Quiesce()
+
+ self._CheckNoTasks(wp)
+
+ # Check result
+ ctx.lock.acquire()
+ try:
+ self.assertEqual(data, ctx.prioresult)
+ finally:
+ ctx.lock.release()
+
+ self._CheckWorkerCount(wp, 1)
+ finally:
+ wp.TerminateWorkers()
+ self._CheckWorkerCount(wp, 0)
+