X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/9a2564e72958362b364f5129f470757946f0a6b6..f2d87a5e5:/lib/workerpool.py?ds=inline diff --git a/lib/workerpool.py b/lib/workerpool.py index d77825f..6b558ce 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -133,6 +133,22 @@ class BaseWorker(threading.Thread, object): """ return (self._current_task is not None) + def _GetCurrentOrderAndTaskId(self): + """Returns the order and task ID of the current task. + + Should only be called from within L{RunTask}. + + """ + self.pool._lock.acquire() + try: + assert self._HasRunningTaskUnlocked() + + (_, order_id, task_id, _) = self._current_task + + return (order_id, task_id) + finally: + self.pool._lock.release() + def run(self): """Main thread function. @@ -202,8 +218,8 @@ class BaseWorker(threading.Thread, object): if defer: assert self._current_task # Schedule again for later run - (_, _, _, args) = self._current_task - pool._AddTaskUnlocked(args, defer.priority, None) + (_, _, task_id, args) = self._current_task + pool._AddTaskUnlocked(args, defer.priority, task_id) if self._current_task: self._current_task = None @@ -299,6 +315,8 @@ class WorkerPool(object): """ assert isinstance(args, (tuple, list)), "Arguments must be a sequence" assert isinstance(priority, (int, long)), "Priority must be numeric" + assert task_id is None or isinstance(task_id, (int, long)), \ + "Task ID must be numeric or None" task = [priority, self._counter.next(), task_id, args]