X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/25e557a59ed2643ee6b0a9fb6f6cc71fd31c3e28..cf572b1378a61db132d84a5e03b0e5a157848caa:/lib/workerpool.py diff --git a/lib/workerpool.py b/lib/workerpool.py index 0ca9155..8db03c7 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2008 Google Inc. +# Copyright (C) 2008, 2009, 2010 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -23,11 +23,34 @@ """ -import collections import logging import threading +import heapq from ganeti import compat +from ganeti import errors + + +_TERMINATE = object() +_DEFAULT_PRIORITY = 0 + + +class DeferTask(Exception): + """Special exception class to defer a task. + + This class can be raised by L{BaseWorker.RunTask} to defer the execution of a + task. Optionally, the priority of the task can be changed. + + """ + def __init__(self, priority=None): + """Initializes this class. + + @type priority: number + @param priority: New task priority (None means no change) + + """ + Exception.__init__(self) + self.priority = priority class BaseWorker(threading.Thread, object): @@ -36,7 +59,7 @@ class BaseWorker(threading.Thread, object): Users of a worker pool must override RunTask in a subclass. """ - # pylint: disable-msg=W0212 + # pylint: disable=W0212 def __init__(self, pool, worker_id): """Constructor for BaseWorker thread. @@ -46,30 +69,63 @@ class BaseWorker(threading.Thread, object): """ super(BaseWorker, self).__init__(name=worker_id) self.pool = pool + self._worker_id = worker_id self._current_task = None - def ShouldTerminate(self): - """Returns whether a worker should terminate. + assert self.getName() == worker_id - """ - return self.pool.ShouldWorkerTerminate(self) + def ShouldTerminate(self): + """Returns whether this worker should terminate. - def _HasRunningTaskUnlocked(self): - """Returns whether this worker is currently running a task. + Should only be called from within L{RunTask}. """ - return (self._current_task is not None) + self.pool._lock.acquire() + try: + assert self._HasRunningTaskUnlocked() + return self.pool._ShouldWorkerTerminateUnlocked(self) + finally: + self.pool._lock.release() - def HasRunningTask(self): - """Returns whether this worker is currently running a task. + def GetCurrentPriority(self): + """Returns the priority of the current task. + + Should only be called from within L{RunTask}. """ self.pool._lock.acquire() try: - return self._HasRunningTaskUnlocked() + assert self._HasRunningTaskUnlocked() + + (priority, _, _) = self._current_task + + return priority finally: self.pool._lock.release() + def SetTaskName(self, taskname): + """Sets the name of the current task. + + Should only be called from within L{RunTask}. + + @type taskname: string + @param taskname: Task's name + + """ + if taskname: + name = "%s/%s" % (self._worker_id, taskname) + else: + name = self._worker_id + + # Set thread name + self.setName(name) + + def _HasRunningTaskUnlocked(self): + """Returns whether this worker is currently running a task. + + """ + return (self._current_task is not None) + def run(self): """Main thread function. @@ -78,58 +134,78 @@ class BaseWorker(threading.Thread, object): """ pool = self.pool - assert not self.HasRunningTask() - while True: + assert self._current_task is None + + defer = None try: - # We wait on lock to be told either terminate or do a task. + # Wait on lock to be told either to terminate or to do a task pool._lock.acquire() try: - if pool._ShouldWorkerTerminateUnlocked(self): - break + task = pool._WaitForTaskUnlocked(self) - # We only wait if there's no task for us. - if not pool._tasks: - logging.debug("Waiting for tasks") + if task is _TERMINATE: + # Told to terminate + break - # wait() releases the lock and sleeps until notified - pool._pool_to_worker.wait() + if task is None: + # Spurious notification, ignore + continue - logging.debug("Notified while waiting") + self._current_task = task - # Were we woken up in order to terminate? - if pool._ShouldWorkerTerminateUnlocked(self): - break + # No longer needed, dispose of reference + del task - if not pool._tasks: - # Spurious notification, ignore - continue + assert self._HasRunningTaskUnlocked() - # Get task from queue and tell pool about it - try: - self._current_task = pool._tasks.popleft() - finally: - pool._worker_to_pool.notifyAll() finally: pool._lock.release() - # Run the actual task + (priority, _, args) = self._current_task try: - logging.debug("Starting task %r", self._current_task) - self.RunTask(*self._current_task) - logging.debug("Done with task %r", self._current_task) - except: # pylint: disable-msg=W0702 + # Run the actual task + assert defer is None + logging.debug("Starting task %r, priority %s", args, priority) + assert self.getName() == self._worker_id + try: + self.RunTask(*args) # pylint: disable=W0142 + finally: + self.SetTaskName(None) + logging.debug("Done with task %r, priority %s", args, priority) + except DeferTask, err: + defer = err + + if defer.priority is None: + # Use same priority + defer.priority = priority + + logging.debug("Deferring task %r, new priority %s", + args, defer.priority) + + assert self._HasRunningTaskUnlocked() + except: # pylint: disable=W0702 logging.exception("Caught unhandled exception") + + assert self._HasRunningTaskUnlocked() finally: # Notify pool pool._lock.acquire() try: + if defer: + assert self._current_task + # Schedule again for later run + (_, _, args) = self._current_task + pool._AddTaskUnlocked(args, defer.priority) + if self._current_task: self._current_task = None pool._worker_to_pool.notifyAll() finally: pool._lock.release() + assert not self._HasRunningTaskUnlocked() + logging.debug("Terminates") def RunTask(self, *args): @@ -170,12 +246,14 @@ class WorkerPool(object): self._last_worker_id = 0 self._workers = [] self._quiescing = False + self._active = True # Terminating workers self._termworkers = [] # Queued tasks - self._tasks = collections.deque() + self._counter = 0 + self._tasks = [] # Start workers self.Resize(num_workers) @@ -189,69 +267,161 @@ class WorkerPool(object): while self._quiescing: self._pool_to_pool.wait() - def AddTask(self, *args): + def _AddTaskUnlocked(self, args, priority): + """Adds a task to the internal queue. + + @type args: sequence + @param args: Arguments passed to L{BaseWorker.RunTask} + @type priority: number + @param priority: Task priority + + """ + assert isinstance(args, (tuple, list)), "Arguments must be a sequence" + assert isinstance(priority, (int, long)), "Priority must be numeric" + + # This counter is used to ensure elements are processed in their + # incoming order. For processing they're sorted by priority and then + # counter. + self._counter += 1 + + heapq.heappush(self._tasks, (priority, self._counter, args)) + + # Notify a waiting worker + self._pool_to_worker.notify() + + def AddTask(self, args, priority=_DEFAULT_PRIORITY): """Adds a task to the queue. + @type args: sequence @param args: arguments passed to L{BaseWorker.RunTask} + @type priority: number + @param priority: Task priority """ self._lock.acquire() try: self._WaitWhileQuiescingUnlocked() - - self._tasks.append(args) - - # Wake one idling worker up - self._pool_to_worker.notify() + self._AddTaskUnlocked(args, priority) finally: self._lock.release() - def AddManyTasks(self, tasks): + def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY): """Add a list of tasks to the queue. @type tasks: list of tuples @param tasks: list of args passed to L{BaseWorker.RunTask} + @type priority: number or list of numbers + @param priority: Priority for all added tasks or a list with the priority + for each task """ assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \ "Each task must be a sequence" + assert (isinstance(priority, (int, long)) or + compat.all(isinstance(prio, (int, long)) for prio in priority)), \ + "Priority must be numeric or be a list of numeric values" + + if isinstance(priority, (int, long)): + priority = [priority] * len(tasks) + elif len(priority) != len(tasks): + raise errors.ProgrammerError("Number of priorities (%s) doesn't match" + " number of tasks (%s)" % + (len(priority), len(tasks))) + self._lock.acquire() try: self._WaitWhileQuiescingUnlocked() - self._tasks.extend(tasks) + assert compat.all(isinstance(prio, (int, long)) for prio in priority) + assert len(tasks) == len(priority) - for _ in tasks: - self._pool_to_worker.notify() + for args, priority in zip(tasks, priority): + self._AddTaskUnlocked(args, priority) finally: self._lock.release() - def _ShouldWorkerTerminateUnlocked(self, worker): - """Returns whether a worker should terminate. + def SetActive(self, active): + """Enable/disable processing of tasks. - """ - return (worker in self._termworkers) + This is different from L{Quiesce} in the sense that this function just + changes an internal flag and doesn't wait for the queue to be empty. Tasks + already being processed continue normally, but no new tasks will be + started. New tasks can still be added. - def ShouldWorkerTerminate(self, worker): - """Returns whether a worker should terminate. + @type active: bool + @param active: Whether tasks should be processed """ self._lock.acquire() try: - return self._ShouldWorkerTerminateUnlocked(worker) + self._active = active + + if active: + # Tell all workers to continue processing + self._pool_to_worker.notifyAll() finally: self._lock.release() + def _WaitForTaskUnlocked(self, worker): + """Waits for a task for a worker. + + @type worker: L{BaseWorker} + @param worker: Worker thread + + """ + if self._ShouldWorkerTerminateUnlocked(worker): + return _TERMINATE + + # We only wait if there's no task for us. + if not (self._active and self._tasks): + logging.debug("Waiting for tasks") + + while True: + # wait() releases the lock and sleeps until notified + self._pool_to_worker.wait() + + logging.debug("Notified while waiting") + + # Were we woken up in order to terminate? + if self._ShouldWorkerTerminateUnlocked(worker): + return _TERMINATE + + # Just loop if pool is not processing tasks at this time + if self._active and self._tasks: + break + + # Get task from queue and tell pool about it + try: + return heapq.heappop(self._tasks) + finally: + self._worker_to_pool.notifyAll() + + def _ShouldWorkerTerminateUnlocked(self, worker): + """Returns whether a worker should terminate. + + """ + return (worker in self._termworkers) + def _HasRunningTasksUnlocked(self): """Checks whether there's a task running in a worker. """ for worker in self._workers + self._termworkers: - if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212 + if worker._HasRunningTaskUnlocked(): # pylint: disable=W0212 return True return False + def HasRunningTasks(self): + """Checks whether there's at least one task running. + + """ + self._lock.acquire() + try: + return self._HasRunningTasksUnlocked() + finally: + self._lock.release() + def Quiesce(self): """Waits until the task queue is empty.