workerpool: Add support for task priority
authorMichael Hanselmann <hansmi@google.com>
Sun, 18 Jul 2010 20:01:01 +0000 (22:01 +0200)
committerMichael Hanselmann <hansmi@google.com>
Tue, 24 Aug 2010 15:32:03 +0000 (17:32 +0200)
To add job priorities, the worker pool underlying the job queue must
support priorities per task. This patch adds them to the worker pool.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Iustin Pop <iustin@google.com>

doc/design-2.3.rst
lib/workerpool.py
test/ganeti.workerpool_unittest.py

index efd8bf2..71a9d4d 100644 (file)
@@ -20,6 +20,54 @@ As for 2.1 and 2.2 we divide the 2.3 design into three areas:
 Core changes
 ------------
 
+Job priorities
+~~~~~~~~~~~~~~
+
+Current state and shortcomings
+++++++++++++++++++++++++++++++
+
+.. TODO: Describe current situation
+
+Proposed changes
+++++++++++++++++
+
+.. TODO: Describe changes to job queue and potentially client programs
+
+Worker pool
+^^^^^^^^^^^
+
+To support job priorities in the job queue, the worker pool underlying
+the job queue must be enhanced to support task priorities. Currently
+tasks are processed in the order they are added to the queue (but, due
+to their nature, they don't necessarily finish in that order). All tasks
+are equal. To support tasks with higher or lower priority, a few changes
+have to be made to the queue inside a worker pool.
+
+Each task is assigned a priority when added to the queue. This priority
+can not be changed until the task is executed (this is fine as in all
+current use-cases, tasks are added to a pool and then forgotten about
+until they're done).
+
+A task's priority can be compared to Unix' process priorities. The lower
+the priority number, the closer to the queue's front it is. A task with
+priority 0 is going to be run before one with priority 10. Tasks with
+the same priority are executed in the order in which they were added.
+
+While a task is running it can query its own priority. If it's not ready
+yet for finishing, it can raise an exception to defer itself, optionally
+changing its own priority. This is useful for the following cases:
+
+- A task is trying to acquire locks, but those locks are still held by
+  other tasks. By deferring itself, the task gives others a chance to
+  run. This is especially useful when all workers are busy.
+- If a task decides it hasn't gotten its locks in a long time, it can
+  start to increase its own priority.
+- Tasks waiting for long-running operations running asynchronously could
+  defer themselves while waiting for a long-running operation.
+
+With these changes, the job queue will be able to implement per-job
+priorities.
+
 
 Feature changes
 ---------------
index 8127329..1838d97 100644 (file)
 
 """
 
-import collections
 import logging
 import threading
+import heapq
 
 from ganeti import compat
+from ganeti import errors
 
 
 _TERMINATE = object()
+_DEFAULT_PRIORITY = 0
+
+
+class DeferTask(Exception):
+  """Special exception class to defer a task.
+
+  This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
+  task. Optionally, the priority of the task can be changed.
+
+  """
+  def __init__(self, priority=None):
+    """Initializes this class.
+
+    @type priority: number
+    @param priority: New task priority (None means no change)
+
+    """
+    Exception.__init__(self)
+    self.priority = priority
 
 
 class BaseWorker(threading.Thread, object):
@@ -64,6 +84,22 @@ class BaseWorker(threading.Thread, object):
     finally:
       self.pool._lock.release()
 
+  def GetCurrentPriority(self):
+    """Returns the priority of the current task.
+
+    Should only be called from within L{RunTask}.
+
+    """
+    self.pool._lock.acquire()
+    try:
+      assert self._HasRunningTaskUnlocked()
+
+      (priority, _, _) = self._current_task
+
+      return priority
+    finally:
+      self.pool._lock.release()
+
   def _HasRunningTaskUnlocked(self):
     """Returns whether this worker is currently running a task.
 
@@ -80,6 +116,8 @@ class BaseWorker(threading.Thread, object):
 
     while True:
       assert self._current_task is None
+
+      defer = None
       try:
         # Wait on lock to be told either to terminate or to do a task
         pool._lock.acquire()
@@ -104,11 +142,24 @@ class BaseWorker(threading.Thread, object):
         finally:
           pool._lock.release()
 
-        # Run the actual task
+        (priority, _, args) = self._current_task
         try:
-          logging.debug("Starting task %r", self._current_task)
-          self.RunTask(*self._current_task)
-          logging.debug("Done with task %r", self._current_task)
+          # Run the actual task
+          assert defer is None
+          logging.debug("Starting task %r, priority %s", args, priority)
+          self.RunTask(*args) # pylint: disable-msg=W0142
+          logging.debug("Done with task %r, priority %s", args, priority)
+        except DeferTask, err:
+          defer = err
+
+          if defer.priority is None:
+            # Use same priority
+            defer.priority = priority
+
+          logging.debug("Deferring task %r, new priority %s", defer.priority)
+
+          assert self._HasRunningTaskUnlocked()
+
         except: # pylint: disable-msg=W0702
           logging.exception("Caught unhandled exception")
 
@@ -117,6 +168,12 @@ class BaseWorker(threading.Thread, object):
         # Notify pool
         pool._lock.acquire()
         try:
+          if defer:
+            assert self._current_task
+            # Schedule again for later run
+            (_, _, args) = self._current_task
+            pool._AddTaskUnlocked(args, defer.priority)
+
           if self._current_task:
             self._current_task = None
             pool._worker_to_pool.notifyAll()
@@ -170,7 +227,8 @@ class WorkerPool(object):
     self._termworkers = []
 
     # Queued tasks
-    self._tasks = collections.deque()
+    self._counter = 0
+    self._tasks = []
 
     # Start workers
     self.Resize(num_workers)
@@ -184,44 +242,77 @@ class WorkerPool(object):
     while self._quiescing:
       self._pool_to_pool.wait()
 
-  def _AddTaskUnlocked(self, args):
+  def _AddTaskUnlocked(self, args, priority):
+    """Adds a task to the internal queue.
+
+    @type args: sequence
+    @param args: Arguments passed to L{BaseWorker.RunTask}
+    @type priority: number
+    @param priority: Task priority
+
+    """
     assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
+    assert isinstance(priority, (int, long)), "Priority must be numeric"
 
-    self._tasks.append(args)
+    # This counter is used to ensure elements are processed in their
+    # incoming order. For processing they're sorted by priority and then
+    # counter.
+    self._counter += 1
+
+    heapq.heappush(self._tasks, (priority, self._counter, args))
 
     # Notify a waiting worker
     self._pool_to_worker.notify()
 
-  def AddTask(self, args):
+  def AddTask(self, args, priority=_DEFAULT_PRIORITY):
     """Adds a task to the queue.
 
     @type args: sequence
     @param args: arguments passed to L{BaseWorker.RunTask}
+    @type priority: number
+    @param priority: Task priority
 
     """
     self._lock.acquire()
     try:
       self._WaitWhileQuiescingUnlocked()
-      self._AddTaskUnlocked(args)
+      self._AddTaskUnlocked(args, priority)
     finally:
       self._lock.release()
 
-  def AddManyTasks(self, tasks):
+  def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY):
     """Add a list of tasks to the queue.
 
     @type tasks: list of tuples
     @param tasks: list of args passed to L{BaseWorker.RunTask}
+    @type priority: number or list of numbers
+    @param priority: Priority for all added tasks or a list with the priority
+                     for each task
 
     """
     assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
       "Each task must be a sequence"
 
+    assert (isinstance(priority, (int, long)) or
+            compat.all(isinstance(prio, (int, long)) for prio in priority)), \
+           "Priority must be numeric or be a list of numeric values"
+
+    if isinstance(priority, (int, long)):
+      priority = [priority] * len(tasks)
+    elif len(priority) != len(tasks):
+      raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
+                                   " number of tasks (%s)" %
+                                   (len(priority), len(tasks)))
+
     self._lock.acquire()
     try:
       self._WaitWhileQuiescingUnlocked()
 
