4 # Copyright (C) 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Base classes for worker pools.
30 from ganeti import errors
31 from ganeti import utils
34 class BaseWorker(threading.Thread, object):
35 """Base worker class for worker pools.
37 Users of a worker pool must override RunTask in a subclass.
40 def __init__(self, pool, worker_id):
41 """Constructor for BaseWorker thread.
44 - pool: Parent worker pool
45 - worker_id: Identifier for this worker
48 super(BaseWorker, self).__init__()
50 self.worker_id = worker_id
51 self._current_task = None
53 def ShouldTerminate(self):
54 """Returns whether a worker should terminate.
57 return self.pool.ShouldWorkerTerminate(self)
59 def _HasRunningTaskUnlocked(self):
60 """Returns whether this worker is currently running a task.
63 return (self._current_task is not None)
65 def HasRunningTask(self):
66 """Returns whether this worker is currently running a task.
69 self.pool._lock.acquire()
71 return self._HasRunningTaskUnlocked()
73 self.pool._lock.release()
76 """Main thread function.
78 Waits for new tasks to show up in the queue.
83 assert not self.HasRunningTask()
87 # We wait on lock to be told either terminate or do a task.
90 if pool._ShouldWorkerTerminateUnlocked(self):
93 # We only wait if there's no task for us.
95 logging.debug("Worker %s: waiting for tasks", self.worker_id)
97 # wait() releases the lock and sleeps until notified
100 logging.debug("Worker %s: notified while waiting", self.worker_id)
102 # Were we woken up in order to terminate?
103 if pool._ShouldWorkerTerminateUnlocked(self):
107 # Spurious notification, ignore
110 # Get task from queue and tell pool about it
112 self._current_task = pool._tasks.popleft()
114 pool._lock.notifyAll()
118 # Run the actual task
120 logging.debug("Worker %s: starting task %r",
121 self.worker_id, self._current_task)
122 self.RunTask(*self._current_task)
123 logging.debug("Worker %s: done with task %r",
124 self.worker_id, self._current_task)
126 logging.error("Worker %s: Caught unhandled exception",
127 self.worker_id, exc_info=True)
132 if self._current_task:
133 self._current_task = None
134 pool._lock.notifyAll()
138 logging.debug("Worker %s: terminates", self.worker_id)
140 def RunTask(self, *args):
141 """Function called to start a task.
144 raise NotImplementedError()
147 class WorkerPool(object):
148 """Worker pool with a queue.
150 This class is thread-safe.
152 Tasks are guaranteed to be started in the order in which they're added to the
153 pool. Due to the nature of threading, they're not guaranteed to finish in the
157 def __init__(self, num_workers, worker_class):
158 """Constructor for worker pool.
161 - num_workers: Number of workers to be started (dynamic resizing is not
163 - worker_class: Class to be instantiated for workers; should derive from
167 # Some of these variables are accessed by BaseWorker
168 self._lock = threading.Condition(threading.Lock())
169 self._worker_class = worker_class
170 self._last_worker_id = 0
172 self._quiescing = False
174 # Terminating workers
175 self._termworkers = []
178 self._tasks = collections.deque()
181 self.Resize(num_workers)
183 # TODO: Implement dynamic resizing?
185 def AddTask(self, *args):
186 """Adds a task to the queue.
189 - *args: Arguments passed to BaseWorker.RunTask
194 # Don't add new tasks while we're quiescing
195 while self._quiescing:
198 # Add task to internal queue
199 self._tasks.append(args)
204 def _ShouldWorkerTerminateUnlocked(self, worker):
205 """Returns whether a worker should terminate.
208 return (worker in self._termworkers)
210 def ShouldWorkerTerminate(self, worker):
211 """Returns whether a worker should terminate.
216 return self._ShouldWorkerTerminateUnlocked(self)
220 def _HasRunningTasksUnlocked(self):
221 """Checks whether there's a task running in a worker.
224 for worker in self._workers + self._termworkers:
225 if worker._HasRunningTaskUnlocked():
230 """Waits until the task queue is empty.
235 self._quiescing = True
237 # Wait while there are tasks pending or running
238 while self._tasks or self._HasRunningTasksUnlocked():
242 self._quiescing = False
244 # Make sure AddTasks continues in case it was waiting
245 self._lock.notifyAll()
249 def _NewWorkerIdUnlocked(self):
250 self._last_worker_id += 1
251 return self._last_worker_id
253 def _ResizeUnlocked(self, num_workers):
254 """Changes the number of workers.
257 assert num_workers >= 0, "num_workers must be >= 0"
259 logging.debug("Resizing to %s workers", num_workers)
261 current_count = len(self._workers)
263 if current_count == num_workers:
267 elif current_count > num_workers:
269 # Create copy of list to iterate over while lock isn't held.
270 termworkers = self._workers[:]
273 # TODO: Implement partial downsizing
274 raise NotImplementedError()
277 self._termworkers += termworkers
279 # Notify workers that something has changed
280 self._lock.notifyAll()
282 # Join all terminating workers
285 for worker in termworkers:
290 # Remove terminated threads. This could be done in a more efficient way
291 # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
292 # don't leave zombie threads around.
293 for worker in termworkers:
294 assert worker in self._termworkers, ("Worker not in list of"
295 " terminating workers")
296 if not worker.isAlive():
297 self._termworkers.remove(worker)
299 assert not self._termworkers, "Zombie worker detected"
301 elif current_count < num_workers:
302 # Create (num_workers - current_count) new workers
303 for i in xrange(num_workers - current_count):
304 worker = self._worker_class(self, self._NewWorkerIdUnlocked())
305 self._workers.append(worker)
308 def Resize(self, num_workers):
309 """Changes the number of workers in the pool.
312 - num_workers: New number of workers
317 return self._ResizeUnlocked(num_workers)
321 def TerminateWorkers(self):
322 """Terminate all worker threads.
324 Unstarted tasks will be ignored.
327 logging.debug("Terminating all workers")
331 self._ResizeUnlocked(0)
334 logging.debug("There are %s tasks left", len(self._tasks))
338 logging.debug("All workers terminated")