Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ 8062638d

History | View | Annotate | Download (13.3 kB)

1 76094e37 Michael Hanselmann
#
2 76094e37 Michael Hanselmann
#
3 76094e37 Michael Hanselmann
4 189d2714 Michael Hanselmann
# Copyright (C) 2008, 2009, 2010 Google Inc.
5 76094e37 Michael Hanselmann
#
6 76094e37 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 76094e37 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 76094e37 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 76094e37 Michael Hanselmann
# (at your option) any later version.
10 76094e37 Michael Hanselmann
#
11 76094e37 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 76094e37 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 76094e37 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 76094e37 Michael Hanselmann
# General Public License for more details.
15 76094e37 Michael Hanselmann
#
16 76094e37 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 76094e37 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 76094e37 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 76094e37 Michael Hanselmann
# 02110-1301, USA.
20 76094e37 Michael Hanselmann
21 76094e37 Michael Hanselmann
22 76094e37 Michael Hanselmann
"""Base classes for worker pools.
23 76094e37 Michael Hanselmann

24 76094e37 Michael Hanselmann
"""
25 76094e37 Michael Hanselmann
26 76094e37 Michael Hanselmann
import logging
27 76094e37 Michael Hanselmann
import threading
28 52c47e4e Michael Hanselmann
import heapq
29 76094e37 Michael Hanselmann
30 25e557a5 Guido Trotter
from ganeti import compat
31 52c47e4e Michael Hanselmann
from ganeti import errors
32 25e557a5 Guido Trotter
33 76094e37 Michael Hanselmann
34 21c5ad52 Michael Hanselmann
_TERMINATE = object()
35 52c47e4e Michael Hanselmann
_DEFAULT_PRIORITY = 0
36 52c47e4e Michael Hanselmann
37 52c47e4e Michael Hanselmann
38 52c47e4e Michael Hanselmann
class DeferTask(Exception):
39 52c47e4e Michael Hanselmann
  """Special exception class to defer a task.
40 52c47e4e Michael Hanselmann

41 52c47e4e Michael Hanselmann
  This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
42 52c47e4e Michael Hanselmann
  task. Optionally, the priority of the task can be changed.
43 52c47e4e Michael Hanselmann

44 52c47e4e Michael Hanselmann
  """
45 52c47e4e Michael Hanselmann
  def __init__(self, priority=None):
46 52c47e4e Michael Hanselmann
    """Initializes this class.
47 52c47e4e Michael Hanselmann

48 52c47e4e Michael Hanselmann
    @type priority: number
49 52c47e4e Michael Hanselmann
    @param priority: New task priority (None means no change)
50 52c47e4e Michael Hanselmann

51 52c47e4e Michael Hanselmann
    """
52 52c47e4e Michael Hanselmann
    Exception.__init__(self)
53 52c47e4e Michael Hanselmann
    self.priority = priority
54 21c5ad52 Michael Hanselmann
55 21c5ad52 Michael Hanselmann
56 76094e37 Michael Hanselmann
class BaseWorker(threading.Thread, object):
57 76094e37 Michael Hanselmann
  """Base worker class for worker pools.
58 76094e37 Michael Hanselmann

59 76094e37 Michael Hanselmann
  Users of a worker pool must override RunTask in a subclass.
60 76094e37 Michael Hanselmann

61 76094e37 Michael Hanselmann
  """
62 7260cfbe Iustin Pop
  # pylint: disable-msg=W0212
63 76094e37 Michael Hanselmann
  def __init__(self, pool, worker_id):
64 76094e37 Michael Hanselmann
    """Constructor for BaseWorker thread.
65 76094e37 Michael Hanselmann

66 116db7c7 Iustin Pop
    @param pool: the parent worker pool
67 116db7c7 Iustin Pop
    @param worker_id: identifier for this worker
68 76094e37 Michael Hanselmann

69 76094e37 Michael Hanselmann
    """
70 d16e6fd9 Michael Hanselmann
    super(BaseWorker, self).__init__(name=worker_id)
71 76094e37 Michael Hanselmann
    self.pool = pool
72 daba67c7 Michael Hanselmann
    self._worker_id = worker_id
73 76094e37 Michael Hanselmann
    self._current_task = None
