4 # Copyright (C) 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Base classes for worker pools.
30 from ganeti import compat
31 from ganeti import errors
38 class DeferTask(Exception):
39 """Special exception class to defer a task.
41 This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
42 task. Optionally, the priority of the task can be changed.
45 def __init__(self, priority=None):
46 """Initializes this class.
48 @type priority: number
49 @param priority: New task priority (None means no change)
52 Exception.__init__(self)
53 self.priority = priority
56 class BaseWorker(threading.Thread, object):
57 """Base worker class for worker pools.
59 Users of a worker pool must override RunTask in a subclass.
62 # pylint: disable=W0212
63 def __init__(self, pool, worker_id):
64 """Constructor for BaseWorker thread.
66 @param pool: the parent worker pool
67 @param worker_id: identifier for this worker
70 super(BaseWorker, self).__init__(name=worker_id)
72 self._worker_id = worker_id
73 self._current_task = None
75 assert self.getName() == worker_id
77 def ShouldTerminate(self):
78 """Returns whether this worker should terminate.
80 Should only be called from within L{RunTask}.
83 self.pool._lock.acquire()
85 assert self._HasRunningTaskUnlocked()
86 return self.pool._ShouldWorkerTerminateUnlocked(self)
88 self.pool._lock.release()
90 def GetCurrentPriority(self):
91 """Returns the priority of the current task.
93 Should only be called from within L{RunTask}.
96 self.pool._lock.acquire()
98 assert self._HasRunningTaskUnlocked()
100 (priority, _, _) = self._current_task
104 self.pool._lock.release()
106 def SetTaskName(self, taskname):
107 """Sets the name of the current task.
109 Should only be called from within L{RunTask}.
111 @type taskname: string
112 @param taskname: Task's name
116 name = "%s/%s" % (self._worker_id, taskname)
118 name = self._worker_id
123 def _HasRunningTaskUnlocked(self):
124 """Returns whether this worker is currently running a task.
127 return (self._current_task is not None)
130 """Main thread function.
132 Waits for new tasks to show up in the queue.
138 assert self._current_task is None
142 # Wait on lock to be told either to terminate or to do a task
145 task = pool._WaitForTaskUnlocked(self)
147 if task is _TERMINATE:
152 # Spurious notification, ignore
155 self._current_task = task
157 # No longer needed, dispose of reference
160 assert self._HasRunningTaskUnlocked()
165 (priority, _, args) = self._current_task
167 # Run the actual task
169 logging.debug("Starting task %r, priority %s", args, priority)
170 assert self.getName() == self._worker_id
172 self.RunTask(*args) # pylint: disable=W0142
174 self.SetTaskName(None)
175 logging.debug("Done with task %r, priority %s", args, priority)
176 except DeferTask, err:
179 if defer.priority is None:
181 defer.priority = priority
183 logging.debug("Deferring task %r, new priority %s",
184 args, defer.priority)
186 assert self._HasRunningTaskUnlocked()
187 except: # pylint: disable=W0702
188 logging.exception("Caught unhandled exception")
190 assert self._HasRunningTaskUnlocked()
196 assert self._current_task
197 # Schedule again for later run
198 (_, _, args) = self._current_task
199 pool._AddTaskUnlocked(args, defer.priority)
201 if self._current_task:
202 self._current_task = None
203 pool._worker_to_pool.notifyAll()
207 assert not self._HasRunningTaskUnlocked()
209 logging.debug("Terminates")
211 def RunTask(self, *args):
212 """Function called to start a task.
214 This needs to be implemented by child classes.
217 raise NotImplementedError()
220 class WorkerPool(object):
221 """Worker pool with a queue.
223 This class is thread-safe.
225 Tasks are guaranteed to be started in the order in which they're
226 added to the pool. Due to the nature of threading, they're not
227 guaranteed to finish in the same order.
230 def __init__(self, name, num_workers, worker_class):
231 """Constructor for worker pool.
233 @param num_workers: number of workers to be started
234 (dynamic resizing is not yet implemented)
235 @param worker_class: the class to be instantiated for workers;
236 should derive from L{BaseWorker}
239 # Some of these variables are accessed by BaseWorker
240 self._lock = threading.Lock()
241 self._pool_to_pool = threading.Condition(self._lock)
242 self._pool_to_worker = threading.Condition(self._lock)
243 self._worker_to_pool = threading.Condition(self._lock)
244 self._worker_class = worker_class
246 self._last_worker_id = 0
248 self._quiescing = False
251 # Terminating workers
252 self._termworkers = []
259 self.Resize(num_workers)
261 # TODO: Implement dynamic resizing?
263 def _WaitWhileQuiescingUnlocked(self):
264 """Wait until the worker pool has finished quiescing.
267 while self._quiescing:
268 self._pool_to_pool.wait()
270 def _AddTaskUnlocked(self, args, priority):
271 """Adds a task to the internal queue.
274 @param args: Arguments passed to L{BaseWorker.RunTask}
275 @type priority: number
276 @param priority: Task priority
279 assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
280 assert isinstance(priority, (int, long)), "Priority must be numeric"
282 # This counter is used to ensure elements are processed in their
283 # incoming order. For processing they're sorted by priority and then
287 heapq.heappush(self._tasks, (priority, self._counter, args))
289 # Notify a waiting worker
290 self._pool_to_worker.notify()
292 def AddTask(self, args, priority=_DEFAULT_PRIORITY):
293 """Adds a task to the queue.
296 @param args: arguments passed to L{BaseWorker.RunTask}
297 @type priority: number
298 @param priority: Task priority
303 self._WaitWhileQuiescingUnlocked()
304 self._AddTaskUnlocked(args, priority)
308 def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY):
309 """Add a list of tasks to the queue.
311 @type tasks: list of tuples
312 @param tasks: list of args passed to L{BaseWorker.RunTask}
313 @type priority: number or list of numbers
314 @param priority: Priority for all added tasks or a list with the priority
318 assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
319 "Each task must be a sequence"
321 assert (isinstance(priority, (int, long)) or
322 compat.all(isinstance(prio, (int, long)) for prio in priority)), \
323 "Priority must be numeric or be a list of numeric values"
325 if isinstance(priority, (int, long)):
326 priority = [priority] * len(tasks)
327 elif len(priority) != len(tasks):
328 raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
329 " number of tasks (%s)" %
330 (len(priority), len(tasks)))
334 self._WaitWhileQuiescingUnlocked()
336 assert compat.all(isinstance(prio, (int, long)) for prio in priority)
337 assert len(tasks) == len(priority)
339 for args, priority in zip(tasks, priority):
340 self._AddTaskUnlocked(args, priority)
344 def SetActive(self, active):
345 """Enable/disable processing of tasks.
347 This is different from L{Quiesce} in the sense that this function just
348 changes an internal flag and doesn't wait for the queue to be empty. Tasks
349 already being processed continue normally, but no new tasks will be
350 started. New tasks can still be added.
353 @param active: Whether tasks should be processed
358 self._active = active
361 # Tell all workers to continue processing
362 self._pool_to_worker.notifyAll()
366 def _WaitForTaskUnlocked(self, worker):
367 """Waits for a task for a worker.
369 @type worker: L{BaseWorker}
370 @param worker: Worker thread
373 if self._ShouldWorkerTerminateUnlocked(worker):
376 # We only wait if there's no task for us.
377 if not (self._active and self._tasks):
378 logging.debug("Waiting for tasks")
381 # wait() releases the lock and sleeps until notified
382 self._pool_to_worker.wait()
384 logging.debug("Notified while waiting")
386 # Were we woken up in order to terminate?
387 if self._ShouldWorkerTerminateUnlocked(worker):
390 # Just loop if pool is not processing tasks at this time
391 if self._active and self._tasks:
394 # Get task from queue and tell pool about it
396 return heapq.heappop(self._tasks)
398 self._worker_to_pool.notifyAll()
400 def _ShouldWorkerTerminateUnlocked(self, worker):
401 """Returns whether a worker should terminate.
404 return (worker in self._termworkers)
406 def _HasRunningTasksUnlocked(self):
407 """Checks whether there's a task running in a worker.
410 for worker in self._workers + self._termworkers:
411 if worker._HasRunningTaskUnlocked(): # pylint: disable=W0212
415 def HasRunningTasks(self):
416 """Checks whether there's at least one task running.
421 return self._HasRunningTasksUnlocked()
426 """Waits until the task queue is empty.
431 self._quiescing = True
433 # Wait while there are tasks pending or running
434 while self._tasks or self._HasRunningTasksUnlocked():
435 self._worker_to_pool.wait()
438 self._quiescing = False
440 # Make sure AddTasks continues in case it was waiting
441 self._pool_to_pool.notifyAll()
445 def _NewWorkerIdUnlocked(self):
446 """Return an identifier for a new worker.
449 self._last_worker_id += 1
451 return "%s%d" % (self._name, self._last_worker_id)
453 def _ResizeUnlocked(self, num_workers):
454 """Changes the number of workers.
457 assert num_workers >= 0, "num_workers must be >= 0"
459 logging.debug("Resizing to %s workers", num_workers)
461 current_count = len(self._workers)
463 if current_count == num_workers:
467 elif current_count > num_workers:
469 # Create copy of list to iterate over while lock isn't held.
470 termworkers = self._workers[:]
473 # TODO: Implement partial downsizing
474 raise NotImplementedError()
477 self._termworkers += termworkers
479 # Notify workers that something has changed
480 self._pool_to_worker.notifyAll()
482 # Join all terminating workers
485 for worker in termworkers:
486 logging.debug("Waiting for thread %s", worker.getName())
491 # Remove terminated threads. This could be done in a more efficient way
492 # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
493 # don't leave zombie threads around.
494 for worker in termworkers:
495 assert worker in self._termworkers, ("Worker not in list of"
496 " terminating workers")
497 if not worker.isAlive():
498 self._termworkers.remove(worker)
500 assert not self._termworkers, "Zombie worker detected"
502 elif current_count < num_workers:
503 # Create (num_workers - current_count) new workers
504 for _ in range(num_workers - current_count):
505 worker = self._worker_class(self, self._NewWorkerIdUnlocked())
506 self._workers.append(worker)
509 def Resize(self, num_workers):
510 """Changes the number of workers in the pool.
512 @param num_workers: the new number of workers
517 return self._ResizeUnlocked(num_workers)
521 def TerminateWorkers(self):
522 """Terminate all worker threads.
524 Unstarted tasks will be ignored.
527 logging.debug("Terminating all workers")
531 self._ResizeUnlocked(0)
534 logging.debug("There are %s tasks left", len(self._tasks))
538 logging.debug("All workers terminated")