Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ 19b9ba9a

History | View | Annotate | Download (10.5 kB)

1 76094e37 Michael Hanselmann
#
2 76094e37 Michael Hanselmann
#
3 76094e37 Michael Hanselmann
4 189d2714 Michael Hanselmann
# Copyright (C) 2008, 2009, 2010 Google Inc.
5 76094e37 Michael Hanselmann
#
6 76094e37 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 76094e37 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 76094e37 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 76094e37 Michael Hanselmann
# (at your option) any later version.
10 76094e37 Michael Hanselmann
#
11 76094e37 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 76094e37 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 76094e37 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 76094e37 Michael Hanselmann
# General Public License for more details.
15 76094e37 Michael Hanselmann
#
16 76094e37 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 76094e37 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 76094e37 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 76094e37 Michael Hanselmann
# 02110-1301, USA.
20 76094e37 Michael Hanselmann
21 76094e37 Michael Hanselmann
22 76094e37 Michael Hanselmann
"""Base classes for worker pools.
23 76094e37 Michael Hanselmann

24 76094e37 Michael Hanselmann
"""
25 76094e37 Michael Hanselmann
26 76094e37 Michael Hanselmann
import collections
27 76094e37 Michael Hanselmann
import logging
28 76094e37 Michael Hanselmann
import threading
29 76094e37 Michael Hanselmann
30 25e557a5 Guido Trotter
from ganeti import compat
31 25e557a5 Guido Trotter
32 76094e37 Michael Hanselmann
33 21c5ad52 Michael Hanselmann
_TERMINATE = object()
34 21c5ad52 Michael Hanselmann
35 21c5ad52 Michael Hanselmann
36 76094e37 Michael Hanselmann
class BaseWorker(threading.Thread, object):
37 76094e37 Michael Hanselmann
  """Base worker class for worker pools.
38 76094e37 Michael Hanselmann

39 76094e37 Michael Hanselmann
  Users of a worker pool must override RunTask in a subclass.
40 76094e37 Michael Hanselmann

41 76094e37 Michael Hanselmann
  """
42 7260cfbe Iustin Pop
  # pylint: disable-msg=W0212
43 76094e37 Michael Hanselmann
  def __init__(self, pool, worker_id):
44 76094e37 Michael Hanselmann
    """Constructor for BaseWorker thread.
45 76094e37 Michael Hanselmann

46 116db7c7 Iustin Pop
    @param pool: the parent worker pool
47 116db7c7 Iustin Pop
    @param worker_id: identifier for this worker
48 76094e37 Michael Hanselmann

49 76094e37 Michael Hanselmann
    """
50 d16e6fd9 Michael Hanselmann
    super(BaseWorker, self).__init__(name=worker_id)
51 76094e37 Michael Hanselmann
    self.pool = pool
52 daba67c7 Michael Hanselmann
    self._worker_id = worker_id
53 76094e37 Michael Hanselmann
    self._current_task = None
54 76094e37 Michael Hanselmann
55 daba67c7 Michael Hanselmann
    assert self.getName() == worker_id
56 daba67c7 Michael Hanselmann
57 76094e37 Michael Hanselmann
  def ShouldTerminate(self):
58 2f4e1516 Michael Hanselmann
    """Returns whether this worker should terminate.
59 2f4e1516 Michael Hanselmann

60 2f4e1516 Michael Hanselmann
    Should only be called from within L{RunTask}.
61 76094e37 Michael Hanselmann

62 76094e37 Michael Hanselmann
    """
63 2f4e1516 Michael Hanselmann
    self.pool._lock.acquire()
64 2f4e1516 Michael Hanselmann
    try:
65 2f4e1516 Michael Hanselmann
      assert self._HasRunningTaskUnlocked()
66 2f4e1516 Michael Hanselmann
      return self.pool._ShouldWorkerTerminateUnlocked(self)
