Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ 8044bf65

History | View | Annotate | Download (9.9 kB)

1 76094e37 Michael Hanselmann
#
2 76094e37 Michael Hanselmann
#
3 76094e37 Michael Hanselmann
4 189d2714 Michael Hanselmann
# Copyright (C) 2008, 2009, 2010 Google Inc.
5 76094e37 Michael Hanselmann
#
6 76094e37 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 76094e37 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 76094e37 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 76094e37 Michael Hanselmann
# (at your option) any later version.
10 76094e37 Michael Hanselmann
#
11 76094e37 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 76094e37 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 76094e37 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 76094e37 Michael Hanselmann
# General Public License for more details.
15 76094e37 Michael Hanselmann
#
16 76094e37 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 76094e37 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 76094e37 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 76094e37 Michael Hanselmann
# 02110-1301, USA.
20 76094e37 Michael Hanselmann
21 76094e37 Michael Hanselmann
22 76094e37 Michael Hanselmann
"""Base classes for worker pools.
23 76094e37 Michael Hanselmann

24 76094e37 Michael Hanselmann
"""
25 76094e37 Michael Hanselmann
26 76094e37 Michael Hanselmann
import collections
27 76094e37 Michael Hanselmann
import logging
28 76094e37 Michael Hanselmann
import threading
29 76094e37 Michael Hanselmann
30 25e557a5 Guido Trotter
from ganeti import compat
31 25e557a5 Guido Trotter
32 76094e37 Michael Hanselmann
33 21c5ad52 Michael Hanselmann
_TERMINATE = object()
34 21c5ad52 Michael Hanselmann
35 21c5ad52 Michael Hanselmann
36 76094e37 Michael Hanselmann
class BaseWorker(threading.Thread, object):
37 76094e37 Michael Hanselmann
  """Base worker class for worker pools.
38 76094e37 Michael Hanselmann

39 76094e37 Michael Hanselmann
  Users of a worker pool must override RunTask in a subclass.
40 76094e37 Michael Hanselmann

41 76094e37 Michael Hanselmann
  """
42 7260cfbe Iustin Pop
  # pylint: disable-msg=W0212
43 76094e37 Michael Hanselmann
  def __init__(self, pool, worker_id):
44 76094e37 Michael Hanselmann
    """Constructor for BaseWorker thread.
45 76094e37 Michael Hanselmann

46 116db7c7 Iustin Pop
    @param pool: the parent worker pool
47 116db7c7 Iustin Pop
    @param worker_id: identifier for this worker
48 76094e37 Michael Hanselmann

49 76094e37 Michael Hanselmann
    """
50 d16e6fd9 Michael Hanselmann
    super(BaseWorker, self).__init__(name=worker_id)
51 76094e37 Michael Hanselmann
    self.pool = pool
52 76094e37 Michael Hanselmann
    self._current_task = None
53 76094e37 Michael Hanselmann
54 76094e37 Michael Hanselmann
  def ShouldTerminate(self):
55 2f4e1516 Michael Hanselmann
    """Returns whether this worker should terminate.
56 2f4e1516 Michael Hanselmann

57 2f4e1516 Michael Hanselmann
    Should only be called from within L{RunTask}.
58 76094e37 Michael Hanselmann

59 76094e37 Michael Hanselmann
    """
60 2f4e1516 Michael Hanselmann
    self.pool._lock.acquire()
61 2f4e1516 Michael Hanselmann
    try:
62 2f4e1516 Michael Hanselmann
      assert self._HasRunningTaskUnlocked()
63 2f4e1516 Michael Hanselmann
      return self.pool._ShouldWorkerTerminateUnlocked(self)
64 2f4e1516 Michael Hanselmann
    finally:
65 2f4e1516 Michael Hanselmann
      self.pool._lock.release()
66 76094e37 Michael Hanselmann
67 b3558df1 Michael Hanselmann
  def _HasRunningTaskUnlocked(self):
68 b3558df1 Michael Hanselmann
    """Returns whether this worker is currently running a task.
69 b3558df1 Michael Hanselmann

70 b3558df1 Michael Hanselmann
    """
71 b3558df1 Michael Hanselmann
    return (self._current_task is not None)
72 b3558df1 Michael Hanselmann
73 76094e37 Michael Hanselmann
  def run(self):
