Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ b399ce1e

History | View | Annotate | Download (9 kB)

1 76094e37 Michael Hanselmann
#
2 76094e37 Michael Hanselmann
#
3 76094e37 Michael Hanselmann
4 76094e37 Michael Hanselmann
# Copyright (C) 2008 Google Inc.
5 76094e37 Michael Hanselmann
#
6 76094e37 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 76094e37 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 76094e37 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 76094e37 Michael Hanselmann
# (at your option) any later version.
10 76094e37 Michael Hanselmann
#
11 76094e37 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 76094e37 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 76094e37 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 76094e37 Michael Hanselmann
# General Public License for more details.
15 76094e37 Michael Hanselmann
#
16 76094e37 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 76094e37 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 76094e37 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 76094e37 Michael Hanselmann
# 02110-1301, USA.
20 76094e37 Michael Hanselmann
21 76094e37 Michael Hanselmann
22 76094e37 Michael Hanselmann
"""Base classes for worker pools.
23 76094e37 Michael Hanselmann

24 76094e37 Michael Hanselmann
"""
25 76094e37 Michael Hanselmann
26 76094e37 Michael Hanselmann
import collections
27 76094e37 Michael Hanselmann
import logging
28 76094e37 Michael Hanselmann
import threading
29 76094e37 Michael Hanselmann
30 76094e37 Michael Hanselmann
31 76094e37 Michael Hanselmann
class BaseWorker(threading.Thread, object):
32 76094e37 Michael Hanselmann
  """Base worker class for worker pools.
33 76094e37 Michael Hanselmann

34 76094e37 Michael Hanselmann
  Users of a worker pool must override RunTask in a subclass.
35 76094e37 Michael Hanselmann

36 76094e37 Michael Hanselmann
  """
37 76094e37 Michael Hanselmann
  def __init__(self, pool, worker_id):
38 76094e37 Michael Hanselmann
    """Constructor for BaseWorker thread.
39 76094e37 Michael Hanselmann

40 116db7c7 Iustin Pop
    @param pool: the parent worker pool
41 116db7c7 Iustin Pop
    @param worker_id: identifier for this worker
42 76094e37 Michael Hanselmann

43 76094e37 Michael Hanselmann
    """
44 76094e37 Michael Hanselmann
    super(BaseWorker, self).__init__()
45 76094e37 Michael Hanselmann
    self.pool = pool
46 76094e37 Michael Hanselmann
    self.worker_id = worker_id
47 76094e37 Michael Hanselmann
    self._current_task = None
48 76094e37 Michael Hanselmann
49 76094e37 Michael Hanselmann
  def ShouldTerminate(self):
50 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
51 76094e37 Michael Hanselmann

52 76094e37 Michael Hanselmann
    """
53 76094e37 Michael Hanselmann
    return self.pool.ShouldWorkerTerminate(self)
54 76094e37 Michael Hanselmann
55 b3558df1 Michael Hanselmann
  def _HasRunningTaskUnlocked(self):
56 b3558df1 Michael Hanselmann
    """Returns whether this worker is currently running a task.
57 b3558df1 Michael Hanselmann

58 b3558df1 Michael Hanselmann
    """
59 b3558df1 Michael Hanselmann
    return (self._current_task is not None)
60 b3558df1 Michael Hanselmann
61 b3558df1 Michael Hanselmann
  def HasRunningTask(self):
62 b3558df1 Michael Hanselmann
    """Returns whether this worker is currently running a task.
63 b3558df1 Michael Hanselmann

64 b3558df1 Michael Hanselmann
    """
65 b3558df1 Michael Hanselmann
    self.pool._lock.acquire()
66 b3558df1 Michael Hanselmann
    try:
67 b3558df1 Michael Hanselmann
      return self._HasRunningTaskUnlocked()
68 b3558df1 Michael Hanselmann
    finally:
69 b3558df1 Michael Hanselmann
      self.pool._lock.release()
70 b3558df1 Michael Hanselmann
71 76094e37 Michael Hanselmann
  def run(self):
72 76094e37 Michael Hanselmann
    """Main thread function.
73 76094e37 Michael Hanselmann

74 76094e37 Michael Hanselmann
    Waits for new tasks to show up in the queue.
75 76094e37 Michael Hanselmann

