X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/b2e8a4d92aaa66d4123a654d86c449b22e953efe..61413377c7a1c624c4a5f2c2ec55dbc3d8691896:/lib/workerpool.py diff --git a/lib/workerpool.py b/lib/workerpool.py index 33ef932..d276b18 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -23,14 +23,34 @@ """ -import collections import logging import threading +import heapq from ganeti import compat +from ganeti import errors _TERMINATE = object() +_DEFAULT_PRIORITY = 0 + + +class DeferTask(Exception): + """Special exception class to defer a task. + + This class can be raised by L{BaseWorker.RunTask} to defer the execution of a + task. Optionally, the priority of the task can be changed. + + """ + def __init__(self, priority=None): + """Initializes this class. + + @type priority: number + @param priority: New task priority (None means no change) + + """ + Exception.__init__(self) + self.priority = priority class BaseWorker(threading.Thread, object): @@ -49,8 +69,11 @@ class BaseWorker(threading.Thread, object): """ super(BaseWorker, self).__init__(name=worker_id) self.pool = pool + self._worker_id = worker_id self._current_task = None + assert self.getName() == worker_id + def ShouldTerminate(self): """Returns whether this worker should terminate. @@ -64,6 +87,39 @@ class BaseWorker(threading.Thread, object): finally: self.pool._lock.release() + def GetCurrentPriority(self): + """Returns the priority of the current task. + + Should only be called from within L{RunTask}. + + """ + self.pool._lock.acquire() + try: + assert self._HasRunningTaskUnlocked() + + (priority, _, _) = self._current_task + + return priority + finally: + self.pool._lock.release() + + def SetTaskName(self, taskname): + """Sets the name of the current task. + + Should only be called from within L{RunTask}. + + @type taskname: string + @param taskname: Task's name + + """ + if taskname: + name = "%s/%s" % (self._worker_id, taskname) + else: + name = self._worker_id + + # Set thread name + self.setName(name) + def _HasRunningTaskUnlocked(self): """Returns whether this worker is currently running a task. @@ -78,9 +134,10 @@ class BaseWorker(threading.Thread, object): """ pool = self.pool - assert self._current_task is None - while True: + assert self._current_task is None + + defer = None try: # Wait on lock to be told either to terminate or to do a task pool._lock.acquire() @@ -97,15 +154,36 @@ class BaseWorker(threading.Thread, object): self._current_task = task + # No longer needed, dispose of reference + del task + assert self._HasRunningTaskUnlocked() + finally: pool._lock.release() - # Run the actual task + (priority, _, args) = self._current_task try: - logging.debug("Starting task %r", self._current_task) - self.RunTask(*self._current_task) - logging.debug("Done with task %r", self._current_task) + # Run the actual task + assert defer is None + logging.debug("Starting task %r, priority %s", args, priority) + assert self.getName() == self._worker_id + try: + self.RunTask(*args) # pylint: disable-msg=W0142 + finally: + self.SetTaskName(None) + logging.debug("Done with task %r, priority %s", args, priority) + except DeferTask, err: + defer = err + + if defer.priority is None: + # Use same priority + defer.priority = priority + + logging.debug("Deferring task %r, new priority %s", + args, defer.priority) + + assert self._HasRunningTaskUnlocked() except: # pylint: disable-msg=W0702 logging.exception("Caught unhandled exception") @@ -114,6 +192,12 @@ class BaseWorker(threading.Thread, object): # Notify pool pool._lock.acquire() try: + if defer: + assert self._current_task + # Schedule again for later run + (_, _, args) = self._current_task + pool._AddTaskUnlocked(args, defer.priority) + if self._current_task: self._current_task = None pool._worker_to_pool.notifyAll() @@ -167,7 +251,8 @@ class WorkerPool(object): self._termworkers = [] # Queued tasks - self._tasks = collections.deque() + self._counter = 0 + self._tasks = [] # Start workers self.Resize(num_workers) @@ -181,44 +266,77 @@ class WorkerPool(object): while self._quiescing: self._pool_to_pool.wait() - def _AddTaskUnlocked(self, args): + def _AddTaskUnlocked(self, args, priority): + """Adds a task to the internal queue. + + @type args: sequence + @param args: Arguments passed to L{BaseWorker.RunTask} + @type priority: number + @param priority: Task priority + + """ assert isinstance(args, (tuple, list)), "Arguments must be a sequence" + assert isinstance(priority, (int, long)), "Priority must be numeric" + + # This counter is used to ensure elements are processed in their + # incoming order. For processing they're sorted by priority and then + # counter. + self._counter += 1 - self._tasks.append(args) + heapq.heappush(self._tasks, (priority, self._counter, args)) # Notify a waiting worker self._pool_to_worker.notify() - def AddTask(self, args): + def AddTask(self, args, priority=_DEFAULT_PRIORITY): """Adds a task to the queue. @type args: sequence @param args: arguments passed to L{BaseWorker.RunTask} + @type priority: number + @param priority: Task priority """ self._lock.acquire() try: self._WaitWhileQuiescingUnlocked() - self._AddTaskUnlocked(args) + self._AddTaskUnlocked(args, priority) finally: self._lock.release() - def AddManyTasks(self, tasks): + def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY): """Add a list of tasks to the queue. @type tasks: list of tuples @param tasks: list of args passed to L{BaseWorker.RunTask} + @type priority: number or list of numbers + @param priority: Priority for all added tasks or a list with the priority + for each task """ assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \ "Each task must be a sequence" + assert (isinstance(priority, (int, long)) or + compat.all(isinstance(prio, (int, long)) for prio in priority)), \ + "Priority must be numeric or be a list of numeric values" + + if isinstance(priority, (int, long)): + priority = [priority] * len(tasks) + elif len(priority) != len(tasks): + raise errors.ProgrammerError("Number of priorities (%s) doesn't match" + " number of tasks (%s)" % + (len(priority), len(tasks))) + self._lock.acquire() try: self._WaitWhileQuiescingUnlocked() - for args in tasks: - self._AddTaskUnlocked(args) + assert compat.all(isinstance(prio, (int, long)) for prio in priority) + assert len(tasks) == len(priority) + + for args, priority in zip(tasks, priority): + self._AddTaskUnlocked(args, priority) finally: self._lock.release() @@ -251,7 +369,7 @@ class WorkerPool(object): # Get task from queue and tell pool about it try: - return self._tasks.popleft() + return heapq.heappop(self._tasks) finally: self._worker_to_pool.notifyAll()