Revert "jqueue: Resume jobs from “waitlock” status"
[ganeti-local] / lib / workerpool.py
index 1ec89af..9f00b91 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2008 Google Inc.
+# Copyright (C) 2008, 2009, 2010 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -27,6 +27,11 @@ import collections
 import logging
 import threading
 
+from ganeti import compat
+
+
+_TERMINATE = object()
+
 
 class BaseWorker(threading.Thread, object):
   """Base worker class for worker pools.
@@ -44,29 +49,46 @@ class BaseWorker(threading.Thread, object):
     """
     super(BaseWorker, self).__init__(name=worker_id)
     self.pool = pool
+    self._worker_id = worker_id
     self._current_task = None
 
+    assert self.getName() == worker_id
+
   def ShouldTerminate(self):
-    """Returns whether a worker should terminate.
+    """Returns whether this worker should terminate.
+
+    Should only be called from within L{RunTask}.
 
     """
-    return self.pool.ShouldWorkerTerminate(self)
+    self.pool._lock.acquire()
+    try:
+      assert self._HasRunningTaskUnlocked()
+      return self.pool._ShouldWorkerTerminateUnlocked(self)
+    finally:
+      self.pool._lock.release()
 
-  def _HasRunningTaskUnlocked(self):
-    """Returns whether this worker is currently running a task.
+  def SetTaskName(self, taskname):
+    """Sets the name of the current task.
+
+    Should only be called from within L{RunTask}.
+
+    @type taskname: string
+    @param taskname: Task's name
 
     """
-    return (self._current_task is not None)
+    if taskname:
+      name = "%s/%s" % (self._worker_id, taskname)
+    else:
+      name = self._worker_id
+
+    # Set thread name
+    self.setName(name)
 
-  def HasRunningTask(self):
+  def _HasRunningTaskUnlocked(self):
     """Returns whether this worker is currently running a task.
 
     """
-    self.pool._lock.acquire()
-    try:
-      return self._HasRunningTaskUnlocked()
-    finally:
-      self.pool._lock.release()
+    return (self._current_task is not None)
 
   def run(self):
     """Main thread function.
@@ -76,48 +98,45 @@ class BaseWorker(threading.Thread, object):
     """
     pool = self.pool
 
-    assert not self.HasRunningTask()
-
     while True:
+      assert self._current_task is None
       try:
-        # We wait on lock to be told either terminate or do a task.
+        # Wait on lock to be told either to terminate or to do a task
         pool._lock.acquire()
         try:
-          if pool._ShouldWorkerTerminateUnlocked(self):
-            break
+          task = pool._WaitForTaskUnlocked(self)
 
-          # We only wait if there's no task for us.
-          if not pool._tasks:
-            logging.debug("Waiting for tasks")
+          if task is _TERMINATE:
+            # Told to terminate
+            break
 
-            # wait() releases the lock and sleeps until notified
-            pool._pool_to_worker.wait()
+          if task is None:
+            # Spurious notification, ignore
+            continue
 
-            logging.debug("Notified while waiting")
+          self._current_task = task
 
-            # Were we woken up in order to terminate?
-            if pool._ShouldWorkerTerminateUnlocked(self):
-              break
+          # No longer needed, dispose of reference
+          del task
 
-            if not pool._tasks:
-              # Spurious notification, ignore
-              continue
+          assert self._HasRunningTaskUnlocked()
 
-          # Get task from queue and tell pool about it
-          try:
-            self._current_task = pool._tasks.popleft()
-          finally:
-            pool._worker_to_pool.notifyAll()
         finally:
           pool._lock.release()
 
         # Run the actual task
         try:
           logging.debug("Starting task %r", self._current_task)
-          self.RunTask(*self._current_task)
+          assert self.getName() == self._worker_id
+          try:
+            self.RunTask(*self._current_task)
+          finally:
+            self.SetTaskName(None)
           logging.debug("Done with task %r", self._current_task)
         except: # pylint: disable-msg=W0702
           logging.exception("Caught unhandled exception")
+
+        assert self._HasRunningTaskUnlocked()
       finally:
         # Notify pool
         pool._lock.acquire()
@@ -128,6 +147,8 @@ class BaseWorker(threading.Thread, object):
         finally:
           pool._lock.release()
 
+      assert not self._HasRunningTaskUnlocked()
+
     logging.debug("Terminates")
 
   def RunTask(self, *args):
@@ -180,41 +201,92 @@ class WorkerPool(object):
 
   # TODO: Implement dynamic resizing?
 
-  def AddTask(self, *args):
+  def _WaitWhileQuiescingUnlocked(self):
+    """Wait until the worker pool has finished quiescing.
+
+    """
+    while self._quiescing:
+      self._pool_to_pool.wait()
+
+  def _AddTaskUnlocked(self, args):
+    assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
+
+    self._tasks.append(args)
+
+    # Notify a waiting worker
+    self._pool_to_worker.notify()
+
+  def AddTask(self, args):
     """Adds a task to the queue.
 
+    @type args: sequence
     @param args: arguments passed to L{BaseWorker.RunTask}
 
     """
     self._lock.acquire()
     try:
-      # Don't add new tasks while we're quiescing
-      while self._quiescing:
-        self._pool_to_pool.wait()
+      self._WaitWhileQuiescingUnlocked()
+      self._AddTaskUnlocked(args)
+    finally:
+      self._lock.release()
 
-      # Add task to internal queue
-      self._tasks.append(args)
+  def AddManyTasks(self, tasks):
+    """Add a list of tasks to the queue.
 
-      # Wake one idling worker up
-      self._pool_to_worker.notify()
+    @type tasks: list of tuples
+    @param tasks: list of args passed to L{BaseWorker.RunTask}
+
+    """
+    assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
+      "Each task must be a sequence"
+
+    self._lock.acquire()
+    try:
+      self._WaitWhileQuiescingUnlocked()
+
+      for args in tasks:
+        self._AddTaskUnlocked(args)
     finally:
       self._lock.release()
 
-  def _ShouldWorkerTerminateUnlocked(self, worker):
-    """Returns whether a worker should terminate.
+  def _WaitForTaskUnlocked(self, worker):
+    """Waits for a task for a worker.
+
+    @type worker: L{BaseWorker}
+    @param worker: Worker thread
 
     """
-    return (worker in self._termworkers)
+    if self._ShouldWorkerTerminateUnlocked(worker):
+      return _TERMINATE
 
-  def ShouldWorkerTerminate(self, worker):
-    """Returns whether a worker should terminate.
+    # We only wait if there's no task for us.
+    if not self._tasks:
+      logging.debug("Waiting for tasks")
 
-    """
-    self._lock.acquire()
+      # wait() releases the lock and sleeps until notified
+      self._pool_to_worker.wait()
+
+      logging.debug("Notified while waiting")
+
+      # Were we woken up in order to terminate?
+      if self._ShouldWorkerTerminateUnlocked(worker):
+        return _TERMINATE
+
+      if not self._tasks:
+        # Spurious notification, ignore
+        return None
+
+    # Get task from queue and tell pool about it
     try:
-      return self._ShouldWorkerTerminateUnlocked(worker)
+      return self._tasks.popleft()
     finally:
-      self._lock.release()
+      self._worker_to_pool.notifyAll()
+
+  def _ShouldWorkerTerminateUnlocked(self, worker):
+    """Returns whether a worker should terminate.
+
+    """
+    return (worker in self._termworkers)
 
   def _HasRunningTasksUnlocked(self):
     """Checks whether there's a task running in a worker.