74 76094e37 Michael Hanselmann
75 daba67c7 Michael Hanselmann
    assert self.getName() == worker_id
76 daba67c7 Michael Hanselmann
77 76094e37 Michael Hanselmann
  def ShouldTerminate(self):
78 2f4e1516 Michael Hanselmann
    """Returns whether this worker should terminate.
79 2f4e1516 Michael Hanselmann

80 2f4e1516 Michael Hanselmann
    Should only be called from within L{RunTask}.
81 76094e37 Michael Hanselmann

82 76094e37 Michael Hanselmann
    """
83 2f4e1516 Michael Hanselmann
    self.pool._lock.acquire()
84 2f4e1516 Michael Hanselmann
    try:
85 2f4e1516 Michael Hanselmann
      assert self._HasRunningTaskUnlocked()
86 2f4e1516 Michael Hanselmann
      return self.pool._ShouldWorkerTerminateUnlocked(self)
87 2f4e1516 Michael Hanselmann
    finally:
88 2f4e1516 Michael Hanselmann
      self.pool._lock.release()
89 76094e37 Michael Hanselmann
90 52c47e4e Michael Hanselmann
  def GetCurrentPriority(self):
91 52c47e4e Michael Hanselmann
    """Returns the priority of the current task.
92 52c47e4e Michael Hanselmann

93 52c47e4e Michael Hanselmann
    Should only be called from within L{RunTask}.
94 52c47e4e Michael Hanselmann

95 52c47e4e Michael Hanselmann
    """
96 52c47e4e Michael Hanselmann
    self.pool._lock.acquire()
97 52c47e4e Michael Hanselmann
    try:
98 52c47e4e Michael Hanselmann
      assert self._HasRunningTaskUnlocked()
99 52c47e4e Michael Hanselmann
100 52c47e4e Michael Hanselmann
      (priority, _, _) = self._current_task
101 52c47e4e Michael Hanselmann
102 52c47e4e Michael Hanselmann
      return priority
103 52c47e4e Michael Hanselmann
    finally:
104 52c47e4e Michael Hanselmann
      self.pool._lock.release()
105 52c47e4e Michael Hanselmann
106 daba67c7 Michael Hanselmann
  def SetTaskName(self, taskname):
107 daba67c7 Michael Hanselmann
    """Sets the name of the current task.
108 daba67c7 Michael Hanselmann

109 daba67c7 Michael Hanselmann
    Should only be called from within L{RunTask}.
110 daba67c7 Michael Hanselmann

111 daba67c7 Michael Hanselmann
    @type taskname: string
112 daba67c7 Michael Hanselmann
    @param taskname: Task's name
113 daba67c7 Michael Hanselmann

114 daba67c7 Michael Hanselmann
    """
115 daba67c7 Michael Hanselmann
    if taskname:
116 daba67c7 Michael Hanselmann
      name = "%s/%s" % (self._worker_id, taskname)
117 daba67c7 Michael Hanselmann
    else:
118 daba67c7 Michael Hanselmann
      name = self._worker_id
119 daba67c7 Michael Hanselmann
120 daba67c7 Michael Hanselmann
    # Set thread name
121 daba67c7 Michael Hanselmann
    self.setName(name)
122 daba67c7 Michael Hanselmann
123 b3558df1 Michael Hanselmann
  def _HasRunningTaskUnlocked(self):
124 b3558df1 Michael Hanselmann
    """Returns whether this worker is currently running a task.
125 b3558df1 Michael Hanselmann

126 b3558df1 Michael Hanselmann
    """
127 b3558df1 Michael Hanselmann
    return (self._current_task is not None)
128 b3558df1 Michael Hanselmann
129 76094e37 Michael Hanselmann
  def run(self):
130 76094e37 Michael Hanselmann
    """Main thread function.
131 76094e37 Michael Hanselmann

132 76094e37 Michael Hanselmann
    Waits for new tasks to show up in the queue.
133 76094e37 Michael Hanselmann

134 76094e37 Michael Hanselmann
    """
135 76094e37 Michael Hanselmann
    pool = self.pool
136 76094e37 Michael Hanselmann
137 76094e37 Michael Hanselmann
    while True:
