Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ 2034c70d

History | View | Annotate | Download (9.5 kB)

1 76094e37 Michael Hanselmann
#
2 76094e37 Michael Hanselmann
#
3 76094e37 Michael Hanselmann
4 76094e37 Michael Hanselmann
# Copyright (C) 2008 Google Inc.
5 76094e37 Michael Hanselmann
#
6 76094e37 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 76094e37 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 76094e37 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 76094e37 Michael Hanselmann
# (at your option) any later version.
10 76094e37 Michael Hanselmann
#
11 76094e37 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 76094e37 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 76094e37 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 76094e37 Michael Hanselmann
# General Public License for more details.
15 76094e37 Michael Hanselmann
#
16 76094e37 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 76094e37 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 76094e37 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 76094e37 Michael Hanselmann
# 02110-1301, USA.
20 76094e37 Michael Hanselmann
21 76094e37 Michael Hanselmann
22 76094e37 Michael Hanselmann
"""Base classes for worker pools.
23 76094e37 Michael Hanselmann

24 76094e37 Michael Hanselmann
"""
25 76094e37 Michael Hanselmann
26 76094e37 Michael Hanselmann
import collections
27 76094e37 Michael Hanselmann
import logging
28 76094e37 Michael Hanselmann
import threading
29 76094e37 Michael Hanselmann
30 25e557a5 Guido Trotter
from ganeti import compat
31 25e557a5 Guido Trotter
32 76094e37 Michael Hanselmann
33 76094e37 Michael Hanselmann
class BaseWorker(threading.Thread, object):
34 76094e37 Michael Hanselmann
  """Base worker class for worker pools.
35 76094e37 Michael Hanselmann

36 76094e37 Michael Hanselmann
  Users of a worker pool must override RunTask in a subclass.
37 76094e37 Michael Hanselmann

38 76094e37 Michael Hanselmann
  """
39 7260cfbe Iustin Pop
  # pylint: disable-msg=W0212
40 76094e37 Michael Hanselmann
  def __init__(self, pool, worker_id):
41 76094e37 Michael Hanselmann
    """Constructor for BaseWorker thread.
42 76094e37 Michael Hanselmann

43 116db7c7 Iustin Pop
    @param pool: the parent worker pool
44 116db7c7 Iustin Pop
    @param worker_id: identifier for this worker
45 76094e37 Michael Hanselmann

46 76094e37 Michael Hanselmann
    """
47 d16e6fd9 Michael Hanselmann
    super(BaseWorker, self).__init__(name=worker_id)
48 76094e37 Michael Hanselmann
    self.pool = pool
49 76094e37 Michael Hanselmann
    self._current_task = None
50 76094e37 Michael Hanselmann
51 76094e37 Michael Hanselmann
  def ShouldTerminate(self):
52 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
53 76094e37 Michael Hanselmann

54 76094e37 Michael Hanselmann
    """
55 76094e37 Michael Hanselmann
    return self.pool.ShouldWorkerTerminate(self)
56 76094e37 Michael Hanselmann
57 b3558df1 Michael Hanselmann
  def _HasRunningTaskUnlocked(self):
58 b3558df1 Michael Hanselmann
    """Returns whether this worker is currently running a task.
59 b3558df1 Michael Hanselmann

60 b3558df1 Michael Hanselmann
    """
61 b3558df1 Michael Hanselmann
    return (self._current_task is not None)
62 b3558df1 Michael Hanselmann
63 b3558df1 Michael Hanselmann
  def HasRunningTask(self):
64 b3558df1 Michael Hanselmann
    """Returns whether this worker is currently running a task.
65 b3558df1 Michael Hanselmann

66 b3558df1 Michael Hanselmann
    """
67 b3558df1 Michael Hanselmann
    self.pool._lock.acquire()
68 b3558df1 Michael Hanselmann
    try:
69 b3558df1 Michael Hanselmann
      return self._HasRunningTaskUnlocked()
70 b3558df1 Michael Hanselmann
    finally:
71 b3558df1 Michael Hanselmann
      self.pool._lock.release()
72 b3558df1 Michael Hanselmann
73 76094e37 Michael Hanselmann
  def run(self):
74 76094e37 Michael Hanselmann
    """Main thread function.
