workerpool: Add method to change task's priority
authorMichael Hanselmann <hansmi@google.com>
Tue, 30 Oct 2012 15:44:02 +0000 (16:44 +0100)
committerMichael Hanselmann <hansmi@google.com>
Tue, 13 Nov 2012 19:20:15 +0000 (20:20 +0100)
Using the task ID a pending task's priority can be changed. This will be
used to change the priority of jobs in the workerpool.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Bernardo Dal Seno <bdalseno@google.com>

lib/workerpool.py
test/ganeti.workerpool_unittest.py

index 58cce36..d77825f 100644 (file)
@@ -54,6 +54,12 @@ class DeferTask(Exception):
     self.priority = priority
 
 
+class NoSuchTask(Exception):
+  """Exception raised when a task can't be found.
+
+  """
+
+
 class BaseWorker(threading.Thread, object):
   """Base worker class for worker pools.
 
@@ -377,6 +383,52 @@ class WorkerPool(object):
     finally:
       self._lock.release()
 
+  def ChangeTaskPriority(self, task_id, priority):
+    """Changes a task's priority.
+
+    @param task_id: Task ID
+    @type priority: number
+    @param priority: New task priority
+    @raise NoSuchTask: When the task referred by C{task_id} can not be found
+      (it may never have existed, may have already been processed, or is
+      currently running)
+
+    """
+    assert isinstance(priority, (int, long)), "Priority must be numeric"
+
+    self._lock.acquire()
+    try:
+      logging.debug("About to change priority of task %s to %s",
+                    task_id, priority)
+
+      # Find old task
+      oldtask = self._taskdata.get(task_id, None)
+      if oldtask is None:
+        msg = "Task '%s' was not found" % task_id
+        logging.debug(msg)
+        raise NoSuchTask(msg)
+
+      # Prepare new task
+      newtask = [priority] + oldtask[1:]
+
+      # Mark old entry as abandoned (this doesn't change the sort order and
+      # therefore doesn't invalidate the heap property of L{self._tasks}).
+      # See also <http://docs.python.org/library/heapq.html#priority-queue-
+      # implementation-notes>.
+      oldtask[-1] = None
+
+      # Change reference to new task entry and forget the old one
+      assert task_id is not None
+      self._taskdata[task_id] = newtask
+
+      # Add a new task with the old number and arguments
+      heapq.heappush(self._tasks, newtask)
+
+      # Notify a waiting worker
+      self._pool_to_worker.notify()
+    finally:
+      self._lock.release()
+
   def SetActive(self, active):
     """Enable/disable processing of tasks.
 
@@ -418,8 +470,15 @@ class WorkerPool(object):
         finally:
           self._worker_to_pool.notifyAll()
 
+        (_, _, task_id, args) = task
+
+        # If the priority was changed, "args" is None
+        if args is None:
+          # Try again
+          logging.debug("Found abandoned task (%r)", task)
+          continue
+
         # Delete reference
-        (_, _, task_id, _) = task
         if task_id is not None:
           del self._taskdata[task_id]
 
index 2449cef..f890db0 100755 (executable)
@@ -132,6 +132,26 @@ class DeferringWorker(workerpool.BaseWorker):
       ctx.lock.release()
 
 
+class PriorityContext:
+  def __init__(self):
+    self.lock = threading.Lock()
+    self.result = []
+
+
+class PriorityWorker(workerpool.BaseWorker):
+  def RunTask(self, ctx, data):
+    ctx.lock.acquire()
+    try:
+      ctx.result.append((self.GetCurrentPriority(), data))
+    finally:
+      ctx.lock.release()
+
+
+class NotImplementedWorker(workerpool.BaseWorker):
+  def RunTask(self):
+    raise NotImplementedError
+
+
 class TestWorkerpool(unittest.TestCase):
   """Workerpool tests"""
 
@@ -274,6 +294,7 @@ class TestWorkerpool(unittest.TestCase):
                         ["Hello world %s" % i for i in range(10)])
       self.assertRaises(AssertionError, wp.AddManyTasks,
                         [i for i in range(10)])
+      self.assertRaises(AssertionError, wp.AddManyTasks, [], task_id=0)
 
       wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
       wp.AddTask((ctx, "A separate hello"))
@@ -479,6 +500,209 @@ class TestWorkerpool(unittest.TestCase):
       wp.TerminateWorkers()
       self._CheckWorkerCount(wp, 0)
 
