Add --force-join option to gnt-node add
[ganeti-local] / lib / workerpool.py
index b895b2c..d276b18 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2008 Google Inc.
+# Copyright (C) 2008, 2009, 2010 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 
 """
 
-import collections
 import logging
 import threading
+import heapq
+
+from ganeti import compat
+from ganeti import errors
+
+
+_TERMINATE = object()
+_DEFAULT_PRIORITY = 0
+
+
+class DeferTask(Exception):
+  """Special exception class to defer a task.
+
+  This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
+  task. Optionally, the priority of the task can be changed.
+
+  """
+  def __init__(self, priority=None):
+    """Initializes this class.
+
+    @type priority: number
+    @param priority: New task priority (None means no change)
+
+    """
+    Exception.__init__(self)
+    self.priority = priority
 
 
 class BaseWorker(threading.Thread, object):
@@ -34,41 +59,73 @@ class BaseWorker(threading.Thread, object):
   Users of a worker pool must override RunTask in a subclass.
 
   """
+  # pylint: disable-msg=W0212
   def __init__(self, pool, worker_id):
     """Constructor for BaseWorker thread.
 
-    Args:
-    - pool: Parent worker pool
-    - worker_id: Identifier for this worker
+    @param pool: the parent worker pool
+    @param worker_id: identifier for this worker
 
     """
-    super(BaseWorker, self).__init__()
+    super(BaseWorker, self).__init__(name=worker_id)
     self.pool = pool
-    self.worker_id = worker_id
+    self._worker_id = worker_id
     self._current_task = None
 
-  def ShouldTerminate(self):
-    """Returns whether a worker should terminate.
+    assert self.getName() == worker_id
 
-    """
-    return self.pool.ShouldWorkerTerminate(self)
+  def ShouldTerminate(self):
+    """Returns whether this worker should terminate.
 
-  def _HasRunningTaskUnlocked(self):
-    """Returns whether this worker is currently running a task.
+    Should only be called from within L{RunTask}.
 
     """
-    return (self._current_task is not None)
+    self.pool._lock.acquire()
+    try:
+      assert self._HasRunningTaskUnlocked()
+      return self.pool._ShouldWorkerTerminateUnlocked(self)
+    finally:
+      self.pool._lock.release()
 
-  def HasRunningTask(self):
-    """Returns whether this worker is currently running a task.
+  def GetCurrentPriority(self):
+    """Returns the priority of the current task.
+
+    Should only be called from within L{RunTask}.
 
     """
     self.pool._lock.acquire()
     try:
-      return self._HasRunningTaskUnlocked()
+      assert self._HasRunningTaskUnlocked()
+
+      (priority, _, _) = self._current_task
+
+      return priority
     finally:
       self.pool._lock.release()
 
+  def SetTaskName(self, taskname):
+    """Sets the name of the current task.
+
+    Should only be called from within L{RunTask}.
+
+    @type taskname: string
+    @param taskname: Task's name
+
+    """
+    if taskname:
+      name = "%s/%s" % (self._worker_id, taskname)
+    else:
+      name = self._worker_id
+
+    # Set thread name
+    self.setName(name)
+
+  def _HasRunningTaskUnlocked(self):
+    """Returns whether this worker is currently running a task.
+
+    """
+    return (self._current_task is not None)
+
   def run(self):
     """Main thread function.
 
@@ -77,66 +134,85 @@ class BaseWorker(threading.Thread, object):
     """
     pool = self.pool
 
-    assert not self.HasRunningTask()
-
     while True:
+      assert self._current_task is None
+
+      defer = None
       try:
-        # We wait on lock to be told either terminate or do a task.
+        # Wait on lock to be told either to terminate or to do a task
         pool._lock.acquire()
         try:
-          if pool._ShouldWorkerTerminateUnlocked(self):
-            break
+          task = pool._WaitForTaskUnlocked(self)
 
-          # We only wait if there's no task for us.
-          if not pool._tasks:
-            logging.debug("Worker %s: waiting for tasks", self.worker_id)
+          if task is _TERMINATE:
+            # Told to terminate
+            break
 
-            # wait() releases the lock and sleeps until notified
-            pool._pool_to_worker.wait()
+          if task is None:
+            # Spurious notification, ignore
+            continue
 
-            logging.debug("Worker %s: notified while waiting", self.worker_id)
+          self._current_task = task
 
-            # Were we woken up in order to terminate?
-            if pool._ShouldWorkerTerminateUnlocked(self):
-              break
+          # No longer needed, dispose of reference
+          del task
 
-            if not pool._tasks:
-              # Spurious notification, ignore
-              continue
+          assert self._HasRunningTaskUnlocked()
 
