"""
-import collections
import logging
import threading
+import heapq
from ganeti import compat
+from ganeti import errors
_TERMINATE = object()
+_DEFAULT_PRIORITY = 0
+
+
+class DeferTask(Exception):
+ """Special exception class to defer a task.
+
+ This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
+ task. Optionally, the priority of the task can be changed.
+
+ """
+ def __init__(self, priority=None):
+ """Initializes this class.
+
+ @type priority: number
+ @param priority: New task priority (None means no change)
+
+ """
+ Exception.__init__(self)
+ self.priority = priority
class BaseWorker(threading.Thread, object):
"""
super(BaseWorker, self).__init__(name=worker_id)
self.pool = pool
+ self._worker_id = worker_id
self._current_task = None
+ assert self.getName() == worker_id
+
def ShouldTerminate(self):
"""Returns whether this worker should terminate.
finally:
self.pool._lock.release()
+ def GetCurrentPriority(self):
+ """Returns the priority of the current task.
+
+ Should only be called from within L{RunTask}.
+
+ """
+ self.pool._lock.acquire()
+ try:
+ assert self._HasRunningTaskUnlocked()
+
+ (priority, _, _) = self._current_task
+
+ return priority
+ finally:
+ self.pool._lock.release()
+
+ def SetTaskName(self, taskname):
+ """Sets the name of the current task.
+
+ Should only be called from within L{RunTask}.
+
+ @type taskname: string
+ @param taskname: Task's name
+
+ """
+ if taskname:
+ name = "%s/%s" % (self._worker_id, taskname)
+ else:
+ name = self._worker_id
+
+ # Set thread name
+ self.setName(name)
+
def _HasRunningTaskUnlocked(self):
"""Returns whether this worker is currently running a task.
"""
pool = self.pool
- assert self._current_task is None
-
while True:
+ assert self._current_task is None
+
+ defer = None
try:
# Wait on lock to be told either to terminate or to do a task
pool._lock.acquire()
self._current_task = task
+ # No longer needed, dispose of reference
+ del task
+
assert self._HasRunningTaskUnlocked()
+
finally:
pool._lock.release()
- # Run the actual task
+ (priority, _, args) = self._current_task
try:
- logging.debug("Starting task %r", self._current_task)
- self.RunTask(*self._current_task)
- logging.debug("Done with task %r", self._current_task)
+ # Run the actual task
+ assert defer is None
+ logging.debug("Starting task %r, priority %s", args, priority)
+ assert self.getName() == self._worker_id
+ try:
+ self.RunTask(*args) # pylint: disable-msg=W0142
+ finally:
+ self.SetTaskName(None)
+ logging.debug("Done with task %r, priority %s", args, priority)
+ except DeferTask, err:
+ defer = err
+
+ if defer.priority is None:
+ # Use same priority
+ defer.priority = priority
+
+ logging.debug("Deferring task %r, new priority %s",
+ args, defer.priority)
+
+ assert self._HasRunningTaskUnlocked()
except: # pylint: disable-msg=W0702
logging.exception("Caught unhandled exception")
# Notify pool
pool._lock.acquire()
try:
+ if defer:
+ assert self._current_task
+ # Schedule again for later run
+ (_, _, args) = self._current_task
+ pool._AddTaskUnlocked(args, defer.priority)
+
if self._current_task:
self._current_task = None
pool._worker_to_pool.notifyAll()
self._termworkers = []
# Queued tasks
- self._tasks = collections.deque()
+ self._counter = 0
+ self._tasks = []
# Start workers
self.Resize(num_workers)
while self._quiescing:
self._pool_to_pool.wait()
- def _AddTaskUnlocked(self, args):
+ def _AddTaskUnlocked(self, args, priority):
+ """Adds a task to the internal queue.
+
+ @type args: sequence
+ @param args: Arguments passed to L{BaseWorker.RunTask}
+ @type priority: number
+ @param priority: Task priority
+
+ """
assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
+ assert isinstance(priority, (int, long)), "Priority must be numeric"
+
+ # This counter is used to ensure elements are processed in their
+ # incoming order. For processing they're sorted by priority and then
+ # counter.
+ self._counter += 1
- self._tasks.append(args)
+ heapq.heappush(self._tasks, (priority, self._counter, args))
# Notify a waiting worker
self._pool_to_worker.notify()
- def AddTask(self, args):
+ def AddTask(self, args, priority=_DEFAULT_PRIORITY):
"""Adds a task to the queue.
@type args: sequence
@param args: arguments passed to L{BaseWorker.RunTask}
+ @type priority: number
+ @param priority: Task priority
"""
self._lock.acquire()
try:
self._WaitWhileQuiescingUnlocked()
- self._AddTaskUnlocked(args)
+ self._AddTaskUnlocked(args, priority)
finally:
self._lock.release()
- def AddManyTasks(self, tasks):
+ def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY):
"""Add a list of tasks to the queue.
@type tasks: list of tuples
@param tasks: list of args passed to L{BaseWorker.RunTask}
+ @type priority: number or list of numbers
+ @param priority: Priority for all added tasks or a list with the priority
+ for each task
"""
assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
"Each task must be a sequence"
+ assert (isinstance(priority, (int, long)) or
+ compat.all(isinstance(prio, (int, long)) for prio in priority)), \
+ "Priority must be numeric or be a list of numeric values"
+
+ if isinstance(priority, (int, long)):
+ priority = [priority] * len(tasks)
+ elif len(priority) != len(tasks):
+ raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
+ " number of tasks (%s)" %
+ (len(priority), len(tasks)))
+
self._lock.acquire()
try:
self._WaitWhileQuiescingUnlocked()
- for args in tasks:
- self._AddTaskUnlocked(args)
+ assert compat.all(isinstance(prio, (int, long)) for prio in priority)
+ assert len(tasks) == len(priority)
+
+ for args, priority in zip(tasks, priority):
+ self._AddTaskUnlocked(args, priority)
finally:
self._lock.release()
# Get task from queue and tell pool about it
try:
- return self._tasks.popleft()
+ return heapq.heappop(self._tasks)
finally:
self._worker_to_pool.notifyAll()