Introduce a micro type system for opcodes
[ganeti-local] / lib / workerpool.py
index 2d3937a..0ca9155 100644 (file)
@@ -27,6 +27,8 @@ import collections
 import logging
 import threading
 
+from ganeti import compat
+
 
 class BaseWorker(threading.Thread, object):
   """Base worker class for worker pools.
@@ -42,9 +44,8 @@ class BaseWorker(threading.Thread, object):
     @param worker_id: identifier for this worker
 
     """
-    super(BaseWorker, self).__init__()
+    super(BaseWorker, self).__init__(name=worker_id)
     self.pool = pool
-    self.worker_id = worker_id
     self._current_task = None
 
   def ShouldTerminate(self):
@@ -89,12 +90,12 @@ class BaseWorker(threading.Thread, object):
 
           # We only wait if there's no task for us.
           if not pool._tasks:
-            logging.debug("Worker %s: waiting for tasks", self.worker_id)
+            logging.debug("Waiting for tasks")
 
             # wait() releases the lock and sleeps until notified
             pool._pool_to_worker.wait()
 
-            logging.debug("Worker %s: notified while waiting", self.worker_id)
+            logging.debug("Notified while waiting")
 
             # Were we woken up in order to terminate?
             if pool._ShouldWorkerTerminateUnlocked(self):
@@ -114,14 +115,11 @@ class BaseWorker(threading.Thread, object):
 
         # Run the actual task
         try:
-          logging.debug("Worker %s: starting task %r",
-                        self.worker_id, self._current_task)
+          logging.debug("Starting task %r", self._current_task)
           self.RunTask(*self._current_task)
-          logging.debug("Worker %s: done with task %r",
-                        self.worker_id, self._current_task)
+          logging.debug("Done with task %r", self._current_task)
         except: # pylint: disable-msg=W0702
-          logging.error("Worker %s: Caught unhandled exception",
-                        self.worker_id, exc_info=True)
+          logging.exception("Caught unhandled exception")
       finally:
         # Notify pool
         pool._lock.acquire()
@@ -132,7 +130,7 @@ class BaseWorker(threading.Thread, object):
         finally:
           pool._lock.release()
 
-    logging.debug("Worker %s: terminates", self.worker_id)
+    logging.debug("Terminates")
 
   def RunTask(self, *args):
     """Function called to start a task.
@@ -153,7 +151,7 @@ class WorkerPool(object):
   guaranteed to finish in the same order.
 
   """
-  def __init__(self, num_workers, worker_class):
+  def __init__(self, name, num_workers, worker_class):
     """Constructor for worker pool.
 
     @param num_workers: number of workers to be started
@@ -168,6 +166,7 @@ class WorkerPool(object):
     self._pool_to_worker = threading.Condition(self._lock)
     self._worker_to_pool = threading.Condition(self._lock)
     self._worker_class = worker_class
+    self._name = name
     self._last_worker_id = 0
     self._workers = []
     self._quiescing = False
@@ -183,6 +182,13 @@ class WorkerPool(object):
 
   # TODO: Implement dynamic resizing?
 
+  def _WaitWhileQuiescingUnlocked(self):
+    """Wait until the worker pool has finished quiescing.
+
+    """
+    while self._quiescing:
+      self._pool_to_pool.wait()
+
   def AddTask(self, *args):
     """Adds a task to the queue.
 
@@ -191,11 +197,8 @@ class WorkerPool(object):
     """
     self._lock.acquire()
     try:
-      # Don't add new tasks while we're quiescing
-      while self._quiescing:
-        self._pool_to_pool.wait()
+      self._WaitWhileQuiescingUnlocked()
 
-      # Add task to internal queue
       self._tasks.append(args)
 
       # Wake one idling worker up
@@ -203,6 +206,27 @@ class WorkerPool(object):
     finally:
       self._lock.release()
 
+  def AddManyTasks(self, tasks):
+    """Add a list of tasks to the queue.
+
+    @type tasks: list of tuples
+    @param tasks: list of args passed to L{BaseWorker.RunTask}
+
+    """
+    assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
+      "Each task must be a sequence"
+
+    self._lock.acquire()
+    try:
+      self._WaitWhileQuiescingUnlocked()
+
+      self._tasks.extend(tasks)
+
+      for _ in tasks:
+        self._pool_to_worker.notify()
+    finally:
+      self._lock.release()
+
   def _ShouldWorkerTerminateUnlocked(self, worker):
     """Returns whether a worker should terminate.
 
@@ -253,7 +277,8 @@ class WorkerPool(object):
 
     """
     self._last_worker_id += 1
-    return self._last_worker_id
+
+    return "%s%d" % (self._name, self._last_worker_id)
 
   def _ResizeUnlocked(self, num_workers):
     """Changes the number of workers.