+  def testChangeTaskPriority(self):
+    wp = workerpool.WorkerPool("Test", 1, PriorityWorker)
+    try:
+      self._CheckWorkerCount(wp, 1)
+
+      ctx = PriorityContext()
+
+      # Use static seed for this test
+      rnd = random.Random(4727)
+
+      # Disable processing of tasks
+      wp.SetActive(False)
+
+      # No task ID
+      self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority,
+                        None, 0)
+
+      # Pre-generate task IDs and priorities
+      count = 100
+      task_ids = range(0, count)
+      priorities = range(200, 200 + count) * 2
+
+      rnd.shuffle(task_ids)
+      rnd.shuffle(priorities)
+
+      # Make sure there are some duplicate priorities, but not all
+      priorities[count * 2 - 10:count * 2 - 1] = \
+        priorities[count - 10: count - 1]
+
+      assert len(priorities) == 2 * count
+      assert priorities[0:(count - 1)] != priorities[count:(2 * count - 1)]
+
+      # Add some tasks; this loop consumes the first half of all previously
+      # generated priorities
+      for (idx, task_id) in enumerate(task_ids):
+        wp.AddTask((ctx, idx),
+                   priority=priorities.pop(),
+                   task_id=task_id)
+
+      self.assertEqual(len(wp._tasks), len(task_ids))
+      self.assertEqual(len(wp._taskdata), len(task_ids))
+
+      # Tasks have been added, so half of the priorities should have been
+      # consumed
+      assert len(priorities) == len(task_ids)
+
+      # Change task priority
+      expected = []
+      for ((idx, task_id), prio) in zip(enumerate(task_ids), priorities):
+        wp.ChangeTaskPriority(task_id, prio)
+        expected.append((prio, idx))
+
+      self.assertEqual(len(wp._taskdata), len(task_ids))
+
+      # Half the entries are now abandoned tasks
+      self.assertEqual(len(wp._tasks), len(task_ids) * 2)
+
+      assert len(priorities) == count
+      assert len(task_ids) == count
+
+      # Start processing
+      wp.SetActive(True)
+
+      # Wait for tasks to finish
+      wp.Quiesce()
+
+      self._CheckNoTasks(wp)
+
+      for task_id in task_ids:
+        # All tasks are done
+        self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority,
+                          task_id, 0)
+
+      # Check result
+      ctx.lock.acquire()
+      try:
+        self.assertEqual(ctx.result, sorted(expected))
+      finally:
+        ctx.lock.release()
+
+      self._CheckWorkerCount(wp, 1)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
+  def testChangeTaskPriorityInteralStructures(self):
+    wp = workerpool.WorkerPool("Test", 1, NotImplementedWorker)
+    try:
+      self._CheckWorkerCount(wp, 1)
+
+      # Use static seed for this test
+      rnd = random.Random(643)
+
+      (num1, num2) = rnd.sample(range(1000), 2)
+
+      # Disable processing of tasks
+      wp.SetActive(False)
+
+      self.assertFalse(wp._tasks)
+      self.assertFalse(wp._taskdata)
+
+      # No priority or task ID
+      wp.AddTask(())
+      self.assertEqual(wp._tasks, [
+        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+        ])
+      self.assertFalse(wp._taskdata)
+
+      # No task ID
+      wp.AddTask((), priority=7413)
+      self.assertEqual(wp._tasks, [
+        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+        [7413, 1, None, ()],
+        ])
+      self.assertFalse(wp._taskdata)
+
+      # Start adding real tasks
+      wp.AddTask((), priority=10267659, task_id=num1)
+      self.assertEqual(wp._tasks, [
+        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+        [7413, 1, None, ()],
+        [10267659, 2, num1, ()],
+        ])
+      self.assertEqual(wp._taskdata, {
+        num1: [10267659, 2, num1, ()],
+        })
+
+      wp.AddTask((), priority=123, task_id=num2)
+      self.assertEqual(sorted(wp._tasks), [
+        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+        [123, 3, num2, ()],
+        [7413, 1, None, ()],
+        [10267659, 2, num1, ()],
+        ])
+      self.assertEqual(wp._taskdata, {
+        num1: [10267659, 2, num1, ()],
+        num2: [123, 3, num2, ()],
+        })
+
+      wp.ChangeTaskPriority(num1, 100)
+      self.assertEqual(sorted(wp._tasks), [
+        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+        [100, 2, num1, ()],
+        [123, 3, num2, ()],
+        [7413, 1, None, ()],
+        [10267659, 2, num1, None],
+        ])
+      self.assertEqual(wp._taskdata, {
+        num1: [100, 2, num1, ()],
+        num2: [123, 3, num2, ()],
+        })
+
+      wp.ChangeTaskPriority(num2, 91337)
+      self.assertEqual(sorted(wp._tasks), [
+        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+        [100, 2, num1, ()],
+        [123, 3, num2, None],
+        [7413, 1, None, ()],
+        [91337, 3, num2, ()],
+        [10267659, 2, num1, None],
+        ])
+      self.assertEqual(wp._taskdata, {
+        num1: [100, 2, num1, ()],
+        num2: [91337, 3, num2, ()],
+        })
+
+      wp.ChangeTaskPriority(num1, 10139)
+      self.assertEqual(sorted(wp._tasks), [
+        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+        [100, 2, num1, None],
+        [123, 3, num2, None],
+        [7413, 1, None, ()],
+        [10139, 2, num1, ()],
+        [91337, 3, num2, ()],
+        [10267659, 2, num1, None],
+        ])
+      self.assertEqual(wp._taskdata, {
+        num1: [10139, 2, num1, ()],
+        num2: [91337, 3, num2, ()],
+        })
+
+      # Change to the same priority once again
+      wp.ChangeTaskPriority(num1, 10139)
+      self.assertEqual(sorted(wp._tasks), [
+        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+        [100, 2, num1, None],
+        [123, 3, num2, None],
+        [7413, 1, None, ()],
+        [10139, 2, num1, None],
+        [10139, 2, num1, ()],
+        [91337, 3, num2, ()],
+        [10267659, 2, num1, None],
+        ])
+      self.assertEqual(wp._taskdata, {
+        num1: [10139, 2, num1, ()],
+        num2: [91337, 3, num2, ()],
+        })
+
+      self._CheckWorkerCount(wp, 1)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
 
 if __name__ == "__main__":
   testutils.GanetiTestProgram()