Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ 415feb2e

History | View | Annotate | Download (14.3 kB)

1 76094e37 Michael Hanselmann
#
2 76094e37 Michael Hanselmann
#
3 76094e37 Michael Hanselmann
4 189d2714 Michael Hanselmann
# Copyright (C) 2008, 2009, 2010 Google Inc.
5 76094e37 Michael Hanselmann
#
6 76094e37 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 76094e37 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 76094e37 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 76094e37 Michael Hanselmann
# (at your option) any later version.
10 76094e37 Michael Hanselmann
#
11 76094e37 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 76094e37 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 76094e37 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 76094e37 Michael Hanselmann
# General Public License for more details.
15 76094e37 Michael Hanselmann
#
16 76094e37 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 76094e37 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 76094e37 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 76094e37 Michael Hanselmann
# 02110-1301, USA.
20 76094e37 Michael Hanselmann
21 76094e37 Michael Hanselmann
22 76094e37 Michael Hanselmann
"""Base classes for worker pools.
23 76094e37 Michael Hanselmann

24 76094e37 Michael Hanselmann
"""
25 76094e37 Michael Hanselmann
26 76094e37 Michael Hanselmann
import logging
27 76094e37 Michael Hanselmann
import threading
28 52c47e4e Michael Hanselmann
import heapq
29 76094e37 Michael Hanselmann
30 25e557a5 Guido Trotter
from ganeti import compat
31 52c47e4e Michael Hanselmann
from ganeti import errors
32 25e557a5 Guido Trotter
33 76094e37 Michael Hanselmann
34 21c5ad52 Michael Hanselmann
_TERMINATE = object()
35 52c47e4e Michael Hanselmann
_DEFAULT_PRIORITY = 0
36 52c47e4e Michael Hanselmann
37 52c47e4e Michael Hanselmann
38 52c47e4e Michael Hanselmann
class DeferTask(Exception):
39 52c47e4e Michael Hanselmann
  """Special exception class to defer a task.
40 52c47e4e Michael Hanselmann

41 52c47e4e Michael Hanselmann
  This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
42 52c47e4e Michael Hanselmann
  task. Optionally, the priority of the task can be changed.
43 52c47e4e Michael Hanselmann

44 52c47e4e Michael Hanselmann
  """
45 52c47e4e Michael Hanselmann
  def __init__(self, priority=None):
46 52c47e4e Michael Hanselmann
    """Initializes this class.
47 52c47e4e Michael Hanselmann

48 52c47e4e Michael Hanselmann
    @type priority: number
49 52c47e4e Michael Hanselmann
    @param priority: New task priority (None means no change)
50 52c47e4e Michael Hanselmann

51 52c47e4e Michael Hanselmann
    """
52 52c47e4e Michael Hanselmann
    Exception.__init__(self)
53 52c47e4e Michael Hanselmann
    self.priority = priority
54 21c5ad52 Michael Hanselmann
55 21c5ad52 Michael Hanselmann
56 76094e37 Michael Hanselmann
class BaseWorker(threading.Thread, object):
57 76094e37 Michael Hanselmann
  """Base worker class for worker pools.
58 76094e37 Michael Hanselmann

59 76094e37 Michael Hanselmann
  Users of a worker pool must override RunTask in a subclass.
60 76094e37 Michael Hanselmann

61 76094e37 Michael Hanselmann
  """
62 b459a848 Andrea Spadaccini
  # pylint: disable=W0212
63 76094e37 Michael Hanselmann
  def __init__(self, pool, worker_id):
64 76094e37 Michael Hanselmann
    """Constructor for BaseWorker thread.
65 76094e37 Michael Hanselmann

66 116db7c7 Iustin Pop
    @param pool: the parent worker pool
67 116db7c7 Iustin Pop
    @param worker_id: identifier for this worker
68 76094e37 Michael Hanselmann

69 76094e37 Michael Hanselmann
    """
70 d16e6fd9 Michael Hanselmann
    super(BaseWorker, self).__init__(name=worker_id)
71 76094e37 Michael Hanselmann
    self.pool = pool
72 daba67c7 Michael Hanselmann
    self._worker_id = worker_id
73 76094e37 Michael Hanselmann
    self._current_task = None
74 76094e37 Michael Hanselmann
75 daba67c7 Michael Hanselmann
    assert self.getName() == worker_id
76 daba67c7 Michael Hanselmann
77 76094e37 Michael Hanselmann
  def ShouldTerminate(self):