76 76094e37 Michael Hanselmann
    """
77 76094e37 Michael Hanselmann
    pool = self.pool
78 76094e37 Michael Hanselmann
79 b3558df1 Michael Hanselmann
    assert not self.HasRunningTask()
80 76094e37 Michael Hanselmann
81 76094e37 Michael Hanselmann
    while True:
82 76094e37 Michael Hanselmann
      try:
83 76094e37 Michael Hanselmann
        # We wait on lock to be told either terminate or do a task.
84 76094e37 Michael Hanselmann
        pool._lock.acquire()
85 76094e37 Michael Hanselmann
        try:
86 76094e37 Michael Hanselmann
          if pool._ShouldWorkerTerminateUnlocked(self):
87 76094e37 Michael Hanselmann
            break
88 76094e37 Michael Hanselmann
89 76094e37 Michael Hanselmann
          # We only wait if there's no task for us.
90 76094e37 Michael Hanselmann
          if not pool._tasks:
91 b3558df1 Michael Hanselmann
            logging.debug("Worker %s: waiting for tasks", self.worker_id)
92 b3558df1 Michael Hanselmann
93 76094e37 Michael Hanselmann
            # wait() releases the lock and sleeps until notified
94 53b1d12b Michael Hanselmann
            pool._pool_to_worker.wait()
95 76094e37 Michael Hanselmann
96 b3558df1 Michael Hanselmann
            logging.debug("Worker %s: notified while waiting", self.worker_id)
97 b3558df1 Michael Hanselmann
98 76094e37 Michael Hanselmann
            # Were we woken up in order to terminate?
99 76094e37 Michael Hanselmann
            if pool._ShouldWorkerTerminateUnlocked(self):
100 76094e37 Michael Hanselmann
              break
101 76094e37 Michael Hanselmann
102 76094e37 Michael Hanselmann
            if not pool._tasks:
103 76094e37 Michael Hanselmann
              # Spurious notification, ignore
104 76094e37 Michael Hanselmann
              continue
105 76094e37 Michael Hanselmann
106 76094e37 Michael Hanselmann
          # Get task from queue and tell pool about it
107 76094e37 Michael Hanselmann
          try:
108 76094e37 Michael Hanselmann
            self._current_task = pool._tasks.popleft()
109 76094e37 Michael Hanselmann
          finally:
110 53b1d12b Michael Hanselmann
            pool._worker_to_pool.notifyAll()
111 76094e37 Michael Hanselmann
        finally:
112 76094e37 Michael Hanselmann
          pool._lock.release()
113 76094e37 Michael Hanselmann
114 76094e37 Michael Hanselmann
        # Run the actual task
115 76094e37 Michael Hanselmann
        try:
116 b3558df1 Michael Hanselmann
          logging.debug("Worker %s: starting task %r",
117 b3558df1 Michael Hanselmann
                        self.worker_id, self._current_task)
118 76094e37 Michael Hanselmann
          self.RunTask(*self._current_task)
119 b3558df1 Michael Hanselmann
          logging.debug("Worker %s: done with task %r",
120 b3558df1 Michael Hanselmann
                        self.worker_id, self._current_task)
121 76094e37 Michael Hanselmann
        except:
122 76094e37 Michael Hanselmann
          logging.error("Worker %s: Caught unhandled exception",
123 76094e37 Michael Hanselmann
                        self.worker_id, exc_info=True)
124 76094e37 Michael Hanselmann
      finally:
125 76094e37 Michael Hanselmann
        # Notify pool
126 76094e37 Michael Hanselmann
        pool._lock.acquire()
127 76094e37 Michael Hanselmann
        try:
128 b3558df1 Michael Hanselmann
          if self._current_task:
129 b3558df1 Michael Hanselmann
            self._current_task = None
130 53b1d12b Michael Hanselmann
            pool._worker_to_pool.notifyAll()
131 76094e37 Michael Hanselmann
        finally:
132 76094e37 Michael Hanselmann
          pool._lock.release()
133 76094e37 Michael Hanselmann
134 b3558df1 Michael Hanselmann
    logging.debug("Worker %s: terminates", self.worker_id)
135 b3558df1 Michael Hanselmann
136 76094e37 Michael Hanselmann
  def RunTask(self, *args):
137 76094e37 Michael Hanselmann
    """Function called to start a task.
138 76094e37 Michael Hanselmann

139 116db7c7 Iustin Pop
    This needs to be implemented by child classes.
140 116db7c7 Iustin Pop

