4 # Copyright (C) 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Base classes for worker pools.
30 from ganeti import compat
36 class BaseWorker(threading.Thread, object):
37 """Base worker class for worker pools.
39 Users of a worker pool must override RunTask in a subclass.
42 # pylint: disable-msg=W0212
43 def __init__(self, pool, worker_id):
44 """Constructor for BaseWorker thread.
46 @param pool: the parent worker pool
47 @param worker_id: identifier for this worker
50 super(BaseWorker, self).__init__(name=worker_id)
52 self._current_task = None
54 def ShouldTerminate(self):
55 """Returns whether this worker should terminate.
57 Should only be called from within L{RunTask}.
60 self.pool._lock.acquire()
62 assert self._HasRunningTaskUnlocked()
63 return self.pool._ShouldWorkerTerminateUnlocked(self)
65 self.pool._lock.release()
67 def _HasRunningTaskUnlocked(self):
68 """Returns whether this worker is currently running a task.
71 return (self._current_task is not None)
74 """Main thread function.
76 Waits for new tasks to show up in the queue.
81 assert self._current_task is None
85 # Wait on lock to be told either to terminate or to do a task
88 task = pool._WaitForTaskUnlocked(self)
90 if task is _TERMINATE:
95 # Spurious notification, ignore
98 self._current_task = task
100 assert self._HasRunningTaskUnlocked()
104 # Run the actual task
106 logging.debug("Starting task %r", self._current_task)
107 self.RunTask(*self._current_task)
108 logging.debug("Done with task %r", self._current_task)
109 except: # pylint: disable-msg=W0702
110 logging.exception("Caught unhandled exception")
115 if self._current_task:
116 self._current_task = None
117 pool._worker_to_pool.notifyAll()
121 logging.debug("Terminates")
123 def RunTask(self, *args):
124 """Function called to start a task.
126 This needs to be implemented by child classes.
129 raise NotImplementedError()
132 class WorkerPool(object):
133 """Worker pool with a queue.
135 This class is thread-safe.
137 Tasks are guaranteed to be started in the order in which they're
138 added to the pool. Due to the nature of threading, they're not
139 guaranteed to finish in the same order.
142 def __init__(self, name, num_workers, worker_class):
143 """Constructor for worker pool.
145 @param num_workers: number of workers to be started
146 (dynamic resizing is not yet implemented)
147 @param worker_class: the class to be instantiated for workers;
148 should derive from L{BaseWorker}
151 # Some of these variables are accessed by BaseWorker
152 self._lock = threading.Lock()
153 self._pool_to_pool = threading.Condition(self._lock)
154 self._pool_to_worker = threading.Condition(self._lock)
155 self._worker_to_pool = threading.Condition(self._lock)
156 self._worker_class = worker_class
158 self._last_worker_id = 0
160 self._quiescing = False
162 # Terminating workers
163 self._termworkers = []
166 self._tasks = collections.deque()
169 self.Resize(num_workers)
171 # TODO: Implement dynamic resizing?
173 def _WaitWhileQuiescingUnlocked(self):
174 """Wait until the worker pool has finished quiescing.
177 while self._quiescing:
178 self._pool_to_pool.wait()
180 def _AddTaskUnlocked(self, args):
181 assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
183 self._tasks.append(args)
185 # Notify a waiting worker
186 self._pool_to_worker.notify()
188 def AddTask(self, *args):
189 """Adds a task to the queue.
191 @param args: arguments passed to L{BaseWorker.RunTask}
196 self._WaitWhileQuiescingUnlocked()
197 self._AddTaskUnlocked(args)
201 def AddManyTasks(self, tasks):
202 """Add a list of tasks to the queue.
204 @type tasks: list of tuples
205 @param tasks: list of args passed to L{BaseWorker.RunTask}
208 assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
209 "Each task must be a sequence"
213 self._WaitWhileQuiescingUnlocked()
216 self._AddTaskUnlocked(args)
220 def _WaitForTaskUnlocked(self, worker):
221 """Waits for a task for a worker.
223 @type worker: L{BaseWorker}
224 @param worker: Worker thread
227 if self._ShouldWorkerTerminateUnlocked(worker):
230 # We only wait if there's no task for us.
232 logging.debug("Waiting for tasks")
234 # wait() releases the lock and sleeps until notified
235 self._pool_to_worker.wait()
237 logging.debug("Notified while waiting")
239 # Were we woken up in order to terminate?
240 if self._ShouldWorkerTerminateUnlocked(worker):
244 # Spurious notification, ignore
247 # Get task from queue and tell pool about it
249 return self._tasks.popleft()
251 self._worker_to_pool.notifyAll()
253 def _ShouldWorkerTerminateUnlocked(self, worker):
254 """Returns whether a worker should terminate.
257 return (worker in self._termworkers)
259 def _HasRunningTasksUnlocked(self):
260 """Checks whether there's a task running in a worker.
263 for worker in self._workers + self._termworkers:
264 if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
269 """Waits until the task queue is empty.
274 self._quiescing = True
276 # Wait while there are tasks pending or running
277 while self._tasks or self._HasRunningTasksUnlocked():
278 self._worker_to_pool.wait()
281 self._quiescing = False
283 # Make sure AddTasks continues in case it was waiting
284 self._pool_to_pool.notifyAll()
288 def _NewWorkerIdUnlocked(self):
289 """Return an identifier for a new worker.
292 self._last_worker_id += 1
294 return "%s%d" % (self._name, self._last_worker_id)
296 def _ResizeUnlocked(self, num_workers):
297 """Changes the number of workers.
300 assert num_workers >= 0, "num_workers must be >= 0"
302 logging.debug("Resizing to %s workers", num_workers)
304 current_count = len(self._workers)
306 if current_count == num_workers:
310 elif current_count > num_workers:
312 # Create copy of list to iterate over while lock isn't held.
313 termworkers = self._workers[:]
316 # TODO: Implement partial downsizing
317 raise NotImplementedError()
320 self._termworkers += termworkers
322 # Notify workers that something has changed
323 self._pool_to_worker.notifyAll()
325 # Join all terminating workers
328 for worker in termworkers:
329 logging.debug("Waiting for thread %s", worker.getName())
334 # Remove terminated threads. This could be done in a more efficient way
335 # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
336 # don't leave zombie threads around.
337 for worker in termworkers:
338 assert worker in self._termworkers, ("Worker not in list of"
339 " terminating workers")
340 if not worker.isAlive():
341 self._termworkers.remove(worker)
343 assert not self._termworkers, "Zombie worker detected"
345 elif current_count < num_workers:
346 # Create (num_workers - current_count) new workers
347 for _ in range(num_workers - current_count):
348 worker = self._worker_class(self, self._NewWorkerIdUnlocked())
349 self._workers.append(worker)
352 def Resize(self, num_workers):
353 """Changes the number of workers in the pool.
355 @param num_workers: the new number of workers
360 return self._ResizeUnlocked(num_workers)
364 def TerminateWorkers(self):
365 """Terminate all worker threads.
367 Unstarted tasks will be ignored.
370 logging.debug("Terminating all workers")
374 self._ResizeUnlocked(0)
377 logging.debug("There are %s tasks left", len(self._tasks))
381 logging.debug("All workers terminated")