self.priority = priority
+class NoSuchTask(Exception):
+ """Exception raised when a task can't be found.
+
+ """
+
+
class BaseWorker(threading.Thread, object):
"""Base worker class for worker pools.
"""
return (self._current_task is not None)
+ def _GetCurrentOrderAndTaskId(self):
+ """Returns the order and task ID of the current task.
+
+ Should only be called from within L{RunTask}.
+
+ """
+ self.pool._lock.acquire()
+ try:
+ assert self._HasRunningTaskUnlocked()
+
+ (_, order_id, task_id, _) = self._current_task
+
+ return (order_id, task_id)
+ finally:
+ self.pool._lock.release()
+
def run(self):
"""Main thread function.
if defer:
assert self._current_task
# Schedule again for later run
- (_, _, _, args) = self._current_task
- pool._AddTaskUnlocked(args, defer.priority, None)
+ (_, _, task_id, args) = self._current_task
+ pool._AddTaskUnlocked(args, defer.priority, task_id)
if self._current_task:
self._current_task = None
"""
assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
assert isinstance(priority, (int, long)), "Priority must be numeric"
+ assert task_id is None or isinstance(task_id, (int, long)), \
+ "Task ID must be numeric or None"
task = [priority, self._counter.next(), task_id, args]
finally:
self._lock.release()
+ def ChangeTaskPriority(self, task_id, priority):
+ """Changes a task's priority.
+
+ @param task_id: Task ID
+ @type priority: number
+ @param priority: New task priority
+ @raise NoSuchTask: When the task referred by C{task_id} can not be found
+ (it may never have existed, may have already been processed, or is
+ currently running)
+
+ """
+ assert isinstance(priority, (int, long)), "Priority must be numeric"
+
+ self._lock.acquire()
+ try:
+ logging.debug("About to change priority of task %s to %s",
+ task_id, priority)
+
+ # Find old task
+ oldtask = self._taskdata.get(task_id, None)
+ if oldtask is None:
+ msg = "Task '%s' was not found" % task_id
+ logging.debug(msg)
+ raise NoSuchTask(msg)
+
+ # Prepare new task
+ newtask = [priority] + oldtask[1:]
+
+ # Mark old entry as abandoned (this doesn't change the sort order and
+ # therefore doesn't invalidate the heap property of L{self._tasks}).
+ # See also <http://docs.python.org/library/heapq.html#priority-queue-
+ # implementation-notes>.
+ oldtask[-1] = None
+
+ # Change reference to new task entry and forget the old one
+ assert task_id is not None
+ self._taskdata[task_id] = newtask
+
+ # Add a new task with the old number and arguments
+ heapq.heappush(self._tasks, newtask)
+
+ # Notify a waiting worker
+ self._pool_to_worker.notify()
+ finally:
+ self._lock.release()
+
def SetActive(self, active):
"""Enable/disable processing of tasks.
finally:
self._worker_to_pool.notifyAll()
+ (_, _, task_id, args) = task
+
+ # If the priority was changed, "args" is None
+ if args is None:
+ # Try again
+ logging.debug("Found abandoned task (%r)", task)
+ continue
+
# Delete reference
- (_, _, task_id, _) = task
if task_id is not None:
del self._taskdata[task_id]