74 76094e37 Michael Hanselmann
    """Main thread function.
75 76094e37 Michael Hanselmann

76 76094e37 Michael Hanselmann
    Waits for new tasks to show up in the queue.
77 76094e37 Michael Hanselmann

78 76094e37 Michael Hanselmann
    """
79 76094e37 Michael Hanselmann
    pool = self.pool
80 76094e37 Michael Hanselmann
81 76094e37 Michael Hanselmann
    while True:
82 46d0a3d0 Michael Hanselmann
      assert self._current_task is None
83 76094e37 Michael Hanselmann
      try:
84 21c5ad52 Michael Hanselmann
        # Wait on lock to be told either to terminate or to do a task
85 76094e37 Michael Hanselmann
        pool._lock.acquire()
86 76094e37 Michael Hanselmann
        try:
87 21c5ad52 Michael Hanselmann
          task = pool._WaitForTaskUnlocked(self)
88 76094e37 Michael Hanselmann
89 21c5ad52 Michael Hanselmann
          if task is _TERMINATE:
90 21c5ad52 Michael Hanselmann
            # Told to terminate
91 21c5ad52 Michael Hanselmann
            break
92 b3558df1 Michael Hanselmann
93 21c5ad52 Michael Hanselmann
          if task is None:
94 21c5ad52 Michael Hanselmann
            # Spurious notification, ignore
95 21c5ad52 Michael Hanselmann
            continue
96 76094e37 Michael Hanselmann
97 21c5ad52 Michael Hanselmann
          self._current_task = task
98 76094e37 Michael Hanselmann
99 46d0a3d0 Michael Hanselmann
          # No longer needed, dispose of reference
100 46d0a3d0 Michael Hanselmann
          del task
101 46d0a3d0 Michael Hanselmann
102 21c5ad52 Michael Hanselmann
          assert self._HasRunningTaskUnlocked()
103 46d0a3d0 Michael Hanselmann
104 76094e37 Michael Hanselmann
        finally:
105 76094e37 Michael Hanselmann
          pool._lock.release()
106 76094e37 Michael Hanselmann
107 76094e37 Michael Hanselmann
        # Run the actual task
108 76094e37 Michael Hanselmann
        try:
109 02fc74da Michael Hanselmann
          logging.debug("Starting task %r", self._current_task)
110 76094e37 Michael Hanselmann
          self.RunTask(*self._current_task)
111 02fc74da Michael Hanselmann
          logging.debug("Done with task %r", self._current_task)
112 7260cfbe Iustin Pop
        except: # pylint: disable-msg=W0702
113 02fc74da Michael Hanselmann
          logging.exception("Caught unhandled exception")
114 c1cf1fe5 Michael Hanselmann
115 c1cf1fe5 Michael Hanselmann
        assert self._HasRunningTaskUnlocked()
116 76094e37 Michael Hanselmann
      finally:
117 76094e37 Michael Hanselmann
        # Notify pool
118 76094e37 Michael Hanselmann
        pool._lock.acquire()
119 76094e37 Michael Hanselmann
        try:
120 b3558df1 Michael Hanselmann
          if self._current_task:
121 b3558df1 Michael Hanselmann
            self._current_task = None
122 53b1d12b Michael Hanselmann
            pool._worker_to_pool.notifyAll()
123 76094e37 Michael Hanselmann
        finally:
124 76094e37 Michael Hanselmann
          pool._lock.release()
125 76094e37 Michael Hanselmann
126 c1cf1fe5 Michael Hanselmann
      assert not self._HasRunningTaskUnlocked()
127 c1cf1fe5 Michael Hanselmann
128 02fc74da Michael Hanselmann
    logging.debug("Terminates")
129 b3558df1 Michael Hanselmann
130 76094e37 Michael Hanselmann
  def RunTask(self, *args):
131 76094e37 Michael Hanselmann
    """Function called to start a task.
132 76094e37 Michael Hanselmann

133 116db7c7 Iustin Pop
    This needs to be implemented by child classes.
134 116db7c7 Iustin Pop

135 76094e37 Michael Hanselmann
    """
136 76094e37 Michael Hanselmann
    raise NotImplementedError()
