Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ 98dfcaff

History | View | Annotate | Download (13.3 kB)

1 76094e37 Michael Hanselmann
#
2 76094e37 Michael Hanselmann
#
3 76094e37 Michael Hanselmann
4 189d2714 Michael Hanselmann
# Copyright (C) 2008, 2009, 2010 Google Inc.
5 76094e37 Michael Hanselmann
#
6 76094e37 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 76094e37 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 76094e37 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 76094e37 Michael Hanselmann
# (at your option) any later version.
10 76094e37 Michael Hanselmann
#
11 76094e37 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 76094e37 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 76094e37 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 76094e37 Michael Hanselmann
# General Public License for more details.
15 76094e37 Michael Hanselmann
#
16 76094e37 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 76094e37 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 76094e37 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 76094e37 Michael Hanselmann
# 02110-1301, USA.
20 76094e37 Michael Hanselmann
21 76094e37 Michael Hanselmann
22 76094e37 Michael Hanselmann
"""Base classes for worker pools.
23 76094e37 Michael Hanselmann

24 76094e37 Michael Hanselmann
"""
25 76094e37 Michael Hanselmann
26 76094e37 Michael Hanselmann
import logging
27 76094e37 Michael Hanselmann
import threading
28 52c47e4e Michael Hanselmann
import heapq
29 76094e37 Michael Hanselmann
30 25e557a5 Guido Trotter
from ganeti import compat
31 52c47e4e Michael Hanselmann
from ganeti import errors
32 25e557a5 Guido Trotter
33 76094e37 Michael Hanselmann
34 21c5ad52 Michael Hanselmann
_TERMINATE = object()
35 52c47e4e Michael Hanselmann
_DEFAULT_PRIORITY = 0
36 52c47e4e Michael Hanselmann
37 52c47e4e Michael Hanselmann
38 52c47e4e Michael Hanselmann
class DeferTask(Exception):
39 52c47e4e Michael Hanselmann
  """Special exception class to defer a task.
40 52c47e4e Michael Hanselmann

41 52c47e4e Michael Hanselmann
  This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
42 52c47e4e Michael Hanselmann
  task. Optionally, the priority of the task can be changed.
43 52c47e4e Michael Hanselmann

44 52c47e4e Michael Hanselmann
  """
45 52c47e4e Michael Hanselmann
  def __init__(self, priority=None):
46 52c47e4e Michael Hanselmann
    """Initializes this class.
47 52c47e4e Michael Hanselmann

48 52c47e4e Michael Hanselmann
    @type priority: number
49 52c47e4e Michael Hanselmann
    @param priority: New task priority (None means no change)
50 52c47e4e Michael Hanselmann

51 52c47e4e Michael Hanselmann
    """
52 52c47e4e Michael Hanselmann
    Exception.__init__(self)
53 52c47e4e Michael Hanselmann
    self.priority = priority
54 21c5ad52 Michael Hanselmann
55 21c5ad52 Michael Hanselmann
56 76094e37 Michael Hanselmann
class BaseWorker(threading.Thread, object):
57 76094e37 Michael Hanselmann
  """Base worker class for worker pools.
58 76094e37 Michael Hanselmann

59 76094e37 Michael Hanselmann
  Users of a worker pool must override RunTask in a subclass.
60 76094e37 Michael Hanselmann

61 76094e37 Michael Hanselmann
  """
62 b459a848 Andrea Spadaccini
  # pylint: disable=W0212
63 76094e37 Michael Hanselmann
  def __init__(self, pool, worker_id):
64 76094e37 Michael Hanselmann
    """Constructor for BaseWorker thread.
65 76094e37 Michael Hanselmann

66 116db7c7 Iustin Pop
    @param pool: the parent worker pool
67 116db7c7 Iustin Pop
    @param worker_id: identifier for this worker
68 76094e37 Michael Hanselmann

69 76094e37 Michael Hanselmann
    """
70 d16e6fd9 Michael Hanselmann
    super(BaseWorker, self).__init__(name=worker_id)
71 76094e37 Michael Hanselmann
    self.pool = pool
72 daba67c7 Michael Hanselmann
    self._worker_id = worker_id
73 76094e37 Michael Hanselmann
    self._current_task = None