-      for args in tasks:
-        self._AddTaskUnlocked(args)
+      assert compat.all(isinstance(prio, (int, long)) for prio in priority)
+      assert len(tasks) == len(priority)
+
+      for args, priority in zip(tasks, priority):
+        self._AddTaskUnlocked(args, priority)
     finally:
       self._lock.release()
 
@@ -254,7 +345,7 @@ class WorkerPool(object):
 
     # Get task from queue and tell pool about it
     try:
-      return self._tasks.popleft()
+      return heapq.heappop(self._tasks)
     finally:
       self._worker_to_pool.notifyAll()
 
index 586cc5e..6386862 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/python
 #
 
-# Copyright (C) 2008 Google Inc.
+# Copyright (C) 2008, 2009, 2010 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -26,13 +26,15 @@ import threading
 import time
 import sys
 import zlib
+import random
 
 from ganeti import workerpool
+from ganeti import errors
 
 import testutils
 
-class CountingContext(object):
 
+class CountingContext(object):
   def __init__(self):
     self._lock = threading.Condition(threading.Lock())
     self.done = 0
@@ -57,7 +59,6 @@ class CountingContext(object):
 
 
 class CountingBaseWorker(workerpool.BaseWorker):
-
   def RunTask(self, ctx, text):
     ctx.DoneTask()
 