78 2f4e1516 Michael Hanselmann
    """Returns whether this worker should terminate.
79 2f4e1516 Michael Hanselmann

80 2f4e1516 Michael Hanselmann
    Should only be called from within L{RunTask}.
81 76094e37 Michael Hanselmann

82 76094e37 Michael Hanselmann
    """
83 2f4e1516 Michael Hanselmann
    self.pool._lock.acquire()
84 2f4e1516 Michael Hanselmann
    try:
85 2f4e1516 Michael Hanselmann
      assert self._HasRunningTaskUnlocked()
86 2f4e1516 Michael Hanselmann
      return self.pool._ShouldWorkerTerminateUnlocked(self)
87 2f4e1516 Michael Hanselmann
    finally:
88 2f4e1516 Michael Hanselmann
      self.pool._lock.release()
89 76094e37 Michael Hanselmann
90 52c47e4e Michael Hanselmann
  def GetCurrentPriority(self):
91 52c47e4e Michael Hanselmann
    """Returns the priority of the current task.
92 52c47e4e Michael Hanselmann

93 52c47e4e Michael Hanselmann
    Should only be called from within L{RunTask}.
94 52c47e4e Michael Hanselmann

95 52c47e4e Michael Hanselmann
    """
96 52c47e4e Michael Hanselmann
    self.pool._lock.acquire()
97 52c47e4e Michael Hanselmann
    try:
98 52c47e4e Michael Hanselmann
      assert self._HasRunningTaskUnlocked()
99 52c47e4e Michael Hanselmann
100 52c47e4e Michael Hanselmann
      (priority, _, _) = self._current_task
101 52c47e4e Michael Hanselmann
102 52c47e4e Michael Hanselmann
      return priority
103 52c47e4e Michael Hanselmann
    finally:
104 52c47e4e Michael Hanselmann
      self.pool._lock.release()
105 52c47e4e Michael Hanselmann
106 daba67c7 Michael Hanselmann
  def SetTaskName(self, taskname):
107 daba67c7 Michael Hanselmann
    """Sets the name of the current task.
108 daba67c7 Michael Hanselmann

109 daba67c7 Michael Hanselmann
    Should only be called from within L{RunTask}.
110 daba67c7 Michael Hanselmann

111 daba67c7 Michael Hanselmann
    @type taskname: string
112 daba67c7 Michael Hanselmann
    @param taskname: Task's name
113 daba67c7 Michael Hanselmann

114 daba67c7 Michael Hanselmann
    """
115 daba67c7 Michael Hanselmann
    if taskname:
116 daba67c7 Michael Hanselmann
      name = "%s/%s" % (self._worker_id, taskname)
117 daba67c7 Michael Hanselmann
    else:
118 daba67c7 Michael Hanselmann
      name = self._worker_id
119 daba67c7 Michael Hanselmann
120 daba67c7 Michael Hanselmann
    # Set thread name
121 daba67c7 Michael Hanselmann
    self.setName(name)
122 daba67c7 Michael Hanselmann
123 b3558df1 Michael Hanselmann
  def _HasRunningTaskUnlocked(self):
124 b3558df1 Michael Hanselmann
    """Returns whether this worker is currently running a task.
125 b3558df1 Michael Hanselmann

126 b3558df1 Michael Hanselmann
    """
127 b3558df1 Michael Hanselmann
    return (self._current_task is not None)
128 b3558df1 Michael Hanselmann
129 76094e37 Michael Hanselmann
  def run(self):
130 76094e37 Michael Hanselmann
    """Main thread function.
131 76094e37 Michael Hanselmann

132 76094e37 Michael Hanselmann
    Waits for new tasks to show up in the queue.
133 76094e37 Michael Hanselmann

134 76094e37 Michael Hanselmann
    """
135 76094e37 Michael Hanselmann
    pool = self.pool
136 76094e37 Michael Hanselmann
137 76094e37 Michael Hanselmann
    while True:
138 46d0a3d0 Michael Hanselmann
      assert self._current_task is None
139 52c47e4e Michael Hanselmann
140 52c47e4e Michael Hanselmann
      defer = None
141 76094e37 Michael Hanselmann
      try:
142 21c5ad52 Michael Hanselmann
        # Wait on lock to be told either to terminate or to do a task
143 76094e37 Michael Hanselmann
        pool._lock.acquire()
144 76094e37 Michael Hanselmann
        try:
145 21c5ad52 Michael Hanselmann
          task = pool._WaitForTaskUnlocked(self)
