self.priority = priority
+class NoSuchTask(Exception):
+ """Exception raised when a task can't be found.
+
+ """
+
+
class BaseWorker(threading.Thread, object):
"""Base worker class for worker pools.
finally:
self._lock.release()
+ def ChangeTaskPriority(self, task_id, priority):
+ """Changes a task's priority.
+
+ @param task_id: Task ID
+ @type priority: number
+ @param priority: New task priority
+ @raise NoSuchTask: When the task referred by C{task_id} can not be found
+ (it may never have existed, may have already been processed, or is
+ currently running)
+
+ """
+ assert isinstance(priority, (int, long)), "Priority must be numeric"
+
+ self._lock.acquire()
+ try:
+ logging.debug("About to change priority of task %s to %s",
+ task_id, priority)
+
+ # Find old task
+ oldtask = self._taskdata.get(task_id, None)
+ if oldtask is None:
+ msg = "Task '%s' was not found" % task_id
+ logging.debug(msg)
+ raise NoSuchTask(msg)
+
+ # Prepare new task
+ newtask = [priority] + oldtask[1:]
+
+ # Mark old entry as abandoned (this doesn't change the sort order and
+ # therefore doesn't invalidate the heap property of L{self._tasks}).
+ # See also <http://docs.python.org/library/heapq.html#priority-queue-
+ # implementation-notes>.
+ oldtask[-1] = None
+
+ # Change reference to new task entry and forget the old one
+ assert task_id is not None
+ self._taskdata[task_id] = newtask
+
+ # Add a new task with the old number and arguments
+ heapq.heappush(self._tasks, newtask)
+
+ # Notify a waiting worker
+ self._pool_to_worker.notify()
+ finally:
+ self._lock.release()
+
def SetActive(self, active):
"""Enable/disable processing of tasks.
finally:
self._worker_to_pool.notifyAll()
+ (_, _, task_id, args) = task
+
+ # If the priority was changed, "args" is None
+ if args is None:
+ # Try again
+ logging.debug("Found abandoned task (%r)", task)
+ continue
+
# Delete reference
- (_, _, task_id, _) = task
if task_id is not None:
del self._taskdata[task_id]
ctx.lock.release()
+class PriorityContext:
+ def __init__(self):
+ self.lock = threading.Lock()
+ self.result = []
+
+
+class PriorityWorker(workerpool.BaseWorker):
+ def RunTask(self, ctx, data):
+ ctx.lock.acquire()
+ try:
+ ctx.result.append((self.GetCurrentPriority(), data))
+ finally:
+ ctx.lock.release()
+
+
+class NotImplementedWorker(workerpool.BaseWorker):
+ def RunTask(self):
+ raise NotImplementedError
+
+
class TestWorkerpool(unittest.TestCase):
"""Workerpool tests"""
["Hello world %s" % i for i in range(10)])
self.assertRaises(AssertionError, wp.AddManyTasks,
[i for i in range(10)])
+ self.assertRaises(AssertionError, wp.AddManyTasks, [], task_id=0)
wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
wp.AddTask((ctx, "A separate hello"))
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
+ def testChangeTaskPriority(self):
+ wp = workerpool.WorkerPool("Test", 1, PriorityWorker)
+ try:
+ self._CheckWorkerCount(wp, 1)
+
+ ctx = PriorityContext()
+
+ # Use static seed for this test
+ rnd = random.Random(4727)
+
+ # Disable processing of tasks
+ wp.SetActive(False)
+
+ # No task ID
+ self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority,
+ None, 0)
+
+ # Pre-generate task IDs and priorities
+ count = 100
+ task_ids = range(0, count)
+ priorities = range(200, 200 + count) * 2
+
+ rnd.shuffle(task_ids)
+ rnd.shuffle(priorities)
+
+ # Make sure there are some duplicate priorities, but not all
+ priorities[count * 2 - 10:count * 2 - 1] = \
+ priorities[count - 10: count - 1]
+
+ assert len(priorities) == 2 * count
+ assert priorities[0:(count - 1)] != priorities[count:(2 * count - 1)]
+
+ # Add some tasks; this loop consumes the first half of all previously
+ # generated priorities
+ for (idx, task_id) in enumerate(task_ids):
+ wp.AddTask((ctx, idx),
+ priority=priorities.pop(),
+ task_id=task_id)
+
+ self.assertEqual(len(wp._tasks), len(task_ids))
+ self.assertEqual(len(wp._taskdata), len(task_ids))
+
+ # Tasks have been added, so half of the priorities should have been
+ # consumed
+ assert len(priorities) == len(task_ids)
+
+ # Change task priority
+ expected = []
+ for ((idx, task_id), prio) in zip(enumerate(task_ids), priorities):
+ wp.ChangeTaskPriority(task_id, prio)
+ expected.append((prio, idx))
+
+ self.assertEqual(len(wp._taskdata), len(task_ids))
+
+ # Half the entries are now abandoned tasks
+ self.assertEqual(len(wp._tasks), len(task_ids) * 2)
+
+ assert len(priorities) == count
+ assert len(task_ids) == count
+
+ # Start processing
+ wp.SetActive(True)
+
+ # Wait for tasks to finish
+ wp.Quiesce()
+
+ self._CheckNoTasks(wp)
+
+ for task_id in task_ids:
+ # All tasks are done
+ self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority,
+ task_id, 0)
+
+ # Check result
+ ctx.lock.acquire()
+ try:
+ self.assertEqual(ctx.result, sorted(expected))
+ finally:
+ ctx.lock.release()
+
+ self._CheckWorkerCount(wp, 1)
+ finally:
+ wp.TerminateWorkers()
+ self._CheckWorkerCount(wp, 0)
+
+ def testChangeTaskPriorityInteralStructures(self):
+ wp = workerpool.WorkerPool("Test", 1, NotImplementedWorker)
+ try:
+ self._CheckWorkerCount(wp, 1)
+
+ # Use static seed for this test
+ rnd = random.Random(643)
+
+ (num1, num2) = rnd.sample(range(1000), 2)
+
+ # Disable processing of tasks
+ wp.SetActive(False)
+
+ self.assertFalse(wp._tasks)
+ self.assertFalse(wp._taskdata)
+
+ # No priority or task ID
+ wp.AddTask(())
+ self.assertEqual(wp._tasks, [
+ [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+ ])
+ self.assertFalse(wp._taskdata)
+
+ # No task ID
+ wp.AddTask((), priority=7413)
+ self.assertEqual(wp._tasks, [
+ [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+ [7413, 1, None, ()],
+ ])
+ self.assertFalse(wp._taskdata)
+
+ # Start adding real tasks
+ wp.AddTask((), priority=10267659, task_id=num1)
+ self.assertEqual(wp._tasks, [
+ [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+ [7413, 1, None, ()],
+ [10267659, 2, num1, ()],
+ ])
+ self.assertEqual(wp._taskdata, {
+ num1: [10267659, 2, num1, ()],
+ })
+
+ wp.AddTask((), priority=123, task_id=num2)
+ self.assertEqual(sorted(wp._tasks), [
+ [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+ [123, 3, num2, ()],
+ [7413, 1, None, ()],
+ [10267659, 2, num1, ()],
+ ])
+ self.assertEqual(wp._taskdata, {
+ num1: [10267659, 2, num1, ()],
+ num2: [123, 3, num2, ()],
+ })
+
+ wp.ChangeTaskPriority(num1, 100)
+ self.assertEqual(sorted(wp._tasks), [
+ [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+ [100, 2, num1, ()],
+ [123, 3, num2, ()],
+ [7413, 1, None, ()],
+ [10267659, 2, num1, None],
+ ])
+ self.assertEqual(wp._taskdata, {
+ num1: [100, 2, num1, ()],
+ num2: [123, 3, num2, ()],
+ })
+
+ wp.ChangeTaskPriority(num2, 91337)
+ self.assertEqual(sorted(wp._tasks), [
+ [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+ [100, 2, num1, ()],
+ [123, 3, num2, None],
+ [7413, 1, None, ()],
+ [91337, 3, num2, ()],
+ [10267659, 2, num1, None],
+ ])
+ self.assertEqual(wp._taskdata, {
+ num1: [100, 2, num1, ()],
+ num2: [91337, 3, num2, ()],
+ })
+
+ wp.ChangeTaskPriority(num1, 10139)
+ self.assertEqual(sorted(wp._tasks), [
+ [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+ [100, 2, num1, None],
+ [123, 3, num2, None],
+ [7413, 1, None, ()],
+ [10139, 2, num1, ()],
+ [91337, 3, num2, ()],
+ [10267659, 2, num1, None],
+ ])
+ self.assertEqual(wp._taskdata, {
+ num1: [10139, 2, num1, ()],
+ num2: [91337, 3, num2, ()],
+ })
+
+ # Change to the same priority once again
+ wp.ChangeTaskPriority(num1, 10139)
+ self.assertEqual(sorted(wp._tasks), [
+ [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+ [100, 2, num1, None],
+ [123, 3, num2, None],
+ [7413, 1, None, ()],
+ [10139, 2, num1, None],
+ [10139, 2, num1, ()],
+ [91337, 3, num2, ()],
+ [10267659, 2, num1, None],
+ ])
+ self.assertEqual(wp._taskdata, {
+ num1: [10139, 2, num1, ()],
+ num2: [91337, 3, num2, ()],
+ })
+
+ self._CheckWorkerCount(wp, 1)
+ finally:
+ wp.TerminateWorkers()
+ self._CheckWorkerCount(wp, 0)
+
if __name__ == "__main__":
testutils.GanetiTestProgram()