X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/c30421e0437a2780a05bb85a63959c1d7abfb445..fac489a56410b927f19f81ef72755cab9f2b9d29:/lib/workerpool.py diff --git a/lib/workerpool.py b/lib/workerpool.py index 1f3bcd8..6b558ce 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -26,6 +26,7 @@ import logging import threading import heapq +import itertools from ganeti import compat from ganeti import errors @@ -53,13 +54,19 @@ class DeferTask(Exception): self.priority = priority +class NoSuchTask(Exception): + """Exception raised when a task can't be found. + + """ + + class BaseWorker(threading.Thread, object): """Base worker class for worker pools. Users of a worker pool must override RunTask in a subclass. """ - # pylint: disable-msg=W0212 + # pylint: disable=W0212 def __init__(self, pool, worker_id): """Constructor for BaseWorker thread. @@ -97,7 +104,7 @@ class BaseWorker(threading.Thread, object): try: assert self._HasRunningTaskUnlocked() - (priority, _, _) = self._current_task + (priority, _, _, _) = self._current_task return priority finally: @@ -126,6 +133,22 @@ class BaseWorker(threading.Thread, object): """ return (self._current_task is not None) + def _GetCurrentOrderAndTaskId(self): + """Returns the order and task ID of the current task. + + Should only be called from within L{RunTask}. + + """ + self.pool._lock.acquire() + try: + assert self._HasRunningTaskUnlocked() + + (_, order_id, task_id, _) = self._current_task + + return (order_id, task_id) + finally: + self.pool._lock.release() + def run(self): """Main thread function. @@ -162,14 +185,14 @@ class BaseWorker(threading.Thread, object): finally: pool._lock.release() - (priority, _, args) = self._current_task + (priority, _, _, args) = self._current_task try: # Run the actual task assert defer is None logging.debug("Starting task %r, priority %s", args, priority) assert self.getName() == self._worker_id try: - self.RunTask(*args) # pylint: disable-msg=W0142 + self.RunTask(*args) # pylint: disable=W0142 finally: self.SetTaskName(None) logging.debug("Done with task %r, priority %s", args, priority) @@ -180,10 +203,11 @@ class BaseWorker(threading.Thread, object): # Use same priority defer.priority = priority - logging.debug("Deferring task %r, new priority %s", defer.priority) + logging.debug("Deferring task %r, new priority %s", + args, defer.priority) assert self._HasRunningTaskUnlocked() - except: # pylint: disable-msg=W0702 + except: # pylint: disable=W0702 logging.exception("Caught unhandled exception") assert self._HasRunningTaskUnlocked() @@ -194,8 +218,8 @@ class BaseWorker(threading.Thread, object): if defer: assert self._current_task # Schedule again for later run - (_, _, args) = self._current_task - pool._AddTaskUnlocked(args, defer.priority) + (_, _, task_id, args) = self._current_task + pool._AddTaskUnlocked(args, defer.priority, task_id) if self._current_task: self._current_task = None @@ -225,6 +249,18 @@ class WorkerPool(object): added to the pool. Due to the nature of threading, they're not guaranteed to finish in the same order. + @type _tasks: list of tuples + @ivar _tasks: Each tuple has the format (priority, order ID, task ID, + arguments). Priority and order ID are numeric and essentially control the + sort order. The order ID is an increasing number denoting the order in + which tasks are added to the queue. The task ID is controlled by user of + workerpool, see L{AddTask} for details. The task arguments are C{None} for + abandoned tasks, otherwise a sequence of arguments to be passed to + L{BaseWorker.RunTask}). The list must fulfill the heap property (for use by + the C{heapq} module). + @type _taskdata: dict; (task IDs as keys, tuples as values) + @ivar _taskdata: Mapping from task IDs to entries in L{_tasks} + """ def __init__(self, name, num_workers, worker_class): """Constructor for worker pool. @@ -245,13 +281,15 @@ class WorkerPool(object): self._last_worker_id = 0 self._workers = [] self._quiescing = False + self._active = True # Terminating workers self._termworkers = [] # Queued tasks - self._counter = 0 + self._counter = itertools.count() self._tasks = [] + self._taskdata = {} # Start workers self.Resize(num_workers) @@ -265,45 +303,57 @@ class WorkerPool(object): while self._quiescing: self._pool_to_pool.wait() - def _AddTaskUnlocked(self, args, priority): + def _AddTaskUnlocked(self, args, priority, task_id): """Adds a task to the internal queue. @type args: sequence @param args: Arguments passed to L{BaseWorker.RunTask} @type priority: number @param priority: Task priority + @param task_id: Task ID """ assert isinstance(args, (tuple, list)), "Arguments must be a sequence" assert isinstance(priority, (int, long)), "Priority must be numeric" + assert task_id is None or isinstance(task_id, (int, long)), \ + "Task ID must be numeric or None" - # This counter is used to ensure elements are processed in their - # incoming order. For processing they're sorted by priority and then - # counter. - self._counter += 1 + task = [priority, self._counter.next(), task_id, args] - heapq.heappush(self._tasks, (priority, self._counter, args)) + if task_id is not None: + assert task_id not in self._taskdata + # Keep a reference to change priority later if necessary + self._taskdata[task_id] = task + + # A counter is used to ensure elements are processed in their incoming + # order. For processing they're sorted by priority and then counter. + heapq.heappush(self._tasks, task) # Notify a waiting worker self._pool_to_worker.notify() - def AddTask(self, args, priority=_DEFAULT_PRIORITY): + def AddTask(self, args, priority=_DEFAULT_PRIORITY, task_id=None): """Adds a task to the queue. @type args: sequence @param args: arguments passed to L{BaseWorker.RunTask} @type priority: number @param priority: Task priority + @param task_id: Task ID + @note: The task ID can be essentially anything that can be used as a + dictionary key. Callers, however, must ensure a task ID is unique while a + task is in the pool or while it might return to the pool due to deferring + using L{DeferTask}. """ self._lock.acquire() try: self._WaitWhileQuiescingUnlocked() - self._AddTaskUnlocked(args, priority) + self._AddTaskUnlocked(args, priority, task_id) finally: self._lock.release() - def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY): + def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY, task_id=None): """Add a list of tasks to the queue. @type tasks: list of tuples @@ -311,14 +361,18 @@ class WorkerPool(object): @type priority: number or list of numbers @param priority: Priority for all added tasks or a list with the priority for each task + @type task_id: list + @param task_id: List with the ID for each task + @note: See L{AddTask} for a note on task IDs. """ assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \ - "Each task must be a sequence" - + "Each task must be a sequence" assert (isinstance(priority, (int, long)) or compat.all(isinstance(prio, (int, long)) for prio in priority)), \ "Priority must be numeric or be a list of numeric values" + assert task_id is None or isinstance(task_id, (tuple, list)), \ + "Task IDs must be in a sequence" if isinstance(priority, (int, long)): priority = [priority] * len(tasks) @@ -327,15 +381,91 @@ class WorkerPool(object): " number of tasks (%s)" % (len(priority), len(tasks))) + if task_id is None: + task_id = [None] * len(tasks) + elif len(task_id) != len(tasks): + raise errors.ProgrammerError("Number of task IDs (%s) doesn't match" + " number of tasks (%s)" % + (len(task_id), len(tasks))) + self._lock.acquire() try: self._WaitWhileQuiescingUnlocked() assert compat.all(isinstance(prio, (int, long)) for prio in priority) assert len(tasks) == len(priority) + assert len(tasks) == len(task_id) + + for (args, prio, tid) in zip(tasks, priority, task_id): + self._AddTaskUnlocked(args, prio, tid) + finally: + self._lock.release() + + def ChangeTaskPriority(self, task_id, priority): + """Changes a task's priority. + + @param task_id: Task ID + @type priority: number + @param priority: New task priority + @raise NoSuchTask: When the task referred by C{task_id} can not be found + (it may never have existed, may have already been processed, or is + currently running) + + """ + assert isinstance(priority, (int, long)), "Priority must be numeric" + + self._lock.acquire() + try: + logging.debug("About to change priority of task %s to %s", + task_id, priority) + + # Find old task + oldtask = self._taskdata.get(task_id, None) + if oldtask is None: + msg = "Task '%s' was not found" % task_id + logging.debug(msg) + raise NoSuchTask(msg) + + # Prepare new task + newtask = [priority] + oldtask[1:] + + # Mark old entry as abandoned (this doesn't change the sort order and + # therefore doesn't invalidate the heap property of L{self._tasks}). + # See also . + oldtask[-1] = None + + # Change reference to new task entry and forget the old one + assert task_id is not None + self._taskdata[task_id] = newtask + + # Add a new task with the old number and arguments + heapq.heappush(self._tasks, newtask) + + # Notify a waiting worker + self._pool_to_worker.notify() + finally: + self._lock.release() + + def SetActive(self, active): + """Enable/disable processing of tasks. + + This is different from L{Quiesce} in the sense that this function just + changes an internal flag and doesn't wait for the queue to be empty. Tasks + already being processed continue normally, but no new tasks will be + started. New tasks can still be added. + + @type active: bool + @param active: Whether tasks should be processed + + """ + self._lock.acquire() + try: + self._active = active - for args, priority in zip(tasks, priority): - self._AddTaskUnlocked(args, priority) + if active: + # Tell all workers to continue processing + self._pool_to_worker.notifyAll() finally: self._lock.release() @@ -346,11 +476,32 @@ class WorkerPool(object): @param worker: Worker thread """ - if self._ShouldWorkerTerminateUnlocked(worker): - return _TERMINATE + while True: + if self._ShouldWorkerTerminateUnlocked(worker): + return _TERMINATE + + # If there's a pending task, return it immediately + if self._active and self._tasks: + # Get task from queue and tell pool about it + try: + task = heapq.heappop(self._tasks) + finally: + self._worker_to_pool.notifyAll() + + (_, _, task_id, args) = task + + # If the priority was changed, "args" is None + if args is None: + # Try again + logging.debug("Found abandoned task (%r)", task) + continue + + # Delete reference + if task_id is not None: + del self._taskdata[task_id] + + return task - # We only wait if there's no task for us. - if not self._tasks: logging.debug("Waiting for tasks") # wait() releases the lock and sleeps until notified @@ -358,20 +509,6 @@ class WorkerPool(object): logging.debug("Notified while waiting") - # Were we woken up in order to terminate? - if self._ShouldWorkerTerminateUnlocked(worker): - return _TERMINATE - - if not self._tasks: - # Spurious notification, ignore - return None - - # Get task from queue and tell pool about it - try: - return heapq.heappop(self._tasks) - finally: - self._worker_to_pool.notifyAll() - def _ShouldWorkerTerminateUnlocked(self, worker): """Returns whether a worker should terminate. @@ -383,10 +520,20 @@ class WorkerPool(object): """ for worker in self._workers + self._termworkers: - if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212 + if worker._HasRunningTaskUnlocked(): # pylint: disable=W0212 return True return False + def HasRunningTasks(self): + """Checks whether there's at least one task running. + + """ + self._lock.acquire() + try: + return self._HasRunningTasksUnlocked() + finally: + self._lock.release() + def Quiesce(self): """Waits until the task queue is empty.