141 76094e37 Michael Hanselmann
    """
142 76094e37 Michael Hanselmann
    raise NotImplementedError()
143 76094e37 Michael Hanselmann
144 76094e37 Michael Hanselmann
145 76094e37 Michael Hanselmann
class WorkerPool(object):
146 76094e37 Michael Hanselmann
  """Worker pool with a queue.
147 76094e37 Michael Hanselmann

148 76094e37 Michael Hanselmann
  This class is thread-safe.
149 76094e37 Michael Hanselmann

150 116db7c7 Iustin Pop
  Tasks are guaranteed to be started in the order in which they're
151 116db7c7 Iustin Pop
  added to the pool. Due to the nature of threading, they're not
152 116db7c7 Iustin Pop
  guaranteed to finish in the same order.
153 76094e37 Michael Hanselmann

154 76094e37 Michael Hanselmann
  """
155 76094e37 Michael Hanselmann
  def __init__(self, num_workers, worker_class):
156 76094e37 Michael Hanselmann
    """Constructor for worker pool.
157 76094e37 Michael Hanselmann

158 116db7c7 Iustin Pop
    @param num_workers: number of workers to be started
159 116db7c7 Iustin Pop
        (dynamic resizing is not yet implemented)
160 116db7c7 Iustin Pop
    @param worker_class: the class to be instantiated for workers;
161 116db7c7 Iustin Pop
        should derive from L{BaseWorker}
162 76094e37 Michael Hanselmann

163 76094e37 Michael Hanselmann
    """
164 76094e37 Michael Hanselmann
    # Some of these variables are accessed by BaseWorker
165 53b1d12b Michael Hanselmann
    self._lock = threading.Lock()
166 53b1d12b Michael Hanselmann
    self._pool_to_pool = threading.Condition(self._lock)
167 53b1d12b Michael Hanselmann
    self._pool_to_worker = threading.Condition(self._lock)
168 53b1d12b Michael Hanselmann
    self._worker_to_pool = threading.Condition(self._lock)
169 76094e37 Michael Hanselmann
    self._worker_class = worker_class
170 76094e37 Michael Hanselmann
    self._last_worker_id = 0
171 76094e37 Michael Hanselmann
    self._workers = []
172 76094e37 Michael Hanselmann
    self._quiescing = False
173 76094e37 Michael Hanselmann
174 76094e37 Michael Hanselmann
    # Terminating workers
175 76094e37 Michael Hanselmann
    self._termworkers = []
176 76094e37 Michael Hanselmann
177 76094e37 Michael Hanselmann
    # Queued tasks
178 76094e37 Michael Hanselmann
    self._tasks = collections.deque()
179 76094e37 Michael Hanselmann
180 76094e37 Michael Hanselmann
    # Start workers
181 76094e37 Michael Hanselmann
    self.Resize(num_workers)
182 76094e37 Michael Hanselmann
183 76094e37 Michael Hanselmann
  # TODO: Implement dynamic resizing?
184 76094e37 Michael Hanselmann
185 76094e37 Michael Hanselmann
  def AddTask(self, *args):
186 76094e37 Michael Hanselmann
    """Adds a task to the queue.
187 76094e37 Michael Hanselmann

188 116db7c7 Iustin Pop
    @param args: arguments passed to L{BaseWorker.RunTask}
189 76094e37 Michael Hanselmann

190 76094e37 Michael Hanselmann
    """
191 76094e37 Michael Hanselmann
    self._lock.acquire()
192 76094e37 Michael Hanselmann
    try:
193 76094e37 Michael Hanselmann
      # Don't add new tasks while we're quiescing
194 76094e37 Michael Hanselmann
      while self._quiescing:
195 53b1d12b Michael Hanselmann
        self._pool_to_pool.wait()
196 76094e37 Michael Hanselmann
197 76094e37 Michael Hanselmann
      # Add task to internal queue
198 76094e37 Michael Hanselmann
      self._tasks.append(args)
199 53b1d12b Michael Hanselmann
200 53b1d12b Michael Hanselmann
      # Wake one idling worker up
201 53b1d12b Michael Hanselmann
      self._pool_to_worker.notify()
202 76094e37 Michael Hanselmann
    finally:
203 76094e37 Michael Hanselmann
      self._lock.release()
204 76094e37 Michael Hanselmann
205 76094e37 Michael Hanselmann
  def _ShouldWorkerTerminateUnlocked(self, worker):
206 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
207 76094e37 Michael Hanselmann

208 76094e37 Michael Hanselmann
    """
