4 # Copyright (C) 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Base classes for worker pools.
30 from ganeti import compat
36 class BaseWorker(threading.Thread, object):
37 """Base worker class for worker pools.
39 Users of a worker pool must override RunTask in a subclass.
42 # pylint: disable-msg=W0212
43 def __init__(self, pool, worker_id):
44 """Constructor for BaseWorker thread.
46 @param pool: the parent worker pool
47 @param worker_id: identifier for this worker
50 super(BaseWorker, self).__init__(name=worker_id)
52 self._worker_id = worker_id
53 self._current_task = None
55 assert self.getName() == worker_id
57 def ShouldTerminate(self):
58 """Returns whether this worker should terminate.
60 Should only be called from within L{RunTask}.
63 self.pool._lock.acquire()
65 assert self._HasRunningTaskUnlocked()
66 return self.pool._ShouldWorkerTerminateUnlocked(self)
68 self.pool._lock.release()
70 def SetTaskName(self, taskname):
71 """Sets the name of the current task.
73 Should only be called from within L{RunTask}.
75 @type taskname: string
76 @param taskname: Task's name
80 name = "%s/%s" % (self._worker_id, taskname)
82 name = self._worker_id
87 def _HasRunningTaskUnlocked(self):
88 """Returns whether this worker is currently running a task.
91 return (self._current_task is not None)
94 """Main thread function.
96 Waits for new tasks to show up in the queue.
102 assert self._current_task is None
104 # Wait on lock to be told either to terminate or to do a task
107 task = pool._WaitForTaskUnlocked(self)
109 if task is _TERMINATE:
114 # Spurious notification, ignore
117 self._current_task = task
119 # No longer needed, dispose of reference
122 assert self._HasRunningTaskUnlocked()
127 # Run the actual task
129 logging.debug("Starting task %r", self._current_task)
130 assert self.getName() == self._worker_id
132 self.RunTask(*self._current_task)
134 self.SetTaskName(None)
135 logging.debug("Done with task %r", self._current_task)
136 except: # pylint: disable-msg=W0702
137 logging.exception("Caught unhandled exception")
139 assert self._HasRunningTaskUnlocked()
144 if self._current_task:
145 self._current_task = None
146 pool._worker_to_pool.notifyAll()
150 assert not self._HasRunningTaskUnlocked()
152 logging.debug("Terminates")
154 def RunTask(self, *args):
155 """Function called to start a task.
157 This needs to be implemented by child classes.
160 raise NotImplementedError()
163 class WorkerPool(object):
164 """Worker pool with a queue.
166 This class is thread-safe.
168 Tasks are guaranteed to be started in the order in which they're
169 added to the pool. Due to the nature of threading, they're not
170 guaranteed to finish in the same order.
173 def __init__(self, name, num_workers, worker_class):
174 """Constructor for worker pool.
176 @param num_workers: number of workers to be started
177 (dynamic resizing is not yet implemented)
178 @param worker_class: the class to be instantiated for workers;
179 should derive from L{BaseWorker}
182 # Some of these variables are accessed by BaseWorker
183 self._lock = threading.Lock()
184 self._pool_to_pool = threading.Condition(self._lock)
185 self._pool_to_worker = threading.Condition(self._lock)
186 self._worker_to_pool = threading.Condition(self._lock)
187 self._worker_class = worker_class
189 self._last_worker_id = 0
191 self._quiescing = False
193 # Terminating workers
194 self._termworkers = []
197 self._tasks = collections.deque()
200 self.Resize(num_workers)
202 # TODO: Implement dynamic resizing?
204 def _WaitWhileQuiescingUnlocked(self):
205 """Wait until the worker pool has finished quiescing.
208 while self._quiescing:
209 self._pool_to_pool.wait()
211 def _AddTaskUnlocked(self, args):
212 assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
214 self._tasks.append(args)
216 # Notify a waiting worker
217 self._pool_to_worker.notify()
219 def AddTask(self, args):
220 """Adds a task to the queue.
223 @param args: arguments passed to L{BaseWorker.RunTask}
228 self._WaitWhileQuiescingUnlocked()
229 self._AddTaskUnlocked(args)
233 def AddManyTasks(self, tasks):
234 """Add a list of tasks to the queue.
236 @type tasks: list of tuples
237 @param tasks: list of args passed to L{BaseWorker.RunTask}
240 assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
241 "Each task must be a sequence"
245 self._WaitWhileQuiescingUnlocked()
248 self._AddTaskUnlocked(args)
252 def _WaitForTaskUnlocked(self, worker):
253 """Waits for a task for a worker.
255 @type worker: L{BaseWorker}
256 @param worker: Worker thread
259 if self._ShouldWorkerTerminateUnlocked(worker):
262 # We only wait if there's no task for us.
264 logging.debug("Waiting for tasks")
266 # wait() releases the lock and sleeps until notified
267 self._pool_to_worker.wait()
269 logging.debug("Notified while waiting")
271 # Were we woken up in order to terminate?
272 if self._ShouldWorkerTerminateUnlocked(worker):
276 # Spurious notification, ignore
279 # Get task from queue and tell pool about it
281 return self._tasks.popleft()
283 self._worker_to_pool.notifyAll()
285 def _ShouldWorkerTerminateUnlocked(self, worker):
286 """Returns whether a worker should terminate.
289 return (worker in self._termworkers)
291 def _HasRunningTasksUnlocked(self):
292 """Checks whether there's a task running in a worker.
295 for worker in self._workers + self._termworkers:
296 if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
301 """Waits until the task queue is empty.
306 self._quiescing = True
308 # Wait while there are tasks pending or running
309 while self._tasks or self._HasRunningTasksUnlocked():
310 self._worker_to_pool.wait()
313 self._quiescing = False
315 # Make sure AddTasks continues in case it was waiting
316 self._pool_to_pool.notifyAll()
320 def _NewWorkerIdUnlocked(self):
321 """Return an identifier for a new worker.
324 self._last_worker_id += 1
326 return "%s%d" % (self._name, self._last_worker_id)
328 def _ResizeUnlocked(self, num_workers):
329 """Changes the number of workers.
332 assert num_workers >= 0, "num_workers must be >= 0"
334 logging.debug("Resizing to %s workers", num_workers)
336 current_count = len(self._workers)
338 if current_count == num_workers:
342 elif current_count > num_workers:
344 # Create copy of list to iterate over while lock isn't held.
345 termworkers = self._workers[:]
348 # TODO: Implement partial downsizing
349 raise NotImplementedError()
352 self._termworkers += termworkers
354 # Notify workers that something has changed
355 self._pool_to_worker.notifyAll()
357 # Join all terminating workers
360 for worker in termworkers:
361 logging.debug("Waiting for thread %s", worker.getName())
366 # Remove terminated threads. This could be done in a more efficient way
367 # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
368 # don't leave zombie threads around.
369 for worker in termworkers:
370 assert worker in self._termworkers, ("Worker not in list of"
371 " terminating workers")
372 if not worker.isAlive():
373 self._termworkers.remove(worker)
375 assert not self._termworkers, "Zombie worker detected"
377 elif current_count < num_workers:
378 # Create (num_workers - current_count) new workers
379 for _ in range(num_workers - current_count):
380 worker = self._worker_class(self, self._NewWorkerIdUnlocked())
381 self._workers.append(worker)
384 def Resize(self, num_workers):
385 """Changes the number of workers in the pool.
387 @param num_workers: the new number of workers
392 return self._ResizeUnlocked(num_workers)
396 def TerminateWorkers(self):
397 """Terminate all worker threads.
399 Unstarted tasks will be ignored.
402 logging.debug("Terminating all workers")
406 self._ResizeUnlocked(0)
409 logging.debug("There are %s tasks left", len(self._tasks))
413 logging.debug("All workers terminated")