138 46d0a3d0 Michael Hanselmann
      assert self._current_task is None
139 52c47e4e Michael Hanselmann
140 52c47e4e Michael Hanselmann
      defer = None
141 76094e37 Michael Hanselmann
      try:
142 21c5ad52 Michael Hanselmann
        # Wait on lock to be told either to terminate or to do a task
143 76094e37 Michael Hanselmann
        pool._lock.acquire()
144 76094e37 Michael Hanselmann
        try:
145 21c5ad52 Michael Hanselmann
          task = pool._WaitForTaskUnlocked(self)
146 76094e37 Michael Hanselmann
147 21c5ad52 Michael Hanselmann
          if task is _TERMINATE:
148 21c5ad52 Michael Hanselmann
            # Told to terminate
149 21c5ad52 Michael Hanselmann
            break
150 b3558df1 Michael Hanselmann
151 21c5ad52 Michael Hanselmann
          if task is None:
152 21c5ad52 Michael Hanselmann
            # Spurious notification, ignore
153 21c5ad52 Michael Hanselmann
            continue
154 76094e37 Michael Hanselmann
155 21c5ad52 Michael Hanselmann
          self._current_task = task
156 76094e37 Michael Hanselmann
157 46d0a3d0 Michael Hanselmann
          # No longer needed, dispose of reference
158 46d0a3d0 Michael Hanselmann
          del task
159 46d0a3d0 Michael Hanselmann
160 21c5ad52 Michael Hanselmann
          assert self._HasRunningTaskUnlocked()
161 46d0a3d0 Michael Hanselmann
162 76094e37 Michael Hanselmann
        finally:
163 76094e37 Michael Hanselmann
          pool._lock.release()
164 76094e37 Michael Hanselmann
165 52c47e4e Michael Hanselmann
        (priority, _, args) = self._current_task
166 76094e37 Michael Hanselmann
        try:
167 52c47e4e Michael Hanselmann
          # Run the actual task
168 52c47e4e Michael Hanselmann
          assert defer is None
169 52c47e4e Michael Hanselmann
          logging.debug("Starting task %r, priority %s", args, priority)
170 daba67c7 Michael Hanselmann
          assert self.getName() == self._worker_id
171 daba67c7 Michael Hanselmann
          try:
172 c30421e0 Renรฉ Nussbaumer
            self.RunTask(*args) # pylint: disable-msg=W0142
173 daba67c7 Michael Hanselmann
          finally:
174 daba67c7 Michael Hanselmann
            self.SetTaskName(None)
175 52c47e4e Michael Hanselmann
          logging.debug("Done with task %r, priority %s", args, priority)
176 52c47e4e Michael Hanselmann
        except DeferTask, err:
177 52c47e4e Michael Hanselmann
          defer = err
178 52c47e4e Michael Hanselmann
179 52c47e4e Michael Hanselmann
          if defer.priority is None:
180 52c47e4e Michael Hanselmann
            # Use same priority
181 52c47e4e Michael Hanselmann
            defer.priority = priority
182 52c47e4e Michael Hanselmann
183 52c47e4e Michael Hanselmann
          logging.debug("Deferring task %r, new priority %s", defer.priority)
184 52c47e4e Michael Hanselmann
185 52c47e4e Michael Hanselmann
          assert self._HasRunningTaskUnlocked()
186 7260cfbe Iustin Pop
        except: # pylint: disable-msg=W0702
187 02fc74da Michael Hanselmann
          logging.exception("Caught unhandled exception")
188 c1cf1fe5 Michael Hanselmann
189 c1cf1fe5 Michael Hanselmann
        assert self._HasRunningTaskUnlocked()
190 76094e37 Michael Hanselmann
      finally:
191 76094e37 Michael Hanselmann
        # Notify pool
192 76094e37 Michael Hanselmann
        pool._lock.acquire()
193 76094e37 Michael Hanselmann
        try:
194 52c47e4e Michael Hanselmann
          if defer:
195 52c47e4e Michael Hanselmann
            assert self._current_task
196 52c47e4e Michael Hanselmann
            # Schedule again for later run
197 52c47e4e Michael Hanselmann
            (_, _, args) = self._current_task