209 76094e37 Michael Hanselmann
    return (worker in self._termworkers)
210 76094e37 Michael Hanselmann
211 76094e37 Michael Hanselmann
  def ShouldWorkerTerminate(self, worker):
212 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
213 76094e37 Michael Hanselmann

214 76094e37 Michael Hanselmann
    """
215 76094e37 Michael Hanselmann
    self._lock.acquire()
216 76094e37 Michael Hanselmann
    try:
217 805f0c07 Iustin Pop
      return self._ShouldWorkerTerminateUnlocked(worker)
218 76094e37 Michael Hanselmann
    finally:
219 76094e37 Michael Hanselmann
      self._lock.release()
220 76094e37 Michael Hanselmann
221 76094e37 Michael Hanselmann
  def _HasRunningTasksUnlocked(self):
222 76094e37 Michael Hanselmann
    """Checks whether there's a task running in a worker.
223 76094e37 Michael Hanselmann

224 76094e37 Michael Hanselmann
    """
225 76094e37 Michael Hanselmann
    for worker in self._workers + self._termworkers:
226 b3558df1 Michael Hanselmann
      if worker._HasRunningTaskUnlocked():
227 76094e37 Michael Hanselmann
        return True
228 76094e37 Michael Hanselmann
    return False
229 76094e37 Michael Hanselmann
230 76094e37 Michael Hanselmann
  def Quiesce(self):
231 76094e37 Michael Hanselmann
    """Waits until the task queue is empty.
232 76094e37 Michael Hanselmann

233 76094e37 Michael Hanselmann
    """
234 76094e37 Michael Hanselmann
    self._lock.acquire()
235 76094e37 Michael Hanselmann
    try:
236 76094e37 Michael Hanselmann
      self._quiescing = True
237 76094e37 Michael Hanselmann
238 76094e37 Michael Hanselmann
      # Wait while there are tasks pending or running
239 76094e37 Michael Hanselmann
      while self._tasks or self._HasRunningTasksUnlocked():
240 53b1d12b Michael Hanselmann
        self._worker_to_pool.wait()
241 76094e37 Michael Hanselmann
242 76094e37 Michael Hanselmann
    finally:
243 76094e37 Michael Hanselmann
      self._quiescing = False
244 76094e37 Michael Hanselmann
245 76094e37 Michael Hanselmann
      # Make sure AddTasks continues in case it was waiting
246 53b1d12b Michael Hanselmann
      self._pool_to_pool.notifyAll()
247 76094e37 Michael Hanselmann
248 76094e37 Michael Hanselmann
      self._lock.release()
249 76094e37 Michael Hanselmann
250 76094e37 Michael Hanselmann
  def _NewWorkerIdUnlocked(self):
251 116db7c7 Iustin Pop
    """Return an identifier for a new worker.
252 116db7c7 Iustin Pop

253 116db7c7 Iustin Pop
    """
254 76094e37 Michael Hanselmann
    self._last_worker_id += 1
255 76094e37 Michael Hanselmann
    return self._last_worker_id
256 76094e37 Michael Hanselmann
257 76094e37 Michael Hanselmann
  def _ResizeUnlocked(self, num_workers):
258 76094e37 Michael Hanselmann
    """Changes the number of workers.
259 76094e37 Michael Hanselmann