74 76094e37 Michael Hanselmann
75 daba67c7 Michael Hanselmann
    assert self.getName() == worker_id
76 daba67c7 Michael Hanselmann
77 76094e37 Michael Hanselmann
  def ShouldTerminate(self):
78 2f4e1516 Michael Hanselmann
    """Returns whether this worker should terminate.
79 2f4e1516 Michael Hanselmann

80 2f4e1516 Michael Hanselmann
    Should only be called from within L{RunTask}.
81 76094e37 Michael Hanselmann

82 76094e37 Michael Hanselmann
    """
83 2f4e1516 Michael Hanselmann
    self.pool._lock.acquire()
84 2f4e1516 Michael Hanselmann
    try:
85 2f4e1516 Michael Hanselmann
      assert self._HasRunningTaskUnlocked()
86 2f4e1516 Michael Hanselmann
      return self.pool._ShouldWorkerTerminateUnlocked(self)
87 2f4e1516 Michael Hanselmann
    finally:
88 2f4e1516 Michael Hanselmann
      self.pool._lock.release()
89 76094e37 Michael Hanselmann
90 52c47e4e Michael Hanselmann
  def GetCurrentPriority(self):
91 52c47e4e Michael Hanselmann
    """Returns the priority of the current task.
92 52c47e4e Michael Hanselmann

93 52c47e4e Michael Hanselmann
    Should only be called from within L{RunTask}.
94 52c47e4e Michael Hanselmann

95 52c47e4e Michael Hanselmann
    """
96 52c47e4e Michael Hanselmann
    self.pool._lock.acquire()
97 52c47e4e Michael Hanselmann
    try:
98 52c47e4e Michael Hanselmann
      assert self._HasRunningTaskUnlocked()
99 52c47e4e Michael Hanselmann
100 52c47e4e Michael Hanselmann
      (priority, _, _) = self._current_task
101 52c47e4e Michael Hanselmann
102 52c47e4e Michael Hanselmann
      return priority
103 52c47e4e Michael Hanselmann
    finally:
104 52c47e4e Michael Hanselmann
      self.pool._lock.release()
105 52c47e4e Michael Hanselmann
106 daba67c7 Michael Hanselmann
  def SetTaskName(self, taskname):
107 daba67c7 Michael Hanselmann
    """Sets the name of the current task.
108 daba67c7 Michael Hanselmann

109 daba67c7 Michael Hanselmann
    Should only be called from within L{RunTask}.
110 daba67c7 Michael Hanselmann

111 daba67c7 Michael Hanselmann
    @type taskname: string
112 daba67c7 Michael Hanselmann
    @param taskname: Task's name
113 daba67c7 Michael Hanselmann

114 daba67c7 Michael Hanselmann
    """
115 daba67c7 Michael Hanselmann
    if taskname:
116 daba67c7 Michael Hanselmann
      name = "%s/%s" % (self._worker_id, taskname)
117 daba67c7 Michael Hanselmann
    else:
118 daba67c7 Michael Hanselmann
      name = self._worker_id
119 daba67c7 Michael Hanselmann
120 daba67c7 Michael Hanselmann
    # Set thread name
121 daba67c7 Michael Hanselmann
    self.setName(name)
122 daba67c7 Michael Hanselmann
123 b3558df1 Michael Hanselmann
  def _HasRunningTaskUnlocked(self):
124 b3558df1 Michael Hanselmann
    """Returns whether this worker is currently running a task.
125 b3558df1 Michael Hanselmann

126 b3558df1 Michael Hanselmann
    """
127 b3558df1 Michael Hanselmann
    return (self._current_task is not None)
128 b3558df1 Michael Hanselmann
129 76094e37 Michael Hanselmann
  def run(self):
130 76094e37 Michael Hanselmann
    """Main thread function.
131 76094e37 Michael Hanselmann

132 76094e37 Michael Hanselmann
    Waits for new tasks to show up in the queue.
133 76094e37 Michael Hanselmann

134 76094e37 Michael Hanselmann
    """
135 76094e37 Michael Hanselmann
    pool = self.pool
136 76094e37 Michael Hanselmann
137 76094e37 Michael Hanselmann
    while True:
138 46d0a3d0 Michael Hanselmann
      assert self._current_task is None
