X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/21c5ad52c8a75d3781f2f0b2e9d7e5688559dbe8..905108aba23c253f17ad1a4f2bf4883b8108bda5:/lib/workerpool.py diff --git a/lib/workerpool.py b/lib/workerpool.py index 25f31b4..d276b18 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -23,14 +23,34 @@ """ -import collections import logging import threading +import heapq from ganeti import compat +from ganeti import errors _TERMINATE = object() +_DEFAULT_PRIORITY = 0 + + +class DeferTask(Exception): + """Special exception class to defer a task. + + This class can be raised by L{BaseWorker.RunTask} to defer the execution of a + task. Optionally, the priority of the task can be changed. + + """ + def __init__(self, priority=None): + """Initializes this class. + + @type priority: number + @param priority: New task priority (None means no change) + + """ + Exception.__init__(self) + self.priority = priority class BaseWorker(threading.Thread, object): @@ -49,30 +69,63 @@ class BaseWorker(threading.Thread, object): """ super(BaseWorker, self).__init__(name=worker_id) self.pool = pool + self._worker_id = worker_id self._current_task = None + assert self.getName() == worker_id + def ShouldTerminate(self): - """Returns whether a worker should terminate. + """Returns whether this worker should terminate. - """ - return self.pool.ShouldWorkerTerminate(self) - - def _HasRunningTaskUnlocked(self): - """Returns whether this worker is currently running a task. + Should only be called from within L{RunTask}. """ - return (self._current_task is not None) + self.pool._lock.acquire() + try: + assert self._HasRunningTaskUnlocked() + return self.pool._ShouldWorkerTerminateUnlocked(self) + finally: + self.pool._lock.release() - def HasRunningTask(self): - """Returns whether this worker is currently running a task. + def GetCurrentPriority(self): + """Returns the priority of the current task. + + Should only be called from within L{RunTask}. """ self.pool._lock.acquire() try: - return self._HasRunningTaskUnlocked() + assert self._HasRunningTaskUnlocked() + + (priority, _, _) = self._current_task + + return priority finally: self.pool._lock.release() + def SetTaskName(self, taskname): + """Sets the name of the current task. + + Should only be called from within L{RunTask}. + + @type taskname: string + @param taskname: Task's name + + """ + if taskname: + name = "%s/%s" % (self._worker_id, taskname) + else: + name = self._worker_id + + # Set thread name + self.setName(name) + + def _HasRunningTaskUnlocked(self): + """Returns whether this worker is currently running a task. + + """ + return (self._current_task is not None) + def run(self): """Main thread function. @@ -81,9 +134,10 @@ class BaseWorker(threading.Thread, object): """ pool = self.pool - assert not self.HasRunningTask() - while True: + assert self._current_task is None + + defer = None try: # Wait on lock to be told either to terminate or to do a task pool._lock.acquire() @@ -100,27 +154,58 @@ class BaseWorker(threading.Thread, object): self._current_task = task + # No longer needed, dispose of reference + del task + assert self._HasRunningTaskUnlocked() + finally: pool._lock.release() - # Run the actual task + (priority, _, args) = self._current_task try: - logging.debug("Starting task %r", self._current_task) - self.RunTask(*self._current_task) - logging.debug("Done with task %r", self._current_task) + # Run the actual task + assert defer is None + logging.debug("Starting task %r, priority %s", args, priority) + assert self.getName() == self._worker_id + try: + self.RunTask(*args) # pylint: disable-msg=W0142 + finally: + self.SetTaskName(None) + logging.debug("Done with task %r, priority %s", args, priority) + except DeferTask, err: + defer = err + + if defer.priority is None: + # Use same priority + defer.priority = priority + + logging.debug("Deferring task %r, new priority %s", + args, defer.priority) + + assert self._HasRunningTaskUnlocked() except: # pylint: disable-msg=W0702 logging.exception("Caught unhandled exception") + + assert self._HasRunningTaskUnlocked() finally: # Notify pool pool._lock.acquire() try: + if defer: + assert self._current_task + # Schedule again for later run + (_, _, args) = self._current_task + pool._AddTaskUnlocked(args, defer.priority) + if self._current_task: self._current_task = None pool._worker_to_pool.notifyAll() finally: pool._lock.release() + assert not self._HasRunningTaskUnlocked() + logging.debug("Terminates") def RunTask(self, *args): @@ -166,7 +251,8 @@ class WorkerPool(object): self._termworkers = [] # Queued tasks - self._tasks = collections.deque() + self._counter = 0 + self._tasks = [] # Start workers self.Resize(num_workers) @@ -180,43 +266,77 @@ class WorkerPool(object): while self._quiescing: self._pool_to_pool.wait() - def _AddTaskUnlocked(self, args): + def _AddTaskUnlocked(self, args, priority): + """Adds a task to the internal queue. + + @type args: sequence + @param args: Arguments passed to L{BaseWorker.RunTask} + @type priority: number + @param priority: Task priority + + """ assert isinstance(args, (tuple, list)), "Arguments must be a sequence" + assert isinstance(priority, (int, long)), "Priority must be numeric" - self._tasks.append(args) + # This counter is used to ensure elements are processed in their + # incoming order. For processing they're sorted by priority and then + # counter. + self._counter += 1 + + heapq.heappush(self._tasks, (priority, self._counter, args)) # Notify a waiting worker self._pool_to_worker.notify() - def AddTask(self, *args): + def AddTask(self, args, priority=_DEFAULT_PRIORITY): """Adds a task to the queue. + @type args: sequence @param args: arguments passed to L{BaseWorker.RunTask} + @type priority: number + @param priority: Task priority """ self._lock.acquire() try: self._WaitWhileQuiescingUnlocked() - self._AddTaskUnlocked(args) + self._AddTaskUnlocked(args, priority) finally: self._lock.release() - def AddManyTasks(self, tasks): + def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY): """Add a list of tasks to the queue. @type tasks: list of tuples @param tasks: list of args passed to L{BaseWorker.RunTask} + @type priority: number or list of numbers + @param priority: Priority for all added tasks or a list with the priority + for each task """ assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \ "Each task must be a sequence" + assert (isinstance(priority, (int, long)) or + compat.all(isinstance(prio, (int, long)) for prio in priority)), \ + "Priority must be numeric or be a list of numeric values" + + if isinstance(priority, (int, long)): + priority = [priority] * len(tasks) + elif len(priority) != len(tasks): + raise errors.ProgrammerError("Number of priorities (%s) doesn't match" + " number of tasks (%s)" % + (len(priority), len(tasks))) + self._lock.acquire() try: self._WaitWhileQuiescingUnlocked() - for args in tasks: - self._AddTaskUnlocked(args) + assert compat.all(isinstance(prio, (int, long)) for prio in priority) + assert len(tasks) == len(priority) + + for args, priority in zip(tasks, priority): + self._AddTaskUnlocked(args, priority) finally: self._lock.release() @@ -249,7 +369,7 @@ class WorkerPool(object): # Get task from queue and tell pool about it try: - return self._tasks.popleft() + return heapq.heappop(self._tasks) finally: self._worker_to_pool.notifyAll() @@ -259,16 +379,6 @@ class WorkerPool(object): """ return (worker in self._termworkers) - def ShouldWorkerTerminate(self, worker): - """Returns whether a worker should terminate. - - """ - self._lock.acquire() - try: - return self._ShouldWorkerTerminateUnlocked(worker) - finally: - self._lock.release() - def _HasRunningTasksUnlocked(self): """Checks whether there's a task running in a worker.