@@ -83,6 +84,46 @@ class ChecksumBaseWorker(workerpool.BaseWorker):
       ctx.lock.release()
 
 
+class ListBuilderContext:
+  def __init__(self):
+    self.lock = threading.Lock()
+    self.result = []
+    self.prioresult = {}
+
+
+class ListBuilderWorker(workerpool.BaseWorker):
+  def RunTask(self, ctx, data):
+    ctx.lock.acquire()
+    try:
+      ctx.result.append((self.GetCurrentPriority(), data))
+      ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
+    finally:
+      ctx.lock.release()
+
+
+class DeferringTaskContext:
+  def __init__(self):
+    self.lock = threading.Lock()
+    self.prioresult = {}
+    self.samepriodefer = {}
+
+
+class DeferringWorker(workerpool.BaseWorker):
+  def RunTask(self, ctx, num, targetprio):
+    ctx.lock.acquire()
+    try:
+      if num in ctx.samepriodefer:
+        del ctx.samepriodefer[num]
+        raise workerpool.DeferTask()
+
+      if self.GetCurrentPriority() > targetprio:
+        raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)
+
+      ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
+    finally:
+      ctx.lock.release()
+
+
 class TestWorkerpool(unittest.TestCase):
   """Workerpool tests"""
 
@@ -206,6 +247,220 @@ class TestWorkerpool(unittest.TestCase):
     finally:
       wp._lock.release()
 