137 76094e37 Michael Hanselmann
138 76094e37 Michael Hanselmann
139 76094e37 Michael Hanselmann
class WorkerPool(object):
140 76094e37 Michael Hanselmann
  """Worker pool with a queue.
141 76094e37 Michael Hanselmann

142 76094e37 Michael Hanselmann
  This class is thread-safe.
143 76094e37 Michael Hanselmann

144 116db7c7 Iustin Pop
  Tasks are guaranteed to be started in the order in which they're
145 116db7c7 Iustin Pop
  added to the pool. Due to the nature of threading, they're not
146 116db7c7 Iustin Pop
  guaranteed to finish in the same order.
147 76094e37 Michael Hanselmann

148 76094e37 Michael Hanselmann
  """
149 89e2b4d2 Michael Hanselmann
  def __init__(self, name, num_workers, worker_class):
150 76094e37 Michael Hanselmann
    """Constructor for worker pool.
151 76094e37 Michael Hanselmann

152 116db7c7 Iustin Pop
    @param num_workers: number of workers to be started
153 116db7c7 Iustin Pop
        (dynamic resizing is not yet implemented)
154 116db7c7 Iustin Pop
    @param worker_class: the class to be instantiated for workers;
155 116db7c7 Iustin Pop
        should derive from L{BaseWorker}
156 76094e37 Michael Hanselmann

157 76094e37 Michael Hanselmann
    """
158 76094e37 Michael Hanselmann
    # Some of these variables are accessed by BaseWorker
159 53b1d12b Michael Hanselmann
    self._lock = threading.Lock()
160 53b1d12b Michael Hanselmann
    self._pool_to_pool = threading.Condition(self._lock)
161 53b1d12b Michael Hanselmann
    self._pool_to_worker = threading.Condition(self._lock)
162 53b1d12b Michael Hanselmann
    self._worker_to_pool = threading.Condition(self._lock)
163 76094e37 Michael Hanselmann
    self._worker_class = worker_class
164 89e2b4d2 Michael Hanselmann
    self._name = name
165 76094e37 Michael Hanselmann
    self._last_worker_id = 0
166 76094e37 Michael Hanselmann
    self._workers = []
167 76094e37 Michael Hanselmann
    self._quiescing = False
168 76094e37 Michael Hanselmann
169 76094e37 Michael Hanselmann
    # Terminating workers
170 76094e37 Michael Hanselmann
    self._termworkers = []
171 76094e37 Michael Hanselmann
172 76094e37 Michael Hanselmann
    # Queued tasks
173 76094e37 Michael Hanselmann
    self._tasks = collections.deque()
174 76094e37 Michael Hanselmann
175 76094e37 Michael Hanselmann
    # Start workers
176 76094e37 Michael Hanselmann
    self.Resize(num_workers)
177 76094e37 Michael Hanselmann
178 76094e37 Michael Hanselmann
  # TODO: Implement dynamic resizing?
179 76094e37 Michael Hanselmann
180 c2a8e8ba Guido Trotter
  def _WaitWhileQuiescingUnlocked(self):
181 c2a8e8ba Guido Trotter
    """Wait until the worker pool has finished quiescing.
182 c2a8e8ba Guido Trotter

183 c2a8e8ba Guido Trotter
    """
184 c2a8e8ba Guido Trotter
    while self._quiescing:
185 c2a8e8ba Guido Trotter
      self._pool_to_pool.wait()
186 c2a8e8ba Guido Trotter
187 189d2714 Michael Hanselmann
  def _AddTaskUnlocked(self, args):
188 189d2714 Michael Hanselmann
    assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
189 189d2714 Michael Hanselmann
190 189d2714 Michael Hanselmann
    self._tasks.append(args)
191 189d2714 Michael Hanselmann
192 189d2714 Michael Hanselmann
    # Notify a waiting worker
193 189d2714 Michael Hanselmann
    self._pool_to_worker.notify()
194 189d2714 Michael Hanselmann
195 b2e8a4d9 Michael Hanselmann
  def AddTask(self, args):
196 76094e37 Michael Hanselmann
    """Adds a task to the queue.
197 76094e37 Michael Hanselmann

198 b2e8a4d9 Michael Hanselmann
    @type args: sequence
199 116db7c7 Iustin Pop
    @param args: arguments passed to L{BaseWorker.RunTask}
200 76094e37 Michael Hanselmann

201 76094e37 Michael Hanselmann
    """