260 76094e37 Michael Hanselmann
    """
261 76094e37 Michael Hanselmann
    assert num_workers >= 0, "num_workers must be >= 0"
262 76094e37 Michael Hanselmann
263 76094e37 Michael Hanselmann
    logging.debug("Resizing to %s workers", num_workers)
264 76094e37 Michael Hanselmann
265 76094e37 Michael Hanselmann
    current_count = len(self._workers)
266 76094e37 Michael Hanselmann
267 76094e37 Michael Hanselmann
    if current_count == num_workers:
268 76094e37 Michael Hanselmann
      # Nothing to do
269 76094e37 Michael Hanselmann
      pass
270 76094e37 Michael Hanselmann
271 76094e37 Michael Hanselmann
    elif current_count > num_workers:
272 76094e37 Michael Hanselmann
      if num_workers == 0:
273 76094e37 Michael Hanselmann
        # Create copy of list to iterate over while lock isn't held.
274 76094e37 Michael Hanselmann
        termworkers = self._workers[:]
275 76094e37 Michael Hanselmann
        del self._workers[:]
276 76094e37 Michael Hanselmann
      else:
277 76094e37 Michael Hanselmann
        # TODO: Implement partial downsizing
278 76094e37 Michael Hanselmann
        raise NotImplementedError()
279 76094e37 Michael Hanselmann
        #termworkers = ...
280 76094e37 Michael Hanselmann
281 76094e37 Michael Hanselmann
      self._termworkers += termworkers
282 76094e37 Michael Hanselmann
283 76094e37 Michael Hanselmann
      # Notify workers that something has changed
284 53b1d12b Michael Hanselmann
      self._pool_to_worker.notifyAll()
285 76094e37 Michael Hanselmann
286 76094e37 Michael Hanselmann
      # Join all terminating workers
287 76094e37 Michael Hanselmann
      self._lock.release()
288 76094e37 Michael Hanselmann
      try:
289 76094e37 Michael Hanselmann
        for worker in termworkers:
290 c0a8eb9e Michael Hanselmann
          logging.debug("Waiting for thread %s", worker.getName())
291 76094e37 Michael Hanselmann
          worker.join()
292 76094e37 Michael Hanselmann
      finally:
293 76094e37 Michael Hanselmann
        self._lock.acquire()
294 76094e37 Michael Hanselmann
295 76094e37 Michael Hanselmann
      # Remove terminated threads. This could be done in a more efficient way
296 76094e37 Michael Hanselmann
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
297 76094e37 Michael Hanselmann
      # don't leave zombie threads around.
298 76094e37 Michael Hanselmann
      for worker in termworkers:
299 76094e37 Michael Hanselmann
        assert worker in self._termworkers, ("Worker not in list of"
300 76094e37 Michael Hanselmann
                                             " terminating workers")
301 76094e37 Michael Hanselmann
        if not worker.isAlive():
302 76094e37 Michael Hanselmann
          self._termworkers.remove(worker)
303 76094e37 Michael Hanselmann
304 76094e37 Michael Hanselmann
      assert not self._termworkers, "Zombie worker detected"
305 76094e37 Michael Hanselmann
306 76094e37 Michael Hanselmann
    elif current_count < num_workers:
307 76094e37 Michael Hanselmann
      # Create (num_workers - current_count) new workers
308 ad1bf20c Iustin Pop
      for _ in xrange(num_workers - current_count):
309 76094e37 Michael Hanselmann
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
310 76094e37 Michael Hanselmann
        self._workers.append(worker)
311 76094e37 Michael Hanselmann
        worker.start()
312 76094e37 Michael Hanselmann
313 76094e37 Michael Hanselmann
  def Resize(self, num_workers):
314 76094e37 Michael Hanselmann
    """Changes the number of workers in the pool.
315 76094e37 Michael Hanselmann

316 116db7c7 Iustin Pop
    @param num_workers: the new number of workers
317 76094e37 Michael Hanselmann

318 76094e37 Michael Hanselmann
    """
319 76094e37 Michael Hanselmann
    self._lock.acquire()
320 76094e37 Michael Hanselmann
    try:
321 76094e37 Michael Hanselmann
      return self._ResizeUnlocked(num_workers)
322 76094e37 Michael Hanselmann
    finally:
323 76094e37 Michael Hanselmann
      self._lock.release()
324 76094e37 Michael Hanselmann
325 76094e37 Michael Hanselmann
  def TerminateWorkers(self):
326 76094e37 Michael Hanselmann
    """Terminate all worker threads.
327 76094e37 Michael Hanselmann

328 76094e37 Michael Hanselmann
    Unstarted tasks will be ignored.
329 76094e37 Michael Hanselmann

330 76094e37 Michael Hanselmann
    """
331 76094e37 Michael Hanselmann
    logging.debug("Terminating all workers")
332 76094e37 Michael Hanselmann
333 76094e37 Michael Hanselmann
    self._lock.acquire()
334 76094e37 Michael Hanselmann
    try:
335 76094e37 Michael Hanselmann
      self._ResizeUnlocked(0)
336 76094e37 Michael Hanselmann
337 76094e37 Michael Hanselmann
      if self._tasks:
338 76094e37 Michael Hanselmann
        logging.debug("There are %s tasks left", len(self._tasks))
339 76094e37 Michael Hanselmann
    finally:
340 76094e37 Michael Hanselmann
      self._lock.release()
341 76094e37 Michael Hanselmann
342 76094e37 Michael Hanselmann
    logging.debug("All workers terminated")