146 76094e37 Michael Hanselmann
147 21c5ad52 Michael Hanselmann
          if task is _TERMINATE:
148 21c5ad52 Michael Hanselmann
            # Told to terminate
149 21c5ad52 Michael Hanselmann
            break
150 b3558df1 Michael Hanselmann
151 21c5ad52 Michael Hanselmann
          if task is None:
152 21c5ad52 Michael Hanselmann
            # Spurious notification, ignore
153 21c5ad52 Michael Hanselmann
            continue
154 76094e37 Michael Hanselmann
155 21c5ad52 Michael Hanselmann
          self._current_task = task
156 76094e37 Michael Hanselmann
157 46d0a3d0 Michael Hanselmann
          # No longer needed, dispose of reference
158 46d0a3d0 Michael Hanselmann
          del task
159 46d0a3d0 Michael Hanselmann
160 21c5ad52 Michael Hanselmann
          assert self._HasRunningTaskUnlocked()
161 46d0a3d0 Michael Hanselmann
162 76094e37 Michael Hanselmann
        finally:
163 76094e37 Michael Hanselmann
          pool._lock.release()
164 76094e37 Michael Hanselmann
165 52c47e4e Michael Hanselmann
        (priority, _, args) = self._current_task
166 76094e37 Michael Hanselmann
        try:
167 52c47e4e Michael Hanselmann
          # Run the actual task
168 52c47e4e Michael Hanselmann
          assert defer is None
169 52c47e4e Michael Hanselmann
          logging.debug("Starting task %r, priority %s", args, priority)
170 daba67c7 Michael Hanselmann
          assert self.getName() == self._worker_id
171 daba67c7 Michael Hanselmann
          try:
172 b459a848 Andrea Spadaccini
            self.RunTask(*args) # pylint: disable=W0142
173 daba67c7 Michael Hanselmann
          finally:
174 daba67c7 Michael Hanselmann
            self.SetTaskName(None)
175 52c47e4e Michael Hanselmann
          logging.debug("Done with task %r, priority %s", args, priority)
176 52c47e4e Michael Hanselmann
        except DeferTask, err:
177 52c47e4e Michael Hanselmann
          defer = err
178 52c47e4e Michael Hanselmann
179 52c47e4e Michael Hanselmann
          if defer.priority is None:
180 52c47e4e Michael Hanselmann
            # Use same priority
181 52c47e4e Michael Hanselmann
            defer.priority = priority
182 52c47e4e Michael Hanselmann
183 e1ea54e9 Michael Hanselmann
          logging.debug("Deferring task %r, new priority %s",
184 e1ea54e9 Michael Hanselmann
                        args, defer.priority)
185 52c47e4e Michael Hanselmann
186 52c47e4e Michael Hanselmann
          assert self._HasRunningTaskUnlocked()
187 b459a848 Andrea Spadaccini
        except: # pylint: disable=W0702
188 02fc74da Michael Hanselmann
          logging.exception("Caught unhandled exception")
189 c1cf1fe5 Michael Hanselmann
190 c1cf1fe5 Michael Hanselmann
        assert self._HasRunningTaskUnlocked()
191 76094e37 Michael Hanselmann
      finally:
192 76094e37 Michael Hanselmann
        # Notify pool
193 76094e37 Michael Hanselmann
        pool._lock.acquire()
194 76094e37 Michael Hanselmann
        try:
195 52c47e4e Michael Hanselmann
          if defer:
196 52c47e4e Michael Hanselmann
            assert self._current_task
197 52c47e4e Michael Hanselmann
            # Schedule again for later run
198 52c47e4e Michael Hanselmann
            (_, _, args) = self._current_task
199 52c47e4e Michael Hanselmann
            pool._AddTaskUnlocked(args, defer.priority)
200 52c47e4e Michael Hanselmann
201 b3558df1 Michael Hanselmann
          if self._current_task:
202 b3558df1 Michael Hanselmann
            self._current_task = None
203 53b1d12b Michael Hanselmann
            pool._worker_to_pool.notifyAll()
204 76094e37 Michael Hanselmann
        finally:
205 76094e37 Michael Hanselmann
          pool._lock.release()
206 76094e37 Michael Hanselmann
207 c1cf1fe5 Michael Hanselmann
      assert not self._HasRunningTaskUnlocked()
208 c1cf1fe5 Michael Hanselmann
209 02fc74da Michael Hanselmann
    logging.debug("Terminates")
210 b3558df1 Michael Hanselmann
211 76094e37 Michael Hanselmann
  def RunTask(self, *args):
