4 # Copyright (C) 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Base classes for worker pools.
30 from ganeti import compat
36 class BaseWorker(threading.Thread, object):
37 """Base worker class for worker pools.
39 Users of a worker pool must override RunTask in a subclass.
42 # pylint: disable-msg=W0212
43 def __init__(self, pool, worker_id):
44 """Constructor for BaseWorker thread.
46 @param pool: the parent worker pool
47 @param worker_id: identifier for this worker
50 super(BaseWorker, self).__init__(name=worker_id)
52 self._current_task = None
54 def ShouldTerminate(self):
55 """Returns whether a worker should terminate.
58 return self.pool.ShouldWorkerTerminate(self)
60 def _HasRunningTaskUnlocked(self):
61 """Returns whether this worker is currently running a task.
64 return (self._current_task is not None)
67 """Main thread function.
69 Waits for new tasks to show up in the queue.
74 assert self._current_task is None
78 # Wait on lock to be told either to terminate or to do a task
81 task = pool._WaitForTaskUnlocked(self)
83 if task is _TERMINATE:
88 # Spurious notification, ignore
91 self._current_task = task
93 assert self._HasRunningTaskUnlocked()
99 logging.debug("Starting task %r", self._current_task)
100 self.RunTask(*self._current_task)
101 logging.debug("Done with task %r", self._current_task)
102 except: # pylint: disable-msg=W0702
103 logging.exception("Caught unhandled exception")
108 if self._current_task:
109 self._current_task = None
110 pool._worker_to_pool.notifyAll()
114 logging.debug("Terminates")
116 def RunTask(self, *args):
117 """Function called to start a task.
119 This needs to be implemented by child classes.
122 raise NotImplementedError()
125 class WorkerPool(object):
126 """Worker pool with a queue.
128 This class is thread-safe.
130 Tasks are guaranteed to be started in the order in which they're
131 added to the pool. Due to the nature of threading, they're not
132 guaranteed to finish in the same order.
135 def __init__(self, name, num_workers, worker_class):
136 """Constructor for worker pool.
138 @param num_workers: number of workers to be started
139 (dynamic resizing is not yet implemented)
140 @param worker_class: the class to be instantiated for workers;
141 should derive from L{BaseWorker}
144 # Some of these variables are accessed by BaseWorker
145 self._lock = threading.Lock()
146 self._pool_to_pool = threading.Condition(self._lock)
147 self._pool_to_worker = threading.Condition(self._lock)
148 self._worker_to_pool = threading.Condition(self._lock)
149 self._worker_class = worker_class
151 self._last_worker_id = 0
153 self._quiescing = False
155 # Terminating workers
156 self._termworkers = []
159 self._tasks = collections.deque()
162 self.Resize(num_workers)
164 # TODO: Implement dynamic resizing?
166 def _WaitWhileQuiescingUnlocked(self):
167 """Wait until the worker pool has finished quiescing.
170 while self._quiescing:
171 self._pool_to_pool.wait()
173 def _AddTaskUnlocked(self, args):
174 assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
176 self._tasks.append(args)
178 # Notify a waiting worker
179 self._pool_to_worker.notify()
181 def AddTask(self, *args):
182 """Adds a task to the queue.
184 @param args: arguments passed to L{BaseWorker.RunTask}
189 self._WaitWhileQuiescingUnlocked()
190 self._AddTaskUnlocked(args)
194 def AddManyTasks(self, tasks):
195 """Add a list of tasks to the queue.
197 @type tasks: list of tuples
198 @param tasks: list of args passed to L{BaseWorker.RunTask}
201 assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
202 "Each task must be a sequence"
206 self._WaitWhileQuiescingUnlocked()
209 self._AddTaskUnlocked(args)
213 def _WaitForTaskUnlocked(self, worker):
214 """Waits for a task for a worker.
216 @type worker: L{BaseWorker}
217 @param worker: Worker thread
220 if self._ShouldWorkerTerminateUnlocked(worker):
223 # We only wait if there's no task for us.
225 logging.debug("Waiting for tasks")
227 # wait() releases the lock and sleeps until notified
228 self._pool_to_worker.wait()
230 logging.debug("Notified while waiting")
232 # Were we woken up in order to terminate?
233 if self._ShouldWorkerTerminateUnlocked(worker):
237 # Spurious notification, ignore
240 # Get task from queue and tell pool about it
242 return self._tasks.popleft()
244 self._worker_to_pool.notifyAll()
246 def _ShouldWorkerTerminateUnlocked(self, worker):
247 """Returns whether a worker should terminate.
250 return (worker in self._termworkers)
252 def ShouldWorkerTerminate(self, worker):
253 """Returns whether a worker should terminate.
258 return self._ShouldWorkerTerminateUnlocked(worker)
262 def _HasRunningTasksUnlocked(self):
263 """Checks whether there's a task running in a worker.
266 for worker in self._workers + self._termworkers:
267 if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
272 """Waits until the task queue is empty.
277 self._quiescing = True
279 # Wait while there are tasks pending or running
280 while self._tasks or self._HasRunningTasksUnlocked():
281 self._worker_to_pool.wait()
284 self._quiescing = False
286 # Make sure AddTasks continues in case it was waiting
287 self._pool_to_pool.notifyAll()
291 def _NewWorkerIdUnlocked(self):
292 """Return an identifier for a new worker.
295 self._last_worker_id += 1
297 return "%s%d" % (self._name, self._last_worker_id)
299 def _ResizeUnlocked(self, num_workers):
300 """Changes the number of workers.
303 assert num_workers >= 0, "num_workers must be >= 0"
305 logging.debug("Resizing to %s workers", num_workers)
307 current_count = len(self._workers)
309 if current_count == num_workers:
313 elif current_count > num_workers:
315 # Create copy of list to iterate over while lock isn't held.
316 termworkers = self._workers[:]
319 # TODO: Implement partial downsizing
320 raise NotImplementedError()
323 self._termworkers += termworkers
325 # Notify workers that something has changed
326 self._pool_to_worker.notifyAll()
328 # Join all terminating workers
331 for worker in termworkers:
332 logging.debug("Waiting for thread %s", worker.getName())
337 # Remove terminated threads. This could be done in a more efficient way
338 # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
339 # don't leave zombie threads around.
340 for worker in termworkers:
341 assert worker in self._termworkers, ("Worker not in list of"
342 " terminating workers")
343 if not worker.isAlive():
344 self._termworkers.remove(worker)
346 assert not self._termworkers, "Zombie worker detected"
348 elif current_count < num_workers:
349 # Create (num_workers - current_count) new workers
350 for _ in range(num_workers - current_count):
351 worker = self._worker_class(self, self._NewWorkerIdUnlocked())
352 self._workers.append(worker)
355 def Resize(self, num_workers):
356 """Changes the number of workers in the pool.
358 @param num_workers: the new number of workers
363 return self._ResizeUnlocked(num_workers)
367 def TerminateWorkers(self):
368 """Terminate all worker threads.
370 Unstarted tasks will be ignored.
373 logging.debug("Terminating all workers")
377 self._ResizeUnlocked(0)
380 logging.debug("There are %s tasks left", len(self._tasks))
384 logging.debug("All workers terminated")