75 76094e37 Michael Hanselmann

76 76094e37 Michael Hanselmann
    Waits for new tasks to show up in the queue.
77 76094e37 Michael Hanselmann

78 76094e37 Michael Hanselmann
    """
79 76094e37 Michael Hanselmann
    pool = self.pool
80 76094e37 Michael Hanselmann
81 b3558df1 Michael Hanselmann
    assert not self.HasRunningTask()
82 76094e37 Michael Hanselmann
83 76094e37 Michael Hanselmann
    while True:
84 76094e37 Michael Hanselmann
      try:
85 76094e37 Michael Hanselmann
        # We wait on lock to be told either terminate or do a task.
86 76094e37 Michael Hanselmann
        pool._lock.acquire()
87 76094e37 Michael Hanselmann
        try:
88 76094e37 Michael Hanselmann
          if pool._ShouldWorkerTerminateUnlocked(self):
89 76094e37 Michael Hanselmann
            break
90 76094e37 Michael Hanselmann
91 76094e37 Michael Hanselmann
          # We only wait if there's no task for us.
92 76094e37 Michael Hanselmann
          if not pool._tasks:
93 02fc74da Michael Hanselmann
            logging.debug("Waiting for tasks")
94 b3558df1 Michael Hanselmann
95 76094e37 Michael Hanselmann
            # wait() releases the lock and sleeps until notified
96 53b1d12b Michael Hanselmann
            pool._pool_to_worker.wait()
97 76094e37 Michael Hanselmann
98 02fc74da Michael Hanselmann
            logging.debug("Notified while waiting")
99 b3558df1 Michael Hanselmann
100 76094e37 Michael Hanselmann
            # Were we woken up in order to terminate?
101 76094e37 Michael Hanselmann
            if pool._ShouldWorkerTerminateUnlocked(self):
102 76094e37 Michael Hanselmann
              break
103 76094e37 Michael Hanselmann
104 76094e37 Michael Hanselmann
            if not pool._tasks:
105 76094e37 Michael Hanselmann
              # Spurious notification, ignore
106 76094e37 Michael Hanselmann
              continue
107 76094e37 Michael Hanselmann
108 76094e37 Michael Hanselmann
          # Get task from queue and tell pool about it
109 76094e37 Michael Hanselmann
          try:
110 76094e37 Michael Hanselmann
            self._current_task = pool._tasks.popleft()
111 76094e37 Michael Hanselmann
          finally:
112 53b1d12b Michael Hanselmann
            pool._worker_to_pool.notifyAll()
113 76094e37 Michael Hanselmann
        finally:
114 76094e37 Michael Hanselmann
          pool._lock.release()
115 76094e37 Michael Hanselmann
116 76094e37 Michael Hanselmann
        # Run the actual task
117 76094e37 Michael Hanselmann
        try:
118 02fc74da Michael Hanselmann
          logging.debug("Starting task %r", self._current_task)
119 76094e37 Michael Hanselmann
          self.RunTask(*self._current_task)
120 02fc74da Michael Hanselmann
          logging.debug("Done with task %r", self._current_task)
121 7260cfbe Iustin Pop
        except: # pylint: disable-msg=W0702
122 02fc74da Michael Hanselmann
          logging.exception("Caught unhandled exception")
123 76094e37 Michael Hanselmann
      finally:
124 76094e37 Michael Hanselmann
        # Notify pool
125 76094e37 Michael Hanselmann
        pool._lock.acquire()
126 76094e37 Michael Hanselmann
        try:
127 b3558df1 Michael Hanselmann
          if self._current_task:
128 b3558df1 Michael Hanselmann
            self._current_task = None
129 53b1d12b Michael Hanselmann
            pool._worker_to_pool.notifyAll()
130 76094e37 Michael Hanselmann
        finally:
131 76094e37 Michael Hanselmann
          pool._lock.release()
132 76094e37 Michael Hanselmann
133 02fc74da Michael Hanselmann
    logging.debug("Terminates")
134 b3558df1 Michael Hanselmann
135 76094e37 Michael Hanselmann
  def RunTask(self, *args):
136 76094e37 Michael Hanselmann
    """Function called to start a task.