198 52c47e4e Michael Hanselmann
            pool._AddTaskUnlocked(args, defer.priority)
199 52c47e4e Michael Hanselmann
200 b3558df1 Michael Hanselmann
          if self._current_task:
201 b3558df1 Michael Hanselmann
            self._current_task = None
202 53b1d12b Michael Hanselmann
            pool._worker_to_pool.notifyAll()
203 76094e37 Michael Hanselmann
        finally:
204 76094e37 Michael Hanselmann
          pool._lock.release()
205 76094e37 Michael Hanselmann
206 c1cf1fe5 Michael Hanselmann
      assert not self._HasRunningTaskUnlocked()
207 c1cf1fe5 Michael Hanselmann
208 02fc74da Michael Hanselmann
    logging.debug("Terminates")
209 b3558df1 Michael Hanselmann
210 76094e37 Michael Hanselmann
  def RunTask(self, *args):
211 76094e37 Michael Hanselmann
    """Function called to start a task.
212 76094e37 Michael Hanselmann

213 116db7c7 Iustin Pop
    This needs to be implemented by child classes.
214 116db7c7 Iustin Pop

215 76094e37 Michael Hanselmann
    """
216 76094e37 Michael Hanselmann
    raise NotImplementedError()
217 76094e37 Michael Hanselmann
218 76094e37 Michael Hanselmann
219 76094e37 Michael Hanselmann
class WorkerPool(object):
220 76094e37 Michael Hanselmann
  """Worker pool with a queue.
221 76094e37 Michael Hanselmann

222 76094e37 Michael Hanselmann
  This class is thread-safe.
223 76094e37 Michael Hanselmann

224 116db7c7 Iustin Pop
  Tasks are guaranteed to be started in the order in which they're
225 116db7c7 Iustin Pop
  added to the pool. Due to the nature of threading, they're not
226 116db7c7 Iustin Pop
  guaranteed to finish in the same order.
227 76094e37 Michael Hanselmann

228 76094e37 Michael Hanselmann
  """
229 89e2b4d2 Michael Hanselmann
  def __init__(self, name, num_workers, worker_class):
230 76094e37 Michael Hanselmann
    """Constructor for worker pool.
231 76094e37 Michael Hanselmann

232 116db7c7 Iustin Pop
    @param num_workers: number of workers to be started
233 116db7c7 Iustin Pop
        (dynamic resizing is not yet implemented)
234 116db7c7 Iustin Pop
    @param worker_class: the class to be instantiated for workers;
235 116db7c7 Iustin Pop
        should derive from L{BaseWorker}
236 76094e37 Michael Hanselmann

237 76094e37 Michael Hanselmann
    """
238 76094e37 Michael Hanselmann
    # Some of these variables are accessed by BaseWorker
239 53b1d12b Michael Hanselmann
    self._lock = threading.Lock()
240 53b1d12b Michael Hanselmann
    self._pool_to_pool = threading.Condition(self._lock)
241 53b1d12b Michael Hanselmann
    self._pool_to_worker = threading.Condition(self._lock)
242 53b1d12b Michael Hanselmann
    self._worker_to_pool = threading.Condition(self._lock)
243 76094e37 Michael Hanselmann
    self._worker_class = worker_class
244 89e2b4d2 Michael Hanselmann
    self._name = name
245 76094e37 Michael Hanselmann
    self._last_worker_id = 0
246 76094e37 Michael Hanselmann
    self._workers = []
247 76094e37 Michael Hanselmann
    self._quiescing = False
248 76094e37 Michael Hanselmann
249 76094e37 Michael Hanselmann
    # Terminating workers
250 76094e37 Michael Hanselmann
    self._termworkers = []
251 76094e37 Michael Hanselmann
252 76094e37 Michael Hanselmann
    # Queued tasks
253 52c47e4e Michael Hanselmann
    self._counter = 0
254 52c47e4e Michael Hanselmann
    self._tasks = []
255 76094e37 Michael Hanselmann
256 76094e37 Michael Hanselmann
    # Start workers
257 76094e37 Michael Hanselmann
    self.Resize(num_workers)
258 76094e37 Michael Hanselmann
259 76094e37 Michael Hanselmann
  # TODO: Implement dynamic resizing?