202 76094e37 Michael Hanselmann
    self._lock.acquire()
203 76094e37 Michael Hanselmann
    try:
204 c2a8e8ba Guido Trotter
      self._WaitWhileQuiescingUnlocked()
205 189d2714 Michael Hanselmann
      self._AddTaskUnlocked(args)
206 76094e37 Michael Hanselmann
    finally:
207 76094e37 Michael Hanselmann
      self._lock.release()
208 76094e37 Michael Hanselmann
209 c2a8e8ba Guido Trotter
  def AddManyTasks(self, tasks):
210 c2a8e8ba Guido Trotter
    """Add a list of tasks to the queue.
211 c2a8e8ba Guido Trotter

212 c2a8e8ba Guido Trotter
    @type tasks: list of tuples
213 c2a8e8ba Guido Trotter
    @param tasks: list of args passed to L{BaseWorker.RunTask}
214 c2a8e8ba Guido Trotter

215 c2a8e8ba Guido Trotter
    """
216 25e557a5 Guido Trotter
    assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
217 25e557a5 Guido Trotter
      "Each task must be a sequence"
218 25e557a5 Guido Trotter
219 c2a8e8ba Guido Trotter
    self._lock.acquire()
220 c2a8e8ba Guido Trotter
    try:
221 c2a8e8ba Guido Trotter
      self._WaitWhileQuiescingUnlocked()
222 c2a8e8ba Guido Trotter
223 189d2714 Michael Hanselmann
      for args in tasks:
224 189d2714 Michael Hanselmann
        self._AddTaskUnlocked(args)
225 c2a8e8ba Guido Trotter
    finally:
226 c2a8e8ba Guido Trotter
      self._lock.release()
227 c2a8e8ba Guido Trotter
228 21c5ad52 Michael Hanselmann
  def _WaitForTaskUnlocked(self, worker):
229 21c5ad52 Michael Hanselmann
    """Waits for a task for a worker.
230 21c5ad52 Michael Hanselmann

231 21c5ad52 Michael Hanselmann
    @type worker: L{BaseWorker}
232 21c5ad52 Michael Hanselmann
    @param worker: Worker thread
233 21c5ad52 Michael Hanselmann

234 21c5ad52 Michael Hanselmann
    """
235 21c5ad52 Michael Hanselmann
    if self._ShouldWorkerTerminateUnlocked(worker):
236 21c5ad52 Michael Hanselmann
      return _TERMINATE
237 21c5ad52 Michael Hanselmann
238 21c5ad52 Michael Hanselmann
    # We only wait if there's no task for us.
239 21c5ad52 Michael Hanselmann
    if not self._tasks:
240 21c5ad52 Michael Hanselmann
      logging.debug("Waiting for tasks")
241 21c5ad52 Michael Hanselmann
242 21c5ad52 Michael Hanselmann
      # wait() releases the lock and sleeps until notified
243 21c5ad52 Michael Hanselmann
      self._pool_to_worker.wait()
244 21c5ad52 Michael Hanselmann
245 21c5ad52 Michael Hanselmann
      logging.debug("Notified while waiting")
246 21c5ad52 Michael Hanselmann
247 21c5ad52 Michael Hanselmann
      # Were we woken up in order to terminate?
248 21c5ad52 Michael Hanselmann
      if self._ShouldWorkerTerminateUnlocked(worker):
249 21c5ad52 Michael Hanselmann
        return _TERMINATE
250 21c5ad52 Michael Hanselmann
251 21c5ad52 Michael Hanselmann
      if not self._tasks:
252 21c5ad52 Michael Hanselmann
        # Spurious notification, ignore
253 21c5ad52 Michael Hanselmann
        return None
254 21c5ad52 Michael Hanselmann
255 21c5ad52 Michael Hanselmann
    # Get task from queue and tell pool about it
256 21c5ad52 Michael Hanselmann
    try:
257 21c5ad52 Michael Hanselmann
      return self._tasks.popleft()
258 21c5ad52 Michael Hanselmann
    finally:
259 21c5ad52 Michael Hanselmann
      self._worker_to_pool.notifyAll()
