#
#
-# Copyright (C) 2008 Google Inc.
+# Copyright (C) 2008, 2009, 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
"""
-import collections
import logging
import threading
+import heapq
+
+from ganeti import compat
+from ganeti import errors
+
+
+_TERMINATE = object()
+_DEFAULT_PRIORITY = 0
+
+
+class DeferTask(Exception):
+ """Special exception class to defer a task.
+
+ This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
+ task. Optionally, the priority of the task can be changed.
+
+ """
+ def __init__(self, priority=None):
+ """Initializes this class.
+
+ @type priority: number
+ @param priority: New task priority (None means no change)
+
+ """
+ Exception.__init__(self)
+ self.priority = priority
class BaseWorker(threading.Thread, object):
Users of a worker pool must override RunTask in a subclass.
"""
+ # pylint: disable=W0212
def __init__(self, pool, worker_id):
"""Constructor for BaseWorker thread.
@param worker_id: identifier for this worker
"""
- super(BaseWorker, self).__init__()
+ super(BaseWorker, self).__init__(name=worker_id)
self.pool = pool
- self.worker_id = worker_id
+ self._worker_id = worker_id
self._current_task = None
- def ShouldTerminate(self):
- """Returns whether a worker should terminate.
+ assert self.getName() == worker_id
- """
- return self.pool.ShouldWorkerTerminate(self)
+ def ShouldTerminate(self):
+ """Returns whether this worker should terminate.
- def _HasRunningTaskUnlocked(self):
- """Returns whether this worker is currently running a task.
+ Should only be called from within L{RunTask}.
"""
- return (self._current_task is not None)
+ self.pool._lock.acquire()
+ try:
+ assert self._HasRunningTaskUnlocked()
+ return self.pool._ShouldWorkerTerminateUnlocked(self)
+ finally:
+ self.pool._lock.release()
- def HasRunningTask(self):
- """Returns whether this worker is currently running a task.
+ def GetCurrentPriority(self):
+ """Returns the priority of the current task.
+
+ Should only be called from within L{RunTask}.
"""
self.pool._lock.acquire()
try:
- return self._HasRunningTaskUnlocked()
+ assert self._HasRunningTaskUnlocked()
+
+ (priority, _, _) = self._current_task
+
+ return priority
finally:
self.pool._lock.release()
+ def SetTaskName(self, taskname):
+ """Sets the name of the current task.
+
+ Should only be called from within L{RunTask}.
+
+ @type taskname: string
+ @param taskname: Task's name
+
+ """
+ if taskname:
+ name = "%s/%s" % (self._worker_id, taskname)
+ else:
+ name = self._worker_id
+
+ # Set thread name
+ self.setName(name)
+
+ def _HasRunningTaskUnlocked(self):
+ """Returns whether this worker is currently running a task.
+
+ """
+ return (self._current_task is not None)
+
def run(self):
"""Main thread function.
"""
pool = self.pool
- assert not self.HasRunningTask()
-
while True:
+ assert self._current_task is None
+
+ defer = None
try:
- # We wait on lock to be told either terminate or do a task.
+ # Wait on lock to be told either to terminate or to do a task
pool._lock.acquire()
try:
- if pool._ShouldWorkerTerminateUnlocked(self):
- break
+ task = pool._WaitForTaskUnlocked(self)
- # We only wait if there's no task for us.
- if not pool._tasks:
- logging.debug("Worker %s: waiting for tasks", self.worker_id)
+ if task is _TERMINATE:
+ # Told to terminate
+ break
- # wait() releases the lock and sleeps until notified
- pool._pool_to_worker.wait()
+ if task is None:
+ # Spurious notification, ignore
+ continue
- logging.debug("Worker %s: notified while waiting", self.worker_id)
+ self._current_task = task
- # Were we woken up in order to terminate?
- if pool._ShouldWorkerTerminateUnlocked(self):
- break
+ # No longer needed, dispose of reference
+ del task
- if not pool._tasks:
- # Spurious notification, ignore
- continue
+ assert self._HasRunningTaskUnlocked()
- # Get task from queue and tell pool about it
- try:
- self._current_task = pool._tasks.popleft()
- finally:
- pool._worker_to_pool.notifyAll()
finally:
pool._lock.release()
- # Run the actual task
+ (priority, _, args) = self._current_task
try:
- logging.debug("Worker %s: starting task %r",
- self.worker_id, self._current_task)
- self.RunTask(*self._current_task)
- logging.debug("Worker %s: done with task %r",
- self.worker_id, self._current_task)
- except:
- logging.error("Worker %s: Caught unhandled exception",
- self.worker_id, exc_info=True)
+ # Run the actual task
+ assert defer is None
+ logging.debug("Starting task %r, priority %s", args, priority)
+ assert self.getName() == self._worker_id
+ try:
+ self.RunTask(*args) # pylint: disable=W0142
+ finally:
+ self.SetTaskName(None)
+ logging.debug("Done with task %r, priority %s", args, priority)
+ except DeferTask, err:
+ defer = err
+
+ if defer.priority is None:
+ # Use same priority
+ defer.priority = priority
+
+ logging.debug("Deferring task %r, new priority %s",
+ args, defer.priority)
+
+ assert self._HasRunningTaskUnlocked()
+ except: # pylint: disable=W0702
+ logging.exception("Caught unhandled exception")
+
+ assert self._HasRunningTaskUnlocked()
finally:
# Notify pool
pool._lock.acquire()
try:
+ if defer:
+ assert self._current_task
+ # Schedule again for later run
+ (_, _, args) = self._current_task
+ pool._AddTaskUnlocked(args, defer.priority)
+
if self._current_task:
self._current_task = None
pool._worker_to_pool.notifyAll()
finally:
pool._lock.release()
- logging.debug("Worker %s: terminates", self.worker_id)
+ assert not self._HasRunningTaskUnlocked()
+
+ logging.debug("Terminates")
def RunTask(self, *args):
"""Function called to start a task.
guaranteed to finish in the same order.
"""
- def __init__(self, num_workers, worker_class):
+ def __init__(self, name, num_workers, worker_class):
"""Constructor for worker pool.
@param num_workers: number of workers to be started
self._pool_to_worker = threading.Condition(self._lock)
self._worker_to_pool = threading.Condition(self._lock)
self._worker_class = worker_class
+ self._name = name
self._last_worker_id = 0
self._workers = []
self._quiescing = False
+ self._active = True
# Terminating workers
self._termworkers = []
# Queued tasks
- self._tasks = collections.deque()
+ self._counter = 0
+ self._tasks = []
# Start workers
self.Resize(num_workers)
# TODO: Implement dynamic resizing?
- def AddTask(self, *args):
+ def _WaitWhileQuiescingUnlocked(self):
+ """Wait until the worker pool has finished quiescing.
+
+ """
+ while self._quiescing:
+ self._pool_to_pool.wait()
+
+ def _AddTaskUnlocked(self, args, priority):
+ """Adds a task to the internal queue.
+
+ @type args: sequence
+ @param args: Arguments passed to L{BaseWorker.RunTask}
+ @type priority: number
+ @param priority: Task priority
+
+ """
+ assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
+ assert isinstance(priority, (int, long)), "Priority must be numeric"
+
+ # This counter is used to ensure elements are processed in their
+ # incoming order. For processing they're sorted by priority and then
+ # counter.
+ self._counter += 1
+
+ heapq.heappush(self._tasks, (priority, self._counter, args))
+
+ # Notify a waiting worker
+ self._pool_to_worker.notify()
+
+ def AddTask(self, args, priority=_DEFAULT_PRIORITY):
"""Adds a task to the queue.
+ @type args: sequence
@param args: arguments passed to L{BaseWorker.RunTask}
+ @type priority: number
+ @param priority: Task priority
"""
self._lock.acquire()
try:
- # Don't add new tasks while we're quiescing
- while self._quiescing:
- self._pool_to_pool.wait()
+ self._WaitWhileQuiescingUnlocked()
+ self._AddTaskUnlocked(args, priority)
+ finally:
+ self._lock.release()
+
+ def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY):
+ """Add a list of tasks to the queue.
- # Add task to internal queue
- self._tasks.append(args)
+ @type tasks: list of tuples
+ @param tasks: list of args passed to L{BaseWorker.RunTask}
+ @type priority: number or list of numbers
+ @param priority: Priority for all added tasks or a list with the priority
+ for each task
- # Wake one idling worker up
- self._pool_to_worker.notify()
+ """
+ assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
+ "Each task must be a sequence"
+
+ assert (isinstance(priority, (int, long)) or
+ compat.all(isinstance(prio, (int, long)) for prio in priority)), \
+ "Priority must be numeric or be a list of numeric values"
+
+ if isinstance(priority, (int, long)):
+ priority = [priority] * len(tasks)
+ elif len(priority) != len(tasks):
+ raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
+ " number of tasks (%s)" %
+ (len(priority), len(tasks)))
+
+ self._lock.acquire()
+ try:
+ self._WaitWhileQuiescingUnlocked()
+
+ assert compat.all(isinstance(prio, (int, long)) for prio in priority)
+ assert len(tasks) == len(priority)
+
+ for args, priority in zip(tasks, priority):
+ self._AddTaskUnlocked(args, priority)
finally:
self._lock.release()
- def _ShouldWorkerTerminateUnlocked(self, worker):
- """Returns whether a worker should terminate.
+ def SetActive(self, active):
+ """Enable/disable processing of tasks.
- """
- return (worker in self._termworkers)
+ This is different from L{Quiesce} in the sense that this function just
+ changes an internal flag and doesn't wait for the queue to be empty. Tasks
+ already being processed continue normally, but no new tasks will be
+ started. New tasks can still be added.
- def ShouldWorkerTerminate(self, worker):
- """Returns whether a worker should terminate.
+ @type active: bool
+ @param active: Whether tasks should be processed
"""
self._lock.acquire()
try:
- return self._ShouldWorkerTerminateUnlocked(worker)
+ self._active = active
+
+ if active:
+ # Tell all workers to continue processing
+ self._pool_to_worker.notifyAll()
finally:
self._lock.release()
+ def _WaitForTaskUnlocked(self, worker):
+ """Waits for a task for a worker.
+
+ @type worker: L{BaseWorker}
+ @param worker: Worker thread
+
+ """
+ if self._ShouldWorkerTerminateUnlocked(worker):
+ return _TERMINATE
+
+ # We only wait if there's no task for us.
+ if not (self._active and self._tasks):
+ logging.debug("Waiting for tasks")
+
+ while True:
+ # wait() releases the lock and sleeps until notified
+ self._pool_to_worker.wait()
+
+ logging.debug("Notified while waiting")
+
+ # Were we woken up in order to terminate?
+ if self._ShouldWorkerTerminateUnlocked(worker):
+ return _TERMINATE
+
+ # Just loop if pool is not processing tasks at this time
+ if self._active and self._tasks:
+ break
+
+ # Get task from queue and tell pool about it
+ try:
+ return heapq.heappop(self._tasks)
+ finally:
+ self._worker_to_pool.notifyAll()
+
+ def _ShouldWorkerTerminateUnlocked(self, worker):
+ """Returns whether a worker should terminate.
+
+ """
+ return (worker in self._termworkers)
+
def _HasRunningTasksUnlocked(self):
"""Checks whether there's a task running in a worker.
"""
for worker in self._workers + self._termworkers:
- if worker._HasRunningTaskUnlocked():
+ if worker._HasRunningTaskUnlocked(): # pylint: disable=W0212
return True
return False
+ def HasRunningTasks(self):
+ """Checks whether there's at least one task running.
+
+ """
+ self._lock.acquire()
+ try:
+ return self._HasRunningTasksUnlocked()
+ finally:
+ self._lock.release()
+
def Quiesce(self):
"""Waits until the task queue is empty.
"""
self._last_worker_id += 1
- return self._last_worker_id
+
+ return "%s%d" % (self._name, self._last_worker_id)
def _ResizeUnlocked(self, num_workers):
"""Changes the number of workers.