4 # Copyright (C) 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Base classes for worker pools.
30 from ganeti import compat
33 class BaseWorker(threading.Thread, object):
34 """Base worker class for worker pools.
36 Users of a worker pool must override RunTask in a subclass.
39 # pylint: disable-msg=W0212
40 def __init__(self, pool, worker_id):
41 """Constructor for BaseWorker thread.
43 @param pool: the parent worker pool
44 @param worker_id: identifier for this worker
47 super(BaseWorker, self).__init__(name=worker_id)
49 self._current_task = None
51 def ShouldTerminate(self):
52 """Returns whether a worker should terminate.
55 return self.pool.ShouldWorkerTerminate(self)
57 def _HasRunningTaskUnlocked(self):
58 """Returns whether this worker is currently running a task.
61 return (self._current_task is not None)
63 def HasRunningTask(self):
64 """Returns whether this worker is currently running a task.
67 self.pool._lock.acquire()
69 return self._HasRunningTaskUnlocked()
71 self.pool._lock.release()
74 """Main thread function.
76 Waits for new tasks to show up in the queue.
81 assert not self.HasRunningTask()
85 # We wait on lock to be told either terminate or do a task.
88 if pool._ShouldWorkerTerminateUnlocked(self):
91 # We only wait if there's no task for us.
93 logging.debug("Waiting for tasks")
95 # wait() releases the lock and sleeps until notified
96 pool._pool_to_worker.wait()
98 logging.debug("Notified while waiting")
100 # Were we woken up in order to terminate?
101 if pool._ShouldWorkerTerminateUnlocked(self):
105 # Spurious notification, ignore
108 # Get task from queue and tell pool about it
110 self._current_task = pool._tasks.popleft()
112 pool._worker_to_pool.notifyAll()
116 # Run the actual task
118 logging.debug("Starting task %r", self._current_task)
119 self.RunTask(*self._current_task)
120 logging.debug("Done with task %r", self._current_task)
121 except: # pylint: disable-msg=W0702
122 logging.exception("Caught unhandled exception")
127 if self._current_task:
128 self._current_task = None
129 pool._worker_to_pool.notifyAll()
133 logging.debug("Terminates")
135 def RunTask(self, *args):
136 """Function called to start a task.
138 This needs to be implemented by child classes.
141 raise NotImplementedError()
144 class WorkerPool(object):
145 """Worker pool with a queue.
147 This class is thread-safe.
149 Tasks are guaranteed to be started in the order in which they're
150 added to the pool. Due to the nature of threading, they're not
151 guaranteed to finish in the same order.
154 def __init__(self, name, num_workers, worker_class):
155 """Constructor for worker pool.
157 @param num_workers: number of workers to be started
158 (dynamic resizing is not yet implemented)
159 @param worker_class: the class to be instantiated for workers;
160 should derive from L{BaseWorker}
163 # Some of these variables are accessed by BaseWorker
164 self._lock = threading.Lock()
165 self._pool_to_pool = threading.Condition(self._lock)
166 self._pool_to_worker = threading.Condition(self._lock)
167 self._worker_to_pool = threading.Condition(self._lock)
168 self._worker_class = worker_class
170 self._last_worker_id = 0
172 self._quiescing = False
174 # Terminating workers
175 self._termworkers = []
178 self._tasks = collections.deque()
181 self.Resize(num_workers)
183 # TODO: Implement dynamic resizing?
185 def _WaitWhileQuiescingUnlocked(self):
186 """Wait until the worker pool has finished quiescing.
189 while self._quiescing:
190 self._pool_to_pool.wait()
192 def AddTask(self, *args):
193 """Adds a task to the queue.
195 @param args: arguments passed to L{BaseWorker.RunTask}
200 self._WaitWhileQuiescingUnlocked()
202 self._tasks.append(args)
204 # Wake one idling worker up
205 self._pool_to_worker.notify()
209 def AddManyTasks(self, tasks):
210 """Add a list of tasks to the queue.
212 @type tasks: list of tuples
213 @param tasks: list of args passed to L{BaseWorker.RunTask}
216 assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
217 "Each task must be a sequence"
221 self._WaitWhileQuiescingUnlocked()
223 self._tasks.extend(tasks)
226 self._pool_to_worker.notify()
230 def _ShouldWorkerTerminateUnlocked(self, worker):
231 """Returns whether a worker should terminate.
234 return (worker in self._termworkers)
236 def ShouldWorkerTerminate(self, worker):
237 """Returns whether a worker should terminate.
242 return self._ShouldWorkerTerminateUnlocked(worker)
246 def _HasRunningTasksUnlocked(self):
247 """Checks whether there's a task running in a worker.
250 for worker in self._workers + self._termworkers:
251 if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
256 """Waits until the task queue is empty.
261 self._quiescing = True
263 # Wait while there are tasks pending or running
264 while self._tasks or self._HasRunningTasksUnlocked():
265 self._worker_to_pool.wait()
268 self._quiescing = False
270 # Make sure AddTasks continues in case it was waiting
271 self._pool_to_pool.notifyAll()
275 def _NewWorkerIdUnlocked(self):
276 """Return an identifier for a new worker.
279 self._last_worker_id += 1
281 return "%s%d" % (self._name, self._last_worker_id)
283 def _ResizeUnlocked(self, num_workers):
284 """Changes the number of workers.
287 assert num_workers >= 0, "num_workers must be >= 0"
289 logging.debug("Resizing to %s workers", num_workers)
291 current_count = len(self._workers)
293 if current_count == num_workers:
297 elif current_count > num_workers:
299 # Create copy of list to iterate over while lock isn't held.
300 termworkers = self._workers[:]
303 # TODO: Implement partial downsizing
304 raise NotImplementedError()
307 self._termworkers += termworkers
309 # Notify workers that something has changed
310 self._pool_to_worker.notifyAll()
312 # Join all terminating workers
315 for worker in termworkers:
316 logging.debug("Waiting for thread %s", worker.getName())
321 # Remove terminated threads. This could be done in a more efficient way
322 # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
323 # don't leave zombie threads around.
324 for worker in termworkers:
325 assert worker in self._termworkers, ("Worker not in list of"
326 " terminating workers")
327 if not worker.isAlive():
328 self._termworkers.remove(worker)
330 assert not self._termworkers, "Zombie worker detected"
332 elif current_count < num_workers:
333 # Create (num_workers - current_count) new workers
334 for _ in range(num_workers - current_count):
335 worker = self._worker_class(self, self._NewWorkerIdUnlocked())
336 self._workers.append(worker)
339 def Resize(self, num_workers):
340 """Changes the number of workers in the pool.
342 @param num_workers: the new number of workers
347 return self._ResizeUnlocked(num_workers)
351 def TerminateWorkers(self):
352 """Terminate all worker threads.
354 Unstarted tasks will be ignored.
357 logging.debug("Terminating all workers")
361 self._ResizeUnlocked(0)
364 logging.debug("There are %s tasks left", len(self._tasks))
368 logging.debug("All workers terminated")