4 # Copyright (C) 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Base classes for worker pools.
30 from ganeti import errors
31 from ganeti import utils
34 class BaseWorker(threading.Thread, object):
35 """Base worker class for worker pools.
37 Users of a worker pool must override RunTask in a subclass.
40 def __init__(self, pool, worker_id):
41 """Constructor for BaseWorker thread.
44 - pool: Parent worker pool
45 - worker_id: Identifier for this worker
48 super(BaseWorker, self).__init__()
50 self.worker_id = worker_id
52 # Also used by WorkerPool
53 self._current_task = None
55 def ShouldTerminate(self):
56 """Returns whether a worker should terminate.
59 return self.pool.ShouldWorkerTerminate(self)
62 """Main thread function.
64 Waits for new tasks to show up in the queue.
69 assert self._current_task is None
73 # We wait on lock to be told either terminate or do a task.
76 if pool._ShouldWorkerTerminateUnlocked(self):
79 # We only wait if there's no task for us.
81 # wait() releases the lock and sleeps until notified
84 # Were we woken up in order to terminate?
85 if pool._ShouldWorkerTerminateUnlocked(self):
89 # Spurious notification, ignore
92 # Get task from queue and tell pool about it
94 self._current_task = pool._tasks.popleft()
96 pool._lock.notifyAll()
100 # Run the actual task
102 self.RunTask(*self._current_task)
104 logging.error("Worker %s: Caught unhandled exception",
105 self.worker_id, exc_info=True)
107 self._current_task = None
112 pool._lock.notifyAll()
116 def RunTask(self, *args):
117 """Function called to start a task.
120 raise NotImplementedError()
123 class WorkerPool(object):
124 """Worker pool with a queue.
126 This class is thread-safe.
128 Tasks are guaranteed to be started in the order in which they're added to the
129 pool. Due to the nature of threading, they're not guaranteed to finish in the
133 def __init__(self, num_workers, worker_class):
134 """Constructor for worker pool.
137 - num_workers: Number of workers to be started (dynamic resizing is not
139 - worker_class: Class to be instantiated for workers; should derive from
143 # Some of these variables are accessed by BaseWorker
144 self._lock = threading.Condition(threading.Lock())
145 self._worker_class = worker_class
146 self._last_worker_id = 0
148 self._quiescing = False
150 # Terminating workers
151 self._termworkers = []
154 self._tasks = collections.deque()
157 self.Resize(num_workers)
159 # TODO: Implement dynamic resizing?
161 def AddTask(self, *args):
162 """Adds a task to the queue.
165 - *args: Arguments passed to BaseWorker.RunTask
170 # Don't add new tasks while we're quiescing
171 while self._quiescing:
174 # Add task to internal queue
175 self._tasks.append(args)
180 def _ShouldWorkerTerminateUnlocked(self, worker):
181 """Returns whether a worker should terminate.
184 return (worker in self._termworkers)
186 def ShouldWorkerTerminate(self, worker):
187 """Returns whether a worker should terminate.
192 return self._ShouldWorkerTerminateUnlocked(self)
196 def _HasRunningTasksUnlocked(self):
197 """Checks whether there's a task running in a worker.
200 for worker in self._workers + self._termworkers:
201 if worker._current_task is not None:
206 """Waits until the task queue is empty.
211 self._quiescing = True
213 # Wait while there are tasks pending or running
214 while self._tasks or self._HasRunningTasksUnlocked():
218 self._quiescing = False
220 # Make sure AddTasks continues in case it was waiting
221 self._lock.notifyAll()
225 def _NewWorkerIdUnlocked(self):
226 self._last_worker_id += 1
227 return self._last_worker_id
229 def _ResizeUnlocked(self, num_workers):
230 """Changes the number of workers.
233 assert num_workers >= 0, "num_workers must be >= 0"
235 logging.debug("Resizing to %s workers", num_workers)
237 current_count = len(self._workers)
239 if current_count == num_workers:
243 elif current_count > num_workers:
245 # Create copy of list to iterate over while lock isn't held.
246 termworkers = self._workers[:]
249 # TODO: Implement partial downsizing
250 raise NotImplementedError()
253 self._termworkers += termworkers
255 # Notify workers that something has changed
256 self._lock.notifyAll()
258 # Join all terminating workers
261 for worker in termworkers:
266 # Remove terminated threads. This could be done in a more efficient way
267 # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
268 # don't leave zombie threads around.
269 for worker in termworkers:
270 assert worker in self._termworkers, ("Worker not in list of"
271 " terminating workers")
272 if not worker.isAlive():
273 self._termworkers.remove(worker)
275 assert not self._termworkers, "Zombie worker detected"
277 elif current_count < num_workers:
278 # Create (num_workers - current_count) new workers
279 for i in xrange(num_workers - current_count):
280 worker = self._worker_class(self, self._NewWorkerIdUnlocked())
281 self._workers.append(worker)
284 def Resize(self, num_workers):
285 """Changes the number of workers in the pool.
288 - num_workers: New number of workers
293 return self._ResizeUnlocked(num_workers)
297 def TerminateWorkers(self):
298 """Terminate all worker threads.
300 Unstarted tasks will be ignored.
303 logging.debug("Terminating all workers")
307 self._ResizeUnlocked(0)
310 logging.debug("There are %s tasks left", len(self._tasks))
314 logging.debug("All workers terminated")