67 2f4e1516 Michael Hanselmann
    finally:
68 2f4e1516 Michael Hanselmann
      self.pool._lock.release()
69 76094e37 Michael Hanselmann
70 daba67c7 Michael Hanselmann
  def SetTaskName(self, taskname):
71 daba67c7 Michael Hanselmann
    """Sets the name of the current task.
72 daba67c7 Michael Hanselmann

73 daba67c7 Michael Hanselmann
    Should only be called from within L{RunTask}.
74 daba67c7 Michael Hanselmann

75 daba67c7 Michael Hanselmann
    @type taskname: string
76 daba67c7 Michael Hanselmann
    @param taskname: Task's name
77 daba67c7 Michael Hanselmann

78 daba67c7 Michael Hanselmann
    """
79 daba67c7 Michael Hanselmann
    if taskname:
80 daba67c7 Michael Hanselmann
      name = "%s/%s" % (self._worker_id, taskname)
81 daba67c7 Michael Hanselmann
    else:
82 daba67c7 Michael Hanselmann
      name = self._worker_id
83 daba67c7 Michael Hanselmann
84 daba67c7 Michael Hanselmann
    # Set thread name
85 daba67c7 Michael Hanselmann
    self.setName(name)
86 daba67c7 Michael Hanselmann
87 b3558df1 Michael Hanselmann
  def _HasRunningTaskUnlocked(self):
88 b3558df1 Michael Hanselmann
    """Returns whether this worker is currently running a task.
89 b3558df1 Michael Hanselmann

90 b3558df1 Michael Hanselmann
    """
91 b3558df1 Michael Hanselmann
    return (self._current_task is not None)
92 b3558df1 Michael Hanselmann
93 76094e37 Michael Hanselmann
  def run(self):
94 76094e37 Michael Hanselmann
    """Main thread function.
95 76094e37 Michael Hanselmann

96 76094e37 Michael Hanselmann
    Waits for new tasks to show up in the queue.
97 76094e37 Michael Hanselmann

98 76094e37 Michael Hanselmann
    """
99 76094e37 Michael Hanselmann
    pool = self.pool
100 76094e37 Michael Hanselmann
101 76094e37 Michael Hanselmann
    while True:
102 46d0a3d0 Michael Hanselmann
      assert self._current_task is None
103 76094e37 Michael Hanselmann
      try:
104 21c5ad52 Michael Hanselmann
        # Wait on lock to be told either to terminate or to do a task
105 76094e37 Michael Hanselmann
        pool._lock.acquire()
106 76094e37 Michael Hanselmann
        try:
107 21c5ad52 Michael Hanselmann
          task = pool._WaitForTaskUnlocked(self)
108 76094e37 Michael Hanselmann
109 21c5ad52 Michael Hanselmann
          if task is _TERMINATE:
110 21c5ad52 Michael Hanselmann
            # Told to terminate
111 21c5ad52 Michael Hanselmann
            break
112 b3558df1 Michael Hanselmann
113 21c5ad52 Michael Hanselmann
          if task is None:
114 21c5ad52 Michael Hanselmann
            # Spurious notification, ignore
115 21c5ad52 Michael Hanselmann
            continue
116 76094e37 Michael Hanselmann
117 21c5ad52 Michael Hanselmann
          self._current_task = task
118 76094e37 Michael Hanselmann
119 46d0a3d0 Michael Hanselmann
          # No longer needed, dispose of reference
120 46d0a3d0 Michael Hanselmann
          del task
121 46d0a3d0 Michael Hanselmann
122 21c5ad52 Michael Hanselmann
          assert self._HasRunningTaskUnlocked()
123 46d0a3d0 Michael Hanselmann
124 76094e37 Michael Hanselmann
        finally:
125 76094e37 Michael Hanselmann
          pool._lock.release()
126 76094e37 Michael Hanselmann
127 76094e37 Michael Hanselmann
        # Run the actual task
