#
#
-# Copyright (C) 2008 Google Inc.
+# Copyright (C) 2008, 2009, 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
import logging
import threading
+from ganeti import compat
+
+
+_TERMINATE = object()
+
class BaseWorker(threading.Thread, object):
"""Base worker class for worker pools.
"""
super(BaseWorker, self).__init__(name=worker_id)
self.pool = pool
+ self._worker_id = worker_id
self._current_task = None
+ assert self.getName() == worker_id
+
def ShouldTerminate(self):
- """Returns whether a worker should terminate.
+ """Returns whether this worker should terminate.
+
+ Should only be called from within L{RunTask}.
"""
- return self.pool.ShouldWorkerTerminate(self)
+ self.pool._lock.acquire()
+ try:
+ assert self._HasRunningTaskUnlocked()
+ return self.pool._ShouldWorkerTerminateUnlocked(self)
+ finally:
+ self.pool._lock.release()
- def _HasRunningTaskUnlocked(self):
- """Returns whether this worker is currently running a task.
+ def SetTaskName(self, taskname):
+ """Sets the name of the current task.
+
+ Should only be called from within L{RunTask}.
+
+ @type taskname: string
+ @param taskname: Task's name
"""
- return (self._current_task is not None)
+ if taskname:
+ name = "%s/%s" % (self._worker_id, taskname)
+ else:
+ name = self._worker_id
+
+ # Set thread name
+ self.setName(name)
- def HasRunningTask(self):
+ def _HasRunningTaskUnlocked(self):
"""Returns whether this worker is currently running a task.
"""
- self.pool._lock.acquire()
- try:
- return self._HasRunningTaskUnlocked()
- finally:
- self.pool._lock.release()
+ return (self._current_task is not None)
def run(self):
"""Main thread function.
"""
pool = self.pool
- assert not self.HasRunningTask()
-
while True:
+ assert self._current_task is None
try:
- # We wait on lock to be told either terminate or do a task.
+ # Wait on lock to be told either to terminate or to do a task
pool._lock.acquire()
try:
- if pool._ShouldWorkerTerminateUnlocked(self):
- break
+ task = pool._WaitForTaskUnlocked(self)
- # We only wait if there's no task for us.
- if not pool._tasks:
- logging.debug("Waiting for tasks")
+ if task is _TERMINATE:
+ # Told to terminate
+ break
- # wait() releases the lock and sleeps until notified
- pool._pool_to_worker.wait()
+ if task is None:
+ # Spurious notification, ignore
+ continue
- logging.debug("Notified while waiting")
+ self._current_task = task
- # Were we woken up in order to terminate?
- if pool._ShouldWorkerTerminateUnlocked(self):
- break
+ # No longer needed, dispose of reference
+ del task
- if not pool._tasks:
- # Spurious notification, ignore
- continue
+ assert self._HasRunningTaskUnlocked()
- # Get task from queue and tell pool about it
- try:
- self._current_task = pool._tasks.popleft()
- finally:
- pool._worker_to_pool.notifyAll()
finally:
pool._lock.release()
# Run the actual task
try:
logging.debug("Starting task %r", self._current_task)
- self.RunTask(*self._current_task)
+ assert self.getName() == self._worker_id
+ try:
+ self.RunTask(*self._current_task)
+ finally:
+ self.SetTaskName(None)
logging.debug("Done with task %r", self._current_task)
except: # pylint: disable-msg=W0702
logging.exception("Caught unhandled exception")
+
+ assert self._HasRunningTaskUnlocked()
finally:
# Notify pool
pool._lock.acquire()
finally:
pool._lock.release()
+ assert not self._HasRunningTaskUnlocked()
+
logging.debug("Terminates")
def RunTask(self, *args):
# TODO: Implement dynamic resizing?
- def AddTask(self, *args):
+ def _WaitWhileQuiescingUnlocked(self):
+ """Wait until the worker pool has finished quiescing.
+
+ """
+ while self._quiescing:
+ self._pool_to_pool.wait()
+
+ def _AddTaskUnlocked(self, args):
+ assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
+
+ self._tasks.append(args)
+
+ # Notify a waiting worker
+ self._pool_to_worker.notify()
+
+ def AddTask(self, args):
"""Adds a task to the queue.
+ @type args: sequence
@param args: arguments passed to L{BaseWorker.RunTask}
"""
self._lock.acquire()
try:
- # Don't add new tasks while we're quiescing
- while self._quiescing:
- self._pool_to_pool.wait()
+ self._WaitWhileQuiescingUnlocked()
+ self._AddTaskUnlocked(args)
+ finally:
+ self._lock.release()
- # Add task to internal queue
- self._tasks.append(args)
+ def AddManyTasks(self, tasks):
+ """Add a list of tasks to the queue.
- # Wake one idling worker up
- self._pool_to_worker.notify()
+ @type tasks: list of tuples
+ @param tasks: list of args passed to L{BaseWorker.RunTask}
+
+ """
+ assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
+ "Each task must be a sequence"
+
+ self._lock.acquire()
+ try:
+ self._WaitWhileQuiescingUnlocked()
+
+ for args in tasks:
+ self._AddTaskUnlocked(args)
finally:
self._lock.release()
- def _ShouldWorkerTerminateUnlocked(self, worker):
- """Returns whether a worker should terminate.
+ def _WaitForTaskUnlocked(self, worker):
+ """Waits for a task for a worker.
+
+ @type worker: L{BaseWorker}
+ @param worker: Worker thread
"""
- return (worker in self._termworkers)
+ if self._ShouldWorkerTerminateUnlocked(worker):
+ return _TERMINATE
- def ShouldWorkerTerminate(self, worker):
- """Returns whether a worker should terminate.
+ # We only wait if there's no task for us.
+ if not self._tasks:
+ logging.debug("Waiting for tasks")
- """
- self._lock.acquire()
+ # wait() releases the lock and sleeps until notified
+ self._pool_to_worker.wait()
+
+ logging.debug("Notified while waiting")
+
+ # Were we woken up in order to terminate?
+ if self._ShouldWorkerTerminateUnlocked(worker):
+ return _TERMINATE
+
+ if not self._tasks:
+ # Spurious notification, ignore
+ return None
+
+ # Get task from queue and tell pool about it
try:
- return self._ShouldWorkerTerminateUnlocked(worker)
+ return self._tasks.popleft()
finally:
- self._lock.release()
+ self._worker_to_pool.notifyAll()
+
+ def _ShouldWorkerTerminateUnlocked(self, worker):
+ """Returns whether a worker should terminate.
+
+ """
+ return (worker in self._termworkers)
def _HasRunningTasksUnlocked(self):
"""Checks whether there's a task running in a worker.