X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/f1501b3f3bc8c6253e80d1b94aac2c4f46c25800..96e0d5cc0a39e253ef5b625a6b48547ed27ca701:/lib/workerpool.py diff --git a/lib/workerpool.py b/lib/workerpool.py index fa202d0..4736be5 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2008 Google Inc. +# Copyright (C) 2008, 2009, 2010 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -23,9 +23,34 @@ """ -import collections import logging import threading +import heapq + +from ganeti import compat +from ganeti import errors + + +_TERMINATE = object() +_DEFAULT_PRIORITY = 0 + + +class DeferTask(Exception): + """Special exception class to defer a task. + + This class can be raised by L{BaseWorker.RunTask} to defer the execution of a + task. Optionally, the priority of the task can be changed. + + """ + def __init__(self, priority=None): + """Initializes this class. + + @type priority: number + @param priority: New task priority (None means no change) + + """ + Exception.__init__(self) + self.priority = priority class BaseWorker(threading.Thread, object): @@ -34,6 +59,7 @@ class BaseWorker(threading.Thread, object): Users of a worker pool must override RunTask in a subclass. """ + # pylint: disable=W0212 def __init__(self, pool, worker_id): """Constructor for BaseWorker thread. @@ -41,33 +67,65 @@ class BaseWorker(threading.Thread, object): @param worker_id: identifier for this worker """ - super(BaseWorker, self).__init__() + super(BaseWorker, self).__init__(name=worker_id) self.pool = pool - self.worker_id = worker_id + self._worker_id = worker_id self._current_task = None - def ShouldTerminate(self): - """Returns whether a worker should terminate. + assert self.getName() == worker_id - """ - return self.pool.ShouldWorkerTerminate(self) + def ShouldTerminate(self): + """Returns whether this worker should terminate. - def _HasRunningTaskUnlocked(self): - """Returns whether this worker is currently running a task. + Should only be called from within L{RunTask}. """ - return (self._current_task is not None) + self.pool._lock.acquire() + try: + assert self._HasRunningTaskUnlocked() + return self.pool._ShouldWorkerTerminateUnlocked(self) + finally: + self.pool._lock.release() - def HasRunningTask(self): - """Returns whether this worker is currently running a task. + def GetCurrentPriority(self): + """Returns the priority of the current task. + + Should only be called from within L{RunTask}. """ self.pool._lock.acquire() try: - return self._HasRunningTaskUnlocked() + assert self._HasRunningTaskUnlocked() + + (priority, _, _) = self._current_task + + return priority finally: self.pool._lock.release() + def SetTaskName(self, taskname): + """Sets the name of the current task. + + Should only be called from within L{RunTask}. + + @type taskname: string + @param taskname: Task's name + + """ + if taskname: + name = "%s/%s" % (self._worker_id, taskname) + else: + name = self._worker_id + + # Set thread name + self.setName(name) + + def _HasRunningTaskUnlocked(self): + """Returns whether this worker is currently running a task. + + """ + return (self._current_task is not None) + def run(self): """Main thread function. @@ -76,62 +134,79 @@ class BaseWorker(threading.Thread, object): """ pool = self.pool - assert not self.HasRunningTask() - while True: + assert self._current_task is None + + defer = None try: - # We wait on lock to be told either terminate or do a task. + # Wait on lock to be told either to terminate or to do a task pool._lock.acquire() try: - if pool._ShouldWorkerTerminateUnlocked(self): - break + task = pool._WaitForTaskUnlocked(self) - # We only wait if there's no task for us. - if not pool._tasks: - logging.debug("Worker %s: waiting for tasks", self.worker_id) + if task is _TERMINATE: + # Told to terminate + break - # wait() releases the lock and sleeps until notified - pool._pool_to_worker.wait() + if task is None: + # Spurious notification, ignore + continue - logging.debug("Worker %s: notified while waiting", self.worker_id) + self._current_task = task - # Were we woken up in order to terminate? - if pool._ShouldWorkerTerminateUnlocked(self): - break + # No longer needed, dispose of reference + del task - if not pool._tasks: - # Spurious notification, ignore - continue + assert self._HasRunningTaskUnlocked() - # Get task from queue and tell pool about it - try: - self._current_task = pool._tasks.popleft() - finally: - pool._worker_to_pool.notifyAll() finally: pool._lock.release() - # Run the actual task + (priority, _, args) = self._current_task try: - logging.debug("Worker %s: starting task %r", - self.worker_id, self._current_task) - self.RunTask(*self._current_task) - logging.debug("Worker %s: done with task %r", - self.worker_id, self._current_task) - except: - logging.error("Worker %s: Caught unhandled exception", - self.worker_id, exc_info=True) + # Run the actual task + assert defer is None + logging.debug("Starting task %r, priority %s", args, priority) + assert self.getName() == self._worker_id + try: + self.RunTask(*args) # pylint: disable=W0142 + finally: + self.SetTaskName(None) + logging.debug("Done with task %r, priority %s", args, priority) + except DeferTask, err: + defer = err + + if defer.priority is None: + # Use same priority + defer.priority = priority + + logging.debug("Deferring task %r, new priority %s", + args, defer.priority) + + assert self._HasRunningTaskUnlocked() + except: # pylint: disable=W0702 + logging.exception("Caught unhandled exception") + + assert self._HasRunningTaskUnlocked() finally: # Notify pool pool._lock.acquire() try: + if defer: + assert self._current_task + # Schedule again for later run + (_, _, args) = self._current_task + pool._AddTaskUnlocked(args, defer.priority) + if self._current_task: self._current_task = None pool._worker_to_pool.notifyAll() finally: pool._lock.release() - logging.debug("Worker %s: terminates", self.worker_id) + assert not self._HasRunningTaskUnlocked() + + logging.debug("Terminates") def RunTask(self, *args): """Function called to start a task. @@ -152,7 +227,7 @@ class WorkerPool(object): guaranteed to finish in the same order. """ - def __init__(self, num_workers, worker_class): + def __init__(self, name, num_workers, worker_class): """Constructor for worker pool. @param num_workers: number of workers to be started @@ -167,6 +242,7 @@ class WorkerPool(object): self._pool_to_worker = threading.Condition(self._lock) self._worker_to_pool = threading.Condition(self._lock) self._worker_class = worker_class + self._name = name self._last_worker_id = 0 self._workers = [] self._quiescing = False @@ -175,55 +251,140 @@ class WorkerPool(object): self._termworkers = [] # Queued tasks - self._tasks = collections.deque() + self._counter = 0 + self._tasks = [] # Start workers self.Resize(num_workers) # TODO: Implement dynamic resizing? - def AddTask(self, *args): + def _WaitWhileQuiescingUnlocked(self): + """Wait until the worker pool has finished quiescing. + + """ + while self._quiescing: + self._pool_to_pool.wait() + + def _AddTaskUnlocked(self, args, priority): + """Adds a task to the internal queue. + + @type args: sequence + @param args: Arguments passed to L{BaseWorker.RunTask} + @type priority: number + @param priority: Task priority + + """ + assert isinstance(args, (tuple, list)), "Arguments must be a sequence" + assert isinstance(priority, (int, long)), "Priority must be numeric" + + # This counter is used to ensure elements are processed in their + # incoming order. For processing they're sorted by priority and then + # counter. + self._counter += 1 + + heapq.heappush(self._tasks, (priority, self._counter, args)) + + # Notify a waiting worker + self._pool_to_worker.notify() + + def AddTask(self, args, priority=_DEFAULT_PRIORITY): """Adds a task to the queue. + @type args: sequence @param args: arguments passed to L{BaseWorker.RunTask} + @type priority: number + @param priority: Task priority """ self._lock.acquire() try: - # Don't add new tasks while we're quiescing - while self._quiescing: - self._pool_to_pool.wait() - - # Add task to internal queue - self._tasks.append(args) - - # Wake one idling worker up - self._pool_to_worker.notify() + self._WaitWhileQuiescingUnlocked() + self._AddTaskUnlocked(args, priority) finally: self._lock.release() - def _ShouldWorkerTerminateUnlocked(self, worker): - """Returns whether a worker should terminate. + def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY): + """Add a list of tasks to the queue. + + @type tasks: list of tuples + @param tasks: list of args passed to L{BaseWorker.RunTask} + @type priority: number or list of numbers + @param priority: Priority for all added tasks or a list with the priority + for each task """ - return (worker in self._termworkers) + assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \ + "Each task must be a sequence" - def ShouldWorkerTerminate(self, worker): - """Returns whether a worker should terminate. + assert (isinstance(priority, (int, long)) or + compat.all(isinstance(prio, (int, long)) for prio in priority)), \ + "Priority must be numeric or be a list of numeric values" + + if isinstance(priority, (int, long)): + priority = [priority] * len(tasks) + elif len(priority) != len(tasks): + raise errors.ProgrammerError("Number of priorities (%s) doesn't match" + " number of tasks (%s)" % + (len(priority), len(tasks))) - """ self._lock.acquire() try: - return self._ShouldWorkerTerminateUnlocked(worker) + self._WaitWhileQuiescingUnlocked() + + assert compat.all(isinstance(prio, (int, long)) for prio in priority) + assert len(tasks) == len(priority) + + for args, priority in zip(tasks, priority): + self._AddTaskUnlocked(args, priority) finally: self._lock.release() + def _WaitForTaskUnlocked(self, worker): + """Waits for a task for a worker. + + @type worker: L{BaseWorker} + @param worker: Worker thread + + """ + if self._ShouldWorkerTerminateUnlocked(worker): + return _TERMINATE + + # We only wait if there's no task for us. + if not self._tasks: + logging.debug("Waiting for tasks") + + # wait() releases the lock and sleeps until notified + self._pool_to_worker.wait() + + logging.debug("Notified while waiting") + + # Were we woken up in order to terminate? + if self._ShouldWorkerTerminateUnlocked(worker): + return _TERMINATE + + if not self._tasks: + # Spurious notification, ignore + return None + + # Get task from queue and tell pool about it + try: + return heapq.heappop(self._tasks) + finally: + self._worker_to_pool.notifyAll() + + def _ShouldWorkerTerminateUnlocked(self, worker): + """Returns whether a worker should terminate. + + """ + return (worker in self._termworkers) + def _HasRunningTasksUnlocked(self): """Checks whether there's a task running in a worker. """ for worker in self._workers + self._termworkers: - if worker._HasRunningTaskUnlocked(): + if worker._HasRunningTaskUnlocked(): # pylint: disable=W0212 return True return False @@ -252,7 +413,8 @@ class WorkerPool(object): """ self._last_worker_id += 1 - return self._last_worker_id + + return "%s%d" % (self._name, self._last_worker_id) def _ResizeUnlocked(self, num_workers): """Changes the number of workers.