139 52c47e4e Michael Hanselmann
140 52c47e4e Michael Hanselmann
      defer = None
141 76094e37 Michael Hanselmann
      try:
142 21c5ad52 Michael Hanselmann
        # Wait on lock to be told either to terminate or to do a task
143 76094e37 Michael Hanselmann
        pool._lock.acquire()
144 76094e37 Michael Hanselmann
        try:
145 21c5ad52 Michael Hanselmann
          task = pool._WaitForTaskUnlocked(self)
146 76094e37 Michael Hanselmann
147 21c5ad52 Michael Hanselmann
          if task is _TERMINATE:
148 21c5ad52 Michael Hanselmann
            # Told to terminate
149 21c5ad52 Michael Hanselmann
            break
150 b3558df1 Michael Hanselmann
151 21c5ad52 Michael Hanselmann
          if task is None:
152 21c5ad52 Michael Hanselmann
            # Spurious notification, ignore
153 21c5ad52 Michael Hanselmann
            continue
154 76094e37 Michael Hanselmann
155 21c5ad52 Michael Hanselmann
          self._current_task = task
156 76094e37 Michael Hanselmann
157 46d0a3d0 Michael Hanselmann
          # No longer needed, dispose of reference
158 46d0a3d0 Michael Hanselmann
          del task
159 46d0a3d0 Michael Hanselmann
160 21c5ad52 Michael Hanselmann
          assert self._HasRunningTaskUnlocked()
161 46d0a3d0 Michael Hanselmann
162 76094e37 Michael Hanselmann
        finally:
163 76094e37 Michael Hanselmann
          pool._lock.release()
164 76094e37 Michael Hanselmann
165 52c47e4e Michael Hanselmann
        (priority, _, args) = self._current_task
166 76094e37 Michael Hanselmann
        try:
167 52c47e4e Michael Hanselmann
          # Run the actual task
168 52c47e4e Michael Hanselmann
          assert defer is None
169 52c47e4e Michael Hanselmann
          logging.debug("Starting task %r, priority %s", args, priority)
170 daba67c7 Michael Hanselmann
          assert self.getName() == self._worker_id
171 daba67c7 Michael Hanselmann
          try:
172 b459a848 Andrea Spadaccini
            self.RunTask(*args) # pylint: disable=W0142
173 daba67c7 Michael Hanselmann
          finally:
174 daba67c7 Michael Hanselmann
            self.SetTaskName(None)
175 52c47e4e Michael Hanselmann
          logging.debug("Done with task %r, priority %s", args, priority)
176 52c47e4e Michael Hanselmann
        except DeferTask, err:
177 52c47e4e Michael Hanselmann
          defer = err
178 52c47e4e Michael Hanselmann
179 52c47e4e Michael Hanselmann
          if defer.priority is None:
180 52c47e4e Michael Hanselmann
            # Use same priority
181 52c47e4e Michael Hanselmann
            defer.priority = priority
182 52c47e4e Michael Hanselmann
183 e1ea54e9 Michael Hanselmann
          logging.debug("Deferring task %r, new priority %s",
184 e1ea54e9 Michael Hanselmann
                        args, defer.priority)
185 52c47e4e Michael Hanselmann
186 52c47e4e Michael Hanselmann
          assert self._HasRunningTaskUnlocked()
187 b459a848 Andrea Spadaccini
        except: # pylint: disable=W0702
188 02fc74da Michael Hanselmann
          logging.exception("Caught unhandled exception")
189 c1cf1fe5 Michael Hanselmann
190 c1cf1fe5 Michael Hanselmann
        assert self._HasRunningTaskUnlocked()
191 76094e37 Michael Hanselmann
      finally:
192 76094e37 Michael Hanselmann
        # Notify pool
193 76094e37 Michael Hanselmann
        pool._lock.acquire()
194 76094e37 Michael Hanselmann
        try:
195 52c47e4e Michael Hanselmann
          if defer:
196 52c47e4e Michael Hanselmann
            assert self._current_task
197 52c47e4e Michael Hanselmann
            # Schedule again for later run
198 52c47e4e Michael Hanselmann
            (_, _, args) = self._current_task