260 76094e37 Michael Hanselmann
261 c2a8e8ba Guido Trotter
  def _WaitWhileQuiescingUnlocked(self):
262 c2a8e8ba Guido Trotter
    """Wait until the worker pool has finished quiescing.
263 c2a8e8ba Guido Trotter

264 c2a8e8ba Guido Trotter
    """
265 c2a8e8ba Guido Trotter
    while self._quiescing:
266 c2a8e8ba Guido Trotter
      self._pool_to_pool.wait()
267 c2a8e8ba Guido Trotter
268 52c47e4e Michael Hanselmann
  def _AddTaskUnlocked(self, args, priority):
269 52c47e4e Michael Hanselmann
    """Adds a task to the internal queue.
270 52c47e4e Michael Hanselmann

271 52c47e4e Michael Hanselmann
    @type args: sequence
272 52c47e4e Michael Hanselmann
    @param args: Arguments passed to L{BaseWorker.RunTask}
273 52c47e4e Michael Hanselmann
    @type priority: number
274 52c47e4e Michael Hanselmann
    @param priority: Task priority
275 52c47e4e Michael Hanselmann

276 52c47e4e Michael Hanselmann
    """
277 189d2714 Michael Hanselmann
    assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
278 52c47e4e Michael Hanselmann
    assert isinstance(priority, (int, long)), "Priority must be numeric"
279 189d2714 Michael Hanselmann
280 52c47e4e Michael Hanselmann
    # This counter is used to ensure elements are processed in their
281 52c47e4e Michael Hanselmann
    # incoming order. For processing they're sorted by priority and then
282 52c47e4e Michael Hanselmann
    # counter.
283 52c47e4e Michael Hanselmann
    self._counter += 1
284 52c47e4e Michael Hanselmann
285 52c47e4e Michael Hanselmann
    heapq.heappush(self._tasks, (priority, self._counter, args))
286 189d2714 Michael Hanselmann
287 189d2714 Michael Hanselmann
    # Notify a waiting worker
288 189d2714 Michael Hanselmann
    self._pool_to_worker.notify()
289 189d2714 Michael Hanselmann
290 52c47e4e Michael Hanselmann
  def AddTask(self, args, priority=_DEFAULT_PRIORITY):
291 76094e37 Michael Hanselmann
    """Adds a task to the queue.
292 76094e37 Michael Hanselmann

293 b2e8a4d9 Michael Hanselmann
    @type args: sequence
294 116db7c7 Iustin Pop
    @param args: arguments passed to L{BaseWorker.RunTask}
295 52c47e4e Michael Hanselmann
    @type priority: number
296 52c47e4e Michael Hanselmann
    @param priority: Task priority
297 76094e37 Michael Hanselmann

298 76094e37 Michael Hanselmann
    """
299 76094e37 Michael Hanselmann
    self._lock.acquire()
300 76094e37 Michael Hanselmann
    try:
301 c2a8e8ba Guido Trotter
      self._WaitWhileQuiescingUnlocked()
302 52c47e4e Michael Hanselmann
      self._AddTaskUnlocked(args, priority)
303 76094e37 Michael Hanselmann
    finally:
304 76094e37 Michael Hanselmann
      self._lock.release()
305 76094e37 Michael Hanselmann
306 52c47e4e Michael Hanselmann
  def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY):
307 c2a8e8ba Guido Trotter
    """Add a list of tasks to the queue.
308 c2a8e8ba Guido Trotter

309 c2a8e8ba Guido Trotter
    @type tasks: list of tuples
310 c2a8e8ba Guido Trotter
    @param tasks: list of args passed to L{BaseWorker.RunTask}
311 52c47e4e Michael Hanselmann
    @type priority: number or list of numbers
312 52c47e4e Michael Hanselmann
    @param priority: Priority for all added tasks or a list with the priority
313 52c47e4e Michael Hanselmann
                     for each task
314 c2a8e8ba Guido Trotter

315 c2a8e8ba Guido Trotter
    """
316 25e557a5 Guido Trotter
    assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
317 25e557a5 Guido Trotter
      "Each task must be a sequence"