128 76094e37 Michael Hanselmann
        try:
129 02fc74da Michael Hanselmann
          logging.debug("Starting task %r", self._current_task)
130 daba67c7 Michael Hanselmann
          assert self.getName() == self._worker_id
131 daba67c7 Michael Hanselmann
          try:
132 daba67c7 Michael Hanselmann
            self.RunTask(*self._current_task)
133 daba67c7 Michael Hanselmann
          finally:
134 daba67c7 Michael Hanselmann
            self.SetTaskName(None)
135 02fc74da Michael Hanselmann
          logging.debug("Done with task %r", self._current_task)
136 7260cfbe Iustin Pop
        except: # pylint: disable-msg=W0702
137 02fc74da Michael Hanselmann
          logging.exception("Caught unhandled exception")
138 c1cf1fe5 Michael Hanselmann
139 c1cf1fe5 Michael Hanselmann
        assert self._HasRunningTaskUnlocked()
140 76094e37 Michael Hanselmann
      finally:
141 76094e37 Michael Hanselmann
        # Notify pool
142 76094e37 Michael Hanselmann
        pool._lock.acquire()
143 76094e37 Michael Hanselmann
        try:
144 b3558df1 Michael Hanselmann
          if self._current_task:
145 b3558df1 Michael Hanselmann
            self._current_task = None
146 53b1d12b Michael Hanselmann
            pool._worker_to_pool.notifyAll()
147 76094e37 Michael Hanselmann
        finally:
148 76094e37 Michael Hanselmann
          pool._lock.release()
149 76094e37 Michael Hanselmann
150 c1cf1fe5 Michael Hanselmann
      assert not self._HasRunningTaskUnlocked()
151 c1cf1fe5 Michael Hanselmann
152 02fc74da Michael Hanselmann
    logging.debug("Terminates")
153 b3558df1 Michael Hanselmann
154 76094e37 Michael Hanselmann
  def RunTask(self, *args):
155 76094e37 Michael Hanselmann
    """Function called to start a task.
156 76094e37 Michael Hanselmann

157 116db7c7 Iustin Pop
    This needs to be implemented by child classes.
158 116db7c7 Iustin Pop

159 76094e37 Michael Hanselmann
    """
160 76094e37 Michael Hanselmann
    raise NotImplementedError()
161 76094e37 Michael Hanselmann
162 76094e37 Michael Hanselmann
163 76094e37 Michael Hanselmann
class WorkerPool(object):
164 76094e37 Michael Hanselmann
  """Worker pool with a queue.
165 76094e37 Michael Hanselmann

166 76094e37 Michael Hanselmann
  This class is thread-safe.
167 76094e37 Michael Hanselmann

168 116db7c7 Iustin Pop
  Tasks are guaranteed to be started in the order in which they're
169 116db7c7 Iustin Pop
  added to the pool. Due to the nature of threading, they're not
170 116db7c7 Iustin Pop
  guaranteed to finish in the same order.
171 76094e37 Michael Hanselmann

172 76094e37 Michael Hanselmann
  """
173 89e2b4d2 Michael Hanselmann
  def __init__(self, name, num_workers, worker_class):
174 76094e37 Michael Hanselmann
    """Constructor for worker pool.
175 76094e37 Michael Hanselmann

176 116db7c7 Iustin Pop
    @param num_workers: number of workers to be started
177 116db7c7 Iustin Pop
        (dynamic resizing is not yet implemented)
178 116db7c7 Iustin Pop
    @param worker_class: the class to be instantiated for workers;
179 116db7c7 Iustin Pop
        should derive from L{BaseWorker}
180 76094e37 Michael Hanselmann

181 76094e37 Michael Hanselmann
    """
182 76094e37 Michael Hanselmann
    # Some of these variables are accessed by BaseWorker
183 53b1d12b Michael Hanselmann
    self._lock = threading.Lock()