199 52c47e4e Michael Hanselmann
            pool._AddTaskUnlocked(args, defer.priority)
200 52c47e4e Michael Hanselmann
201 b3558df1 Michael Hanselmann
          if self._current_task:
202 b3558df1 Michael Hanselmann
            self._current_task = None
203 53b1d12b Michael Hanselmann
            pool._worker_to_pool.notifyAll()
204 76094e37 Michael Hanselmann
        finally:
205 76094e37 Michael Hanselmann
          pool._lock.release()
206 76094e37 Michael Hanselmann
207 c1cf1fe5 Michael Hanselmann
      assert not self._HasRunningTaskUnlocked()
208 c1cf1fe5 Michael Hanselmann
209 02fc74da Michael Hanselmann
    logging.debug("Terminates")
210 b3558df1 Michael Hanselmann
211 76094e37 Michael Hanselmann
  def RunTask(self, *args):
212 76094e37 Michael Hanselmann
    """Function called to start a task.
213 76094e37 Michael Hanselmann

214 116db7c7 Iustin Pop
    This needs to be implemented by child classes.
215 116db7c7 Iustin Pop

216 76094e37 Michael Hanselmann
    """
217 76094e37 Michael Hanselmann
    raise NotImplementedError()
218 76094e37 Michael Hanselmann
219 76094e37 Michael Hanselmann
220 76094e37 Michael Hanselmann
class WorkerPool(object):
221 76094e37 Michael Hanselmann
  """Worker pool with a queue.
222 76094e37 Michael Hanselmann

223 76094e37 Michael Hanselmann
  This class is thread-safe.
224 76094e37 Michael Hanselmann

225 116db7c7 Iustin Pop
  Tasks are guaranteed to be started in the order in which they're
226 116db7c7 Iustin Pop
  added to the pool. Due to the nature of threading, they're not
227 116db7c7 Iustin Pop
  guaranteed to finish in the same order.
228 76094e37 Michael Hanselmann

229 76094e37 Michael Hanselmann
  """
230 89e2b4d2 Michael Hanselmann
  def __init__(self, name, num_workers, worker_class):
231 76094e37 Michael Hanselmann
    """Constructor for worker pool.
232 76094e37 Michael Hanselmann

233 116db7c7 Iustin Pop
    @param num_workers: number of workers to be started
234 116db7c7 Iustin Pop
        (dynamic resizing is not yet implemented)
235 116db7c7 Iustin Pop
    @param worker_class: the class to be instantiated for workers;
236 116db7c7 Iustin Pop
        should derive from L{BaseWorker}
237 76094e37 Michael Hanselmann

238 76094e37 Michael Hanselmann
    """
239 76094e37 Michael Hanselmann
    # Some of these variables are accessed by BaseWorker
240 53b1d12b Michael Hanselmann
    self._lock = threading.Lock()
241 53b1d12b Michael Hanselmann
    self._pool_to_pool = threading.Condition(self._lock)
242 53b1d12b Michael Hanselmann
    self._pool_to_worker = threading.Condition(self._lock)
243 53b1d12b Michael Hanselmann
    self._worker_to_pool = threading.Condition(self._lock)
244 76094e37 Michael Hanselmann
    self._worker_class = worker_class
245 89e2b4d2 Michael Hanselmann
    self._name = name
246 76094e37 Michael Hanselmann
    self._last_worker_id = 0
247 76094e37 Michael Hanselmann
    self._workers = []
248 76094e37 Michael Hanselmann
    self._quiescing = False
249 76094e37 Michael Hanselmann
250 76094e37 Michael Hanselmann
    # Terminating workers
251 76094e37 Michael Hanselmann
    self._termworkers = []
252 76094e37 Michael Hanselmann
253 76094e37 Michael Hanselmann
    # Queued tasks
254 52c47e4e Michael Hanselmann
    self._counter = 0
255 52c47e4e Michael Hanselmann
    self._tasks = []
256 76094e37 Michael Hanselmann
257 76094e37 Michael Hanselmann
    # Start workers
258 76094e37 Michael Hanselmann
    self.Resize(num_workers)
259 76094e37 Michael Hanselmann
260 76094e37 Michael Hanselmann
  # TODO: Implement dynamic resizing?
