Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ 89e2b4d2

History | View | Annotate | Download (9.2 kB)

1 76094e37 Michael Hanselmann
#
2 76094e37 Michael Hanselmann
#
3 76094e37 Michael Hanselmann
4 76094e37 Michael Hanselmann
# Copyright (C) 2008 Google Inc.
5 76094e37 Michael Hanselmann
#
6 76094e37 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 76094e37 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 76094e37 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 76094e37 Michael Hanselmann
# (at your option) any later version.
10 76094e37 Michael Hanselmann
#
11 76094e37 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 76094e37 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 76094e37 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 76094e37 Michael Hanselmann
# General Public License for more details.
15 76094e37 Michael Hanselmann
#
16 76094e37 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 76094e37 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 76094e37 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 76094e37 Michael Hanselmann
# 02110-1301, USA.
20 76094e37 Michael Hanselmann
21 76094e37 Michael Hanselmann
22 76094e37 Michael Hanselmann
"""Base classes for worker pools.
23 76094e37 Michael Hanselmann

24 76094e37 Michael Hanselmann
"""
25 76094e37 Michael Hanselmann
26 76094e37 Michael Hanselmann
import collections
27 76094e37 Michael Hanselmann
import logging
28 76094e37 Michael Hanselmann
import threading
29 76094e37 Michael Hanselmann
30 76094e37 Michael Hanselmann
31 76094e37 Michael Hanselmann
class BaseWorker(threading.Thread, object):
32 76094e37 Michael Hanselmann
  """Base worker class for worker pools.
33 76094e37 Michael Hanselmann

34 76094e37 Michael Hanselmann
  Users of a worker pool must override RunTask in a subclass.
35 76094e37 Michael Hanselmann

36 76094e37 Michael Hanselmann
  """
37 7260cfbe Iustin Pop
  # pylint: disable-msg=W0212
38 76094e37 Michael Hanselmann
  def __init__(self, pool, worker_id):
39 76094e37 Michael Hanselmann
    """Constructor for BaseWorker thread.
40 76094e37 Michael Hanselmann

41 116db7c7 Iustin Pop
    @param pool: the parent worker pool
42 116db7c7 Iustin Pop
    @param worker_id: identifier for this worker
43 76094e37 Michael Hanselmann

44 76094e37 Michael Hanselmann
    """
45 76094e37 Michael Hanselmann
    super(BaseWorker, self).__init__()
46 76094e37 Michael Hanselmann
    self.pool = pool
47 76094e37 Michael Hanselmann
    self.worker_id = worker_id
48 76094e37 Michael Hanselmann
    self._current_task = None
49 76094e37 Michael Hanselmann
50 76094e37 Michael Hanselmann
  def ShouldTerminate(self):
51 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
52 76094e37 Michael Hanselmann

53 76094e37 Michael Hanselmann
    """
54 76094e37 Michael Hanselmann
    return self.pool.ShouldWorkerTerminate(self)
55 76094e37 Michael Hanselmann
56 b3558df1 Michael Hanselmann
  def _HasRunningTaskUnlocked(self):
57 b3558df1 Michael Hanselmann
    """Returns whether this worker is currently running a task.
58 b3558df1 Michael Hanselmann

59 b3558df1 Michael Hanselmann
    """
60 b3558df1 Michael Hanselmann
    return (self._current_task is not None)
61 b3558df1 Michael Hanselmann
62 b3558df1 Michael Hanselmann
  def HasRunningTask(self):
63 b3558df1 Michael Hanselmann
    """Returns whether this worker is currently running a task.
64 b3558df1 Michael Hanselmann

65 b3558df1 Michael Hanselmann
    """
66 b3558df1 Michael Hanselmann
    self.pool._lock.acquire()
67 b3558df1 Michael Hanselmann
    try:
68 b3558df1 Michael Hanselmann
      return self._HasRunningTaskUnlocked()
69 b3558df1 Michael Hanselmann
    finally:
70 b3558df1 Michael Hanselmann
      self.pool._lock.release()
71 b3558df1 Michael Hanselmann
72 76094e37 Michael Hanselmann
  def run(self):
73 76094e37 Michael Hanselmann
    """Main thread function.
74 76094e37 Michael Hanselmann

75 76094e37 Michael Hanselmann
    Waits for new tasks to show up in the queue.
76 76094e37 Michael Hanselmann

