Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ ccedb11b

History | View | Annotate | Download (9.8 kB)

1 76094e37 Michael Hanselmann
#
2 76094e37 Michael Hanselmann
#
3 76094e37 Michael Hanselmann
4 189d2714 Michael Hanselmann
# Copyright (C) 2008, 2009, 2010 Google Inc.
5 76094e37 Michael Hanselmann
#
6 76094e37 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 76094e37 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 76094e37 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 76094e37 Michael Hanselmann
# (at your option) any later version.
10 76094e37 Michael Hanselmann
#
11 76094e37 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 76094e37 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 76094e37 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 76094e37 Michael Hanselmann
# General Public License for more details.
15 76094e37 Michael Hanselmann
#
16 76094e37 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 76094e37 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 76094e37 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 76094e37 Michael Hanselmann
# 02110-1301, USA.
20 76094e37 Michael Hanselmann
21 76094e37 Michael Hanselmann
22 76094e37 Michael Hanselmann
"""Base classes for worker pools.
23 76094e37 Michael Hanselmann

24 76094e37 Michael Hanselmann
"""
25 76094e37 Michael Hanselmann
26 76094e37 Michael Hanselmann
import collections
27 76094e37 Michael Hanselmann
import logging
28 76094e37 Michael Hanselmann
import threading
29 76094e37 Michael Hanselmann
30 25e557a5 Guido Trotter
from ganeti import compat
31 25e557a5 Guido Trotter
32 76094e37 Michael Hanselmann
33 21c5ad52 Michael Hanselmann
_TERMINATE = object()
34 21c5ad52 Michael Hanselmann
35 21c5ad52 Michael Hanselmann
36 76094e37 Michael Hanselmann
class BaseWorker(threading.Thread, object):
37 76094e37 Michael Hanselmann
  """Base worker class for worker pools.
38 76094e37 Michael Hanselmann

39 76094e37 Michael Hanselmann
  Users of a worker pool must override RunTask in a subclass.
40 76094e37 Michael Hanselmann

41 76094e37 Michael Hanselmann
  """
42 7260cfbe Iustin Pop
  # pylint: disable-msg=W0212
43 76094e37 Michael Hanselmann
  def __init__(self, pool, worker_id):
44 76094e37 Michael Hanselmann
    """Constructor for BaseWorker thread.
45 76094e37 Michael Hanselmann

46 116db7c7 Iustin Pop
    @param pool: the parent worker pool
47 116db7c7 Iustin Pop
    @param worker_id: identifier for this worker
48 76094e37 Michael Hanselmann

49 76094e37 Michael Hanselmann
    """
50 d16e6fd9 Michael Hanselmann
    super(BaseWorker, self).__init__(name=worker_id)
51 76094e37 Michael Hanselmann
    self.pool = pool
52 76094e37 Michael Hanselmann
    self._current_task = None
53 76094e37 Michael Hanselmann
54 76094e37 Michael Hanselmann
  def ShouldTerminate(self):
55 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
56 76094e37 Michael Hanselmann

57 76094e37 Michael Hanselmann
    """
58 76094e37 Michael Hanselmann
    return self.pool.ShouldWorkerTerminate(self)
59 76094e37 Michael Hanselmann
60 b3558df1 Michael Hanselmann
  def _HasRunningTaskUnlocked(self):
61 b3558df1 Michael Hanselmann
    """Returns whether this worker is currently running a task.
62 b3558df1 Michael Hanselmann

63 b3558df1 Michael Hanselmann
    """
64 b3558df1 Michael Hanselmann
    return (self._current_task is not None)
65 b3558df1 Michael Hanselmann
66 76094e37 Michael Hanselmann
  def run(self):
67 76094e37 Michael Hanselmann
    """Main thread function.
68 76094e37 Michael Hanselmann

69 76094e37 Michael Hanselmann
    Waits for new tasks to show up in the queue.
70 76094e37 Michael Hanselmann

