Merge remote-tracking branch 'origin/stable-2.8'
[ganeti-local] / lib / workerpool.py
index 58cce36..6b558ce 100644 (file)
@@ -54,6 +54,12 @@ class DeferTask(Exception):
     self.priority = priority
 
 
+class NoSuchTask(Exception):
+  """Exception raised when a task can't be found.
+
+  """
+
+
 class BaseWorker(threading.Thread, object):
   """Base worker class for worker pools.
 
@@ -127,6 +133,22 @@ class BaseWorker(threading.Thread, object):
     """
     return (self._current_task is not None)
 
+  def _GetCurrentOrderAndTaskId(self):
+    """Returns the order and task ID of the current task.
+
+    Should only be called from within L{RunTask}.
+
+    """
+    self.pool._lock.acquire()
+    try:
+      assert self._HasRunningTaskUnlocked()
+
+      (_, order_id, task_id, _) = self._current_task
+
+      return (order_id, task_id)
+    finally:
+      self.pool._lock.release()
+
   def run(self):
     """Main thread function.
 
@@ -196,8 +218,8 @@ class BaseWorker(threading.Thread, object):
           if defer:
             assert self._current_task
             # Schedule again for later run
-            (_, _, _, args) = self._current_task
-            pool._AddTaskUnlocked(args, defer.priority, None)
+            (_, _, task_id, args) = self._current_task
+            pool._AddTaskUnlocked(args, defer.priority, task_id)
 
           if self._current_task:
             self._current_task = None
@@ -293,6 +315,8 @@ class WorkerPool(object):
     """
     assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
     assert isinstance(priority, (int, long)), "Priority must be numeric"
+    assert task_id is None or isinstance(task_id, (int, long)), \
+      "Task ID must be numeric or None"
 
     task = [priority, self._counter.next(), task_id, args]
 
@@ -377,6 +401,52 @@ class WorkerPool(object):
     finally:
       self._lock.release()
 
+  def ChangeTaskPriority(self, task_id, priority):
+    """Changes a task's priority.
+
+    @param task_id: Task ID
+    @type priority: number
+    @param priority: New task priority
+    @raise NoSuchTask: When the task referred by C{task_id} can not be found
+      (it may never have existed, may have already been processed, or is
+      currently running)
+
+    """
+    assert isinstance(priority, (int, long)), "Priority must be numeric"
+
+    self._lock.acquire()
+    try:
+      logging.debug("About to change priority of task %s to %s",
+                    task_id, priority)
+
+      # Find old task
+      oldtask = self._taskdata.get(task_id, None)
+      if oldtask is None:
+        msg = "Task '%s' was not found" % task_id
+        logging.debug(msg)
+        raise NoSuchTask(msg)
+
+      # Prepare new task
+      newtask = [priority] + oldtask[1:]
+
+      # Mark old entry as abandoned (this doesn't change the sort order and
+      # therefore doesn't invalidate the heap property of L{self._tasks}).
+      # See also <http://docs.python.org/library/heapq.html#priority-queue-
+      # implementation-notes>.
+      oldtask[-1] = None
+
+      # Change reference to new task entry and forget the old one
+      assert task_id is not None
+      self._taskdata[task_id] = newtask
+
+      # Add a new task with the old number and arguments
+      heapq.heappush(self._tasks, newtask)
+
+      # Notify a waiting worker
+      self._pool_to_worker.notify()
+    finally:
+      self._lock.release()
+
   def SetActive(self, active):
     """Enable/disable processing of tasks.
 
@@ -418,8 +488,15 @@ class WorkerPool(object):
         finally:
           self._worker_to_pool.notifyAll()
 
+        (_, _, task_id, args) = task
+
+        # If the priority was changed, "args" is None
+        if args is None:
+          # Try again
+          logging.debug("Found abandoned task (%r)", task)
+          continue
+
         # Delete reference
-        (_, _, task_id, _) = task
         if task_id is not None:
           del self._taskdata[task_id]