318 25e557a5 Guido Trotter
319 52c47e4e Michael Hanselmann
    assert (isinstance(priority, (int, long)) or
320 52c47e4e Michael Hanselmann
            compat.all(isinstance(prio, (int, long)) for prio in priority)), \
321 52c47e4e Michael Hanselmann
           "Priority must be numeric or be a list of numeric values"
322 52c47e4e Michael Hanselmann
323 52c47e4e Michael Hanselmann
    if isinstance(priority, (int, long)):
324 52c47e4e Michael Hanselmann
      priority = [priority] * len(tasks)
325 52c47e4e Michael Hanselmann
    elif len(priority) != len(tasks):
326 52c47e4e Michael Hanselmann
      raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
327 52c47e4e Michael Hanselmann
                                   " number of tasks (%s)" %
328 52c47e4e Michael Hanselmann
                                   (len(priority), len(tasks)))
329 52c47e4e Michael Hanselmann
330 c2a8e8ba Guido Trotter
    self._lock.acquire()
331 c2a8e8ba Guido Trotter
    try:
332 c2a8e8ba Guido Trotter
      self._WaitWhileQuiescingUnlocked()
333 c2a8e8ba Guido Trotter
334 52c47e4e Michael Hanselmann
      assert compat.all(isinstance(prio, (int, long)) for prio in priority)
335 52c47e4e Michael Hanselmann
      assert len(tasks) == len(priority)
336 52c47e4e Michael Hanselmann
337 52c47e4e Michael Hanselmann
      for args, priority in zip(tasks, priority):
338 52c47e4e Michael Hanselmann
        self._AddTaskUnlocked(args, priority)
339 c2a8e8ba Guido Trotter
    finally:
340 c2a8e8ba Guido Trotter
      self._lock.release()
341 c2a8e8ba Guido Trotter
342 21c5ad52 Michael Hanselmann
  def _WaitForTaskUnlocked(self, worker):
343 21c5ad52 Michael Hanselmann
    """Waits for a task for a worker.
344 21c5ad52 Michael Hanselmann

345 21c5ad52 Michael Hanselmann
    @type worker: L{BaseWorker}
346 21c5ad52 Michael Hanselmann
    @param worker: Worker thread
347 21c5ad52 Michael Hanselmann

348 21c5ad52 Michael Hanselmann
    """
349 21c5ad52 Michael Hanselmann
    if self._ShouldWorkerTerminateUnlocked(worker):
350 21c5ad52 Michael Hanselmann
      return _TERMINATE
351 21c5ad52 Michael Hanselmann
352 21c5ad52 Michael Hanselmann
    # We only wait if there's no task for us.
353 21c5ad52 Michael Hanselmann
    if not self._tasks:
354 21c5ad52 Michael Hanselmann
      logging.debug("Waiting for tasks")
355 21c5ad52 Michael Hanselmann
356 21c5ad52 Michael Hanselmann
      # wait() releases the lock and sleeps until notified
357 21c5ad52 Michael Hanselmann
      self._pool_to_worker.wait()
358 21c5ad52 Michael Hanselmann
359 21c5ad52 Michael Hanselmann
      logging.debug("Notified while waiting")
360 21c5ad52 Michael Hanselmann
361 21c5ad52 Michael Hanselmann
      # Were we woken up in order to terminate?
362 21c5ad52 Michael Hanselmann
      if self._ShouldWorkerTerminateUnlocked(worker):
363 21c5ad52 Michael Hanselmann
        return _TERMINATE
364 21c5ad52 Michael Hanselmann
365 21c5ad52 Michael Hanselmann
      if not self._tasks:
366 21c5ad52 Michael Hanselmann
        # Spurious notification, ignore
367 21c5ad52 Michael Hanselmann
        return None
368 21c5ad52 Michael Hanselmann
369 21c5ad52 Michael Hanselmann
    # Get task from queue and tell pool about it
370 21c5ad52 Michael Hanselmann
    try:
371 52c47e4e Michael Hanselmann
      return heapq.heappop(self._tasks)
372 21c5ad52 Michael Hanselmann
    finally:
373 21c5ad52 Michael Hanselmann
      self._worker_to_pool.notifyAll()
374 21c5ad52 Michael Hanselmann
375 76094e37 Michael Hanselmann
  def _ShouldWorkerTerminateUnlocked(self, worker):