212 76094e37 Michael Hanselmann
    """Function called to start a task.
213 76094e37 Michael Hanselmann

214 116db7c7 Iustin Pop
    This needs to be implemented by child classes.
215 116db7c7 Iustin Pop

216 76094e37 Michael Hanselmann
    """
217 76094e37 Michael Hanselmann
    raise NotImplementedError()
218 76094e37 Michael Hanselmann
219 76094e37 Michael Hanselmann
220 76094e37 Michael Hanselmann
class WorkerPool(object):
221 76094e37 Michael Hanselmann
  """Worker pool with a queue.
222 76094e37 Michael Hanselmann

223 76094e37 Michael Hanselmann
  This class is thread-safe.
224 76094e37 Michael Hanselmann

225 116db7c7 Iustin Pop
  Tasks are guaranteed to be started in the order in which they're
226 116db7c7 Iustin Pop
  added to the pool. Due to the nature of threading, they're not
227 116db7c7 Iustin Pop
  guaranteed to finish in the same order.
228 76094e37 Michael Hanselmann

229 76094e37 Michael Hanselmann
  """
230 89e2b4d2 Michael Hanselmann
  def __init__(self, name, num_workers, worker_class):
231 76094e37 Michael Hanselmann
    """Constructor for worker pool.
232 76094e37 Michael Hanselmann

233 116db7c7 Iustin Pop
    @param num_workers: number of workers to be started
234 116db7c7 Iustin Pop
        (dynamic resizing is not yet implemented)
235 116db7c7 Iustin Pop
    @param worker_class: the class to be instantiated for workers;
236 116db7c7 Iustin Pop
        should derive from L{BaseWorker}
237 76094e37 Michael Hanselmann

238 76094e37 Michael Hanselmann
    """
239 76094e37 Michael Hanselmann
    # Some of these variables are accessed by BaseWorker
240 53b1d12b Michael Hanselmann
    self._lock = threading.Lock()
241 53b1d12b Michael Hanselmann
    self._pool_to_pool = threading.Condition(self._lock)
242 53b1d12b Michael Hanselmann
    self._pool_to_worker = threading.Condition(self._lock)
243 53b1d12b Michael Hanselmann
    self._worker_to_pool = threading.Condition(self._lock)
244 76094e37 Michael Hanselmann
    self._worker_class = worker_class
245 89e2b4d2 Michael Hanselmann
    self._name = name
246 76094e37 Michael Hanselmann
    self._last_worker_id = 0
247 76094e37 Michael Hanselmann
    self._workers = []
248 76094e37 Michael Hanselmann
    self._quiescing = False
249 27caa993 Michael Hanselmann
    self._active = True
250 76094e37 Michael Hanselmann
251 76094e37 Michael Hanselmann
    # Terminating workers
252 76094e37 Michael Hanselmann
    self._termworkers = []
253 76094e37 Michael Hanselmann
254 76094e37 Michael Hanselmann
    # Queued tasks
255 52c47e4e Michael Hanselmann
    self._counter = 0
256 52c47e4e Michael Hanselmann
    self._tasks = []
257 76094e37 Michael Hanselmann
258 76094e37 Michael Hanselmann
    # Start workers
259 76094e37 Michael Hanselmann
    self.Resize(num_workers)
260 76094e37 Michael Hanselmann
261 76094e37 Michael Hanselmann
  # TODO: Implement dynamic resizing?
262 76094e37 Michael Hanselmann
263 c2a8e8ba Guido Trotter
  def _WaitWhileQuiescingUnlocked(self):
264 c2a8e8ba Guido Trotter
    """Wait until the worker pool has finished quiescing.
265 c2a8e8ba Guido Trotter

266 c2a8e8ba Guido Trotter
    """
267 c2a8e8ba Guido Trotter
    while self._quiescing:
268 c2a8e8ba Guido Trotter
      self._pool_to_pool.wait()
269 c2a8e8ba Guido Trotter
270 52c47e4e Michael Hanselmann
  def _AddTaskUnlocked(self, args, priority):
271 52c47e4e Michael Hanselmann
    """Adds a task to the internal queue.
272 52c47e4e Michael Hanselmann

273 52c47e4e Michael Hanselmann
    @type args: sequence
274 52c47e4e Michael Hanselmann
    @param args: Arguments passed to L{BaseWorker.RunTask}
275 52c47e4e Michael Hanselmann
    @type priority: number
276 52c47e4e Michael Hanselmann
    @param priority: Task priority
277 52c47e4e Michael Hanselmann

278 52c47e4e Michael Hanselmann
    """