77 76094e37 Michael Hanselmann
    """
78 76094e37 Michael Hanselmann
    pool = self.pool
79 76094e37 Michael Hanselmann
80 b3558df1 Michael Hanselmann
    assert not self.HasRunningTask()
81 76094e37 Michael Hanselmann
82 76094e37 Michael Hanselmann
    while True:
83 76094e37 Michael Hanselmann
      try:
84 76094e37 Michael Hanselmann
        # We wait on lock to be told either terminate or do a task.
85 76094e37 Michael Hanselmann
        pool._lock.acquire()
86 76094e37 Michael Hanselmann
        try:
87 76094e37 Michael Hanselmann
          if pool._ShouldWorkerTerminateUnlocked(self):
88 76094e37 Michael Hanselmann
            break
89 76094e37 Michael Hanselmann
90 76094e37 Michael Hanselmann
          # We only wait if there's no task for us.
91 76094e37 Michael Hanselmann
          if not pool._tasks:
92 b3558df1 Michael Hanselmann
            logging.debug("Worker %s: waiting for tasks", self.worker_id)
93 b3558df1 Michael Hanselmann
94 76094e37 Michael Hanselmann
            # wait() releases the lock and sleeps until notified
95 53b1d12b Michael Hanselmann
            pool._pool_to_worker.wait()
96 76094e37 Michael Hanselmann
97 b3558df1 Michael Hanselmann
            logging.debug("Worker %s: notified while waiting", self.worker_id)
98 b3558df1 Michael Hanselmann
99 76094e37 Michael Hanselmann
            # Were we woken up in order to terminate?
100 76094e37 Michael Hanselmann
            if pool._ShouldWorkerTerminateUnlocked(self):
101 76094e37 Michael Hanselmann
              break
102 76094e37 Michael Hanselmann
103 76094e37 Michael Hanselmann
            if not pool._tasks:
104 76094e37 Michael Hanselmann
              # Spurious notification, ignore
105 76094e37 Michael Hanselmann
              continue
106 76094e37 Michael Hanselmann
107 76094e37 Michael Hanselmann
          # Get task from queue and tell pool about it
108 76094e37 Michael Hanselmann
          try:
109 76094e37 Michael Hanselmann
            self._current_task = pool._tasks.popleft()
110 76094e37 Michael Hanselmann
          finally:
111 53b1d12b Michael Hanselmann
            pool._worker_to_pool.notifyAll()
112 76094e37 Michael Hanselmann
        finally:
113 76094e37 Michael Hanselmann
          pool._lock.release()
114 76094e37 Michael Hanselmann
115 76094e37 Michael Hanselmann
        # Run the actual task
116 76094e37 Michael Hanselmann
        try:
117 b3558df1 Michael Hanselmann
          logging.debug("Worker %s: starting task %r",
118 b3558df1 Michael Hanselmann
                        self.worker_id, self._current_task)
119 76094e37 Michael Hanselmann
          self.RunTask(*self._current_task)
120 b3558df1 Michael Hanselmann
          logging.debug("Worker %s: done with task %r",
121 b3558df1 Michael Hanselmann
                        self.worker_id, self._current_task)
122 7260cfbe Iustin Pop
        except: # pylint: disable-msg=W0702
123 76094e37 Michael Hanselmann
          logging.error("Worker %s: Caught unhandled exception",
124 76094e37 Michael Hanselmann
                        self.worker_id, exc_info=True)
125 76094e37 Michael Hanselmann
      finally:
126 76094e37 Michael Hanselmann
        # Notify pool
127 76094e37 Michael Hanselmann
        pool._lock.acquire()
128 76094e37 Michael Hanselmann
        try:
129 b3558df1 Michael Hanselmann
          if self._current_task:
130 b3558df1 Michael Hanselmann
            self._current_task = None
131 53b1d12b Michael Hanselmann
            pool._worker_to_pool.notifyAll()
132 76094e37 Michael Hanselmann
        finally:
133 76094e37 Michael Hanselmann
          pool._lock.release()
134 76094e37 Michael Hanselmann
135 b3558df1 Michael Hanselmann
    logging.debug("Worker %s: terminates", self.worker_id)
136 b3558df1 Michael Hanselmann
137 76094e37 Michael Hanselmann
  def RunTask(self, *args):
138 76094e37 Michael Hanselmann
    """Function called to start a task.
139 76094e37 Michael Hanselmann

140 116db7c7 Iustin Pop
    This needs to be implemented by child classes.
141 116db7c7 Iustin Pop