261 76094e37 Michael Hanselmann
262 c2a8e8ba Guido Trotter
  def _WaitWhileQuiescingUnlocked(self):
263 c2a8e8ba Guido Trotter
    """Wait until the worker pool has finished quiescing.
264 c2a8e8ba Guido Trotter

265 c2a8e8ba Guido Trotter
    """
266 c2a8e8ba Guido Trotter
    while self._quiescing:
267 c2a8e8ba Guido Trotter
      self._pool_to_pool.wait()
268 c2a8e8ba Guido Trotter
269 52c47e4e Michael Hanselmann
  def _AddTaskUnlocked(self, args, priority):
270 52c47e4e Michael Hanselmann
    """Adds a task to the internal queue.
271 52c47e4e Michael Hanselmann

272 52c47e4e Michael Hanselmann
    @type args: sequence
273 52c47e4e Michael Hanselmann
    @param args: Arguments passed to L{BaseWorker.RunTask}
274 52c47e4e Michael Hanselmann
    @type priority: number
275 52c47e4e Michael Hanselmann
    @param priority: Task priority
276 52c47e4e Michael Hanselmann

277 52c47e4e Michael Hanselmann
    """
278 189d2714 Michael Hanselmann
    assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
279 52c47e4e Michael Hanselmann
    assert isinstance(priority, (int, long)), "Priority must be numeric"
280 189d2714 Michael Hanselmann
281 52c47e4e Michael Hanselmann
    # This counter is used to ensure elements are processed in their
282 52c47e4e Michael Hanselmann
    # incoming order. For processing they're sorted by priority and then
283 52c47e4e Michael Hanselmann
    # counter.
284 52c47e4e Michael Hanselmann
    self._counter += 1
285 52c47e4e Michael Hanselmann
286 52c47e4e Michael Hanselmann
    heapq.heappush(self._tasks, (priority, self._counter, args))
287 189d2714 Michael Hanselmann
288 189d2714 Michael Hanselmann
    # Notify a waiting worker
289 189d2714 Michael Hanselmann
    self._pool_to_worker.notify()
290 189d2714 Michael Hanselmann
291 52c47e4e Michael Hanselmann
  def AddTask(self, args, priority=_DEFAULT_PRIORITY):
292 76094e37 Michael Hanselmann
    """Adds a task to the queue.
293 76094e37 Michael Hanselmann

294 b2e8a4d9 Michael Hanselmann
    @type args: sequence
295 116db7c7 Iustin Pop
    @param args: arguments passed to L{BaseWorker.RunTask}
296 52c47e4e Michael Hanselmann
    @type priority: number
297 52c47e4e Michael Hanselmann
    @param priority: Task priority
298 76094e37 Michael Hanselmann

299 76094e37 Michael Hanselmann
    """
300 76094e37 Michael Hanselmann
    self._lock.acquire()
301 76094e37 Michael Hanselmann
    try:
302 c2a8e8ba Guido Trotter
      self._WaitWhileQuiescingUnlocked()
303 52c47e4e Michael Hanselmann
      self._AddTaskUnlocked(args, priority)
304 76094e37 Michael Hanselmann
    finally:
305 76094e37 Michael Hanselmann
      self._lock.release()
306 76094e37 Michael Hanselmann
307 52c47e4e Michael Hanselmann
  def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY):
308 c2a8e8ba Guido Trotter
    """Add a list of tasks to the queue.
309 c2a8e8ba Guido Trotter

310 c2a8e8ba Guido Trotter
    @type tasks: list of tuples
311 c2a8e8ba Guido Trotter
    @param tasks: list of args passed to L{BaseWorker.RunTask}
312 52c47e4e Michael Hanselmann
    @type priority: number or list of numbers
313 52c47e4e Michael Hanselmann
    @param priority: Priority for all added tasks or a list with the priority
314 52c47e4e Michael Hanselmann
                     for each task
315 c2a8e8ba Guido Trotter

316 c2a8e8ba Guido Trotter
    """
317 25e557a5 Guido Trotter
    assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
318 25e557a5 Guido Trotter
      "Each task must be a sequence"
319 25e557a5 Guido Trotter
320 52c47e4e Michael Hanselmann
    assert (isinstance(priority, (int, long)) or
321 52c47e4e Michael Hanselmann
            compat.all(isinstance(prio, (int, long)) for prio in priority)), \