137 76094e37 Michael Hanselmann

138 116db7c7 Iustin Pop
    This needs to be implemented by child classes.
139 116db7c7 Iustin Pop

140 76094e37 Michael Hanselmann
    """
141 76094e37 Michael Hanselmann
    raise NotImplementedError()
142 76094e37 Michael Hanselmann
143 76094e37 Michael Hanselmann
144 76094e37 Michael Hanselmann
class WorkerPool(object):
145 76094e37 Michael Hanselmann
  """Worker pool with a queue.
146 76094e37 Michael Hanselmann

147 76094e37 Michael Hanselmann
  This class is thread-safe.
148 76094e37 Michael Hanselmann

149 116db7c7 Iustin Pop
  Tasks are guaranteed to be started in the order in which they're
150 116db7c7 Iustin Pop
  added to the pool. Due to the nature of threading, they're not
151 116db7c7 Iustin Pop
  guaranteed to finish in the same order.
152 76094e37 Michael Hanselmann

153 76094e37 Michael Hanselmann
  """
154 89e2b4d2 Michael Hanselmann
  def __init__(self, name, num_workers, worker_class):
155 76094e37 Michael Hanselmann
    """Constructor for worker pool.
156 76094e37 Michael Hanselmann

157 116db7c7 Iustin Pop
    @param num_workers: number of workers to be started
158 116db7c7 Iustin Pop
        (dynamic resizing is not yet implemented)
159 116db7c7 Iustin Pop
    @param worker_class: the class to be instantiated for workers;
160 116db7c7 Iustin Pop
        should derive from L{BaseWorker}
161 76094e37 Michael Hanselmann

162 76094e37 Michael Hanselmann
    """
163 76094e37 Michael Hanselmann
    # Some of these variables are accessed by BaseWorker
164 53b1d12b Michael Hanselmann
    self._lock = threading.Lock()
165 53b1d12b Michael Hanselmann
    self._pool_to_pool = threading.Condition(self._lock)
166 53b1d12b Michael Hanselmann
    self._pool_to_worker = threading.Condition(self._lock)
167 53b1d12b Michael Hanselmann
    self._worker_to_pool = threading.Condition(self._lock)
168 76094e37 Michael Hanselmann
    self._worker_class = worker_class
169 89e2b4d2 Michael Hanselmann
    self._name = name
170 76094e37 Michael Hanselmann
    self._last_worker_id = 0
171 76094e37 Michael Hanselmann
    self._workers = []
172 76094e37 Michael Hanselmann
    self._quiescing = False
173 76094e37 Michael Hanselmann
174 76094e37 Michael Hanselmann
    # Terminating workers
175 76094e37 Michael Hanselmann
    self._termworkers = []
176 76094e37 Michael Hanselmann
177 76094e37 Michael Hanselmann
    # Queued tasks
178 76094e37 Michael Hanselmann
    self._tasks = collections.deque()
179 76094e37 Michael Hanselmann
180 76094e37 Michael Hanselmann
    # Start workers
181 76094e37 Michael Hanselmann
    self.Resize(num_workers)
182 76094e37 Michael Hanselmann
183 76094e37 Michael Hanselmann
  # TODO: Implement dynamic resizing?
184 76094e37 Michael Hanselmann
185 c2a8e8ba Guido Trotter
  def _WaitWhileQuiescingUnlocked(self):
186 c2a8e8ba Guido Trotter
    """Wait until the worker pool has finished quiescing.