142 76094e37 Michael Hanselmann
    """
143 76094e37 Michael Hanselmann
    raise NotImplementedError()
144 76094e37 Michael Hanselmann
145 76094e37 Michael Hanselmann
146 76094e37 Michael Hanselmann
class WorkerPool(object):
147 76094e37 Michael Hanselmann
  """Worker pool with a queue.
148 76094e37 Michael Hanselmann

149 76094e37 Michael Hanselmann
  This class is thread-safe.
150 76094e37 Michael Hanselmann

151 116db7c7 Iustin Pop
  Tasks are guaranteed to be started in the order in which they're
152 116db7c7 Iustin Pop
  added to the pool. Due to the nature of threading, they're not
153 116db7c7 Iustin Pop
  guaranteed to finish in the same order.
154 76094e37 Michael Hanselmann

155 76094e37 Michael Hanselmann
  """
156 89e2b4d2 Michael Hanselmann
  def __init__(self, name, num_workers, worker_class):
157 76094e37 Michael Hanselmann
    """Constructor for worker pool.
158 76094e37 Michael Hanselmann

159 116db7c7 Iustin Pop
    @param num_workers: number of workers to be started
160 116db7c7 Iustin Pop
        (dynamic resizing is not yet implemented)
161 116db7c7 Iustin Pop
    @param worker_class: the class to be instantiated for workers;
162 116db7c7 Iustin Pop
        should derive from L{BaseWorker}
163 76094e37 Michael Hanselmann

164 76094e37 Michael Hanselmann
    """
165 76094e37 Michael Hanselmann
    # Some of these variables are accessed by BaseWorker
166 53b1d12b Michael Hanselmann
    self._lock = threading.Lock()
167 53b1d12b Michael Hanselmann
    self._pool_to_pool = threading.Condition(self._lock)
168 53b1d12b Michael Hanselmann
    self._pool_to_worker = threading.Condition(self._lock)
169 53b1d12b Michael Hanselmann
    self._worker_to_pool = threading.Condition(self._lock)
170 76094e37 Michael Hanselmann
    self._worker_class = worker_class
171 89e2b4d2 Michael Hanselmann
    self._name = name
172 76094e37 Michael Hanselmann
    self._last_worker_id = 0
173 76094e37 Michael Hanselmann
    self._workers = []
174 76094e37 Michael Hanselmann
    self._quiescing = False
175 76094e37 Michael Hanselmann
176 76094e37 Michael Hanselmann
    # Terminating workers
177 76094e37 Michael Hanselmann
    self._termworkers = []
178 76094e37 Michael Hanselmann
179 76094e37 Michael Hanselmann
    # Queued tasks
180 76094e37 Michael Hanselmann
    self._tasks = collections.deque()
181 76094e37 Michael Hanselmann
182 76094e37 Michael Hanselmann
    # Start workers
183 76094e37 Michael Hanselmann
    self.Resize(num_workers)
184 76094e37 Michael Hanselmann
185 76094e37 Michael Hanselmann
  # TODO: Implement dynamic resizing?
186 76094e37 Michael Hanselmann
187 76094e37 Michael Hanselmann
  def AddTask(self, *args):
188 76094e37 Michael Hanselmann
    """Adds a task to the queue.
189 76094e37 Michael Hanselmann

190 116db7c7 Iustin Pop
    @param args: arguments passed to L{BaseWorker.RunTask}
191 76094e37 Michael Hanselmann

192 76094e37 Michael Hanselmann
    """
193 76094e37 Michael Hanselmann
    self._lock.acquire()
194 76094e37 Michael Hanselmann
    try:
195 76094e37 Michael Hanselmann
      # Don't add new tasks while we're quiescing
196 76094e37 Michael Hanselmann
      while self._quiescing:
197 53b1d12b Michael Hanselmann
        self._pool_to_pool.wait()
198 76094e37 Michael Hanselmann
199 76094e37 Michael Hanselmann
      # Add task to internal queue
200 76094e37 Michael Hanselmann
      self._tasks.append(args)
201 53b1d12b Michael Hanselmann
202 53b1d12b Michael Hanselmann
      # Wake one idling worker up
203 53b1d12b Michael Hanselmann
      self._pool_to_worker.notify()
204 76094e37 Michael Hanselmann
    finally:
205 76094e37 Michael Hanselmann
      self._lock.release()
206 76094e37 Michael Hanselmann
207 76094e37 Michael Hanselmann
  def _ShouldWorkerTerminateUnlocked(self, worker):
208 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
209 76094e37 Michael Hanselmann

210 76094e37 Michael Hanselmann
    """
