workerpool: Preserve task number when deferring
authorMichael Hanselmann <hansmi@google.com>
Thu, 1 Nov 2012 18:06:02 +0000 (19:06 +0100)
committerMichael Hanselmann <hansmi@google.com>
Tue, 13 Nov 2012 19:20:15 +0000 (20:20 +0100)
When a task is deferred it should receive the same task ID upon being
returned to the pool.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Bernardo Dal Seno <bdalseno@google.com>

lib/workerpool.py
test/ganeti.workerpool_unittest.py

index d77825f..6b558ce 100644 (file)
@@ -133,6 +133,22 @@ class BaseWorker(threading.Thread, object):
     """
     return (self._current_task is not None)
 
+  def _GetCurrentOrderAndTaskId(self):
+    """Returns the order and task ID of the current task.
+
+    Should only be called from within L{RunTask}.
+
+    """
+    self.pool._lock.acquire()
+    try:
+      assert self._HasRunningTaskUnlocked()
+
+      (_, order_id, task_id, _) = self._current_task
+
+      return (order_id, task_id)
+    finally:
+      self.pool._lock.release()
+
   def run(self):
     """Main thread function.
 
@@ -202,8 +218,8 @@ class BaseWorker(threading.Thread, object):
           if defer:
             assert self._current_task
             # Schedule again for later run
-            (_, _, _, args) = self._current_task
-            pool._AddTaskUnlocked(args, defer.priority, None)
+            (_, _, task_id, args) = self._current_task
+            pool._AddTaskUnlocked(args, defer.priority, task_id)
 
           if self._current_task:
             self._current_task = None
@@ -299,6 +315,8 @@ class WorkerPool(object):
     """
     assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
     assert isinstance(priority, (int, long)), "Priority must be numeric"
+    assert task_id is None or isinstance(task_id, (int, long)), \
+      "Task ID must be numeric or None"
 
     task = [priority, self._counter.next(), task_id, args]
 
index f890db0..8f35a69 100755 (executable)
@@ -31,6 +31,7 @@ import random
 from ganeti import workerpool
 from ganeti import errors
 from ganeti import utils
+from ganeti import compat
 
 import testutils
 
@@ -114,12 +115,16 @@ class DeferringTaskContext:
     self.lock = threading.Lock()
     self.prioresult = {}
     self.samepriodefer = {}
+    self.num2ordertaskid = {}
 
 
 class DeferringWorker(workerpool.BaseWorker):
   def RunTask(self, ctx, num, targetprio):
     ctx.lock.acquire()
     try:
+      otilst = ctx.num2ordertaskid.setdefault(num, [])
+      otilst.append(self._GetCurrentOrderAndTaskId())
+
       if num in ctx.samepriodefer:
         del ctx.samepriodefer[num]
         raise workerpool.DeferTask()
@@ -466,6 +471,7 @@ class TestWorkerpool(unittest.TestCase):
       rnd = random.Random(14921)
 
       data = {}
+      num2taskid = {}
       for i in range(1, 333):
         ctx.lock.acquire()
         try:
@@ -475,7 +481,9 @@ class TestWorkerpool(unittest.TestCase):
           ctx.lock.release()
 
         prio = int(rnd.random() * 30)
-        wp.AddTask((ctx, i, prio), priority=50)
+        num2taskid[i] = 1000 * i
+        wp.AddTask((ctx, i, prio), priority=50,
+                   task_id=num2taskid[i])
         data.setdefault(prio, set()).add(i)
 
         # Cause some distortion
@@ -492,6 +500,21 @@ class TestWorkerpool(unittest.TestCase):
       ctx.lock.acquire()
       try:
         self.assertEqual(data, ctx.prioresult)
+
+        all_order_ids = []
+
+        for (num, numordertaskid) in ctx.num2ordertaskid.items():
+          order_ids = map(compat.fst, numordertaskid)
+          self.assertFalse(utils.FindDuplicates(order_ids),
+                           msg="Order ID has been reused")
+          all_order_ids.extend(order_ids)
+
+          for task_id in map(compat.snd, numordertaskid):
+            self.assertEqual(task_id, num2taskid[num],
+                             msg=("Task %s used different task IDs" % num))
+
+        self.assertFalse(utils.FindDuplicates(all_order_ids),
+                         msg="Order ID has been reused")
       finally:
         ctx.lock.release()