-          # Get task from queue and tell pool about it
-          try:
-            self._current_task = pool._tasks.popleft()
-          finally:
-            pool._worker_to_pool.notifyAll()
         finally:
           pool._lock.release()
 
-        # Run the actual task
+        (priority, _, args) = self._current_task
         try:
-          logging.debug("Worker %s: starting task %r",
-                        self.worker_id, self._current_task)
-          self.RunTask(*self._current_task)
-          logging.debug("Worker %s: done with task %r",
-                        self.worker_id, self._current_task)
-        except:
-          logging.error("Worker %s: Caught unhandled exception",
-                        self.worker_id, exc_info=True)
+          # Run the actual task
+          assert defer is None
+          logging.debug("Starting task %r, priority %s", args, priority)
+          assert self.getName() == self._worker_id
+          try:
+            self.RunTask(*args) # pylint: disable-msg=W0142
+          finally:
+            self.SetTaskName(None)
+          logging.debug("Done with task %r, priority %s", args, priority)
+        except DeferTask, err:
+          defer = err
+
+          if defer.priority is None:
+            # Use same priority
+            defer.priority = priority
+
+          logging.debug("Deferring task %r, new priority %s",
+                        args, defer.priority)
+
+          assert self._HasRunningTaskUnlocked()
+        except: # pylint: disable-msg=W0702
+          logging.exception("Caught unhandled exception")
+
+        assert self._HasRunningTaskUnlocked()
       finally:
         # Notify pool
         pool._lock.acquire()
         try:
+          if defer:
+            assert self._current_task
+            # Schedule again for later run
+            (_, _, args) = self._current_task
+            pool._AddTaskUnlocked(args, defer.priority)
+
           if self._current_task:
             self._current_task = None
             pool._worker_to_pool.notifyAll()
         finally:
           pool._lock.release()
 
-    logging.debug("Worker %s: terminates", self.worker_id)
+      assert not self._HasRunningTaskUnlocked()
+
+    logging.debug("Terminates")
 
   def RunTask(self, *args):
     """Function called to start a task.
 
+    This needs to be implemented by child classes.
+
     """
     raise NotImplementedError()
 
@@ -146,19 +222,18 @@ class WorkerPool(object):
 
   This class is thread-safe.
 
-  Tasks are guaranteed to be started in the order in which they're added to the
-  pool. Due to the nature of threading, they're not guaranteed to finish in the
-  same order.
+  Tasks are guaranteed to be started in the order in which they're
+  added to the pool. Due to the nature of threading, they're not
+  guaranteed to finish in the same order.
 
   """
-  def __init__(self, num_workers, worker_class):
+  def __init__(self, name, num_workers, worker_class):
     """Constructor for worker pool.
 
-    Args:
-    - num_workers: Number of workers to be started (dynamic resizing is not
-                   yet implemented)
-    - worker_class: Class to be instantiated for workers; should derive from
-                    BaseWorker
+    @param num_workers: number of workers to be started
+        (dynamic resizing is not yet implemented)
+    @param worker_class: the class to be instantiated for workers;
+        should derive from L{BaseWorker}
 
     """
     # Some of these variables are accessed by BaseWorker
@@ -167,6 +242,7 @@ class WorkerPool(object):
     self._pool_to_worker = threading.Condition(self._lock)
     self._worker_to_pool = threading.Condition(self._lock)
     self._worker_class = worker_class
+    self._name = name
     self._last_worker_id = 0
     self._workers = []
     self._quiescing = False
@@ -175,56 +251,140 @@ class WorkerPool(object):
     self._termworkers = []
 
     # Queued tasks
-    self._tasks = collections.deque()
+    self._counter = 0
+    self._tasks = []
 
     # Start workers
     self.Resize(num_workers)
 
   # TODO: Implement dynamic resizing?
 
-  def AddTask(self, *args):
+  def _WaitWhileQuiescingUnlocked(self):
+    """Wait until the worker pool has finished quiescing.
+
+    """
+    while self._quiescing:
+      self._pool_to_pool.wait()
+
+  def _AddTaskUnlocked(self, args, priority):
+    """Adds a task to the internal queue.
+
+    @type args: sequence
+    @param args: Arguments passed to L{BaseWorker.RunTask}
+    @type priority: number
+    @param priority: Task priority
+
+    """
+    assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
+    assert isinstance(priority, (int, long)), "Priority must be numeric"
+
+    # This counter is used to ensure elements are processed in their
+    # incoming order. For processing they're sorted by priority and then
+    # counter.
+    self._counter += 1
+
+    heapq.heappush(self._tasks, (priority, self._counter, args))
+
+    # Notify a waiting worker
+    self._pool_to_worker.notify()
+
+  def AddTask(self, args, priority=_DEFAULT_PRIORITY):
     """Adds a task to the queue.
 