71 76094e37 Michael Hanselmann
    """
72 76094e37 Michael Hanselmann
    pool = self.pool
73 76094e37 Michael Hanselmann
74 ccedb11b Michael Hanselmann
    assert self._current_task is None
75 76094e37 Michael Hanselmann
76 76094e37 Michael Hanselmann
    while True:
77 76094e37 Michael Hanselmann
      try:
78 21c5ad52 Michael Hanselmann
        # Wait on lock to be told either to terminate or to do a task
79 76094e37 Michael Hanselmann
        pool._lock.acquire()
80 76094e37 Michael Hanselmann
        try:
81 21c5ad52 Michael Hanselmann
          task = pool._WaitForTaskUnlocked(self)
82 76094e37 Michael Hanselmann
83 21c5ad52 Michael Hanselmann
          if task is _TERMINATE:
84 21c5ad52 Michael Hanselmann
            # Told to terminate
85 21c5ad52 Michael Hanselmann
            break
86 b3558df1 Michael Hanselmann
87 21c5ad52 Michael Hanselmann
          if task is None:
88 21c5ad52 Michael Hanselmann
            # Spurious notification, ignore
89 21c5ad52 Michael Hanselmann
            continue
90 76094e37 Michael Hanselmann
91 21c5ad52 Michael Hanselmann
          self._current_task = task
92 76094e37 Michael Hanselmann
93 21c5ad52 Michael Hanselmann
          assert self._HasRunningTaskUnlocked()
94 76094e37 Michael Hanselmann
        finally:
95 76094e37 Michael Hanselmann
          pool._lock.release()
96 76094e37 Michael Hanselmann
97 76094e37 Michael Hanselmann
        # Run the actual task
98 76094e37 Michael Hanselmann
        try:
99 02fc74da Michael Hanselmann
          logging.debug("Starting task %r", self._current_task)
100 76094e37 Michael Hanselmann
          self.RunTask(*self._current_task)
101 02fc74da Michael Hanselmann
          logging.debug("Done with task %r", self._current_task)
102 7260cfbe Iustin Pop
        except: # pylint: disable-msg=W0702
103 02fc74da Michael Hanselmann
          logging.exception("Caught unhandled exception")
104 76094e37 Michael Hanselmann
      finally:
105 76094e37 Michael Hanselmann
        # Notify pool
106 76094e37 Michael Hanselmann
        pool._lock.acquire()
107 76094e37 Michael Hanselmann
        try:
108 b3558df1 Michael Hanselmann
          if self._current_task:
109 b3558df1 Michael Hanselmann
            self._current_task = None
110 53b1d12b Michael Hanselmann
            pool._worker_to_pool.notifyAll()
111 76094e37 Michael Hanselmann
        finally:
112 76094e37 Michael Hanselmann
          pool._lock.release()
113 76094e37 Michael Hanselmann
114 02fc74da Michael Hanselmann
    logging.debug("Terminates")
115 b3558df1 Michael Hanselmann
116 76094e37 Michael Hanselmann
  def RunTask(self, *args):
117 76094e37 Michael Hanselmann
    """Function called to start a task.
118 76094e37 Michael Hanselmann

119 116db7c7 Iustin Pop
    This needs to be implemented by child classes.
120 116db7c7 Iustin Pop

121 76094e37 Michael Hanselmann
    """
122 76094e37 Michael Hanselmann
    raise NotImplementedError()
123 76094e37 Michael Hanselmann
124 76094e37 Michael Hanselmann
125 76094e37 Michael Hanselmann
class WorkerPool(object):
126 76094e37 Michael Hanselmann
  """Worker pool with a queue.
127 76094e37 Michael Hanselmann

128 76094e37 Michael Hanselmann
  This class is thread-safe.
129 76094e37 Michael Hanselmann

130 116db7c7 Iustin Pop
  Tasks are guaranteed to be started in the order in which they're
131 116db7c7 Iustin Pop
  added to the pool. Due to the nature of threading, they're not
132 116db7c7 Iustin Pop
  guaranteed to finish in the same order.
133 76094e37 Michael Hanselmann

134 76094e37 Michael Hanselmann
  """
135 89e2b4d2 Michael Hanselmann
  def __init__(self, name, num_workers, worker_class):
136 76094e37 Michael Hanselmann
    """Constructor for worker pool.
137 76094e37 Michael Hanselmann