211 76094e37 Michael Hanselmann
    return (worker in self._termworkers)
212 76094e37 Michael Hanselmann
213 76094e37 Michael Hanselmann
  def ShouldWorkerTerminate(self, worker):
214 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
215 76094e37 Michael Hanselmann

216 76094e37 Michael Hanselmann
    """
217 76094e37 Michael Hanselmann
    self._lock.acquire()
218 76094e37 Michael Hanselmann
    try:
219 805f0c07 Iustin Pop
      return self._ShouldWorkerTerminateUnlocked(worker)
220 76094e37 Michael Hanselmann
    finally:
221 76094e37 Michael Hanselmann
      self._lock.release()
222 76094e37 Michael Hanselmann
223 76094e37 Michael Hanselmann
  def _HasRunningTasksUnlocked(self):
224 76094e37 Michael Hanselmann
    """Checks whether there's a task running in a worker.
225 76094e37 Michael Hanselmann

226 76094e37 Michael Hanselmann
    """
227 76094e37 Michael Hanselmann
    for worker in self._workers + self._termworkers:
228 7260cfbe Iustin Pop
      if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
229 76094e37 Michael Hanselmann
        return True
230 76094e37 Michael Hanselmann
    return False
231 76094e37 Michael Hanselmann
232 76094e37 Michael Hanselmann
  def Quiesce(self):
233 76094e37 Michael Hanselmann
    """Waits until the task queue is empty.
234 76094e37 Michael Hanselmann

235 76094e37 Michael Hanselmann
    """
236 76094e37 Michael Hanselmann
    self._lock.acquire()
237 76094e37 Michael Hanselmann
    try:
238 76094e37 Michael Hanselmann
      self._quiescing = True
239 76094e37 Michael Hanselmann
240 76094e37 Michael Hanselmann
      # Wait while there are tasks pending or running
241 76094e37 Michael Hanselmann
      while self._tasks or self._HasRunningTasksUnlocked():
242 53b1d12b Michael Hanselmann
        self._worker_to_pool.wait()
243 76094e37 Michael Hanselmann
244 76094e37 Michael Hanselmann
    finally:
245 76094e37 Michael Hanselmann
      self._quiescing = False
246 76094e37 Michael Hanselmann
247 76094e37 Michael Hanselmann
      # Make sure AddTasks continues in case it was waiting
248 53b1d12b Michael Hanselmann
      self._pool_to_pool.notifyAll()
249 76094e37 Michael Hanselmann
250 76094e37 Michael Hanselmann
      self._lock.release()
251 76094e37 Michael Hanselmann
252 76094e37 Michael Hanselmann
  def _NewWorkerIdUnlocked(self):
253 116db7c7 Iustin Pop
    """Return an identifier for a new worker.
254 116db7c7 Iustin Pop

255 116db7c7 Iustin Pop
    """
256 76094e37 Michael Hanselmann
    self._last_worker_id += 1
257 89e2b4d2 Michael Hanselmann
258 89e2b4d2 Michael Hanselmann
    return "%s%d" % (self._name, self._last_worker_id)
259 76094e37 Michael Hanselmann
260 76094e37 Michael Hanselmann
  def _ResizeUnlocked(self, num_workers):
261 76094e37 Michael Hanselmann
    """Changes the number of workers.
262 76094e37 Michael Hanselmann