+  def testPriorityChecksum(self):
+    # Tests whether all tasks are run and, since we're only using a single
+    # thread, whether everything is started in order and respects the priority
+    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
+    try:
+      self._CheckWorkerCount(wp, 1)
+
+      ctx = ChecksumContext()
+
+      data = {}
+      tasks = []
+      priorities = []
+      for i in range(1, 333):
+        prio = i % 7
+        tasks.append((ctx, i))
+        priorities.append(prio)
+        data.setdefault(prio, []).append(i)
+
+      wp.AddManyTasks(tasks, priority=priorities)
+
+      wp.Quiesce()
+
+      self._CheckNoTasks(wp)
+
+      # Check sum
+      ctx.lock.acquire()
+      try:
+        checksum = ChecksumContext.CHECKSUM_START
+        for priority in sorted(data.keys()):
+          for i in data[priority]:
+            checksum = ChecksumContext.UpdateChecksum(checksum, i)
+
+        self.assertEqual(checksum, ctx.checksum)
+      finally:
+        ctx.lock.release()
+
+      self._CheckWorkerCount(wp, 1)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
+  def testPriorityListManyTasks(self):
+    # Tests whether all tasks are run and, since we're only using a single
+    # thread, whether everything is started in order and respects the priority
+    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
+    try:
+      self._CheckWorkerCount(wp, 1)
+
+      ctx = ListBuilderContext()
+
+      # Use static seed for this test
+      rnd = random.Random(0)
+
+      data = {}
+      tasks = []
+      priorities = []
+      for i in range(1, 333):
+        prio = int(rnd.random() * 10)
+        tasks.append((ctx, i))
+        priorities.append(prio)
+        data.setdefault(prio, []).append((prio, i))
+
+      wp.AddManyTasks(tasks, priority=priorities)
+
+      self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
+                        [("x", ), ("y", )], priority=[1] * 5)
+
+      wp.Quiesce()
+
+      self._CheckNoTasks(wp)
+
+      # Check result
+      ctx.lock.acquire()
+      try:
+        expresult = []
+        for priority in sorted(data.keys()):
+          expresult.extend(data[priority])
+
+        self.assertEqual(expresult, ctx.result)
+      finally:
+        ctx.lock.release()
+
+      self._CheckWorkerCount(wp, 1)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
+  def testPriorityListSingleTasks(self):
+    # Tests whether all tasks are run and, since we're only using a single
+    # thread, whether everything is started in order and respects the priority
+    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
+    try:
+      self._CheckWorkerCount(wp, 1)
+
+      ctx = ListBuilderContext()
+
+      # Use static seed for this test
+      rnd = random.Random(26279)
+
+      data = {}
+      for i in range(1, 333):
+        prio = int(rnd.random() * 30)
+        wp.AddTask((ctx, i), priority=prio)
+        data.setdefault(prio, []).append(i)
+
+        # Cause some distortion
+        if i % 11 == 0:
+          time.sleep(.001)
+        if i % 41 == 0:
+          wp.Quiesce()
+
+      wp.Quiesce()
+
+      self._CheckNoTasks(wp)
+
+      # Check result
+      ctx.lock.acquire()
+      try:
+        self.assertEqual(data, ctx.prioresult)
+      finally:
+        ctx.lock.release()
+
+      self._CheckWorkerCount(wp, 1)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
+  def testPriorityListSingleTasks(self):
+    # Tests whether all tasks are run and, since we're only using a single
+    # thread, whether everything is started in order and respects the priority
+    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
+    try:
+      self._CheckWorkerCount(wp, 1)
+
+      ctx = ListBuilderContext()
+
+      # Use static seed for this test
+      rnd = random.Random(26279)
+
+      data = {}
+      for i in range(1, 333):
+        prio = int(rnd.random() * 30)
+        wp.AddTask((ctx, i), priority=prio)
+        data.setdefault(prio, []).append(i)
+
+        # Cause some distortion
+        if i % 11 == 0:
+          time.sleep(.001)
+        if i % 41 == 0:
+          wp.Quiesce()
+
+      wp.Quiesce()
+
+      self._CheckNoTasks(wp)
+
+      # Check result
+      ctx.lock.acquire()
+      try:
+        self.assertEqual(data, ctx.prioresult)
+      finally:
+        ctx.lock.release()
+
+      self._CheckWorkerCount(wp, 1)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
+  def testDeferTask(self):
+    # Tests whether all tasks are run and, since we're only using a single
+    # thread, whether everything is started in order and respects the priority
+    wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
+    try:
+      self._CheckWorkerCount(wp, 1)
+
+      ctx = DeferringTaskContext()
+
+      # Use static seed for this test
+      rnd = random.Random(14921)
+
+      data = {}
+      for i in range(1, 333):
+        ctx.lock.acquire()
+        try:
+          if i % 5 == 0:
+            ctx.samepriodefer[i] = True
+        finally:
+          ctx.lock.release()
+
+        prio = int(rnd.random() * 30)
+        wp.AddTask((ctx, i, prio), priority=50)
+        data.setdefault(prio, set()).add(i)
+
+        # Cause some distortion
+        if i % 24 == 0:
+          time.sleep(.001)
+        if i % 31 == 0:
+          wp.Quiesce()
+
+      wp.Quiesce()
+
+      self._CheckNoTasks(wp)
+
+      # Check result
+      ctx.lock.acquire()
+      try:
+        self.assertEqual(data, ctx.prioresult)
+      finally:
+        ctx.lock.release()
+
+      self._CheckWorkerCount(wp, 1)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
 
 if __name__ == '__main__':
   testutils.GanetiTestProgram()