138 116db7c7 Iustin Pop
    @param num_workers: number of workers to be started
139 116db7c7 Iustin Pop
        (dynamic resizing is not yet implemented)
140 116db7c7 Iustin Pop
    @param worker_class: the class to be instantiated for workers;
141 116db7c7 Iustin Pop
        should derive from L{BaseWorker}
142 76094e37 Michael Hanselmann

143 76094e37 Michael Hanselmann
    """
144 76094e37 Michael Hanselmann
    # Some of these variables are accessed by BaseWorker
145 53b1d12b Michael Hanselmann
    self._lock = threading.Lock()
146 53b1d12b Michael Hanselmann
    self._pool_to_pool = threading.Condition(self._lock)
147 53b1d12b Michael Hanselmann
    self._pool_to_worker = threading.Condition(self._lock)
148 53b1d12b Michael Hanselmann
    self._worker_to_pool = threading.Condition(self._lock)
149 76094e37 Michael Hanselmann
    self._worker_class = worker_class
150 89e2b4d2 Michael Hanselmann
    self._name = name
151 76094e37 Michael Hanselmann
    self._last_worker_id = 0
152 76094e37 Michael Hanselmann
    self._workers = []
153 76094e37 Michael Hanselmann
    self._quiescing = False
154 76094e37 Michael Hanselmann
155 76094e37 Michael Hanselmann
    # Terminating workers
156 76094e37 Michael Hanselmann
    self._termworkers = []
157 76094e37 Michael Hanselmann
158 76094e37 Michael Hanselmann
    # Queued tasks
159 76094e37 Michael Hanselmann
    self._tasks = collections.deque()
160 76094e37 Michael Hanselmann
161 76094e37 Michael Hanselmann
    # Start workers
162 76094e37 Michael Hanselmann
    self.Resize(num_workers)
163 76094e37 Michael Hanselmann
164 76094e37 Michael Hanselmann
  # TODO: Implement dynamic resizing?
165 76094e37 Michael Hanselmann
166 c2a8e8ba Guido Trotter
  def _WaitWhileQuiescingUnlocked(self):
167 c2a8e8ba Guido Trotter
    """Wait until the worker pool has finished quiescing.
168 c2a8e8ba Guido Trotter

169 c2a8e8ba Guido Trotter
    """
170 c2a8e8ba Guido Trotter
    while self._quiescing:
171 c2a8e8ba Guido Trotter
      self._pool_to_pool.wait()
172 c2a8e8ba Guido Trotter
173 189d2714 Michael Hanselmann
  def _AddTaskUnlocked(self, args):
174 189d2714 Michael Hanselmann
    assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
175 189d2714 Michael Hanselmann
176 189d2714 Michael Hanselmann
    self._tasks.append(args)
177 189d2714 Michael Hanselmann
178 189d2714 Michael Hanselmann
    # Notify a waiting worker
179 189d2714 Michael Hanselmann
    self._pool_to_worker.notify()
180 189d2714 Michael Hanselmann
181 76094e37 Michael Hanselmann
  def AddTask(self, *args):
182 76094e37 Michael Hanselmann
    """Adds a task to the queue.
183 76094e37 Michael Hanselmann

184 116db7c7 Iustin Pop
    @param args: arguments passed to L{BaseWorker.RunTask}
185 76094e37 Michael Hanselmann

186 76094e37 Michael Hanselmann
    """
187 76094e37 Michael Hanselmann
    self._lock.acquire()
188 76094e37 Michael Hanselmann
    try:
189 c2a8e8ba Guido Trotter
      self._WaitWhileQuiescingUnlocked()
190 189d2714 Michael Hanselmann
      self._AddTaskUnlocked(args)
191 76094e37 Michael Hanselmann
    finally:
192 76094e37 Michael Hanselmann
      self._lock.release()
193 76094e37 Michael Hanselmann
194 c2a8e8ba Guido Trotter
  def AddManyTasks(self, tasks):
195 c2a8e8ba Guido Trotter
    """Add a list of tasks to the queue.
196 c2a8e8ba Guido Trotter

197 c2a8e8ba Guido Trotter
    @type tasks: list of tuples
