4 # Copyright (C) 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Base classes for worker pools.
30 from ganeti import compat
36 class BaseWorker(threading.Thread, object):
37 """Base worker class for worker pools.
39 Users of a worker pool must override RunTask in a subclass.
42 # pylint: disable-msg=W0212
43 def __init__(self, pool, worker_id):
44 """Constructor for BaseWorker thread.
46 @param pool: the parent worker pool
47 @param worker_id: identifier for this worker
50 super(BaseWorker, self).__init__(name=worker_id)
52 self._current_task = None
54 def ShouldTerminate(self):
55 """Returns whether this worker should terminate.
57 Should only be called from within L{RunTask}.
60 self.pool._lock.acquire()
62 assert self._HasRunningTaskUnlocked()
63 return self.pool._ShouldWorkerTerminateUnlocked(self)
65 self.pool._lock.release()
67 def _HasRunningTaskUnlocked(self):
68 """Returns whether this worker is currently running a task.
71 return (self._current_task is not None)
74 """Main thread function.
76 Waits for new tasks to show up in the queue.
82 assert self._current_task is None
84 # Wait on lock to be told either to terminate or to do a task
87 task = pool._WaitForTaskUnlocked(self)
89 if task is _TERMINATE:
94 # Spurious notification, ignore
97 self._current_task = task
99 # No longer needed, dispose of reference
102 assert self._HasRunningTaskUnlocked()
107 # Run the actual task
109 logging.debug("Starting task %r", self._current_task)
110 self.RunTask(*self._current_task)
111 logging.debug("Done with task %r", self._current_task)
112 except: # pylint: disable-msg=W0702
113 logging.exception("Caught unhandled exception")
115 assert self._HasRunningTaskUnlocked()
120 if self._current_task:
121 self._current_task = None
122 pool._worker_to_pool.notifyAll()
126 assert not self._HasRunningTaskUnlocked()
128 logging.debug("Terminates")
130 def RunTask(self, *args):
131 """Function called to start a task.
133 This needs to be implemented by child classes.
136 raise NotImplementedError()
139 class WorkerPool(object):
140 """Worker pool with a queue.
142 This class is thread-safe.
144 Tasks are guaranteed to be started in the order in which they're
145 added to the pool. Due to the nature of threading, they're not
146 guaranteed to finish in the same order.
149 def __init__(self, name, num_workers, worker_class):
150 """Constructor for worker pool.
152 @param num_workers: number of workers to be started
153 (dynamic resizing is not yet implemented)
154 @param worker_class: the class to be instantiated for workers;
155 should derive from L{BaseWorker}
158 # Some of these variables are accessed by BaseWorker
159 self._lock = threading.Lock()
160 self._pool_to_pool = threading.Condition(self._lock)
161 self._pool_to_worker = threading.Condition(self._lock)
162 self._worker_to_pool = threading.Condition(self._lock)
163 self._worker_class = worker_class
165 self._last_worker_id = 0
167 self._quiescing = False
169 # Terminating workers
170 self._termworkers = []
173 self._tasks = collections.deque()
176 self.Resize(num_workers)
178 # TODO: Implement dynamic resizing?
180 def _WaitWhileQuiescingUnlocked(self):
181 """Wait until the worker pool has finished quiescing.
184 while self._quiescing:
185 self._pool_to_pool.wait()
187 def _AddTaskUnlocked(self, args):
188 assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
190 self._tasks.append(args)
192 # Notify a waiting worker
193 self._pool_to_worker.notify()
195 def AddTask(self, args):
196 """Adds a task to the queue.
199 @param args: arguments passed to L{BaseWorker.RunTask}
204 self._WaitWhileQuiescingUnlocked()
205 self._AddTaskUnlocked(args)
209 def AddManyTasks(self, tasks):
210 """Add a list of tasks to the queue.
212 @type tasks: list of tuples
213 @param tasks: list of args passed to L{BaseWorker.RunTask}
216 assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
217 "Each task must be a sequence"
221 self._WaitWhileQuiescingUnlocked()
224 self._AddTaskUnlocked(args)
228 def _WaitForTaskUnlocked(self, worker):
229 """Waits for a task for a worker.
231 @type worker: L{BaseWorker}
232 @param worker: Worker thread
235 if self._ShouldWorkerTerminateUnlocked(worker):
238 # We only wait if there's no task for us.
240 logging.debug("Waiting for tasks")
242 # wait() releases the lock and sleeps until notified
243 self._pool_to_worker.wait()
245 logging.debug("Notified while waiting")
247 # Were we woken up in order to terminate?
248 if self._ShouldWorkerTerminateUnlocked(worker):
252 # Spurious notification, ignore
255 # Get task from queue and tell pool about it
257 return self._tasks.popleft()
259 self._worker_to_pool.notifyAll()
261 def _ShouldWorkerTerminateUnlocked(self, worker):
262 """Returns whether a worker should terminate.
265 return (worker in self._termworkers)
267 def _HasRunningTasksUnlocked(self):
268 """Checks whether there's a task running in a worker.
271 for worker in self._workers + self._termworkers:
272 if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
277 """Waits until the task queue is empty.
282 self._quiescing = True
284 # Wait while there are tasks pending or running
285 while self._tasks or self._HasRunningTasksUnlocked():
286 self._worker_to_pool.wait()
289 self._quiescing = False
291 # Make sure AddTasks continues in case it was waiting
292 self._pool_to_pool.notifyAll()
296 def _NewWorkerIdUnlocked(self):
297 """Return an identifier for a new worker.
300 self._last_worker_id += 1
302 return "%s%d" % (self._name, self._last_worker_id)
304 def _ResizeUnlocked(self, num_workers):
305 """Changes the number of workers.
308 assert num_workers >= 0, "num_workers must be >= 0"
310 logging.debug("Resizing to %s workers", num_workers)
312 current_count = len(self._workers)
314 if current_count == num_workers:
318 elif current_count > num_workers:
320 # Create copy of list to iterate over while lock isn't held.
321 termworkers = self._workers[:]
324 # TODO: Implement partial downsizing
325 raise NotImplementedError()
328 self._termworkers += termworkers
330 # Notify workers that something has changed
331 self._pool_to_worker.notifyAll()
333 # Join all terminating workers
336 for worker in termworkers:
337 logging.debug("Waiting for thread %s", worker.getName())
342 # Remove terminated threads. This could be done in a more efficient way
343 # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
344 # don't leave zombie threads around.
345 for worker in termworkers:
346 assert worker in self._termworkers, ("Worker not in list of"
347 " terminating workers")
348 if not worker.isAlive():
349 self._termworkers.remove(worker)
351 assert not self._termworkers, "Zombie worker detected"
353 elif current_count < num_workers:
354 # Create (num_workers - current_count) new workers
355 for _ in range(num_workers - current_count):
356 worker = self._worker_class(self, self._NewWorkerIdUnlocked())
357 self._workers.append(worker)
360 def Resize(self, num_workers):
361 """Changes the number of workers in the pool.
363 @param num_workers: the new number of workers
368 return self._ResizeUnlocked(num_workers)
372 def TerminateWorkers(self):
373 """Terminate all worker threads.
375 Unstarted tasks will be ignored.
378 logging.debug("Terminating all workers")
382 self._ResizeUnlocked(0)
385 logging.debug("There are %s tasks left", len(self._tasks))
389 logging.debug("All workers terminated")