X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/ad1bf20c8f1a500bfbd6430465168fa9491be2de..c113a9ab55b9b4a825f3b8f2ea1644d2ec3e2f24:/lib/workerpool.py?ds=inline diff --git a/lib/workerpool.py b/lib/workerpool.py index 54b3fb7..8db03c7 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2008 Google Inc. +# Copyright (C) 2008, 2009, 2010 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -23,9 +23,34 @@ """ -import collections import logging import threading +import heapq + +from ganeti import compat +from ganeti import errors + + +_TERMINATE = object() +_DEFAULT_PRIORITY = 0 + + +class DeferTask(Exception): + """Special exception class to defer a task. + + This class can be raised by L{BaseWorker.RunTask} to defer the execution of a + task. Optionally, the priority of the task can be changed. + + """ + def __init__(self, priority=None): + """Initializes this class. + + @type priority: number + @param priority: New task priority (None means no change) + + """ + Exception.__init__(self) + self.priority = priority class BaseWorker(threading.Thread, object): @@ -34,6 +59,7 @@ class BaseWorker(threading.Thread, object): Users of a worker pool must override RunTask in a subclass. """ + # pylint: disable=W0212 def __init__(self, pool, worker_id): """Constructor for BaseWorker thread. @@ -41,33 +67,65 @@ class BaseWorker(threading.Thread, object): @param worker_id: identifier for this worker """ - super(BaseWorker, self).__init__() + super(BaseWorker, self).__init__(name=worker_id) self.pool = pool - self.worker_id = worker_id + self._worker_id = worker_id self._current_task = None - def ShouldTerminate(self): - """Returns whether a worker should terminate. + assert self.getName() == worker_id - """ - return self.pool.ShouldWorkerTerminate(self) + def ShouldTerminate(self): + """Returns whether this worker should terminate. - def _HasRunningTaskUnlocked(self): - """Returns whether this worker is currently running a task. + Should only be called from within L{RunTask}. """ - return (self._current_task is not None) + self.pool._lock.acquire() + try: + assert self._HasRunningTaskUnlocked() + return self.pool._ShouldWorkerTerminateUnlocked(self) + finally: + self.pool._lock.release() - def HasRunningTask(self): - """Returns whether this worker is currently running a task. + def GetCurrentPriority(self): + """Returns the priority of the current task. + + Should only be called from within L{RunTask}. """ self.pool._lock.acquire() try: - return self._HasRunningTaskUnlocked() + assert self._HasRunningTaskUnlocked() + + (priority, _, _) = self._current_task + + return priority finally: self.pool._lock.release() + def SetTaskName(self, taskname): + """Sets the name of the current task. + + Should only be called from within L{RunTask}. + + @type taskname: string + @param taskname: Task's name + + """ + if taskname: + name = "%s/%s" % (self._worker_id, taskname) + else: + name = self._worker_id + + # Set thread name + self.setName(name) + + def _HasRunningTaskUnlocked(self): + """Returns whether this worker is currently running a task. + + """ + return (self._current_task is not None) + def run(self): """Main thread function. @@ -76,62 +134,79 @@ class BaseWorker(threading.Thread, object): """ pool = self.pool - assert not self.HasRunningTask() - while True: + assert self._current_task is None + + defer = None try: - # We wait on lock to be told either terminate or do a task. + # Wait on lock to be told either to terminate or to do a task pool._lock.acquire() try: - if pool._ShouldWorkerTerminateUnlocked(self): - break + task = pool._WaitForTaskUnlocked(self) - # We only wait if there's no task for us. - if not pool._tasks: - logging.debug("Worker %s: waiting for tasks", self.worker_id) + if task is _TERMINATE: + # Told to terminate + break - # wait() releases the lock and sleeps until notified - pool._pool_to_worker.wait() + if task is None: + # Spurious notification, ignore + continue - logging.debug("Worker %s: notified while waiting", self.worker_id) + self._current_task = task - # Were we woken up in order to terminate? - if pool._ShouldWorkerTerminateUnlocked(self): - break + # No longer needed, dispose of reference + del task - if not pool._tasks: - # Spurious notification, ignore - continue + assert self._HasRunningTaskUnlocked() - # Get task from queue and tell pool about it - try: - self._current_task = pool._tasks.popleft() - finally: - pool._worker_to_pool.notifyAll() finally: pool._lock.release() - # Run the actual task + (priority, _, args) = self._current_task try: - logging.debug("Worker %s: starting task %r", - self.worker_id, self._current_task) - self.RunTask(*self._current_task) - logging.debug("Worker %s: done with task %r", - self.worker_id, self._current_task) - except: - logging.error("Worker %s: Caught unhandled exception", - self.worker_id, exc_info=True) + # Run the actual task + assert defer is None + logging.debug("Starting task %r, priority %s", args, priority) + assert self.getName() == self._worker_id + try: + self.RunTask(*args) # pylint: disable=W0142 + finally: + self.SetTaskName(None) + logging.debug("Done with task %r, priority %s", args, priority) + except DeferTask, err: + defer = err + + if defer.priority is None: + # Use same priority + defer.priority = priority + + logging.debug("Deferring task %r, new priority %s", + args, defer.priority) + + assert self._HasRunningTaskUnlocked() + except: # pylint: disable=W0702 + logging.exception("Caught unhandled exception") + + assert self._HasRunningTaskUnlocked() finally: # Notify pool pool._lock.acquire() try: + if defer: + assert self._current_task + # Schedule again for later run + (_, _, args) = self._current_task + pool._AddTaskUnlocked(args, defer.priority) + if self._current_task: self._current_task = None pool._worker_to_pool.notifyAll() finally: pool._lock.release() - logging.debug("Worker %s: terminates", self.worker_id) + assert not self._HasRunningTaskUnlocked() + + logging.debug("Terminates") def RunTask(self, *args): """Function called to start a task. @@ -152,7 +227,7 @@ class WorkerPool(object): guaranteed to finish in the same order. """ - def __init__(self, num_workers, worker_class): + def __init__(self, name, num_workers, worker_class): """Constructor for worker pool. @param num_workers: number of workers to be started @@ -167,66 +242,186 @@ class WorkerPool(object): self._pool_to_worker = threading.Condition(self._lock) self._worker_to_pool = threading.Condition(self._lock) self._worker_class = worker_class + self._name = name self._last_worker_id = 0 self._workers = [] self._quiescing = False + self._active = True # Terminating workers self._termworkers = [] # Queued tasks - self._tasks = collections.deque() + self._counter = 0 + self._tasks = [] # Start workers self.Resize(num_workers) # TODO: Implement dynamic resizing? - def AddTask(self, *args): + def _WaitWhileQuiescingUnlocked(self): + """Wait until the worker pool has finished quiescing. + + """ + while self._quiescing: + self._pool_to_pool.wait() + + def _AddTaskUnlocked(self, args, priority): + """Adds a task to the internal queue. + + @type args: sequence + @param args: Arguments passed to L{BaseWorker.RunTask} + @type priority: number + @param priority: Task priority + + """ + assert isinstance(args, (tuple, list)), "Arguments must be a sequence" + assert isinstance(priority, (int, long)), "Priority must be numeric" + + # This counter is used to ensure elements are processed in their + # incoming order. For processing they're sorted by priority and then + # counter. + self._counter += 1 + + heapq.heappush(self._tasks, (priority, self._counter, args)) + + # Notify a waiting worker + self._pool_to_worker.notify() + + def AddTask(self, args, priority=_DEFAULT_PRIORITY): """Adds a task to the queue. + @type args: sequence @param args: arguments passed to L{BaseWorker.RunTask} + @type priority: number + @param priority: Task priority """ self._lock.acquire() try: - # Don't add new tasks while we're quiescing - while self._quiescing: - self._pool_to_pool.wait() + self._WaitWhileQuiescingUnlocked() + self._AddTaskUnlocked(args, priority) + finally: + self._lock.release() + + def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY): + """Add a list of tasks to the queue. - # Add task to internal queue - self._tasks.append(args) + @type tasks: list of tuples + @param tasks: list of args passed to L{BaseWorker.RunTask} + @type priority: number or list of numbers + @param priority: Priority for all added tasks or a list with the priority + for each task - # Wake one idling worker up - self._pool_to_worker.notify() + """ + assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \ + "Each task must be a sequence" + + assert (isinstance(priority, (int, long)) or + compat.all(isinstance(prio, (int, long)) for prio in priority)), \ + "Priority must be numeric or be a list of numeric values" + + if isinstance(priority, (int, long)): + priority = [priority] * len(tasks) + elif len(priority) != len(tasks): + raise errors.ProgrammerError("Number of priorities (%s) doesn't match" + " number of tasks (%s)" % + (len(priority), len(tasks))) + + self._lock.acquire() + try: + self._WaitWhileQuiescingUnlocked() + + assert compat.all(isinstance(prio, (int, long)) for prio in priority) + assert len(tasks) == len(priority) + + for args, priority in zip(tasks, priority): + self._AddTaskUnlocked(args, priority) finally: self._lock.release() - def _ShouldWorkerTerminateUnlocked(self, worker): - """Returns whether a worker should terminate. + def SetActive(self, active): + """Enable/disable processing of tasks. - """ - return (worker in self._termworkers) + This is different from L{Quiesce} in the sense that this function just + changes an internal flag and doesn't wait for the queue to be empty. Tasks + already being processed continue normally, but no new tasks will be + started. New tasks can still be added. - def ShouldWorkerTerminate(self, worker): - """Returns whether a worker should terminate. + @type active: bool + @param active: Whether tasks should be processed """ self._lock.acquire() try: - return self._ShouldWorkerTerminateUnlocked(worker) + self._active = active + + if active: + # Tell all workers to continue processing + self._pool_to_worker.notifyAll() finally: self._lock.release() + def _WaitForTaskUnlocked(self, worker): + """Waits for a task for a worker. + + @type worker: L{BaseWorker} + @param worker: Worker thread + + """ + if self._ShouldWorkerTerminateUnlocked(worker): + return _TERMINATE + + # We only wait if there's no task for us. + if not (self._active and self._tasks): + logging.debug("Waiting for tasks") + + while True: + # wait() releases the lock and sleeps until notified + self._pool_to_worker.wait() + + logging.debug("Notified while waiting") + + # Were we woken up in order to terminate? + if self._ShouldWorkerTerminateUnlocked(worker): + return _TERMINATE + + # Just loop if pool is not processing tasks at this time + if self._active and self._tasks: + break + + # Get task from queue and tell pool about it + try: + return heapq.heappop(self._tasks) + finally: + self._worker_to_pool.notifyAll() + + def _ShouldWorkerTerminateUnlocked(self, worker): + """Returns whether a worker should terminate. + + """ + return (worker in self._termworkers) + def _HasRunningTasksUnlocked(self): """Checks whether there's a task running in a worker. """ for worker in self._workers + self._termworkers: - if worker._HasRunningTaskUnlocked(): + if worker._HasRunningTaskUnlocked(): # pylint: disable=W0212 return True return False + def HasRunningTasks(self): + """Checks whether there's at least one task running. + + """ + self._lock.acquire() + try: + return self._HasRunningTasksUnlocked() + finally: + self._lock.release() + def Quiesce(self): """Waits until the task queue is empty. @@ -252,7 +447,8 @@ class WorkerPool(object): """ self._last_worker_id += 1 - return self._last_worker_id + + return "%s%d" % (self._name, self._last_worker_id) def _ResizeUnlocked(self, num_workers): """Changes the number of workers. @@ -305,7 +501,7 @@ class WorkerPool(object): elif current_count < num_workers: # Create (num_workers - current_count) new workers - for _ in xrange(num_workers - current_count): + for _ in range(num_workers - current_count): worker = self._worker_class(self, self._NewWorkerIdUnlocked()) self._workers.append(worker) worker.start()