322 52c47e4e Michael Hanselmann
           "Priority must be numeric or be a list of numeric values"
323 52c47e4e Michael Hanselmann
324 52c47e4e Michael Hanselmann
    if isinstance(priority, (int, long)):
325 52c47e4e Michael Hanselmann
      priority = [priority] * len(tasks)
326 52c47e4e Michael Hanselmann
    elif len(priority) != len(tasks):
327 52c47e4e Michael Hanselmann
      raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
328 52c47e4e Michael Hanselmann
                                   " number of tasks (%s)" %
329 52c47e4e Michael Hanselmann
                                   (len(priority), len(tasks)))
330 52c47e4e Michael Hanselmann
331 c2a8e8ba Guido Trotter
    self._lock.acquire()
332 c2a8e8ba Guido Trotter
    try:
333 c2a8e8ba Guido Trotter
      self._WaitWhileQuiescingUnlocked()
334 c2a8e8ba Guido Trotter
335 52c47e4e Michael Hanselmann
      assert compat.all(isinstance(prio, (int, long)) for prio in priority)
336 52c47e4e Michael Hanselmann
      assert len(tasks) == len(priority)
337 52c47e4e Michael Hanselmann
338 52c47e4e Michael Hanselmann
      for args, priority in zip(tasks, priority):
339 52c47e4e Michael Hanselmann
        self._AddTaskUnlocked(args, priority)
340 c2a8e8ba Guido Trotter
    finally:
341 c2a8e8ba Guido Trotter
      self._lock.release()
342 c2a8e8ba Guido Trotter
343 21c5ad52 Michael Hanselmann
  def _WaitForTaskUnlocked(self, worker):
344 21c5ad52 Michael Hanselmann
    """Waits for a task for a worker.
345 21c5ad52 Michael Hanselmann

346 21c5ad52 Michael Hanselmann
    @type worker: L{BaseWorker}
347 21c5ad52 Michael Hanselmann
    @param worker: Worker thread
348 21c5ad52 Michael Hanselmann

349 21c5ad52 Michael Hanselmann
    """
350 21c5ad52 Michael Hanselmann
    if self._ShouldWorkerTerminateUnlocked(worker):
351 21c5ad52 Michael Hanselmann
      return _TERMINATE
352 21c5ad52 Michael Hanselmann
353 21c5ad52 Michael Hanselmann
    # We only wait if there's no task for us.
354 21c5ad52 Michael Hanselmann
    if not self._tasks:
355 21c5ad52 Michael Hanselmann
      logging.debug("Waiting for tasks")
356 21c5ad52 Michael Hanselmann
357 21c5ad52 Michael Hanselmann
      # wait() releases the lock and sleeps until notified
358 21c5ad52 Michael Hanselmann
      self._pool_to_worker.wait()
359 21c5ad52 Michael Hanselmann
360 21c5ad52 Michael Hanselmann
      logging.debug("Notified while waiting")
361 21c5ad52 Michael Hanselmann
362 21c5ad52 Michael Hanselmann
      # Were we woken up in order to terminate?
363 21c5ad52 Michael Hanselmann
      if self._ShouldWorkerTerminateUnlocked(worker):
364 21c5ad52 Michael Hanselmann
        return _TERMINATE
365 21c5ad52 Michael Hanselmann
366 21c5ad52 Michael Hanselmann
      if not self._tasks:
367 21c5ad52 Michael Hanselmann
        # Spurious notification, ignore
368 21c5ad52 Michael Hanselmann
        return None
369 21c5ad52 Michael Hanselmann
370 21c5ad52 Michael Hanselmann
    # Get task from queue and tell pool about it
371 21c5ad52 Michael Hanselmann
    try:
372 52c47e4e Michael Hanselmann
      return heapq.heappop(self._tasks)
373 21c5ad52 Michael Hanselmann
    finally:
374 21c5ad52 Michael Hanselmann
      self._worker_to_pool.notifyAll()
375 21c5ad52 Michael Hanselmann
376 76094e37 Michael Hanselmann
  def _ShouldWorkerTerminateUnlocked(self, worker):
