"""
-import collections
import logging
import threading
+import heapq
from ganeti import compat
+from ganeti import errors
_TERMINATE = object()
+_DEFAULT_PRIORITY = 0
+
+
+class DeferTask(Exception):
+ """Special exception class to defer a task.
+
+ This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
+ task. Optionally, the priority of the task can be changed.
+
+ """
+ def __init__(self, priority=None):
+ """Initializes this class.
+
+ @type priority: number
+ @param priority: New task priority (None means no change)
+
+ """
+ Exception.__init__(self)
+ self.priority = priority
class BaseWorker(threading.Thread, object):
Users of a worker pool must override RunTask in a subclass.
"""
- # pylint: disable-msg=W0212
+ # pylint: disable=W0212
def __init__(self, pool, worker_id):
"""Constructor for BaseWorker thread.
"""
super(BaseWorker, self).__init__(name=worker_id)
self.pool = pool
+ self._worker_id = worker_id
self._current_task = None
+ assert self.getName() == worker_id
+
def ShouldTerminate(self):
"""Returns whether this worker should terminate.
finally:
self.pool._lock.release()
+ def GetCurrentPriority(self):
+ """Returns the priority of the current task.
+
+ Should only be called from within L{RunTask}.
+
+ """
+ self.pool._lock.acquire()
+ try:
+ assert self._HasRunningTaskUnlocked()
+
+ (priority, _, _) = self._current_task
+
+ return priority
+ finally:
+ self.pool._lock.release()
+
+ def SetTaskName(self, taskname):
+ """Sets the name of the current task.
+
+ Should only be called from within L{RunTask}.
+
+ @type taskname: string
+ @param taskname: Task's name
+
+ """
+ if taskname:
+ name = "%s/%s" % (self._worker_id, taskname)
+ else:
+ name = self._worker_id
+
+ # Set thread name
+ self.setName(name)
+
def _HasRunningTaskUnlocked(self):
"""Returns whether this worker is currently running a task.
"""
pool = self.pool
- assert self._current_task is None
-
while True:
+ assert self._current_task is None
+
+ defer = None
try:
# Wait on lock to be told either to terminate or to do a task
pool._lock.acquire()
self._current_task = task
+ # No longer needed, dispose of reference
+ del task
+
assert self._HasRunningTaskUnlocked()
+
finally:
pool._lock.release()
- # Run the actual task
+ (priority, _, args) = self._current_task
try:
- logging.debug("Starting task %r", self._current_task)
- self.RunTask(*self._current_task)
- logging.debug("Done with task %r", self._current_task)
- except: # pylint: disable-msg=W0702
+ # Run the actual task
+ assert defer is None
+ logging.debug("Starting task %r, priority %s", args, priority)
+ assert self.getName() == self._worker_id
+ try:
+ self.RunTask(*args) # pylint: disable=W0142
+ finally:
+ self.SetTaskName(None)
+ logging.debug("Done with task %r, priority %s", args, priority)
+ except DeferTask, err:
+ defer = err
+
+ if defer.priority is None:
+ # Use same priority
+ defer.priority = priority
+
+ logging.debug("Deferring task %r, new priority %s",
+ args, defer.priority)
+
+ assert self._HasRunningTaskUnlocked()
+ except: # pylint: disable=W0702
logging.exception("Caught unhandled exception")
assert self._HasRunningTaskUnlocked()
# Notify pool
pool._lock.acquire()
try:
+ if defer:
+ assert self._current_task
+ # Schedule again for later run
+ (_, _, args) = self._current_task
+ pool._AddTaskUnlocked(args, defer.priority)
+
if self._current_task:
self._current_task = None
pool._worker_to_pool.notifyAll()
self._last_worker_id = 0
self._workers = []
self._quiescing = False
+ self._active = True
# Terminating workers
self._termworkers = []
# Queued tasks
- self._tasks = collections.deque()
+ self._counter = 0
+ self._tasks = []
# Start workers
self.Resize(num_workers)
while self._quiescing:
self._pool_to_pool.wait()
- def _AddTaskUnlocked(self, args):
+ def _AddTaskUnlocked(self, args, priority):
+ """Adds a task to the internal queue.
+
+ @type args: sequence
+ @param args: Arguments passed to L{BaseWorker.RunTask}
+ @type priority: number
+ @param priority: Task priority
+
+ """
assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
+ assert isinstance(priority, (int, long)), "Priority must be numeric"
+
+ # This counter is used to ensure elements are processed in their
+ # incoming order. For processing they're sorted by priority and then
+ # counter.
+ self._counter += 1
- self._tasks.append(args)
+ heapq.heappush(self._tasks, (priority, self._counter, args))
# Notify a waiting worker
self._pool_to_worker.notify()
- def AddTask(self, args):
+ def AddTask(self, args, priority=_DEFAULT_PRIORITY):
"""Adds a task to the queue.
@type args: sequence
@param args: arguments passed to L{BaseWorker.RunTask}
+ @type priority: number
+ @param priority: Task priority
"""
self._lock.acquire()
try:
self._WaitWhileQuiescingUnlocked()
- self._AddTaskUnlocked(args)
+ self._AddTaskUnlocked(args, priority)
finally:
self._lock.release()
- def AddManyTasks(self, tasks):
+ def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY):
"""Add a list of tasks to the queue.
@type tasks: list of tuples
@param tasks: list of args passed to L{BaseWorker.RunTask}
+ @type priority: number or list of numbers
+ @param priority: Priority for all added tasks or a list with the priority
+ for each task
"""
assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
"Each task must be a sequence"
+ assert (isinstance(priority, (int, long)) or
+ compat.all(isinstance(prio, (int, long)) for prio in priority)), \
+ "Priority must be numeric or be a list of numeric values"
+
+ if isinstance(priority, (int, long)):
+ priority = [priority] * len(tasks)
+ elif len(priority) != len(tasks):
+ raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
+ " number of tasks (%s)" %
+ (len(priority), len(tasks)))
+
self._lock.acquire()
try:
self._WaitWhileQuiescingUnlocked()
- for args in tasks:
- self._AddTaskUnlocked(args)
+ assert compat.all(isinstance(prio, (int, long)) for prio in priority)
+ assert len(tasks) == len(priority)
+
+ for args, priority in zip(tasks, priority):
+ self._AddTaskUnlocked(args, priority)
+ finally:
+ self._lock.release()
+
+ def SetActive(self, active):
+ """Enable/disable processing of tasks.
+
+ This is different from L{Quiesce} in the sense that this function just
+ changes an internal flag and doesn't wait for the queue to be empty. Tasks
+ already being processed continue normally, but no new tasks will be
+ started. New tasks can still be added.
+
+ @type active: bool
+ @param active: Whether tasks should be processed
+
+ """
+ self._lock.acquire()
+ try:
+ self._active = active
+
+ if active:
+ # Tell all workers to continue processing
+ self._pool_to_worker.notifyAll()
finally:
self._lock.release()
return _TERMINATE
# We only wait if there's no task for us.
- if not self._tasks:
+ if not (self._active and self._tasks):
logging.debug("Waiting for tasks")
- # wait() releases the lock and sleeps until notified
- self._pool_to_worker.wait()
+ while True:
+ # wait() releases the lock and sleeps until notified
+ self._pool_to_worker.wait()
- logging.debug("Notified while waiting")
+ logging.debug("Notified while waiting")
- # Were we woken up in order to terminate?
- if self._ShouldWorkerTerminateUnlocked(worker):
- return _TERMINATE
+ # Were we woken up in order to terminate?
+ if self._ShouldWorkerTerminateUnlocked(worker):
+ return _TERMINATE
- if not self._tasks:
- # Spurious notification, ignore
- return None
+ # Just loop if pool is not processing tasks at this time
+ if self._active and self._tasks:
+ break
# Get task from queue and tell pool about it
try:
- return self._tasks.popleft()
+ return heapq.heappop(self._tasks)
finally:
self._worker_to_pool.notifyAll()
"""
for worker in self._workers + self._termworkers:
- if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
+ if worker._HasRunningTaskUnlocked(): # pylint: disable=W0212
return True
return False
+ def HasRunningTasks(self):
+ """Checks whether there's at least one task running.
+
+ """
+ self._lock.acquire()
+ try:
+ return self._HasRunningTasksUnlocked()
+ finally:
+ self._lock.release()
+
def Quiesce(self):
"""Waits until the task queue is empty.