+ def testPriorityChecksum(self):
+ # Tests whether all tasks are run and, since we're only using a single
+ # thread, whether everything is started in order and respects the priority
+ wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
+ try:
+ self._CheckWorkerCount(wp, 1)
+
+ ctx = ChecksumContext()
+
+ data = {}
+ tasks = []
+ priorities = []
+ for i in range(1, 333):
+ prio = i % 7
+ tasks.append((ctx, i))
+ priorities.append(prio)
+ data.setdefault(prio, []).append(i)
+
+ wp.AddManyTasks(tasks, priority=priorities)
+
+ wp.Quiesce()
+
+ self._CheckNoTasks(wp)
+
+ # Check sum
+ ctx.lock.acquire()
+ try:
+ checksum = ChecksumContext.CHECKSUM_START
+ for priority in sorted(data.keys()):
+ for i in data[priority]:
+ checksum = ChecksumContext.UpdateChecksum(checksum, i)
+
+ self.assertEqual(checksum, ctx.checksum)
+ finally:
+ ctx.lock.release()
+
+ self._CheckWorkerCount(wp, 1)
+ finally:
+ wp.TerminateWorkers()
+ self._CheckWorkerCount(wp, 0)
+
+ def testPriorityListManyTasks(self):
+ # Tests whether all tasks are run and, since we're only using a single
+ # thread, whether everything is started in order and respects the priority
+ wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
+ try:
+ self._CheckWorkerCount(wp, 1)
+
+ ctx = ListBuilderContext()
+
+ # Use static seed for this test
+ rnd = random.Random(0)
+
+ data = {}
+ tasks = []
+ priorities = []
+ for i in range(1, 333):
+ prio = int(rnd.random() * 10)
+ tasks.append((ctx, i))
+ priorities.append(prio)
+ data.setdefault(prio, []).append((prio, i))
+
+ wp.AddManyTasks(tasks, priority=priorities)
+
+ self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
+ [("x", ), ("y", )], priority=[1] * 5)
+ self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
+ [("x", ), ("y", )], task_id=[1] * 5)
+
+ wp.Quiesce()
+
+ self._CheckNoTasks(wp)
+
+ # Check result
+ ctx.lock.acquire()
+ try:
+ expresult = []
+ for priority in sorted(data.keys()):
+ expresult.extend(data[priority])
+
+ self.assertEqual(expresult, ctx.result)
+ finally:
+ ctx.lock.release()
+
+ self._CheckWorkerCount(wp, 1)
+ finally:
+ wp.TerminateWorkers()
+ self._CheckWorkerCount(wp, 0)
+
+ def testPriorityListSingleTasks(self):
+ # Tests whether all tasks are run and, since we're only using a single
+ # thread, whether everything is started in order and respects the priority
+ wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
+ try:
+ self._CheckWorkerCount(wp, 1)
+
+ ctx = ListBuilderContext()
+
+ # Use static seed for this test
+ rnd = random.Random(26279)
+
+ data = {}
+ for i in range(1, 333):
+ prio = int(rnd.random() * 30)
+ wp.AddTask((ctx, i), priority=prio)
+ data.setdefault(prio, []).append(i)
+
+ # Cause some distortion
+ if i % 11 == 0:
+ time.sleep(.001)
+ if i % 41 == 0:
+ wp.Quiesce()
+
+ wp.Quiesce()
+
+ self._CheckNoTasks(wp)
+
+ # Check result
+ ctx.lock.acquire()
+ try:
+ self.assertEqual(data, ctx.prioresult)
+ finally:
+ ctx.lock.release()
+
+ self._CheckWorkerCount(wp, 1)
+ finally:
+ wp.TerminateWorkers()
+ self._CheckWorkerCount(wp, 0)
+
+ def testDeferTask(self):
+ # Tests whether all tasks are run and, since we're only using a single
+ # thread, whether everything is started in order and respects the priority
+ wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
+ try:
+ self._CheckWorkerCount(wp, 1)
+
+ ctx = DeferringTaskContext()
+
+ # Use static seed for this test
+ rnd = random.Random(14921)
+
+ data = {}
+ num2taskid = {}
+ for i in range(1, 333):
+ ctx.lock.acquire()
+ try:
+ if i % 5 == 0:
+ ctx.samepriodefer[i] = True
+ finally:
+ ctx.lock.release()
+
+ prio = int(rnd.random() * 30)
+ num2taskid[i] = 1000 * i
+ wp.AddTask((ctx, i, prio), priority=50,
+ task_id=num2taskid[i])
+ data.setdefault(prio, set()).add(i)
+
+ # Cause some distortion
+ if i % 24 == 0:
+ time.sleep(.001)
+ if i % 31 == 0:
+ wp.Quiesce()
+
+ wp.Quiesce()
+
+ self._CheckNoTasks(wp)
+
+ # Check result
+ ctx.lock.acquire()
+ try:
+ self.assertEqual(data, ctx.prioresult)
+
+ all_order_ids = []
+
+ for (num, numordertaskid) in ctx.num2ordertaskid.items():
+ order_ids = map(compat.fst, numordertaskid)
+ self.assertFalse(utils.FindDuplicates(order_ids),
+ msg="Order ID has been reused")
+ all_order_ids.extend(order_ids)
+
+ for task_id in map(compat.snd, numordertaskid):
+ self.assertEqual(task_id, num2taskid[num],
+ msg=("Task %s used different task IDs" % num))
+
+ self.assertFalse(utils.FindDuplicates(all_order_ids),
+ msg="Order ID has been reused")
+ finally:
+ ctx.lock.release()
+
+ self._CheckWorkerCount(wp, 1)
+ finally:
+ wp.TerminateWorkers()
+ self._CheckWorkerCount(wp, 0)
+
+ def testChangeTaskPriority(self):
+ wp = workerpool.WorkerPool("Test", 1, PriorityWorker)
+ try:
+ self._CheckWorkerCount(wp, 1)
+
+ ctx = PriorityContext()
+
+ # Use static seed for this test
+ rnd = random.Random(4727)
+
+ # Disable processing of tasks
+ wp.SetActive(False)
+
+ # No task ID
+ self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority,
+ None, 0)
+
+ # Pre-generate task IDs and priorities
+ count = 100
+ task_ids = range(0, count)
+ priorities = range(200, 200 + count) * 2
+
+ rnd.shuffle(task_ids)
+ rnd.shuffle(priorities)
+
+ # Make sure there are some duplicate priorities, but not all
+ priorities[count * 2 - 10:count * 2 - 1] = \
+ priorities[count - 10: count - 1]
+
+ assert len(priorities) == 2 * count
+ assert priorities[0:(count - 1)] != priorities[count:(2 * count - 1)]
+
+ # Add some tasks; this loop consumes the first half of all previously
+ # generated priorities
+ for (idx, task_id) in enumerate(task_ids):
+ wp.AddTask((ctx, idx),
+ priority=priorities.pop(),
+ task_id=task_id)
+
+ self.assertEqual(len(wp._tasks), len(task_ids))
+ self.assertEqual(len(wp._taskdata), len(task_ids))
+
+ # Tasks have been added, so half of the priorities should have been
+ # consumed
+ assert len(priorities) == len(task_ids)
+
+ # Change task priority
+ expected = []
+ for ((idx, task_id), prio) in zip(enumerate(task_ids), priorities):
+ wp.ChangeTaskPriority(task_id, prio)
+ expected.append((prio, idx))
+
+ self.assertEqual(len(wp._taskdata), len(task_ids))
+
+ # Half the entries are now abandoned tasks
+ self.assertEqual(len(wp._tasks), len(task_ids) * 2)
+
+ assert len(priorities) == count
+ assert len(task_ids) == count
+
+ # Start processing
+ wp.SetActive(True)
+
+ # Wait for tasks to finish
+ wp.Quiesce()
+
+ self._CheckNoTasks(wp)
+
+ for task_id in task_ids:
+ # All tasks are done
+ self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority,
+ task_id, 0)
+
+ # Check result
+ ctx.lock.acquire()
+ try:
+ self.assertEqual(ctx.result, sorted(expected))
+ finally:
+ ctx.lock.release()
+
+ self._CheckWorkerCount(wp, 1)
+ finally:
+ wp.TerminateWorkers()
+ self._CheckWorkerCount(wp, 0)
+
+ def testChangeTaskPriorityInteralStructures(self):
+ wp = workerpool.WorkerPool("Test", 1, NotImplementedWorker)
+ try:
+ self._CheckWorkerCount(wp, 1)
+
+ # Use static seed for this test
+ rnd = random.Random(643)
+
+ (num1, num2) = rnd.sample(range(1000), 2)
+
+ # Disable processing of tasks
+ wp.SetActive(False)
+
+ self.assertFalse(wp._tasks)
+ self.assertFalse(wp._taskdata)
+
+ # No priority or task ID
+ wp.AddTask(())
+ self.assertEqual(wp._tasks, [
+ [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+ ])
+ self.assertFalse(wp._taskdata)
+
+ # No task ID
+ wp.AddTask((), priority=7413)
+ self.assertEqual(wp._tasks, [
+ [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+ [7413, 1, None, ()],
+ ])
+ self.assertFalse(wp._taskdata)
+
+ # Start adding real tasks
+ wp.AddTask((), priority=10267659, task_id=num1)
+ self.assertEqual(wp._tasks, [
+ [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+ [7413, 1, None, ()],
+ [10267659, 2, num1, ()],
+ ])
+ self.assertEqual(wp._taskdata, {
+ num1: [10267659, 2, num1, ()],
+ })
+
+ wp.AddTask((), priority=123, task_id=num2)
+ self.assertEqual(sorted(wp._tasks), [
+ [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+ [123, 3, num2, ()],
+ [7413, 1, None, ()],
+ [10267659, 2, num1, ()],
+ ])
+ self.assertEqual(wp._taskdata, {
+ num1: [10267659, 2, num1, ()],
+ num2: [123, 3, num2, ()],
+ })
+
+ wp.ChangeTaskPriority(num1, 100)
+ self.assertEqual(sorted(wp._tasks), [
+ [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+ [100, 2, num1, ()],
+ [123, 3, num2, ()],
+ [7413, 1, None, ()],
+ [10267659, 2, num1, None],
+ ])
+ self.assertEqual(wp._taskdata, {
+ num1: [100, 2, num1, ()],
+ num2: [123, 3, num2, ()],
+ })
+
+ wp.ChangeTaskPriority(num2, 91337)
+ self.assertEqual(sorted(wp._tasks), [
+ [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+ [100, 2, num1, ()],
+ [123, 3, num2, None],
+ [7413, 1, None, ()],
+ [91337, 3, num2, ()],
+ [10267659, 2, num1, None],
+ ])
+ self.assertEqual(wp._taskdata, {
+ num1: [100, 2, num1, ()],
+ num2: [91337, 3, num2, ()],
+ })
+
+ wp.ChangeTaskPriority(num1, 10139)
+ self.assertEqual(sorted(wp._tasks), [
+ [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+ [100, 2, num1, None],
+ [123, 3, num2, None],
+ [7413, 1, None, ()],
+ [10139, 2, num1, ()],
+ [91337, 3, num2, ()],
+ [10267659, 2, num1, None],
+ ])
+ self.assertEqual(wp._taskdata, {
+ num1: [10139, 2, num1, ()],
+ num2: [91337, 3, num2, ()],
+ })
+
+ # Change to the same priority once again
+ wp.ChangeTaskPriority(num1, 10139)
+ self.assertEqual(sorted(wp._tasks), [
+ [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+ [100, 2, num1, None],
+ [123, 3, num2, None],
+ [7413, 1, None, ()],
+ [10139, 2, num1, None],
+ [10139, 2, num1, ()],
+ [91337, 3, num2, ()],
+ [10267659, 2, num1, None],
+ ])
+ self.assertEqual(wp._taskdata, {
+ num1: [10139, 2, num1, ()],
+ num2: [91337, 3, num2, ()],
+ })
+
+ self._CheckWorkerCount(wp, 1)
+ finally:
+ wp.TerminateWorkers()
+ self._CheckWorkerCount(wp, 0)
+