377 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
378 76094e37 Michael Hanselmann

379 76094e37 Michael Hanselmann
    """
380 76094e37 Michael Hanselmann
    return (worker in self._termworkers)
381 76094e37 Michael Hanselmann
382 76094e37 Michael Hanselmann
  def _HasRunningTasksUnlocked(self):
383 76094e37 Michael Hanselmann
    """Checks whether there's a task running in a worker.
384 76094e37 Michael Hanselmann

385 76094e37 Michael Hanselmann
    """
386 76094e37 Michael Hanselmann
    for worker in self._workers + self._termworkers:
387 b459a848 Andrea Spadaccini
      if worker._HasRunningTaskUnlocked(): # pylint: disable=W0212
388 76094e37 Michael Hanselmann
        return True
389 76094e37 Michael Hanselmann
    return False
390 76094e37 Michael Hanselmann
391 76094e37 Michael Hanselmann
  def Quiesce(self):
392 76094e37 Michael Hanselmann
    """Waits until the task queue is empty.
393 76094e37 Michael Hanselmann

394 76094e37 Michael Hanselmann
    """
395 76094e37 Michael Hanselmann
    self._lock.acquire()
396 76094e37 Michael Hanselmann
    try:
397 76094e37 Michael Hanselmann
      self._quiescing = True
398 76094e37 Michael Hanselmann
399 76094e37 Michael Hanselmann
      # Wait while there are tasks pending or running
400 76094e37 Michael Hanselmann
      while self._tasks or self._HasRunningTasksUnlocked():
401 53b1d12b Michael Hanselmann
        self._worker_to_pool.wait()
402 76094e37 Michael Hanselmann
403 76094e37 Michael Hanselmann
    finally:
404 76094e37 Michael Hanselmann
      self._quiescing = False
405 76094e37 Michael Hanselmann
406 76094e37 Michael Hanselmann
      # Make sure AddTasks continues in case it was waiting
407 53b1d12b Michael Hanselmann
      self._pool_to_pool.notifyAll()
408 76094e37 Michael Hanselmann
409 76094e37 Michael Hanselmann
      self._lock.release()
410 76094e37 Michael Hanselmann
411 76094e37 Michael Hanselmann
  def _NewWorkerIdUnlocked(self):
412 116db7c7 Iustin Pop
    """Return an identifier for a new worker.
413 116db7c7 Iustin Pop

414 116db7c7 Iustin Pop
    """
415 76094e37 Michael Hanselmann
    self._last_worker_id += 1
416 89e2b4d2 Michael Hanselmann
417 89e2b4d2 Michael Hanselmann
    return "%s%d" % (self._name, self._last_worker_id)
418 76094e37 Michael Hanselmann
419 76094e37 Michael Hanselmann
  def _ResizeUnlocked(self, num_workers):
420 76094e37 Michael Hanselmann
    """Changes the number of workers.
421 76094e37 Michael Hanselmann