184 53b1d12b Michael Hanselmann
    self._pool_to_pool = threading.Condition(self._lock)
185 53b1d12b Michael Hanselmann
    self._pool_to_worker = threading.Condition(self._lock)
186 53b1d12b Michael Hanselmann
    self._worker_to_pool = threading.Condition(self._lock)
187 76094e37 Michael Hanselmann
    self._worker_class = worker_class
188 89e2b4d2 Michael Hanselmann
    self._name = name
189 76094e37 Michael Hanselmann
    self._last_worker_id = 0
190 76094e37 Michael Hanselmann
    self._workers = []
191 76094e37 Michael Hanselmann
    self._quiescing = False
192 76094e37 Michael Hanselmann
193 76094e37 Michael Hanselmann
    # Terminating workers
194 76094e37 Michael Hanselmann
    self._termworkers = []
195 76094e37 Michael Hanselmann
196 76094e37 Michael Hanselmann
    # Queued tasks
197 76094e37 Michael Hanselmann
    self._tasks = collections.deque()
198 76094e37 Michael Hanselmann
199 76094e37 Michael Hanselmann
    # Start workers
200 76094e37 Michael Hanselmann
    self.Resize(num_workers)
201 76094e37 Michael Hanselmann
202 76094e37 Michael Hanselmann
  # TODO: Implement dynamic resizing?
203 76094e37 Michael Hanselmann
204 c2a8e8ba Guido Trotter
  def _WaitWhileQuiescingUnlocked(self):
205 c2a8e8ba Guido Trotter
    """Wait until the worker pool has finished quiescing.
206 c2a8e8ba Guido Trotter

207 c2a8e8ba Guido Trotter
    """
208 c2a8e8ba Guido Trotter
    while self._quiescing:
209 c2a8e8ba Guido Trotter
      self._pool_to_pool.wait()
210 c2a8e8ba Guido Trotter
211 189d2714 Michael Hanselmann
  def _AddTaskUnlocked(self, args):
212 189d2714 Michael Hanselmann
    assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
213 189d2714 Michael Hanselmann
214 189d2714 Michael Hanselmann
    self._tasks.append(args)
215 189d2714 Michael Hanselmann
216 189d2714 Michael Hanselmann
    # Notify a waiting worker
217 189d2714 Michael Hanselmann
    self._pool_to_worker.notify()
218 189d2714 Michael Hanselmann
219 b2e8a4d9 Michael Hanselmann
  def AddTask(self, args):
220 76094e37 Michael Hanselmann
    """Adds a task to the queue.
221 76094e37 Michael Hanselmann

222 b2e8a4d9 Michael Hanselmann
    @type args: sequence
223 116db7c7 Iustin Pop
    @param args: arguments passed to L{BaseWorker.RunTask}
224 76094e37 Michael Hanselmann

225 76094e37 Michael Hanselmann
    """
226 76094e37 Michael Hanselmann
    self._lock.acquire()
227 76094e37 Michael Hanselmann
    try:
228 c2a8e8ba Guido Trotter
      self._WaitWhileQuiescingUnlocked()
229 189d2714 Michael Hanselmann
      self._AddTaskUnlocked(args)
230 76094e37 Michael Hanselmann
    finally:
231 76094e37 Michael Hanselmann
      self._lock.release()
232 76094e37 Michael Hanselmann
233 c2a8e8ba Guido Trotter
  def AddManyTasks(self, tasks):
234 c2a8e8ba Guido Trotter
    """Add a list of tasks to the queue.
235 c2a8e8ba Guido Trotter

236 c2a8e8ba Guido Trotter
    @type tasks: list of tuples
237 c2a8e8ba Guido Trotter
    @param tasks: list of args passed to L{BaseWorker.RunTask}
238 c2a8e8ba Guido Trotter

239 c2a8e8ba Guido Trotter
    """
240 25e557a5 Guido Trotter
    assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
