From 9a2564e72958362b364f5129f470757946f0a6b6 Mon Sep 17 00:00:00 2001 From: Michael Hanselmann Date: Tue, 30 Oct 2012 16:44:02 +0100 Subject: [PATCH] workerpool: Add method to change task's priority Using the task ID a pending task's priority can be changed. This will be used to change the priority of jobs in the workerpool. Signed-off-by: Michael Hanselmann Reviewed-by: Bernardo Dal Seno --- lib/workerpool.py | 61 +++++++++- test/ganeti.workerpool_unittest.py | 224 ++++++++++++++++++++++++++++++++++++ 2 files changed, 284 insertions(+), 1 deletion(-) diff --git a/lib/workerpool.py b/lib/workerpool.py index 58cce36..d77825f 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -54,6 +54,12 @@ class DeferTask(Exception): self.priority = priority +class NoSuchTask(Exception): + """Exception raised when a task can't be found. + + """ + + class BaseWorker(threading.Thread, object): """Base worker class for worker pools. @@ -377,6 +383,52 @@ class WorkerPool(object): finally: self._lock.release() + def ChangeTaskPriority(self, task_id, priority): + """Changes a task's priority. + + @param task_id: Task ID + @type priority: number + @param priority: New task priority + @raise NoSuchTask: When the task referred by C{task_id} can not be found + (it may never have existed, may have already been processed, or is + currently running) + + """ + assert isinstance(priority, (int, long)), "Priority must be numeric" + + self._lock.acquire() + try: + logging.debug("About to change priority of task %s to %s", + task_id, priority) + + # Find old task + oldtask = self._taskdata.get(task_id, None) + if oldtask is None: + msg = "Task '%s' was not found" % task_id + logging.debug(msg) + raise NoSuchTask(msg) + + # Prepare new task + newtask = [priority] + oldtask[1:] + + # Mark old entry as abandoned (this doesn't change the sort order and + # therefore doesn't invalidate the heap property of L{self._tasks}). + # See also . + oldtask[-1] = None + + # Change reference to new task entry and forget the old one + assert task_id is not None + self._taskdata[task_id] = newtask + + # Add a new task with the old number and arguments + heapq.heappush(self._tasks, newtask) + + # Notify a waiting worker + self._pool_to_worker.notify() + finally: + self._lock.release() + def SetActive(self, active): """Enable/disable processing of tasks. @@ -418,8 +470,15 @@ class WorkerPool(object): finally: self._worker_to_pool.notifyAll() + (_, _, task_id, args) = task + + # If the priority was changed, "args" is None + if args is None: + # Try again + logging.debug("Found abandoned task (%r)", task) + continue + # Delete reference - (_, _, task_id, _) = task if task_id is not None: del self._taskdata[task_id] diff --git a/test/ganeti.workerpool_unittest.py b/test/ganeti.workerpool_unittest.py index 2449cef..f890db0 100755 --- a/test/ganeti.workerpool_unittest.py +++ b/test/ganeti.workerpool_unittest.py @@ -132,6 +132,26 @@ class DeferringWorker(workerpool.BaseWorker): ctx.lock.release() +class PriorityContext: + def __init__(self): + self.lock = threading.Lock() + self.result = [] + + +class PriorityWorker(workerpool.BaseWorker): + def RunTask(self, ctx, data): + ctx.lock.acquire() + try: + ctx.result.append((self.GetCurrentPriority(), data)) + finally: + ctx.lock.release() + + +class NotImplementedWorker(workerpool.BaseWorker): + def RunTask(self): + raise NotImplementedError + + class TestWorkerpool(unittest.TestCase): """Workerpool tests""" @@ -274,6 +294,7 @@ class TestWorkerpool(unittest.TestCase): ["Hello world %s" % i for i in range(10)]) self.assertRaises(AssertionError, wp.AddManyTasks, [i for i in range(10)]) + self.assertRaises(AssertionError, wp.AddManyTasks, [], task_id=0) wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)]) wp.AddTask((ctx, "A separate hello")) @@ -479,6 +500,209 @@ class TestWorkerpool(unittest.TestCase): wp.TerminateWorkers() self._CheckWorkerCount(wp, 0) + def testChangeTaskPriority(self): + wp = workerpool.WorkerPool("Test", 1, PriorityWorker) + try: + self._CheckWorkerCount(wp, 1) + + ctx = PriorityContext() + + # Use static seed for this test + rnd = random.Random(4727) + + # Disable processing of tasks + wp.SetActive(False) + + # No task ID + self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority, + None, 0) + + # Pre-generate task IDs and priorities + count = 100 + task_ids = range(0, count) + priorities = range(200, 200 + count) * 2 + + rnd.shuffle(task_ids) + rnd.shuffle(priorities) + + # Make sure there are some duplicate priorities, but not all + priorities[count * 2 - 10:count * 2 - 1] = \ + priorities[count - 10: count - 1] + + assert len(priorities) == 2 * count + assert priorities[0:(count - 1)] != priorities[count:(2 * count - 1)] + + # Add some tasks; this loop consumes the first half of all previously + # generated priorities + for (idx, task_id) in enumerate(task_ids): + wp.AddTask((ctx, idx), + priority=priorities.pop(), + task_id=task_id) + + self.assertEqual(len(wp._tasks), len(task_ids)) + self.assertEqual(len(wp._taskdata), len(task_ids)) + + # Tasks have been added, so half of the priorities should have been + # consumed + assert len(priorities) == len(task_ids) + + # Change task priority + expected = [] + for ((idx, task_id), prio) in zip(enumerate(task_ids), priorities): + wp.ChangeTaskPriority(task_id, prio) + expected.append((prio, idx)) + + self.assertEqual(len(wp._taskdata), len(task_ids)) + + # Half the entries are now abandoned tasks + self.assertEqual(len(wp._tasks), len(task_ids) * 2) + + assert len(priorities) == count + assert len(task_ids) == count + + # Start processing + wp.SetActive(True) + + # Wait for tasks to finish + wp.Quiesce() + + self._CheckNoTasks(wp) + + for task_id in task_ids: + # All tasks are done + self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority, + task_id, 0) + + # Check result + ctx.lock.acquire() + try: + self.assertEqual(ctx.result, sorted(expected)) + finally: + ctx.lock.release() + + self._CheckWorkerCount(wp, 1) + finally: + wp.TerminateWorkers() + self._CheckWorkerCount(wp, 0) + + def testChangeTaskPriorityInteralStructures(self): + wp = workerpool.WorkerPool("Test", 1, NotImplementedWorker) + try: + self._CheckWorkerCount(wp, 1) + + # Use static seed for this test + rnd = random.Random(643) + + (num1, num2) = rnd.sample(range(1000), 2) + + # Disable processing of tasks + wp.SetActive(False) + + self.assertFalse(wp._tasks) + self.assertFalse(wp._taskdata) + + # No priority or task ID + wp.AddTask(()) + self.assertEqual(wp._tasks, [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + ]) + self.assertFalse(wp._taskdata) + + # No task ID + wp.AddTask((), priority=7413) + self.assertEqual(wp._tasks, [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + [7413, 1, None, ()], + ]) + self.assertFalse(wp._taskdata) + + # Start adding real tasks + wp.AddTask((), priority=10267659, task_id=num1) + self.assertEqual(wp._tasks, [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + [7413, 1, None, ()], + [10267659, 2, num1, ()], + ]) + self.assertEqual(wp._taskdata, { + num1: [10267659, 2, num1, ()], + }) + + wp.AddTask((), priority=123, task_id=num2) + self.assertEqual(sorted(wp._tasks), [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + [123, 3, num2, ()], + [7413, 1, None, ()], + [10267659, 2, num1, ()], + ]) + self.assertEqual(wp._taskdata, { + num1: [10267659, 2, num1, ()], + num2: [123, 3, num2, ()], + }) + + wp.ChangeTaskPriority(num1, 100) + self.assertEqual(sorted(wp._tasks), [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + [100, 2, num1, ()], + [123, 3, num2, ()], + [7413, 1, None, ()], + [10267659, 2, num1, None], + ]) + self.assertEqual(wp._taskdata, { + num1: [100, 2, num1, ()], + num2: [123, 3, num2, ()], + }) + + wp.ChangeTaskPriority(num2, 91337) + self.assertEqual(sorted(wp._tasks), [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + [100, 2, num1, ()], + [123, 3, num2, None], + [7413, 1, None, ()], + [91337, 3, num2, ()], + [10267659, 2, num1, None], + ]) + self.assertEqual(wp._taskdata, { + num1: [100, 2, num1, ()], + num2: [91337, 3, num2, ()], + }) + + wp.ChangeTaskPriority(num1, 10139) + self.assertEqual(sorted(wp._tasks), [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + [100, 2, num1, None], + [123, 3, num2, None], + [7413, 1, None, ()], + [10139, 2, num1, ()], + [91337, 3, num2, ()], + [10267659, 2, num1, None], + ]) + self.assertEqual(wp._taskdata, { + num1: [10139, 2, num1, ()], + num2: [91337, 3, num2, ()], + }) + + # Change to the same priority once again + wp.ChangeTaskPriority(num1, 10139) + self.assertEqual(sorted(wp._tasks), [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + [100, 2, num1, None], + [123, 3, num2, None], + [7413, 1, None, ()], + [10139, 2, num1, None], + [10139, 2, num1, ()], + [91337, 3, num2, ()], + [10267659, 2, num1, None], + ]) + self.assertEqual(wp._taskdata, { + num1: [10139, 2, num1, ()], + num2: [91337, 3, num2, ()], + }) + + self._CheckWorkerCount(wp, 1) + finally: + wp.TerminateWorkers() + self._CheckWorkerCount(wp, 0) + if __name__ == "__main__": testutils.GanetiTestProgram() -- 1.7.10.4