279 189d2714 Michael Hanselmann
    assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
280 52c47e4e Michael Hanselmann
    assert isinstance(priority, (int, long)), "Priority must be numeric"
281 189d2714 Michael Hanselmann
282 52c47e4e Michael Hanselmann
    # This counter is used to ensure elements are processed in their
283 52c47e4e Michael Hanselmann
    # incoming order. For processing they're sorted by priority and then
284 52c47e4e Michael Hanselmann
    # counter.
285 52c47e4e Michael Hanselmann
    self._counter += 1
286 52c47e4e Michael Hanselmann
287 52c47e4e Michael Hanselmann
    heapq.heappush(self._tasks, (priority, self._counter, args))
288 189d2714 Michael Hanselmann
289 189d2714 Michael Hanselmann
    # Notify a waiting worker
290 189d2714 Michael Hanselmann
    self._pool_to_worker.notify()
291 189d2714 Michael Hanselmann
292 52c47e4e Michael Hanselmann
  def AddTask(self, args, priority=_DEFAULT_PRIORITY):
293 76094e37 Michael Hanselmann
    """Adds a task to the queue.
294 76094e37 Michael Hanselmann

295 b2e8a4d9 Michael Hanselmann
    @type args: sequence
296 116db7c7 Iustin Pop
    @param args: arguments passed to L{BaseWorker.RunTask}
297 52c47e4e Michael Hanselmann
    @type priority: number
298 52c47e4e Michael Hanselmann
    @param priority: Task priority
299 76094e37 Michael Hanselmann

300 76094e37 Michael Hanselmann
    """
301 76094e37 Michael Hanselmann
    self._lock.acquire()
302 76094e37 Michael Hanselmann
    try:
303 c2a8e8ba Guido Trotter
      self._WaitWhileQuiescingUnlocked()
304 52c47e4e Michael Hanselmann
      self._AddTaskUnlocked(args, priority)
305 76094e37 Michael Hanselmann
    finally:
306 76094e37 Michael Hanselmann
      self._lock.release()
307 76094e37 Michael Hanselmann
308 52c47e4e Michael Hanselmann
  def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY):
309 c2a8e8ba Guido Trotter
    """Add a list of tasks to the queue.
310 c2a8e8ba Guido Trotter

311 c2a8e8ba Guido Trotter
    @type tasks: list of tuples
312 c2a8e8ba Guido Trotter
    @param tasks: list of args passed to L{BaseWorker.RunTask}
313 52c47e4e Michael Hanselmann
    @type priority: number or list of numbers
314 52c47e4e Michael Hanselmann
    @param priority: Priority for all added tasks or a list with the priority
315 52c47e4e Michael Hanselmann
                     for each task
316 c2a8e8ba Guido Trotter

317 c2a8e8ba Guido Trotter
    """
318 25e557a5 Guido Trotter
    assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
319 25e557a5 Guido Trotter
      "Each task must be a sequence"
320 25e557a5 Guido Trotter
321 52c47e4e Michael Hanselmann
    assert (isinstance(priority, (int, long)) or
322 52c47e4e Michael Hanselmann
            compat.all(isinstance(prio, (int, long)) for prio in priority)), \
323 52c47e4e Michael Hanselmann
           "Priority must be numeric or be a list of numeric values"
324 52c47e4e Michael Hanselmann
325 52c47e4e Michael Hanselmann
    if isinstance(priority, (int, long)):
326 52c47e4e Michael Hanselmann
      priority = [priority] * len(tasks)
327 52c47e4e Michael Hanselmann
    elif len(priority) != len(tasks):
328 52c47e4e Michael Hanselmann
      raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
329 52c47e4e Michael Hanselmann
                                   " number of tasks (%s)" %
330 52c47e4e Michael Hanselmann
                                   (len(priority), len(tasks)))
331 52c47e4e Michael Hanselmann
332 c2a8e8ba Guido Trotter
    self._lock.acquire()
333 c2a8e8ba Guido Trotter
    try:
334 c2a8e8ba Guido Trotter
      self._WaitWhileQuiescingUnlocked()
335 c2a8e8ba Guido Trotter
336 52c47e4e Michael Hanselmann
      assert compat.all(isinstance(prio, (int, long)) for prio in priority)
337 52c47e4e Michael Hanselmann
      assert len(tasks) == len(priority)