241 25e557a5 Guido Trotter
      "Each task must be a sequence"
242 25e557a5 Guido Trotter
243 c2a8e8ba Guido Trotter
    self._lock.acquire()
244 c2a8e8ba Guido Trotter
    try:
245 c2a8e8ba Guido Trotter
      self._WaitWhileQuiescingUnlocked()
246 c2a8e8ba Guido Trotter
247 189d2714 Michael Hanselmann
      for args in tasks:
248 189d2714 Michael Hanselmann
        self._AddTaskUnlocked(args)
249 c2a8e8ba Guido Trotter
    finally:
250 c2a8e8ba Guido Trotter
      self._lock.release()
251 c2a8e8ba Guido Trotter
252 21c5ad52 Michael Hanselmann
  def _WaitForTaskUnlocked(self, worker):
253 21c5ad52 Michael Hanselmann
    """Waits for a task for a worker.
254 21c5ad52 Michael Hanselmann

255 21c5ad52 Michael Hanselmann
    @type worker: L{BaseWorker}
256 21c5ad52 Michael Hanselmann
    @param worker: Worker thread
257 21c5ad52 Michael Hanselmann

258 21c5ad52 Michael Hanselmann
    """
259 21c5ad52 Michael Hanselmann
    if self._ShouldWorkerTerminateUnlocked(worker):
260 21c5ad52 Michael Hanselmann
      return _TERMINATE
261 21c5ad52 Michael Hanselmann
262 21c5ad52 Michael Hanselmann
    # We only wait if there's no task for us.
263 21c5ad52 Michael Hanselmann
    if not self._tasks:
264 21c5ad52 Michael Hanselmann
      logging.debug("Waiting for tasks")
265 21c5ad52 Michael Hanselmann
266 21c5ad52 Michael Hanselmann
      # wait() releases the lock and sleeps until notified
267 21c5ad52 Michael Hanselmann
      self._pool_to_worker.wait()
268 21c5ad52 Michael Hanselmann
269 21c5ad52 Michael Hanselmann
      logging.debug("Notified while waiting")
270 21c5ad52 Michael Hanselmann
271 21c5ad52 Michael Hanselmann
      # Were we woken up in order to terminate?
272 21c5ad52 Michael Hanselmann
      if self._ShouldWorkerTerminateUnlocked(worker):
273 21c5ad52 Michael Hanselmann
        return _TERMINATE
274 21c5ad52 Michael Hanselmann
275 21c5ad52 Michael Hanselmann
      if not self._tasks:
276 21c5ad52 Michael Hanselmann
        # Spurious notification, ignore
277 21c5ad52 Michael Hanselmann
        return None
278 21c5ad52 Michael Hanselmann
279 21c5ad52 Michael Hanselmann
    # Get task from queue and tell pool about it
280 21c5ad52 Michael Hanselmann
    try:
281 21c5ad52 Michael Hanselmann
      return self._tasks.popleft()
282 21c5ad52 Michael Hanselmann
    finally:
283 21c5ad52 Michael Hanselmann
      self._worker_to_pool.notifyAll()
284 21c5ad52 Michael Hanselmann
285 76094e37 Michael Hanselmann
  def _ShouldWorkerTerminateUnlocked(self, worker):
286 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
287 76094e37 Michael Hanselmann

288 76094e37 Michael Hanselmann
    """
289 76094e37 Michael Hanselmann
    return (worker in self._termworkers)
290 76094e37 Michael Hanselmann
291 76094e37 Michael Hanselmann
  def _HasRunningTasksUnlocked(self):
292 76094e37 Michael Hanselmann
    """Checks whether there's a task running in a worker.
293 76094e37 Michael Hanselmann