260 21c5ad52 Michael Hanselmann
261 76094e37 Michael Hanselmann
  def _ShouldWorkerTerminateUnlocked(self, worker):
262 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
263 76094e37 Michael Hanselmann

264 76094e37 Michael Hanselmann
    """
265 76094e37 Michael Hanselmann
    return (worker in self._termworkers)
266 76094e37 Michael Hanselmann
267 76094e37 Michael Hanselmann
  def _HasRunningTasksUnlocked(self):
268 76094e37 Michael Hanselmann
    """Checks whether there's a task running in a worker.
269 76094e37 Michael Hanselmann

270 76094e37 Michael Hanselmann
    """
271 76094e37 Michael Hanselmann
    for worker in self._workers + self._termworkers:
272 7260cfbe Iustin Pop
      if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
273 76094e37 Michael Hanselmann
        return True
274 76094e37 Michael Hanselmann
    return False
275 76094e37 Michael Hanselmann
276 76094e37 Michael Hanselmann
  def Quiesce(self):
277 76094e37 Michael Hanselmann
    """Waits until the task queue is empty.
278 76094e37 Michael Hanselmann

279 76094e37 Michael Hanselmann
    """
280 76094e37 Michael Hanselmann
    self._lock.acquire()
281 76094e37 Michael Hanselmann
    try:
282 76094e37 Michael Hanselmann
      self._quiescing = True
283 76094e37 Michael Hanselmann
284 76094e37 Michael Hanselmann
      # Wait while there are tasks pending or running
285 76094e37 Michael Hanselmann
      while self._tasks or self._HasRunningTasksUnlocked():
286 53b1d12b Michael Hanselmann
        self._worker_to_pool.wait()
287 76094e37 Michael Hanselmann
288 76094e37 Michael Hanselmann
    finally:
289 76094e37 Michael Hanselmann
      self._quiescing = False
290 76094e37 Michael Hanselmann
291 76094e37 Michael Hanselmann
      # Make sure AddTasks continues in case it was waiting
292 53b1d12b Michael Hanselmann
      self._pool_to_pool.notifyAll()
293 76094e37 Michael Hanselmann
294 76094e37 Michael Hanselmann
      self._lock.release()
295 76094e37 Michael Hanselmann
296 76094e37 Michael Hanselmann
  def _NewWorkerIdUnlocked(self):
297 116db7c7 Iustin Pop
    """Return an identifier for a new worker.
298 116db7c7 Iustin Pop

299 116db7c7 Iustin Pop
    """
300 76094e37 Michael Hanselmann
    self._last_worker_id += 1
301 89e2b4d2 Michael Hanselmann
302 89e2b4d2 Michael Hanselmann
    return "%s%d" % (self._name, self._last_worker_id)
303 76094e37 Michael Hanselmann
304 76094e37 Michael Hanselmann
  def _ResizeUnlocked(self, num_workers):
305 76094e37 Michael Hanselmann
    """Changes the number of workers.
306 76094e37 Michael Hanselmann