198 c2a8e8ba Guido Trotter
    @param tasks: list of args passed to L{BaseWorker.RunTask}
199 c2a8e8ba Guido Trotter

200 c2a8e8ba Guido Trotter
    """
201 25e557a5 Guido Trotter
    assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
202 25e557a5 Guido Trotter
      "Each task must be a sequence"
203 25e557a5 Guido Trotter
204 c2a8e8ba Guido Trotter
    self._lock.acquire()
205 c2a8e8ba Guido Trotter
    try:
206 c2a8e8ba Guido Trotter
      self._WaitWhileQuiescingUnlocked()
207 c2a8e8ba Guido Trotter
208 189d2714 Michael Hanselmann
      for args in tasks:
209 189d2714 Michael Hanselmann
        self._AddTaskUnlocked(args)
210 c2a8e8ba Guido Trotter
    finally:
211 c2a8e8ba Guido Trotter
      self._lock.release()
212 c2a8e8ba Guido Trotter
213 21c5ad52 Michael Hanselmann
  def _WaitForTaskUnlocked(self, worker):
214 21c5ad52 Michael Hanselmann
    """Waits for a task for a worker.
215 21c5ad52 Michael Hanselmann

216 21c5ad52 Michael Hanselmann
    @type worker: L{BaseWorker}
217 21c5ad52 Michael Hanselmann
    @param worker: Worker thread
218 21c5ad52 Michael Hanselmann

219 21c5ad52 Michael Hanselmann
    """
220 21c5ad52 Michael Hanselmann
    if self._ShouldWorkerTerminateUnlocked(worker):
221 21c5ad52 Michael Hanselmann
      return _TERMINATE
222 21c5ad52 Michael Hanselmann
223 21c5ad52 Michael Hanselmann
    # We only wait if there's no task for us.
224 21c5ad52 Michael Hanselmann
    if not self._tasks:
225 21c5ad52 Michael Hanselmann
      logging.debug("Waiting for tasks")
226 21c5ad52 Michael Hanselmann
227 21c5ad52 Michael Hanselmann
      # wait() releases the lock and sleeps until notified
228 21c5ad52 Michael Hanselmann
      self._pool_to_worker.wait()
229 21c5ad52 Michael Hanselmann
230 21c5ad52 Michael Hanselmann
      logging.debug("Notified while waiting")
231 21c5ad52 Michael Hanselmann
232 21c5ad52 Michael Hanselmann
      # Were we woken up in order to terminate?
233 21c5ad52 Michael Hanselmann
      if self._ShouldWorkerTerminateUnlocked(worker):
234 21c5ad52 Michael Hanselmann
        return _TERMINATE
235 21c5ad52 Michael Hanselmann
236 21c5ad52 Michael Hanselmann
      if not self._tasks:
237 21c5ad52 Michael Hanselmann
        # Spurious notification, ignore
238 21c5ad52 Michael Hanselmann
        return None
239 21c5ad52 Michael Hanselmann
240 21c5ad52 Michael Hanselmann
    # Get task from queue and tell pool about it
241 21c5ad52 Michael Hanselmann
    try:
242 21c5ad52 Michael Hanselmann
      return self._tasks.popleft()
243 21c5ad52 Michael Hanselmann
    finally:
244 21c5ad52 Michael Hanselmann
      self._worker_to_pool.notifyAll()
245 21c5ad52 Michael Hanselmann
246 76094e37 Michael Hanselmann
  def _ShouldWorkerTerminateUnlocked(self, worker):
247 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
248 76094e37 Michael Hanselmann

249 76094e37 Michael Hanselmann
    """
250 76094e37 Michael Hanselmann
    return (worker in self._termworkers)
251 76094e37 Michael Hanselmann
252 76094e37 Michael Hanselmann
  def ShouldWorkerTerminate(self, worker):
253 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
254 76094e37 Michael Hanselmann