187 c2a8e8ba Guido Trotter

188 c2a8e8ba Guido Trotter
    """
189 c2a8e8ba Guido Trotter
    while self._quiescing:
190 c2a8e8ba Guido Trotter
      self._pool_to_pool.wait()
191 c2a8e8ba Guido Trotter
192 76094e37 Michael Hanselmann
  def AddTask(self, *args):
193 76094e37 Michael Hanselmann
    """Adds a task to the queue.
194 76094e37 Michael Hanselmann

195 116db7c7 Iustin Pop
    @param args: arguments passed to L{BaseWorker.RunTask}
196 76094e37 Michael Hanselmann

197 76094e37 Michael Hanselmann
    """
198 76094e37 Michael Hanselmann
    self._lock.acquire()
199 76094e37 Michael Hanselmann
    try:
200 c2a8e8ba Guido Trotter
      self._WaitWhileQuiescingUnlocked()
201 76094e37 Michael Hanselmann
202 76094e37 Michael Hanselmann
      self._tasks.append(args)
203 53b1d12b Michael Hanselmann
204 53b1d12b Michael Hanselmann
      # Wake one idling worker up
205 53b1d12b Michael Hanselmann
      self._pool_to_worker.notify()
206 76094e37 Michael Hanselmann
    finally:
207 76094e37 Michael Hanselmann
      self._lock.release()
208 76094e37 Michael Hanselmann
209 c2a8e8ba Guido Trotter
  def AddManyTasks(self, tasks):
210 c2a8e8ba Guido Trotter
    """Add a list of tasks to the queue.
211 c2a8e8ba Guido Trotter

212 c2a8e8ba Guido Trotter
    @type tasks: list of tuples
213 c2a8e8ba Guido Trotter
    @param tasks: list of args passed to L{BaseWorker.RunTask}
214 c2a8e8ba Guido Trotter

215 c2a8e8ba Guido Trotter
    """
216 25e557a5 Guido Trotter
    assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
217 25e557a5 Guido Trotter
      "Each task must be a sequence"
218 25e557a5 Guido Trotter
219 c2a8e8ba Guido Trotter
    self._lock.acquire()
220 c2a8e8ba Guido Trotter
    try:
221 c2a8e8ba Guido Trotter
      self._WaitWhileQuiescingUnlocked()
222 c2a8e8ba Guido Trotter
223 c2a8e8ba Guido Trotter
      self._tasks.extend(tasks)
224 c2a8e8ba Guido Trotter
225 c2a8e8ba Guido Trotter
      for _ in tasks:
226 c2a8e8ba Guido Trotter
        self._pool_to_worker.notify()
227 c2a8e8ba Guido Trotter
    finally:
228 c2a8e8ba Guido Trotter
      self._lock.release()
229 c2a8e8ba Guido Trotter
230 76094e37 Michael Hanselmann
  def _ShouldWorkerTerminateUnlocked(self, worker):
231 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
232 76094e37 Michael Hanselmann

233 76094e37 Michael Hanselmann
    """
234 76094e37 Michael Hanselmann
    return (worker in self._termworkers)
235 76094e37 Michael Hanselmann
236 76094e37 Michael Hanselmann
  def ShouldWorkerTerminate(self, worker):
237 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
238 76094e37 Michael Hanselmann

239 76094e37 Michael Hanselmann
    """
240 76094e37 Michael Hanselmann
    self._lock.acquire()
241 76094e37 Michael Hanselmann
    try:
242 805f0c07 Iustin Pop
      return self._ShouldWorkerTerminateUnlocked(worker)
243 76094e37 Michael Hanselmann
    finally:
244 76094e37 Michael Hanselmann
      self._lock.release()
245 76094e37 Michael Hanselmann
246 76094e37 Michael Hanselmann
  def _HasRunningTasksUnlocked(self):
247 76094e37 Michael Hanselmann
    """Checks whether there's a task running in a worker.
248 76094e37 Michael Hanselmann