263 76094e37 Michael Hanselmann
    """
264 76094e37 Michael Hanselmann
    assert num_workers >= 0, "num_workers must be >= 0"
265 76094e37 Michael Hanselmann
266 76094e37 Michael Hanselmann
    logging.debug("Resizing to %s workers", num_workers)
267 76094e37 Michael Hanselmann
268 76094e37 Michael Hanselmann
    current_count = len(self._workers)
269 76094e37 Michael Hanselmann
270 76094e37 Michael Hanselmann
    if current_count == num_workers:
271 76094e37 Michael Hanselmann
      # Nothing to do
272 76094e37 Michael Hanselmann
      pass
273 76094e37 Michael Hanselmann
274 76094e37 Michael Hanselmann
    elif current_count > num_workers:
275 76094e37 Michael Hanselmann
      if num_workers == 0:
276 76094e37 Michael Hanselmann
        # Create copy of list to iterate over while lock isn't held.
277 76094e37 Michael Hanselmann
        termworkers = self._workers[:]
278 76094e37 Michael Hanselmann
        del self._workers[:]
279 76094e37 Michael Hanselmann
      else:
280 76094e37 Michael Hanselmann
        # TODO: Implement partial downsizing
281 76094e37 Michael Hanselmann
        raise NotImplementedError()
282 76094e37 Michael Hanselmann
        #termworkers = ...
283 76094e37 Michael Hanselmann
284 76094e37 Michael Hanselmann
      self._termworkers += termworkers
285 76094e37 Michael Hanselmann
286 76094e37 Michael Hanselmann
      # Notify workers that something has changed
287 53b1d12b Michael Hanselmann
      self._pool_to_worker.notifyAll()
288 76094e37 Michael Hanselmann
289 76094e37 Michael Hanselmann
      # Join all terminating workers
290 76094e37 Michael Hanselmann
      self._lock.release()
291 76094e37 Michael Hanselmann
      try:
292 76094e37 Michael Hanselmann
        for worker in termworkers:
293 c0a8eb9e Michael Hanselmann
          logging.debug("Waiting for thread %s", worker.getName())
294 76094e37 Michael Hanselmann
          worker.join()
295 76094e37 Michael Hanselmann
      finally:
296 76094e37 Michael Hanselmann
        self._lock.acquire()
297 76094e37 Michael Hanselmann
298 76094e37 Michael Hanselmann
      # Remove terminated threads. This could be done in a more efficient way
299 76094e37 Michael Hanselmann
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
300 76094e37 Michael Hanselmann
      # don't leave zombie threads around.
301 76094e37 Michael Hanselmann
      for worker in termworkers:
302 76094e37 Michael Hanselmann
        assert worker in self._termworkers, ("Worker not in list of"
303 76094e37 Michael Hanselmann
                                             " terminating workers")
304 76094e37 Michael Hanselmann
        if not worker.isAlive():
305 76094e37 Michael Hanselmann
          self._termworkers.remove(worker)
306 76094e37 Michael Hanselmann
307 76094e37 Michael Hanselmann
      assert not self._termworkers, "Zombie worker detected"
308 76094e37 Michael Hanselmann
309 76094e37 Michael Hanselmann
    elif current_count < num_workers:
310 76094e37 Michael Hanselmann
      # Create (num_workers - current_count) new workers
311 f1501b3f Michael Hanselmann
      for _ in range(num_workers - current_count):
312 76094e37 Michael Hanselmann
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
313 76094e37 Michael Hanselmann
        self._workers.append(worker)
314 76094e37 Michael Hanselmann
        worker.start()
315 76094e37 Michael Hanselmann
316 76094e37 Michael Hanselmann
  def Resize(self, num_workers):
317 76094e37 Michael Hanselmann
    """Changes the number of workers in the pool.
318 76094e37 Michael Hanselmann

319 116db7c7 Iustin Pop
    @param num_workers: the new number of workers
320 76094e37 Michael Hanselmann

321 76094e37 Michael Hanselmann
    """
322 76094e37 Michael Hanselmann
    self._lock.acquire()
323 76094e37 Michael Hanselmann
    try:
324 76094e37 Michael Hanselmann
      return self._ResizeUnlocked(num_workers)
325 76094e37 Michael Hanselmann
    finally:
326 76094e37 Michael Hanselmann
      self._lock.release()
327 76094e37 Michael Hanselmann
328 76094e37 Michael Hanselmann
  def TerminateWorkers(self):
329 76094e37 Michael Hanselmann
    """Terminate all worker threads.
330 76094e37 Michael Hanselmann

331 76094e37 Michael Hanselmann
    Unstarted tasks will be ignored.
332 76094e37 Michael Hanselmann

333 76094e37 Michael Hanselmann
    """
334 76094e37 Michael Hanselmann
    logging.debug("Terminating all workers")
335 76094e37 Michael Hanselmann
336 76094e37 Michael Hanselmann
    self._lock.acquire()
337 76094e37 Michael Hanselmann
    try:
338 76094e37 Michael Hanselmann
      self._ResizeUnlocked(0)
339 76094e37 Michael Hanselmann
340 76094e37 Michael Hanselmann
      if self._tasks:
341 76094e37 Michael Hanselmann
        logging.debug("There are %s tasks left", len(self._tasks))
342 76094e37 Michael Hanselmann
    finally:
343 76094e37 Michael Hanselmann
      self._lock.release()
344 76094e37 Michael Hanselmann
345 76094e37 Michael Hanselmann
    logging.debug("All workers terminated")