255 76094e37 Michael Hanselmann
    """
256 76094e37 Michael Hanselmann
    self._lock.acquire()
257 76094e37 Michael Hanselmann
    try:
258 805f0c07 Iustin Pop
      return self._ShouldWorkerTerminateUnlocked(worker)
259 76094e37 Michael Hanselmann
    finally:
260 76094e37 Michael Hanselmann
      self._lock.release()
261 76094e37 Michael Hanselmann
262 76094e37 Michael Hanselmann
  def _HasRunningTasksUnlocked(self):
263 76094e37 Michael Hanselmann
    """Checks whether there's a task running in a worker.
264 76094e37 Michael Hanselmann

265 76094e37 Michael Hanselmann
    """
266 76094e37 Michael Hanselmann
    for worker in self._workers + self._termworkers:
267 7260cfbe Iustin Pop
      if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
268 76094e37 Michael Hanselmann
        return True
269 76094e37 Michael Hanselmann
    return False
270 76094e37 Michael Hanselmann
271 76094e37 Michael Hanselmann
  def Quiesce(self):
272 76094e37 Michael Hanselmann
    """Waits until the task queue is empty.
273 76094e37 Michael Hanselmann

274 76094e37 Michael Hanselmann
    """
275 76094e37 Michael Hanselmann
    self._lock.acquire()
276 76094e37 Michael Hanselmann
    try:
277 76094e37 Michael Hanselmann
      self._quiescing = True
278 76094e37 Michael Hanselmann
279 76094e37 Michael Hanselmann
      # Wait while there are tasks pending or running
280 76094e37 Michael Hanselmann
      while self._tasks or self._HasRunningTasksUnlocked():
281 53b1d12b Michael Hanselmann
        self._worker_to_pool.wait()
282 76094e37 Michael Hanselmann
283 76094e37 Michael Hanselmann
    finally:
284 76094e37 Michael Hanselmann
      self._quiescing = False
285 76094e37 Michael Hanselmann
286 76094e37 Michael Hanselmann
      # Make sure AddTasks continues in case it was waiting
287 53b1d12b Michael Hanselmann
      self._pool_to_pool.notifyAll()
288 76094e37 Michael Hanselmann
289 76094e37 Michael Hanselmann
      self._lock.release()
290 76094e37 Michael Hanselmann
291 76094e37 Michael Hanselmann
  def _NewWorkerIdUnlocked(self):
292 116db7c7 Iustin Pop
    """Return an identifier for a new worker.
293 116db7c7 Iustin Pop

294 116db7c7 Iustin Pop
    """
295 76094e37 Michael Hanselmann
    self._last_worker_id += 1
296 89e2b4d2 Michael Hanselmann
297 89e2b4d2 Michael Hanselmann
    return "%s%d" % (self._name, self._last_worker_id)
298 76094e37 Michael Hanselmann
299 76094e37 Michael Hanselmann
  def _ResizeUnlocked(self, num_workers):
300 76094e37 Michael Hanselmann
    """Changes the number of workers.
301 76094e37 Michael Hanselmann