249 76094e37 Michael Hanselmann
    """
250 76094e37 Michael Hanselmann
    for worker in self._workers + self._termworkers:
251 7260cfbe Iustin Pop
      if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
252 76094e37 Michael Hanselmann
        return True
253 76094e37 Michael Hanselmann
    return False
254 76094e37 Michael Hanselmann
255 76094e37 Michael Hanselmann
  def Quiesce(self):
256 76094e37 Michael Hanselmann
    """Waits until the task queue is empty.
257 76094e37 Michael Hanselmann

258 76094e37 Michael Hanselmann
    """
259 76094e37 Michael Hanselmann
    self._lock.acquire()
260 76094e37 Michael Hanselmann
    try:
261 76094e37 Michael Hanselmann
      self._quiescing = True
262 76094e37 Michael Hanselmann
263 76094e37 Michael Hanselmann
      # Wait while there are tasks pending or running
264 76094e37 Michael Hanselmann
      while self._tasks or self._HasRunningTasksUnlocked():
265 53b1d12b Michael Hanselmann
        self._worker_to_pool.wait()
266 76094e37 Michael Hanselmann
267 76094e37 Michael Hanselmann
    finally:
268 76094e37 Michael Hanselmann
      self._quiescing = False
269 76094e37 Michael Hanselmann
270 76094e37 Michael Hanselmann
      # Make sure AddTasks continues in case it was waiting
271 53b1d12b Michael Hanselmann
      self._pool_to_pool.notifyAll()
272 76094e37 Michael Hanselmann
273 76094e37 Michael Hanselmann
      self._lock.release()
274 76094e37 Michael Hanselmann
275 76094e37 Michael Hanselmann
  def _NewWorkerIdUnlocked(self):
276 116db7c7 Iustin Pop
    """Return an identifier for a new worker.
277 116db7c7 Iustin Pop

278 116db7c7 Iustin Pop
    """
279 76094e37 Michael Hanselmann
    self._last_worker_id += 1
280 89e2b4d2 Michael Hanselmann
281 89e2b4d2 Michael Hanselmann
    return "%s%d" % (self._name, self._last_worker_id)
282 76094e37 Michael Hanselmann
283 76094e37 Michael Hanselmann
  def _ResizeUnlocked(self, num_workers):
284 76094e37 Michael Hanselmann
    """Changes the number of workers.
285 76094e37 Michael Hanselmann