376 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
377 76094e37 Michael Hanselmann

378 76094e37 Michael Hanselmann
    """
379 76094e37 Michael Hanselmann
    return (worker in self._termworkers)
380 76094e37 Michael Hanselmann
381 76094e37 Michael Hanselmann
  def _HasRunningTasksUnlocked(self):
382 76094e37 Michael Hanselmann
    """Checks whether there's a task running in a worker.
383 76094e37 Michael Hanselmann

384 76094e37 Michael Hanselmann
    """
385 76094e37 Michael Hanselmann
    for worker in self._workers + self._termworkers:
386 7260cfbe Iustin Pop
      if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
387 76094e37 Michael Hanselmann
        return True
388 76094e37 Michael Hanselmann
    return False
389 76094e37 Michael Hanselmann
390 76094e37 Michael Hanselmann
  def Quiesce(self):
391 76094e37 Michael Hanselmann
    """Waits until the task queue is empty.
392 76094e37 Michael Hanselmann

393 76094e37 Michael Hanselmann
    """
394 76094e37 Michael Hanselmann
    self._lock.acquire()
395 76094e37 Michael Hanselmann
    try:
396 76094e37 Michael Hanselmann
      self._quiescing = True
397 76094e37 Michael Hanselmann
398 76094e37 Michael Hanselmann
      # Wait while there are tasks pending or running
399 76094e37 Michael Hanselmann
      while self._tasks or self._HasRunningTasksUnlocked():
400 53b1d12b Michael Hanselmann
        self._worker_to_pool.wait()
401 76094e37 Michael Hanselmann
402 76094e37 Michael Hanselmann
    finally:
403 76094e37 Michael Hanselmann
      self._quiescing = False
404 76094e37 Michael Hanselmann
405 76094e37 Michael Hanselmann
      # Make sure AddTasks continues in case it was waiting
406 53b1d12b Michael Hanselmann
      self._pool_to_pool.notifyAll()
407 76094e37 Michael Hanselmann
408 76094e37 Michael Hanselmann
      self._lock.release()
409 76094e37 Michael Hanselmann
410 76094e37 Michael Hanselmann
  def _NewWorkerIdUnlocked(self):
411 116db7c7 Iustin Pop
    """Return an identifier for a new worker.
412 116db7c7 Iustin Pop

413 116db7c7 Iustin Pop
    """
414 76094e37 Michael Hanselmann
    self._last_worker_id += 1
415 89e2b4d2 Michael Hanselmann
416 89e2b4d2 Michael Hanselmann
    return "%s%d" % (self._name, self._last_worker_id)
417 76094e37 Michael Hanselmann
418 76094e37 Michael Hanselmann
  def _ResizeUnlocked(self, num_workers):
419 76094e37 Michael Hanselmann
    """Changes the number of workers.
420 76094e37 Michael Hanselmann