-    Args:
-    - *args: Arguments passed to BaseWorker.RunTask
+    @type args: sequence
+    @param args: arguments passed to L{BaseWorker.RunTask}
+    @type priority: number
+    @param priority: Task priority
 
     """
     self._lock.acquire()
     try:
-      # Don't add new tasks while we're quiescing
-      while self._quiescing:
-        self._pool_to_pool.wait()
-
-      # Add task to internal queue
-      self._tasks.append(args)
-
-      # Wake one idling worker up
-      self._pool_to_worker.notify()
+      self._WaitWhileQuiescingUnlocked()
+      self._AddTaskUnlocked(args, priority)
     finally:
       self._lock.release()
 
-  def _ShouldWorkerTerminateUnlocked(self, worker):
-    """Returns whether a worker should terminate.
+  def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY):
+    """Add a list of tasks to the queue.
+
+    @type tasks: list of tuples
+    @param tasks: list of args passed to L{BaseWorker.RunTask}
+    @type priority: number or list of numbers
+    @param priority: Priority for all added tasks or a list with the priority
+                     for each task
 
     """
-    return (worker in self._termworkers)
+    assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
+      "Each task must be a sequence"
 
-  def ShouldWorkerTerminate(self, worker):
-    """Returns whether a worker should terminate.
+    assert (isinstance(priority, (int, long)) or
+            compat.all(isinstance(prio, (int, long)) for prio in priority)), \
+           "Priority must be numeric or be a list of numeric values"
+
+    if isinstance(priority, (int, long)):
+      priority = [priority] * len(tasks)
+    elif len(priority) != len(tasks):
+      raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
+                                   " number of tasks (%s)" %
+                                   (len(priority), len(tasks)))
 
-    """
     self._lock.acquire()
     try:
-      return self._ShouldWorkerTerminateUnlocked(self)
+      self._WaitWhileQuiescingUnlocked()
+
+      assert compat.all(isinstance(prio, (int, long)) for prio in priority)
+      assert len(tasks) == len(priority)
+
+      for args, priority in zip(tasks, priority):
+        self._AddTaskUnlocked(args, priority)
     finally:
       self._lock.release()
 
+  def _WaitForTaskUnlocked(self, worker):
+    """Waits for a task for a worker.
+
+    @type worker: L{BaseWorker}
+    @param worker: Worker thread
+
+    """
+    if self._ShouldWorkerTerminateUnlocked(worker):
+      return _TERMINATE
+
+    # We only wait if there's no task for us.
+    if not self._tasks:
+      logging.debug("Waiting for tasks")
+
+      # wait() releases the lock and sleeps until notified
+      self._pool_to_worker.wait()
+
+      logging.debug("Notified while waiting")
+
+      # Were we woken up in order to terminate?
+      if self._ShouldWorkerTerminateUnlocked(worker):
+        return _TERMINATE
+
+      if not self._tasks:
+        # Spurious notification, ignore
+        return None
+
+    # Get task from queue and tell pool about it
+    try:
+      return heapq.heappop(self._tasks)
+    finally:
+      self._worker_to_pool.notifyAll()
+
+  def _ShouldWorkerTerminateUnlocked(self, worker):
+    """Returns whether a worker should terminate.
+
+    """
+    return (worker in self._termworkers)
+
   def _HasRunningTasksUnlocked(self):
     """Checks whether there's a task running in a worker.
 
     """
     for worker in self._workers + self._termworkers:
-      if worker._HasRunningTaskUnlocked():
+      if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
         return True
     return False
 
@@ -249,8 +409,12 @@ class WorkerPool(object):
       self._lock.release()
 
   def _NewWorkerIdUnlocked(self):
+    """Return an identifier for a new worker.
+
+    """
     self._last_worker_id += 1
-    return self._last_worker_id
+
+    return "%s%d" % (self._name, self._last_worker_id)
 
   def _ResizeUnlocked(self, num_workers):
     """Changes the number of workers.
@@ -303,7 +467,7 @@ class WorkerPool(object):
 
     elif current_count < num_workers:
       # Create (num_workers - current_count) new workers
-      for i in xrange(num_workers - current_count):
+      for _ in range(num_workers - current_count):
         worker = self._worker_class(self, self._NewWorkerIdUnlocked())
         self._workers.append(worker)
         worker.start()
@@ -311,8 +475,7 @@ class WorkerPool(object):
   def Resize(self, num_workers):
     """Changes the number of workers in the pool.
 
-    Args:
-    - num_workers: New number of workers
+    @param num_workers: the new number of workers
 
     """
     self._lock.acquire()