338 52c47e4e Michael Hanselmann
339 52c47e4e Michael Hanselmann
      for args, priority in zip(tasks, priority):
340 52c47e4e Michael Hanselmann
        self._AddTaskUnlocked(args, priority)
341 c2a8e8ba Guido Trotter
    finally:
342 c2a8e8ba Guido Trotter
      self._lock.release()
343 c2a8e8ba Guido Trotter
344 27caa993 Michael Hanselmann
  def SetActive(self, active):
345 27caa993 Michael Hanselmann
    """Enable/disable processing of tasks.
346 27caa993 Michael Hanselmann

347 27caa993 Michael Hanselmann
    This is different from L{Quiesce} in the sense that this function just
348 27caa993 Michael Hanselmann
    changes an internal flag and doesn't wait for the queue to be empty. Tasks
349 27caa993 Michael Hanselmann
    already being processed continue normally, but no new tasks will be
350 27caa993 Michael Hanselmann
    started. New tasks can still be added.
351 27caa993 Michael Hanselmann

352 27caa993 Michael Hanselmann
    @type active: bool
353 27caa993 Michael Hanselmann
    @param active: Whether tasks should be processed
354 27caa993 Michael Hanselmann

355 27caa993 Michael Hanselmann
    """
356 27caa993 Michael Hanselmann
    self._lock.acquire()
357 27caa993 Michael Hanselmann
    try:
358 27caa993 Michael Hanselmann
      self._active = active
359 27caa993 Michael Hanselmann
360 27caa993 Michael Hanselmann
      if active:
361 27caa993 Michael Hanselmann
        # Tell all workers to continue processing
362 27caa993 Michael Hanselmann
        self._pool_to_worker.notifyAll()
363 27caa993 Michael Hanselmann
    finally:
364 27caa993 Michael Hanselmann
      self._lock.release()
365 27caa993 Michael Hanselmann
366 21c5ad52 Michael Hanselmann
  def _WaitForTaskUnlocked(self, worker):
367 21c5ad52 Michael Hanselmann
    """Waits for a task for a worker.
368 21c5ad52 Michael Hanselmann

369 21c5ad52 Michael Hanselmann
    @type worker: L{BaseWorker}
370 21c5ad52 Michael Hanselmann
    @param worker: Worker thread
371 21c5ad52 Michael Hanselmann

372 21c5ad52 Michael Hanselmann
    """
373 21c5ad52 Michael Hanselmann
    if self._ShouldWorkerTerminateUnlocked(worker):
374 21c5ad52 Michael Hanselmann
      return _TERMINATE
375 21c5ad52 Michael Hanselmann
376 21c5ad52 Michael Hanselmann
    # We only wait if there's no task for us.
377 27caa993 Michael Hanselmann
    if not (self._active and self._tasks):
378 21c5ad52 Michael Hanselmann
      logging.debug("Waiting for tasks")
379 21c5ad52 Michael Hanselmann
380 2db05c94 Michael Hanselmann
      while True:
381 2db05c94 Michael Hanselmann
        # wait() releases the lock and sleeps until notified
382 2db05c94 Michael Hanselmann
        self._pool_to_worker.wait()
383 21c5ad52 Michael Hanselmann
384 2db05c94 Michael Hanselmann
        logging.debug("Notified while waiting")
385 21c5ad52 Michael Hanselmann
386 2db05c94 Michael Hanselmann
        # Were we woken up in order to terminate?
387 2db05c94 Michael Hanselmann
        if self._ShouldWorkerTerminateUnlocked(worker):
388 2db05c94 Michael Hanselmann
          return _TERMINATE
389 21c5ad52 Michael Hanselmann
390 27caa993 Michael Hanselmann
        # Just loop if pool is not processing tasks at this time
391 27caa993 Michael Hanselmann
        if self._active and self._tasks:
392 2db05c94 Michael Hanselmann
          break
393 21c5ad52 Michael Hanselmann
394 21c5ad52 Michael Hanselmann
    # Get task from queue and tell pool about it
395 21c5ad52 Michael Hanselmann
    try:
396 52c47e4e Michael Hanselmann
      return heapq.heappop(self._tasks)
397 21c5ad52 Michael Hanselmann
    finally:
398 21c5ad52 Michael Hanselmann
      self._worker_to_pool.notifyAll()
399 21c5ad52 Michael Hanselmann
400 76094e37 Michael Hanselmann
  def _ShouldWorkerTerminateUnlocked(self, worker):
401 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
402 76094e37 Michael Hanselmann

