+ for (args, prio, tid) in zip(tasks, priority, task_id):
+ self._AddTaskUnlocked(args, prio, tid)
+ finally:
+ self._lock.release()
+
+ def ChangeTaskPriority(self, task_id, priority):
+ """Changes a task's priority.
+
+ @param task_id: Task ID
+ @type priority: number
+ @param priority: New task priority
+ @raise NoSuchTask: When the task referred by C{task_id} can not be found
+ (it may never have existed, may have already been processed, or is
+ currently running)
+
+ """
+ assert isinstance(priority, (int, long)), "Priority must be numeric"
+
+ self._lock.acquire()
+ try:
+ logging.debug("About to change priority of task %s to %s",
+ task_id, priority)
+
+ # Find old task
+ oldtask = self._taskdata.get(task_id, None)
+ if oldtask is None:
+ msg = "Task '%s' was not found" % task_id
+ logging.debug(msg)
+ raise NoSuchTask(msg)
+
+ # Prepare new task
+ newtask = [priority] + oldtask[1:]
+
+ # Mark old entry as abandoned (this doesn't change the sort order and
+ # therefore doesn't invalidate the heap property of L{self._tasks}).
+ # See also <http://docs.python.org/library/heapq.html#priority-queue-
+ # implementation-notes>.
+ oldtask[-1] = None
+
+ # Change reference to new task entry and forget the old one
+ assert task_id is not None
+ self._taskdata[task_id] = newtask
+
+ # Add a new task with the old number and arguments
+ heapq.heappush(self._tasks, newtask)
+
+ # Notify a waiting worker
+ self._pool_to_worker.notify()