302 76094e37 Michael Hanselmann
    """
303 76094e37 Michael Hanselmann
    assert num_workers >= 0, "num_workers must be >= 0"
304 76094e37 Michael Hanselmann
305 76094e37 Michael Hanselmann
    logging.debug("Resizing to %s workers", num_workers)
306 76094e37 Michael Hanselmann
307 76094e37 Michael Hanselmann
    current_count = len(self._workers)
308 76094e37 Michael Hanselmann
309 76094e37 Michael Hanselmann
    if current_count == num_workers:
310 76094e37 Michael Hanselmann
      # Nothing to do
311 76094e37 Michael Hanselmann
      pass
312 76094e37 Michael Hanselmann
313 76094e37 Michael Hanselmann
    elif current_count > num_workers:
314 76094e37 Michael Hanselmann
      if num_workers == 0:
315 76094e37 Michael Hanselmann
        # Create copy of list to iterate over while lock isn't held.
316 76094e37 Michael Hanselmann
        termworkers = self._workers[:]
317 76094e37 Michael Hanselmann
        del self._workers[:]
318 76094e37 Michael Hanselmann
      else:
319 76094e37 Michael Hanselmann
        # TODO: Implement partial downsizing
320 76094e37 Michael Hanselmann
        raise NotImplementedError()
321 76094e37 Michael Hanselmann
        #termworkers = ...
322 76094e37 Michael Hanselmann
323 76094e37 Michael Hanselmann
      self._termworkers += termworkers
324 76094e37 Michael Hanselmann
325 76094e37 Michael Hanselmann
      # Notify workers that something has changed
326 53b1d12b Michael Hanselmann
      self._pool_to_worker.notifyAll()
327 76094e37 Michael Hanselmann
328 76094e37 Michael Hanselmann
      # Join all terminating workers
329 76094e37 Michael Hanselmann
      self._lock.release()
330 76094e37 Michael Hanselmann
      try:
331 76094e37 Michael Hanselmann
        for worker in termworkers:
332 c0a8eb9e Michael Hanselmann
          logging.debug("Waiting for thread %s", worker.getName())
333 76094e37 Michael Hanselmann
          worker.join()
334 76094e37 Michael Hanselmann
      finally:
335 76094e37 Michael Hanselmann
        self._lock.acquire()
336 76094e37 Michael Hanselmann
337 76094e37 Michael Hanselmann
      # Remove terminated threads. This could be done in a more efficient way
338 76094e37 Michael Hanselmann
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
339 76094e37 Michael Hanselmann
      # don't leave zombie threads around.
340 76094e37 Michael Hanselmann
      for worker in termworkers:
341 76094e37 Michael Hanselmann
        assert worker in self._termworkers, ("Worker not in list of"
342 76094e37 Michael Hanselmann
                                             " terminating workers")
343 76094e37 Michael Hanselmann
        if not worker.isAlive():
344 76094e37 Michael Hanselmann
          self._termworkers.remove(worker)
345 76094e37 Michael Hanselmann
346 76094e37 Michael Hanselmann
      assert not self._termworkers, "Zombie worker detected"
347 76094e37 Michael Hanselmann
348 76094e37 Michael Hanselmann
    elif current_count < num_workers:
349 76094e37 Michael Hanselmann
      # Create (num_workers - current_count) new workers
350 f1501b3f Michael Hanselmann
      for _ in range(num_workers - current_count):
351 76094e37 Michael Hanselmann
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
352 76094e37 Michael Hanselmann
        self._workers.append(worker)
353 76094e37 Michael Hanselmann
        worker.start()
354 76094e37 Michael Hanselmann
355 76094e37 Michael Hanselmann
  def Resize(self, num_workers):
356 76094e37 Michael Hanselmann
    """Changes the number of workers in the pool.
357 76094e37 Michael Hanselmann

358 116db7c7 Iustin Pop
    @param num_workers: the new number of workers
359 76094e37 Michael Hanselmann

360 76094e37 Michael Hanselmann
    """
361 76094e37 Michael Hanselmann
    self._lock.acquire()
362 76094e37 Michael Hanselmann
    try:
363 76094e37 Michael Hanselmann
      return self._ResizeUnlocked(num_workers)
364 76094e37 Michael Hanselmann
    finally:
365 76094e37 Michael Hanselmann
      self._lock.release()
366 76094e37 Michael Hanselmann
367 76094e37 Michael Hanselmann
  def TerminateWorkers(self):
368 76094e37 Michael Hanselmann
    """Terminate all worker threads.
369 76094e37 Michael Hanselmann

370 76094e37 Michael Hanselmann
    Unstarted tasks will be ignored.
371 76094e37 Michael Hanselmann

372 76094e37 Michael Hanselmann
    """
373 76094e37 Michael Hanselmann
    logging.debug("Terminating all workers")
374 76094e37 Michael Hanselmann
375 76094e37 Michael Hanselmann
    self._lock.acquire()
376 76094e37 Michael Hanselmann
    try:
377 76094e37 Michael Hanselmann
      self._ResizeUnlocked(0)
378 76094e37 Michael Hanselmann
379 76094e37 Michael Hanselmann
      if self._tasks:
380 76094e37 Michael Hanselmann
        logging.debug("There are %s tasks left", len(self._tasks))
381 76094e37 Michael Hanselmann
    finally:
382 76094e37 Michael Hanselmann
      self._lock.release()
383 76094e37 Michael Hanselmann
384 76094e37 Michael Hanselmann
    logging.debug("All workers terminated")