421 76094e37 Michael Hanselmann
    """
422 76094e37 Michael Hanselmann
    assert num_workers >= 0, "num_workers must be >= 0"
423 76094e37 Michael Hanselmann
424 76094e37 Michael Hanselmann
    logging.debug("Resizing to %s workers", num_workers)
425 76094e37 Michael Hanselmann
426 76094e37 Michael Hanselmann
    current_count = len(self._workers)
427 76094e37 Michael Hanselmann
428 76094e37 Michael Hanselmann
    if current_count == num_workers:
429 76094e37 Michael Hanselmann
      # Nothing to do
430 76094e37 Michael Hanselmann
      pass
431 76094e37 Michael Hanselmann
432 76094e37 Michael Hanselmann
    elif current_count > num_workers:
433 76094e37 Michael Hanselmann
      if num_workers == 0:
434 76094e37 Michael Hanselmann
        # Create copy of list to iterate over while lock isn't held.
435 76094e37 Michael Hanselmann
        termworkers = self._workers[:]
436 76094e37 Michael Hanselmann
        del self._workers[:]
437 76094e37 Michael Hanselmann
      else:
438 76094e37 Michael Hanselmann
        # TODO: Implement partial downsizing
439 76094e37 Michael Hanselmann
        raise NotImplementedError()
440 76094e37 Michael Hanselmann
        #termworkers = ...
441 76094e37 Michael Hanselmann
442 76094e37 Michael Hanselmann
      self._termworkers += termworkers
443 76094e37 Michael Hanselmann
444 76094e37 Michael Hanselmann
      # Notify workers that something has changed
445 53b1d12b Michael Hanselmann
      self._pool_to_worker.notifyAll()
446 76094e37 Michael Hanselmann
447 76094e37 Michael Hanselmann
      # Join all terminating workers
448 76094e37 Michael Hanselmann
      self._lock.release()
449 76094e37 Michael Hanselmann
      try:
450 76094e37 Michael Hanselmann
        for worker in termworkers:
451 c0a8eb9e Michael Hanselmann
          logging.debug("Waiting for thread %s", worker.getName())
452 76094e37 Michael Hanselmann
          worker.join()
453 76094e37 Michael Hanselmann
      finally:
454 76094e37 Michael Hanselmann
        self._lock.acquire()
455 76094e37 Michael Hanselmann
456 76094e37 Michael Hanselmann
      # Remove terminated threads. This could be done in a more efficient way
457 76094e37 Michael Hanselmann
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
458 76094e37 Michael Hanselmann
      # don't leave zombie threads around.
459 76094e37 Michael Hanselmann
      for worker in termworkers:
460 76094e37 Michael Hanselmann
        assert worker in self._termworkers, ("Worker not in list of"
461 76094e37 Michael Hanselmann
                                             " terminating workers")
462 76094e37 Michael Hanselmann
        if not worker.isAlive():
463 76094e37 Michael Hanselmann
          self._termworkers.remove(worker)
464 76094e37 Michael Hanselmann
465 76094e37 Michael Hanselmann
      assert not self._termworkers, "Zombie worker detected"
466 76094e37 Michael Hanselmann
467 76094e37 Michael Hanselmann
    elif current_count < num_workers:
468 76094e37 Michael Hanselmann
      # Create (num_workers - current_count) new workers
469 f1501b3f Michael Hanselmann
      for _ in range(num_workers - current_count):
470 76094e37 Michael Hanselmann
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
471 76094e37 Michael Hanselmann
        self._workers.append(worker)
472 76094e37 Michael Hanselmann
        worker.start()
473 76094e37 Michael Hanselmann
474 76094e37 Michael Hanselmann
  def Resize(self, num_workers):
475 76094e37 Michael Hanselmann
    """Changes the number of workers in the pool.
476 76094e37 Michael Hanselmann

477 116db7c7 Iustin Pop
    @param num_workers: the new number of workers
478 76094e37 Michael Hanselmann

479 76094e37 Michael Hanselmann
    """
480 76094e37 Michael Hanselmann
    self._lock.acquire()
481 76094e37 Michael Hanselmann
    try:
482 76094e37 Michael Hanselmann
      return self._ResizeUnlocked(num_workers)
483 76094e37 Michael Hanselmann
    finally:
484 76094e37 Michael Hanselmann
      self._lock.release()
485 76094e37 Michael Hanselmann
486 76094e37 Michael Hanselmann
  def TerminateWorkers(self):
487 76094e37 Michael Hanselmann
    """Terminate all worker threads.
488 76094e37 Michael Hanselmann

489 76094e37 Michael Hanselmann
    Unstarted tasks will be ignored.
490 76094e37 Michael Hanselmann

491 76094e37 Michael Hanselmann
    """
492 76094e37 Michael Hanselmann
    logging.debug("Terminating all workers")
493 76094e37 Michael Hanselmann
494 76094e37 Michael Hanselmann
    self._lock.acquire()
495 76094e37 Michael Hanselmann
    try:
496 76094e37 Michael Hanselmann
      self._ResizeUnlocked(0)
497 76094e37 Michael Hanselmann
498 76094e37 Michael Hanselmann
      if self._tasks:
499 76094e37 Michael Hanselmann
        logging.debug("There are %s tasks left", len(self._tasks))
500 76094e37 Michael Hanselmann
    finally:
501 76094e37 Michael Hanselmann
      self._lock.release()
502 76094e37 Michael Hanselmann
503 76094e37 Michael Hanselmann
    logging.debug("All workers terminated")