294 76094e37 Michael Hanselmann
    """
295 76094e37 Michael Hanselmann
    for worker in self._workers + self._termworkers:
296 7260cfbe Iustin Pop
      if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
297 76094e37 Michael Hanselmann
        return True
298 76094e37 Michael Hanselmann
    return False
299 76094e37 Michael Hanselmann
300 76094e37 Michael Hanselmann
  def Quiesce(self):
301 76094e37 Michael Hanselmann
    """Waits until the task queue is empty.
302 76094e37 Michael Hanselmann

303 76094e37 Michael Hanselmann
    """
304 76094e37 Michael Hanselmann
    self._lock.acquire()
305 76094e37 Michael Hanselmann
    try:
306 76094e37 Michael Hanselmann
      self._quiescing = True
307 76094e37 Michael Hanselmann
308 76094e37 Michael Hanselmann
      # Wait while there are tasks pending or running
309 76094e37 Michael Hanselmann
      while self._tasks or self._HasRunningTasksUnlocked():
310 53b1d12b Michael Hanselmann
        self._worker_to_pool.wait()
311 76094e37 Michael Hanselmann
312 76094e37 Michael Hanselmann
    finally:
313 76094e37 Michael Hanselmann
      self._quiescing = False
314 76094e37 Michael Hanselmann
315 76094e37 Michael Hanselmann
      # Make sure AddTasks continues in case it was waiting
316 53b1d12b Michael Hanselmann
      self._pool_to_pool.notifyAll()
317 76094e37 Michael Hanselmann
318 76094e37 Michael Hanselmann
      self._lock.release()
319 76094e37 Michael Hanselmann
320 76094e37 Michael Hanselmann
  def _NewWorkerIdUnlocked(self):
321 116db7c7 Iustin Pop
    """Return an identifier for a new worker.
322 116db7c7 Iustin Pop

323 116db7c7 Iustin Pop
    """
324 76094e37 Michael Hanselmann
    self._last_worker_id += 1
325 89e2b4d2 Michael Hanselmann
326 89e2b4d2 Michael Hanselmann
    return "%s%d" % (self._name, self._last_worker_id)
327 76094e37 Michael Hanselmann
328 76094e37 Michael Hanselmann
  def _ResizeUnlocked(self, num_workers):
329 76094e37 Michael Hanselmann
    """Changes the number of workers.
330 76094e37 Michael Hanselmann