286 76094e37 Michael Hanselmann
    """
287 76094e37 Michael Hanselmann
    assert num_workers >= 0, "num_workers must be >= 0"
288 76094e37 Michael Hanselmann
289 76094e37 Michael Hanselmann
    logging.debug("Resizing to %s workers", num_workers)
290 76094e37 Michael Hanselmann
291 76094e37 Michael Hanselmann
    current_count = len(self._workers)
292 76094e37 Michael Hanselmann
293 76094e37 Michael Hanselmann
    if current_count == num_workers:
294 76094e37 Michael Hanselmann
      # Nothing to do
295 76094e37 Michael Hanselmann
      pass
296 76094e37 Michael Hanselmann
297 76094e37 Michael Hanselmann
    elif current_count > num_workers:
298 76094e37 Michael Hanselmann
      if num_workers == 0:
299 76094e37 Michael Hanselmann
        # Create copy of list to iterate over while lock isn't held.
300 76094e37 Michael Hanselmann
        termworkers = self._workers[:]
301 76094e37 Michael Hanselmann
        del self._workers[:]
302 76094e37 Michael Hanselmann
      else:
303 76094e37 Michael Hanselmann
        # TODO: Implement partial downsizing
304 76094e37 Michael Hanselmann
        raise NotImplementedError()
305 76094e37 Michael Hanselmann
        #termworkers = ...
306 76094e37 Michael Hanselmann
307 76094e37 Michael Hanselmann
      self._termworkers += termworkers
308 76094e37 Michael Hanselmann
309 76094e37 Michael Hanselmann
      # Notify workers that something has changed
310 53b1d12b Michael Hanselmann
      self._pool_to_worker.notifyAll()
311 76094e37 Michael Hanselmann
312 76094e37 Michael Hanselmann
      # Join all terminating workers
313 76094e37 Michael Hanselmann
      self._lock.release()
314 76094e37 Michael Hanselmann
      try:
315 76094e37 Michael Hanselmann
        for worker in termworkers:
316 c0a8eb9e Michael Hanselmann
          logging.debug("Waiting for thread %s", worker.getName())
317 76094e37 Michael Hanselmann
          worker.join()
318 76094e37 Michael Hanselmann
      finally:
319 76094e37 Michael Hanselmann
        self._lock.acquire()
320 76094e37 Michael Hanselmann
321 76094e37 Michael Hanselmann
      # Remove terminated threads. This could be done in a more efficient way
322 76094e37 Michael Hanselmann
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
323 76094e37 Michael Hanselmann
      # don't leave zombie threads around.
324 76094e37 Michael Hanselmann
      for worker in termworkers:
325 76094e37 Michael Hanselmann
        assert worker in self._termworkers, ("Worker not in list of"
326 76094e37 Michael Hanselmann
                                             " terminating workers")
327 76094e37 Michael Hanselmann
        if not worker.isAlive():
328 76094e37 Michael Hanselmann
          self._termworkers.remove(worker)
329 76094e37 Michael Hanselmann
330 76094e37 Michael Hanselmann
      assert not self._termworkers, "Zombie worker detected"
331 76094e37 Michael Hanselmann
332 76094e37 Michael Hanselmann
    elif current_count < num_workers:
333 76094e37 Michael Hanselmann
      # Create (num_workers - current_count) new workers
334 f1501b3f Michael Hanselmann
      for _ in range(num_workers - current_count):
335 76094e37 Michael Hanselmann
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
336 76094e37 Michael Hanselmann
        self._workers.append(worker)
337 76094e37 Michael Hanselmann
        worker.start()
338 76094e37 Michael Hanselmann
339 76094e37 Michael Hanselmann
  def Resize(self, num_workers):
340 76094e37 Michael Hanselmann
    """Changes the number of workers in the pool.
341 76094e37 Michael Hanselmann

342 116db7c7 Iustin Pop
    @param num_workers: the new number of workers
343 76094e37 Michael Hanselmann

344 76094e37 Michael Hanselmann
    """
345 76094e37 Michael Hanselmann
    self._lock.acquire()
346 76094e37 Michael Hanselmann
    try:
347 76094e37 Michael Hanselmann
      return self._ResizeUnlocked(num_workers)
348 76094e37 Michael Hanselmann
    finally:
349 76094e37 Michael Hanselmann
      self._lock.release()
350 76094e37 Michael Hanselmann
351 76094e37 Michael Hanselmann
  def TerminateWorkers(self):
352 76094e37 Michael Hanselmann
    """Terminate all worker threads.
353 76094e37 Michael Hanselmann

354 76094e37 Michael Hanselmann
    Unstarted tasks will be ignored.
355 76094e37 Michael Hanselmann

356 76094e37 Michael Hanselmann
    """
357 76094e37 Michael Hanselmann
    logging.debug("Terminating all workers")
358 76094e37 Michael Hanselmann
359 76094e37 Michael Hanselmann
    self._lock.acquire()
360 76094e37 Michael Hanselmann
    try:
361 76094e37 Michael Hanselmann
      self._ResizeUnlocked(0)
362 76094e37 Michael Hanselmann
363 76094e37 Michael Hanselmann
      if self._tasks:
364 76094e37 Michael Hanselmann
        logging.debug("There are %s tasks left", len(self._tasks))
365 76094e37 Michael Hanselmann
    finally:
366 76094e37 Michael Hanselmann
      self._lock.release()
367 76094e37 Michael Hanselmann
368 76094e37 Michael Hanselmann
    logging.debug("All workers terminated")