422 76094e37 Michael Hanselmann
    """
423 76094e37 Michael Hanselmann
    assert num_workers >= 0, "num_workers must be >= 0"
424 76094e37 Michael Hanselmann
425 76094e37 Michael Hanselmann
    logging.debug("Resizing to %s workers", num_workers)
426 76094e37 Michael Hanselmann
427 76094e37 Michael Hanselmann
    current_count = len(self._workers)
428 76094e37 Michael Hanselmann
429 76094e37 Michael Hanselmann
    if current_count == num_workers:
430 76094e37 Michael Hanselmann
      # Nothing to do
431 76094e37 Michael Hanselmann
      pass
432 76094e37 Michael Hanselmann
433 76094e37 Michael Hanselmann
    elif current_count > num_workers:
434 76094e37 Michael Hanselmann
      if num_workers == 0:
435 76094e37 Michael Hanselmann
        # Create copy of list to iterate over while lock isn't held.
436 76094e37 Michael Hanselmann
        termworkers = self._workers[:]
437 76094e37 Michael Hanselmann
        del self._workers[:]
438 76094e37 Michael Hanselmann
      else:
439 76094e37 Michael Hanselmann
        # TODO: Implement partial downsizing
440 76094e37 Michael Hanselmann
        raise NotImplementedError()
441 76094e37 Michael Hanselmann
        #termworkers = ...
442 76094e37 Michael Hanselmann
443 76094e37 Michael Hanselmann
      self._termworkers += termworkers
444 76094e37 Michael Hanselmann
445 76094e37 Michael Hanselmann
      # Notify workers that something has changed
446 53b1d12b Michael Hanselmann
      self._pool_to_worker.notifyAll()
447 76094e37 Michael Hanselmann
448 76094e37 Michael Hanselmann
      # Join all terminating workers
449 76094e37 Michael Hanselmann
      self._lock.release()
450 76094e37 Michael Hanselmann
      try:
451 76094e37 Michael Hanselmann
        for worker in termworkers:
452 c0a8eb9e Michael Hanselmann
          logging.debug("Waiting for thread %s", worker.getName())
453 76094e37 Michael Hanselmann
          worker.join()
454 76094e37 Michael Hanselmann
      finally:
455 76094e37 Michael Hanselmann
        self._lock.acquire()
456 76094e37 Michael Hanselmann
457 76094e37 Michael Hanselmann
      # Remove terminated threads. This could be done in a more efficient way
458 76094e37 Michael Hanselmann
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
459 76094e37 Michael Hanselmann
      # don't leave zombie threads around.
460 76094e37 Michael Hanselmann
      for worker in termworkers:
461 76094e37 Michael Hanselmann
        assert worker in self._termworkers, ("Worker not in list of"
462 76094e37 Michael Hanselmann
                                             " terminating workers")
463 76094e37 Michael Hanselmann
        if not worker.isAlive():
464 76094e37 Michael Hanselmann
          self._termworkers.remove(worker)
465 76094e37 Michael Hanselmann
466 76094e37 Michael Hanselmann
      assert not self._termworkers, "Zombie worker detected"
467 76094e37 Michael Hanselmann
468 76094e37 Michael Hanselmann
    elif current_count < num_workers:
469 76094e37 Michael Hanselmann
      # Create (num_workers - current_count) new workers
470 f1501b3f Michael Hanselmann
      for _ in range(num_workers - current_count):
471 76094e37 Michael Hanselmann
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
472 76094e37 Michael Hanselmann
        self._workers.append(worker)
473 76094e37 Michael Hanselmann
        worker.start()
474 76094e37 Michael Hanselmann
475 76094e37 Michael Hanselmann
  def Resize(self, num_workers):
476 76094e37 Michael Hanselmann
    """Changes the number of workers in the pool.
477 76094e37 Michael Hanselmann

478 116db7c7 Iustin Pop
    @param num_workers: the new number of workers
479 76094e37 Michael Hanselmann

480 76094e37 Michael Hanselmann
    """
481 76094e37 Michael Hanselmann
    self._lock.acquire()
482 76094e37 Michael Hanselmann
    try:
483 76094e37 Michael Hanselmann
      return self._ResizeUnlocked(num_workers)
484 76094e37 Michael Hanselmann
    finally:
485 76094e37 Michael Hanselmann
      self._lock.release()
486 76094e37 Michael Hanselmann
487 76094e37 Michael Hanselmann
  def TerminateWorkers(self):
488 76094e37 Michael Hanselmann
    """Terminate all worker threads.
489 76094e37 Michael Hanselmann

490 76094e37 Michael Hanselmann
    Unstarted tasks will be ignored.
491 76094e37 Michael Hanselmann

492 76094e37 Michael Hanselmann
    """
493 76094e37 Michael Hanselmann
    logging.debug("Terminating all workers")
494 76094e37 Michael Hanselmann
495 76094e37 Michael Hanselmann
    self._lock.acquire()
496 76094e37 Michael Hanselmann
    try:
497 76094e37 Michael Hanselmann
      self._ResizeUnlocked(0)
498 76094e37 Michael Hanselmann
499 76094e37 Michael Hanselmann
      if self._tasks:
500 76094e37 Michael Hanselmann
        logging.debug("There are %s tasks left", len(self._tasks))
501 76094e37 Michael Hanselmann
    finally:
502 76094e37 Michael Hanselmann
      self._lock.release()
503 76094e37 Michael Hanselmann
504 76094e37 Michael Hanselmann
    logging.debug("All workers terminated")