307 76094e37 Michael Hanselmann
    """
308 76094e37 Michael Hanselmann
    assert num_workers >= 0, "num_workers must be >= 0"
309 76094e37 Michael Hanselmann
310 76094e37 Michael Hanselmann
    logging.debug("Resizing to %s workers", num_workers)
311 76094e37 Michael Hanselmann
312 76094e37 Michael Hanselmann
    current_count = len(self._workers)
313 76094e37 Michael Hanselmann
314 76094e37 Michael Hanselmann
    if current_count == num_workers:
315 76094e37 Michael Hanselmann
      # Nothing to do
316 76094e37 Michael Hanselmann
      pass
317 76094e37 Michael Hanselmann
318 76094e37 Michael Hanselmann
    elif current_count > num_workers:
319 76094e37 Michael Hanselmann
      if num_workers == 0:
320 76094e37 Michael Hanselmann
        # Create copy of list to iterate over while lock isn't held.
321 76094e37 Michael Hanselmann
        termworkers = self._workers[:]
322 76094e37 Michael Hanselmann
        del self._workers[:]
323 76094e37 Michael Hanselmann
      else:
324 76094e37 Michael Hanselmann
        # TODO: Implement partial downsizing
325 76094e37 Michael Hanselmann
        raise NotImplementedError()
326 76094e37 Michael Hanselmann
        #termworkers = ...
327 76094e37 Michael Hanselmann
328 76094e37 Michael Hanselmann
      self._termworkers += termworkers
329 76094e37 Michael Hanselmann
330 76094e37 Michael Hanselmann
      # Notify workers that something has changed
331 53b1d12b Michael Hanselmann
      self._pool_to_worker.notifyAll()
332 76094e37 Michael Hanselmann
333 76094e37 Michael Hanselmann
      # Join all terminating workers
334 76094e37 Michael Hanselmann
      self._lock.release()
335 76094e37 Michael Hanselmann
      try:
336 76094e37 Michael Hanselmann
        for worker in termworkers:
337 c0a8eb9e Michael Hanselmann
          logging.debug("Waiting for thread %s", worker.getName())
338 76094e37 Michael Hanselmann
          worker.join()
339 76094e37 Michael Hanselmann
      finally:
340 76094e37 Michael Hanselmann
        self._lock.acquire()
341 76094e37 Michael Hanselmann
342 76094e37 Michael Hanselmann
      # Remove terminated threads. This could be done in a more efficient way
343 76094e37 Michael Hanselmann
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
344 76094e37 Michael Hanselmann
      # don't leave zombie threads around.
345 76094e37 Michael Hanselmann
      for worker in termworkers:
346 76094e37 Michael Hanselmann
        assert worker in self._termworkers, ("Worker not in list of"
347 76094e37 Michael Hanselmann
                                             " terminating workers")
348 76094e37 Michael Hanselmann
        if not worker.isAlive():
349 76094e37 Michael Hanselmann
          self._termworkers.remove(worker)
350 76094e37 Michael Hanselmann
351 76094e37 Michael Hanselmann
      assert not self._termworkers, "Zombie worker detected"
352 76094e37 Michael Hanselmann
353 76094e37 Michael Hanselmann
    elif current_count < num_workers:
354 76094e37 Michael Hanselmann
      # Create (num_workers - current_count) new workers
355 f1501b3f Michael Hanselmann
      for _ in range(num_workers - current_count):
356 76094e37 Michael Hanselmann
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
357 76094e37 Michael Hanselmann
        self._workers.append(worker)
358 76094e37 Michael Hanselmann
        worker.start()
359 76094e37 Michael Hanselmann
360 76094e37 Michael Hanselmann
  def Resize(self, num_workers):
361 76094e37 Michael Hanselmann
    """Changes the number of workers in the pool.
362 76094e37 Michael Hanselmann

363 116db7c7 Iustin Pop
    @param num_workers: the new number of workers
364 76094e37 Michael Hanselmann

365 76094e37 Michael Hanselmann
    """
366 76094e37 Michael Hanselmann
    self._lock.acquire()
367 76094e37 Michael Hanselmann
    try:
368 76094e37 Michael Hanselmann
      return self._ResizeUnlocked(num_workers)
369 76094e37 Michael Hanselmann
    finally:
370 76094e37 Michael Hanselmann
      self._lock.release()
371 76094e37 Michael Hanselmann
372 76094e37 Michael Hanselmann
  def TerminateWorkers(self):
373 76094e37 Michael Hanselmann
    """Terminate all worker threads.
374 76094e37 Michael Hanselmann

375 76094e37 Michael Hanselmann
    Unstarted tasks will be ignored.
376 76094e37 Michael Hanselmann

377 76094e37 Michael Hanselmann
    """
378 76094e37 Michael Hanselmann
    logging.debug("Terminating all workers")
379 76094e37 Michael Hanselmann
380 76094e37 Michael Hanselmann
    self._lock.acquire()
381 76094e37 Michael Hanselmann
    try:
382 76094e37 Michael Hanselmann
      self._ResizeUnlocked(0)
383 76094e37 Michael Hanselmann
384 76094e37 Michael Hanselmann
      if self._tasks:
385 76094e37 Michael Hanselmann
        logging.debug("There are %s tasks left", len(self._tasks))
386 76094e37 Michael Hanselmann
    finally:
387 76094e37 Michael Hanselmann
      self._lock.release()
388 76094e37 Michael Hanselmann
389 76094e37 Michael Hanselmann
    logging.debug("All workers terminated")