331 76094e37 Michael Hanselmann
    """
332 76094e37 Michael Hanselmann
    assert num_workers >= 0, "num_workers must be >= 0"
333 76094e37 Michael Hanselmann
334 76094e37 Michael Hanselmann
    logging.debug("Resizing to %s workers", num_workers)
335 76094e37 Michael Hanselmann
336 76094e37 Michael Hanselmann
    current_count = len(self._workers)
337 76094e37 Michael Hanselmann
338 76094e37 Michael Hanselmann
    if current_count == num_workers:
339 76094e37 Michael Hanselmann
      # Nothing to do
340 76094e37 Michael Hanselmann
      pass
341 76094e37 Michael Hanselmann
342 76094e37 Michael Hanselmann
    elif current_count > num_workers:
343 76094e37 Michael Hanselmann
      if num_workers == 0:
344 76094e37 Michael Hanselmann
        # Create copy of list to iterate over while lock isn't held.
345 76094e37 Michael Hanselmann
        termworkers = self._workers[:]
346 76094e37 Michael Hanselmann
        del self._workers[:]
347 76094e37 Michael Hanselmann
      else:
348 76094e37 Michael Hanselmann
        # TODO: Implement partial downsizing
349 76094e37 Michael Hanselmann
        raise NotImplementedError()
350 76094e37 Michael Hanselmann
        #termworkers = ...
351 76094e37 Michael Hanselmann
352 76094e37 Michael Hanselmann
      self._termworkers += termworkers
353 76094e37 Michael Hanselmann
354 76094e37 Michael Hanselmann
      # Notify workers that something has changed
355 53b1d12b Michael Hanselmann
      self._pool_to_worker.notifyAll()
356 76094e37 Michael Hanselmann
357 76094e37 Michael Hanselmann
      # Join all terminating workers
358 76094e37 Michael Hanselmann
      self._lock.release()
359 76094e37 Michael Hanselmann
      try:
360 76094e37 Michael Hanselmann
        for worker in termworkers:
361 c0a8eb9e Michael Hanselmann
          logging.debug("Waiting for thread %s", worker.getName())
362 76094e37 Michael Hanselmann
          worker.join()
363 76094e37 Michael Hanselmann
      finally:
364 76094e37 Michael Hanselmann
        self._lock.acquire()
365 76094e37 Michael Hanselmann
366 76094e37 Michael Hanselmann
      # Remove terminated threads. This could be done in a more efficient way
367 76094e37 Michael Hanselmann
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
368 76094e37 Michael Hanselmann
      # don't leave zombie threads around.
369 76094e37 Michael Hanselmann
      for worker in termworkers:
370 76094e37 Michael Hanselmann
        assert worker in self._termworkers, ("Worker not in list of"
371 76094e37 Michael Hanselmann
                                             " terminating workers")
372 76094e37 Michael Hanselmann
        if not worker.isAlive():
373 76094e37 Michael Hanselmann
          self._termworkers.remove(worker)
374 76094e37 Michael Hanselmann
375 76094e37 Michael Hanselmann
      assert not self._termworkers, "Zombie worker detected"
376 76094e37 Michael Hanselmann
377 76094e37 Michael Hanselmann
    elif current_count < num_workers:
378 76094e37 Michael Hanselmann
      # Create (num_workers - current_count) new workers
379 f1501b3f Michael Hanselmann
      for _ in range(num_workers - current_count):
380 76094e37 Michael Hanselmann
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
381 76094e37 Michael Hanselmann
        self._workers.append(worker)
382 76094e37 Michael Hanselmann
        worker.start()
383 76094e37 Michael Hanselmann
384 76094e37 Michael Hanselmann
  def Resize(self, num_workers):
385 76094e37 Michael Hanselmann
    """Changes the number of workers in the pool.
386 76094e37 Michael Hanselmann

387 116db7c7 Iustin Pop
    @param num_workers: the new number of workers
388 76094e37 Michael Hanselmann

389 76094e37 Michael Hanselmann
    """
390 76094e37 Michael Hanselmann
    self._lock.acquire()
391 76094e37 Michael Hanselmann
    try:
392 76094e37 Michael Hanselmann
      return self._ResizeUnlocked(num_workers)
393 76094e37 Michael Hanselmann
    finally:
394 76094e37 Michael Hanselmann
      self._lock.release()
395 76094e37 Michael Hanselmann
396 76094e37 Michael Hanselmann
  def TerminateWorkers(self):
397 76094e37 Michael Hanselmann
    """Terminate all worker threads.
398 76094e37 Michael Hanselmann

399 76094e37 Michael Hanselmann
    Unstarted tasks will be ignored.
400 76094e37 Michael Hanselmann

401 76094e37 Michael Hanselmann
    """
402 76094e37 Michael Hanselmann
    logging.debug("Terminating all workers")
403 76094e37 Michael Hanselmann
404 76094e37 Michael Hanselmann
    self._lock.acquire()
405 76094e37 Michael Hanselmann
    try:
406 76094e37 Michael Hanselmann
      self._ResizeUnlocked(0)
407 76094e37 Michael Hanselmann
408 76094e37 Michael Hanselmann
      if self._tasks:
409 76094e37 Michael Hanselmann
        logging.debug("There are %s tasks left", len(self._tasks))
410 76094e37 Michael Hanselmann
    finally:
411 76094e37 Michael Hanselmann
      self._lock.release()
412 76094e37 Michael Hanselmann
413 76094e37 Michael Hanselmann
    logging.debug("All workers terminated")