403 76094e37 Michael Hanselmann
    """
404 76094e37 Michael Hanselmann
    return (worker in self._termworkers)
405 76094e37 Michael Hanselmann
406 76094e37 Michael Hanselmann
  def _HasRunningTasksUnlocked(self):
407 76094e37 Michael Hanselmann
    """Checks whether there's a task running in a worker.
408 76094e37 Michael Hanselmann

409 76094e37 Michael Hanselmann
    """
410 76094e37 Michael Hanselmann
    for worker in self._workers + self._termworkers:
411 b459a848 Andrea Spadaccini
      if worker._HasRunningTaskUnlocked(): # pylint: disable=W0212
412 76094e37 Michael Hanselmann
        return True
413 76094e37 Michael Hanselmann
    return False
414 76094e37 Michael Hanselmann
415 ef52306a Michael Hanselmann
  def HasRunningTasks(self):
416 ef52306a Michael Hanselmann
    """Checks whether there's at least one task running.
417 ef52306a Michael Hanselmann

418 ef52306a Michael Hanselmann
    """
419 ef52306a Michael Hanselmann
    self._lock.acquire()
420 ef52306a Michael Hanselmann
    try:
421 ef52306a Michael Hanselmann
      return self._HasRunningTasksUnlocked()
422 ef52306a Michael Hanselmann
    finally:
423 ef52306a Michael Hanselmann
      self._lock.release()
424 ef52306a Michael Hanselmann
425 76094e37 Michael Hanselmann
  def Quiesce(self):
426 76094e37 Michael Hanselmann
    """Waits until the task queue is empty.
427 76094e37 Michael Hanselmann

428 76094e37 Michael Hanselmann
    """
429 76094e37 Michael Hanselmann
    self._lock.acquire()
430 76094e37 Michael Hanselmann
    try:
431 76094e37 Michael Hanselmann
      self._quiescing = True
432 76094e37 Michael Hanselmann
433 76094e37 Michael Hanselmann
      # Wait while there are tasks pending or running
434 76094e37 Michael Hanselmann
      while self._tasks or self._HasRunningTasksUnlocked():
435 53b1d12b Michael Hanselmann
        self._worker_to_pool.wait()
436 76094e37 Michael Hanselmann
437 76094e37 Michael Hanselmann
    finally:
438 76094e37 Michael Hanselmann
      self._quiescing = False
439 76094e37 Michael Hanselmann
440 76094e37 Michael Hanselmann
      # Make sure AddTasks continues in case it was waiting
441 53b1d12b Michael Hanselmann
      self._pool_to_pool.notifyAll()
442 76094e37 Michael Hanselmann
443 76094e37 Michael Hanselmann
      self._lock.release()
444 76094e37 Michael Hanselmann
445 76094e37 Michael Hanselmann
  def _NewWorkerIdUnlocked(self):
446 116db7c7 Iustin Pop
    """Return an identifier for a new worker.
447 116db7c7 Iustin Pop

448 116db7c7 Iustin Pop
    """
449 76094e37 Michael Hanselmann
    self._last_worker_id += 1
450 89e2b4d2 Michael Hanselmann
451 89e2b4d2 Michael Hanselmann
    return "%s%d" % (self._name, self._last_worker_id)
452 76094e37 Michael Hanselmann
453 76094e37 Michael Hanselmann
  def _ResizeUnlocked(self, num_workers):
454 76094e37 Michael Hanselmann
    """Changes the number of workers.
455 76094e37 Michael Hanselmann

