Merge remote-tracking branch 'origin/stable-2.8'
[ganeti-local] / lib / workerpool.py
index d77825f..6b558ce 100644 (file)
@@ -133,6 +133,22 @@ class BaseWorker(threading.Thread, object):
     """
     return (self._current_task is not None)
 
+  def _GetCurrentOrderAndTaskId(self):
+    """Returns the order and task ID of the current task.
+
+    Should only be called from within L{RunTask}.
+
+    """
+    self.pool._lock.acquire()
+    try:
+      assert self._HasRunningTaskUnlocked()
+
+      (_, order_id, task_id, _) = self._current_task
+
+      return (order_id, task_id)
+    finally:
+      self.pool._lock.release()
+
   def run(self):
     """Main thread function.
 
@@ -202,8 +218,8 @@ class BaseWorker(threading.Thread, object):
           if defer:
             assert self._current_task
             # Schedule again for later run
-            (_, _, _, args) = self._current_task
-            pool._AddTaskUnlocked(args, defer.priority, None)
+            (_, _, task_id, args) = self._current_task
+            pool._AddTaskUnlocked(args, defer.priority, task_id)
 
           if self._current_task:
             self._current_task = None
@@ -299,6 +315,8 @@ class WorkerPool(object):
     """
     assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
     assert isinstance(priority, (int, long)), "Priority must be numeric"
+    assert task_id is None or isinstance(task_id, (int, long)), \
+      "Task ID must be numeric or None"
 
     task = [priority, self._counter.next(), task_id, args]