456 76094e37 Michael Hanselmann
    """
457 76094e37 Michael Hanselmann
    assert num_workers >= 0, "num_workers must be >= 0"
458 76094e37 Michael Hanselmann
459 76094e37 Michael Hanselmann
    logging.debug("Resizing to %s workers", num_workers)
460 76094e37 Michael Hanselmann
461 76094e37 Michael Hanselmann
    current_count = len(self._workers)
462 76094e37 Michael Hanselmann
463 76094e37 Michael Hanselmann
    if current_count == num_workers:
464 76094e37 Michael Hanselmann
      # Nothing to do
465 76094e37 Michael Hanselmann
      pass
466 76094e37 Michael Hanselmann
467 76094e37 Michael Hanselmann
    elif current_count > num_workers:
468 76094e37 Michael Hanselmann
      if num_workers == 0:
469 76094e37 Michael Hanselmann
        # Create copy of list to iterate over while lock isn't held.
470 76094e37 Michael Hanselmann
        termworkers = self._workers[:]
471 76094e37 Michael Hanselmann
        del self._workers[:]
472 76094e37 Michael Hanselmann
      else:
473 76094e37 Michael Hanselmann
        # TODO: Implement partial downsizing
474 76094e37 Michael Hanselmann
        raise NotImplementedError()
475 76094e37 Michael Hanselmann
        #termworkers = ...
476 76094e37 Michael Hanselmann
477 76094e37 Michael Hanselmann
      self._termworkers += termworkers
478 76094e37 Michael Hanselmann
479 76094e37 Michael Hanselmann
      # Notify workers that something has changed
480 53b1d12b Michael Hanselmann
      self._pool_to_worker.notifyAll()
481 76094e37 Michael Hanselmann
482 76094e37 Michael Hanselmann
      # Join all terminating workers
483 76094e37 Michael Hanselmann
      self._lock.release()
484 76094e37 Michael Hanselmann
      try:
485 76094e37 Michael Hanselmann
        for worker in termworkers:
486 c0a8eb9e Michael Hanselmann
          logging.debug("Waiting for thread %s", worker.getName())
487 76094e37 Michael Hanselmann
          worker.join()
488 76094e37 Michael Hanselmann
      finally:
489 76094e37 Michael Hanselmann
        self._lock.acquire()
490 76094e37 Michael Hanselmann
491 76094e37 Michael Hanselmann
      # Remove terminated threads. This could be done in a more efficient way
492 76094e37 Michael Hanselmann
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
493 76094e37 Michael Hanselmann
      # don't leave zombie threads around.
494 76094e37 Michael Hanselmann
      for worker in termworkers:
495 76094e37 Michael Hanselmann
        assert worker in self._termworkers, ("Worker not in list of"
496 76094e37 Michael Hanselmann
                                             " terminating workers")
497 76094e37 Michael Hanselmann
        if not worker.isAlive():
498 76094e37 Michael Hanselmann
          self._termworkers.remove(worker)
499 76094e37 Michael Hanselmann
500 76094e37 Michael Hanselmann
      assert not self._termworkers, "Zombie worker detected"
501 76094e37 Michael Hanselmann
502 76094e37 Michael Hanselmann
    elif current_count < num_workers:
503 76094e37 Michael Hanselmann
      # Create (num_workers - current_count) new workers
504 f1501b3f Michael Hanselmann
      for _ in range(num_workers - current_count):
505 76094e37 Michael Hanselmann
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
506 76094e37 Michael Hanselmann
        self._workers.append(worker)
507 76094e37 Michael Hanselmann
        worker.start()
508 76094e37 Michael Hanselmann
509 76094e37 Michael Hanselmann
  def Resize(self, num_workers):
510 76094e37 Michael Hanselmann
    """Changes the number of workers in the pool.
511 76094e37 Michael Hanselmann

512 116db7c7 Iustin Pop
    @param num_workers: the new number of workers
513 76094e37 Michael Hanselmann

514 76094e37 Michael Hanselmann
    """
515 76094e37 Michael Hanselmann
    self._lock.acquire()
516 76094e37 Michael Hanselmann
    try:
517 76094e37 Michael Hanselmann
      return self._ResizeUnlocked(num_workers)
518 76094e37 Michael Hanselmann
    finally:
519 76094e37 Michael Hanselmann
      self._lock.release()
520 76094e37 Michael Hanselmann
521 76094e37 Michael Hanselmann
  def TerminateWorkers(self):
522 76094e37 Michael Hanselmann
    """Terminate all worker threads.
523 76094e37 Michael Hanselmann

524 76094e37 Michael Hanselmann
    Unstarted tasks will be ignored.
525 76094e37 Michael Hanselmann

526 76094e37 Michael Hanselmann
    """
527 76094e37 Michael Hanselmann
    logging.debug("Terminating all workers")
528 76094e37 Michael Hanselmann
529 76094e37 Michael Hanselmann
    self._lock.acquire()
530 76094e37 Michael Hanselmann
    try:
531 76094e37 Michael Hanselmann
      self._ResizeUnlocked(0)
532 76094e37 Michael Hanselmann
533 76094e37 Michael Hanselmann
      if self._tasks:
534 76094e37 Michael Hanselmann
        logging.debug("There are %s tasks left", len(self._tasks))
535 76094e37 Michael Hanselmann
    finally:
536 76094e37 Michael Hanselmann
      self._lock.release()
537 76094e37 Michael Hanselmann
538 76094e37 Michael Hanselmann
    logging.debug("All workers terminated")