Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ c4a2fee1

History | View | Annotate | Download (9 kB)

1 76094e37 Michael Hanselmann
#
2 76094e37 Michael Hanselmann
#
3 76094e37 Michael Hanselmann
4 76094e37 Michael Hanselmann
# Copyright (C) 2008 Google Inc.
5 76094e37 Michael Hanselmann
#
6 76094e37 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 76094e37 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 76094e37 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 76094e37 Michael Hanselmann
# (at your option) any later version.
10 76094e37 Michael Hanselmann
#
11 76094e37 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 76094e37 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 76094e37 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 76094e37 Michael Hanselmann
# General Public License for more details.
15 76094e37 Michael Hanselmann
#
16 76094e37 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 76094e37 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 76094e37 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 76094e37 Michael Hanselmann
# 02110-1301, USA.
20 76094e37 Michael Hanselmann
21 76094e37 Michael Hanselmann
22 76094e37 Michael Hanselmann
"""Base classes for worker pools.
23 76094e37 Michael Hanselmann

24 76094e37 Michael Hanselmann
"""
25 76094e37 Michael Hanselmann
26 76094e37 Michael Hanselmann
import collections
27 76094e37 Michael Hanselmann
import logging
28 76094e37 Michael Hanselmann
import threading
29 76094e37 Michael Hanselmann
30 76094e37 Michael Hanselmann
from ganeti import errors
31 76094e37 Michael Hanselmann
from ganeti import utils
32 76094e37 Michael Hanselmann
33 76094e37 Michael Hanselmann
34 76094e37 Michael Hanselmann
class BaseWorker(threading.Thread, object):
35 76094e37 Michael Hanselmann
  """Base worker class for worker pools.
36 76094e37 Michael Hanselmann

37 76094e37 Michael Hanselmann
  Users of a worker pool must override RunTask in a subclass.
38 76094e37 Michael Hanselmann

39 76094e37 Michael Hanselmann
  """
40 76094e37 Michael Hanselmann
  def __init__(self, pool, worker_id):
41 76094e37 Michael Hanselmann
    """Constructor for BaseWorker thread.
42 76094e37 Michael Hanselmann

43 76094e37 Michael Hanselmann
    Args:
44 76094e37 Michael Hanselmann
    - pool: Parent worker pool
45 76094e37 Michael Hanselmann
    - worker_id: Identifier for this worker
46 76094e37 Michael Hanselmann

47 76094e37 Michael Hanselmann
    """
48 76094e37 Michael Hanselmann
    super(BaseWorker, self).__init__()
49 76094e37 Michael Hanselmann
    self.pool = pool
50 76094e37 Michael Hanselmann
    self.worker_id = worker_id
51 76094e37 Michael Hanselmann
    self._current_task = None
52 76094e37 Michael Hanselmann
53 76094e37 Michael Hanselmann
  def ShouldTerminate(self):
54 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
55 76094e37 Michael Hanselmann

56 76094e37 Michael Hanselmann
    """
57 76094e37 Michael Hanselmann
    return self.pool.ShouldWorkerTerminate(self)
58 76094e37 Michael Hanselmann
59 b3558df1 Michael Hanselmann
  def _HasRunningTaskUnlocked(self):
60 b3558df1 Michael Hanselmann
    """Returns whether this worker is currently running a task.
61 b3558df1 Michael Hanselmann

62 b3558df1 Michael Hanselmann
    """
63 b3558df1 Michael Hanselmann
    return (self._current_task is not None)
64 b3558df1 Michael Hanselmann
65 b3558df1 Michael Hanselmann
  def HasRunningTask(self):
66 b3558df1 Michael Hanselmann
    """Returns whether this worker is currently running a task.
67 b3558df1 Michael Hanselmann

68 b3558df1 Michael Hanselmann
    """
69 b3558df1 Michael Hanselmann
    self.pool._lock.acquire()
70 b3558df1 Michael Hanselmann
    try:
71 b3558df1 Michael Hanselmann
      return self._HasRunningTaskUnlocked()
72 b3558df1 Michael Hanselmann
    finally:
73 b3558df1 Michael Hanselmann
      self.pool._lock.release()
74 b3558df1 Michael Hanselmann
75 76094e37 Michael Hanselmann
  def run(self):
76 76094e37 Michael Hanselmann
    """Main thread function.
77 76094e37 Michael Hanselmann

78 76094e37 Michael Hanselmann
    Waits for new tasks to show up in the queue.
79 76094e37 Michael Hanselmann

80 76094e37 Michael Hanselmann
    """
81 76094e37 Michael Hanselmann
    pool = self.pool
82 76094e37 Michael Hanselmann
83 b3558df1 Michael Hanselmann
    assert not self.HasRunningTask()
84 76094e37 Michael Hanselmann
85 76094e37 Michael Hanselmann
    while True:
86 76094e37 Michael Hanselmann
      try:
87 76094e37 Michael Hanselmann
        # We wait on lock to be told either terminate or do a task.
88 76094e37 Michael Hanselmann
        pool._lock.acquire()
89 76094e37 Michael Hanselmann
        try:
90 76094e37 Michael Hanselmann
          if pool._ShouldWorkerTerminateUnlocked(self):
91 76094e37 Michael Hanselmann
            break
92 76094e37 Michael Hanselmann
93 76094e37 Michael Hanselmann
          # We only wait if there's no task for us.
94 76094e37 Michael Hanselmann
          if not pool._tasks:
95 b3558df1 Michael Hanselmann
            logging.debug("Worker %s: waiting for tasks", self.worker_id)
96 b3558df1 Michael Hanselmann
97 76094e37 Michael Hanselmann
            # wait() releases the lock and sleeps until notified
98 53b1d12b Michael Hanselmann
            pool._pool_to_worker.wait()
99 76094e37 Michael Hanselmann
100 b3558df1 Michael Hanselmann
            logging.debug("Worker %s: notified while waiting", self.worker_id)
101 b3558df1 Michael Hanselmann
102 76094e37 Michael Hanselmann
            # Were we woken up in order to terminate?
103 76094e37 Michael Hanselmann
            if pool._ShouldWorkerTerminateUnlocked(self):
104 76094e37 Michael Hanselmann
              break
105 76094e37 Michael Hanselmann
106 76094e37 Michael Hanselmann
            if not pool._tasks:
107 76094e37 Michael Hanselmann
              # Spurious notification, ignore
108 76094e37 Michael Hanselmann
              continue
109 76094e37 Michael Hanselmann
110 76094e37 Michael Hanselmann
          # Get task from queue and tell pool about it
111 76094e37 Michael Hanselmann
          try:
112 76094e37 Michael Hanselmann
            self._current_task = pool._tasks.popleft()
113 76094e37 Michael Hanselmann
          finally:
114 53b1d12b Michael Hanselmann
            pool._worker_to_pool.notifyAll()
115 76094e37 Michael Hanselmann
        finally:
116 76094e37 Michael Hanselmann
          pool._lock.release()
117 76094e37 Michael Hanselmann
118 76094e37 Michael Hanselmann
        # Run the actual task
119 76094e37 Michael Hanselmann
        try:
120 b3558df1 Michael Hanselmann
          logging.debug("Worker %s: starting task %r",
121 b3558df1 Michael Hanselmann
                        self.worker_id, self._current_task)
122 76094e37 Michael Hanselmann
          self.RunTask(*self._current_task)
123 b3558df1 Michael Hanselmann
          logging.debug("Worker %s: done with task %r",
124 b3558df1 Michael Hanselmann
                        self.worker_id, self._current_task)
125 76094e37 Michael Hanselmann
        except:
126 76094e37 Michael Hanselmann
          logging.error("Worker %s: Caught unhandled exception",
127 76094e37 Michael Hanselmann
                        self.worker_id, exc_info=True)
128 76094e37 Michael Hanselmann
      finally:
129 76094e37 Michael Hanselmann
        # Notify pool
130 76094e37 Michael Hanselmann
        pool._lock.acquire()
131 76094e37 Michael Hanselmann
        try:
132 b3558df1 Michael Hanselmann
          if self._current_task:
133 b3558df1 Michael Hanselmann
            self._current_task = None
134 53b1d12b Michael Hanselmann
            pool._worker_to_pool.notifyAll()
135 76094e37 Michael Hanselmann
        finally:
136 76094e37 Michael Hanselmann
          pool._lock.release()
137 76094e37 Michael Hanselmann
138 b3558df1 Michael Hanselmann
    logging.debug("Worker %s: terminates", self.worker_id)
139 b3558df1 Michael Hanselmann
140 76094e37 Michael Hanselmann
  def RunTask(self, *args):
141 76094e37 Michael Hanselmann
    """Function called to start a task.
142 76094e37 Michael Hanselmann

143 76094e37 Michael Hanselmann
    """
144 76094e37 Michael Hanselmann
    raise NotImplementedError()
145 76094e37 Michael Hanselmann
146 76094e37 Michael Hanselmann
147 76094e37 Michael Hanselmann
class WorkerPool(object):
148 76094e37 Michael Hanselmann
  """Worker pool with a queue.
149 76094e37 Michael Hanselmann

150 76094e37 Michael Hanselmann
  This class is thread-safe.
151 76094e37 Michael Hanselmann

152 76094e37 Michael Hanselmann
  Tasks are guaranteed to be started in the order in which they're added to the
153 76094e37 Michael Hanselmann
  pool. Due to the nature of threading, they're not guaranteed to finish in the
154 76094e37 Michael Hanselmann
  same order.
155 76094e37 Michael Hanselmann

156 76094e37 Michael Hanselmann
  """
157 76094e37 Michael Hanselmann
  def __init__(self, num_workers, worker_class):
158 76094e37 Michael Hanselmann
    """Constructor for worker pool.
159 76094e37 Michael Hanselmann

160 76094e37 Michael Hanselmann
    Args:
161 76094e37 Michael Hanselmann
    - num_workers: Number of workers to be started (dynamic resizing is not
162 76094e37 Michael Hanselmann
                   yet implemented)
163 76094e37 Michael Hanselmann
    - worker_class: Class to be instantiated for workers; should derive from
164 76094e37 Michael Hanselmann
                    BaseWorker
165 76094e37 Michael Hanselmann

166 76094e37 Michael Hanselmann
    """
167 76094e37 Michael Hanselmann
    # Some of these variables are accessed by BaseWorker
168 53b1d12b Michael Hanselmann
    self._lock = threading.Lock()
169 53b1d12b Michael Hanselmann
    self._pool_to_pool = threading.Condition(self._lock)
170 53b1d12b Michael Hanselmann
    self._pool_to_worker = threading.Condition(self._lock)
171 53b1d12b Michael Hanselmann
    self._worker_to_pool = threading.Condition(self._lock)
172 76094e37 Michael Hanselmann
    self._worker_class = worker_class
173 76094e37 Michael Hanselmann
    self._last_worker_id = 0
174 76094e37 Michael Hanselmann
    self._workers = []
175 76094e37 Michael Hanselmann
    self._quiescing = False
176 76094e37 Michael Hanselmann
177 76094e37 Michael Hanselmann
    # Terminating workers
178 76094e37 Michael Hanselmann
    self._termworkers = []
179 76094e37 Michael Hanselmann
180 76094e37 Michael Hanselmann
    # Queued tasks
181 76094e37 Michael Hanselmann
    self._tasks = collections.deque()
182 76094e37 Michael Hanselmann
183 76094e37 Michael Hanselmann
    # Start workers
184 76094e37 Michael Hanselmann
    self.Resize(num_workers)
185 76094e37 Michael Hanselmann
186 76094e37 Michael Hanselmann
  # TODO: Implement dynamic resizing?
187 76094e37 Michael Hanselmann
188 76094e37 Michael Hanselmann
  def AddTask(self, *args):
189 76094e37 Michael Hanselmann
    """Adds a task to the queue.
190 76094e37 Michael Hanselmann

191 76094e37 Michael Hanselmann
    Args:
192 76094e37 Michael Hanselmann
    - *args: Arguments passed to BaseWorker.RunTask
193 76094e37 Michael Hanselmann

194 76094e37 Michael Hanselmann
    """
195 76094e37 Michael Hanselmann
    self._lock.acquire()
196 76094e37 Michael Hanselmann
    try:
197 76094e37 Michael Hanselmann
      # Don't add new tasks while we're quiescing
198 76094e37 Michael Hanselmann
      while self._quiescing:
199 53b1d12b Michael Hanselmann
        self._pool_to_pool.wait()
200 76094e37 Michael Hanselmann
201 76094e37 Michael Hanselmann
      # Add task to internal queue
202 76094e37 Michael Hanselmann
      self._tasks.append(args)
203 53b1d12b Michael Hanselmann
204 53b1d12b Michael Hanselmann
      # Wake one idling worker up
205 53b1d12b Michael Hanselmann
      self._pool_to_worker.notify()
206 76094e37 Michael Hanselmann
    finally:
207 76094e37 Michael Hanselmann
      self._lock.release()
208 76094e37 Michael Hanselmann
209 76094e37 Michael Hanselmann
  def _ShouldWorkerTerminateUnlocked(self, worker):
210 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
211 76094e37 Michael Hanselmann

212 76094e37 Michael Hanselmann
    """
213 76094e37 Michael Hanselmann
    return (worker in self._termworkers)
214 76094e37 Michael Hanselmann
215 76094e37 Michael Hanselmann
  def ShouldWorkerTerminate(self, worker):
216 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
217 76094e37 Michael Hanselmann

218 76094e37 Michael Hanselmann
    """
219 76094e37 Michael Hanselmann
    self._lock.acquire()
220 76094e37 Michael Hanselmann
    try:
221 76094e37 Michael Hanselmann
      return self._ShouldWorkerTerminateUnlocked(self)
222 76094e37 Michael Hanselmann
    finally:
223 76094e37 Michael Hanselmann
      self._lock.release()
224 76094e37 Michael Hanselmann
225 76094e37 Michael Hanselmann
  def _HasRunningTasksUnlocked(self):
226 76094e37 Michael Hanselmann
    """Checks whether there's a task running in a worker.
227 76094e37 Michael Hanselmann

228 76094e37 Michael Hanselmann
    """
229 76094e37 Michael Hanselmann
    for worker in self._workers + self._termworkers:
230 b3558df1 Michael Hanselmann
      if worker._HasRunningTaskUnlocked():
231 76094e37 Michael Hanselmann
        return True
232 76094e37 Michael Hanselmann
    return False
233 76094e37 Michael Hanselmann
234 76094e37 Michael Hanselmann
  def Quiesce(self):
235 76094e37 Michael Hanselmann
    """Waits until the task queue is empty.
236 76094e37 Michael Hanselmann

237 76094e37 Michael Hanselmann
    """
238 76094e37 Michael Hanselmann
    self._lock.acquire()
239 76094e37 Michael Hanselmann
    try:
240 76094e37 Michael Hanselmann
      self._quiescing = True
241 76094e37 Michael Hanselmann
242 76094e37 Michael Hanselmann
      # Wait while there are tasks pending or running
243 76094e37 Michael Hanselmann
      while self._tasks or self._HasRunningTasksUnlocked():
244 53b1d12b Michael Hanselmann
        self._worker_to_pool.wait()
245 76094e37 Michael Hanselmann
246 76094e37 Michael Hanselmann
    finally:
247 76094e37 Michael Hanselmann
      self._quiescing = False
248 76094e37 Michael Hanselmann
249 76094e37 Michael Hanselmann
      # Make sure AddTasks continues in case it was waiting
250 53b1d12b Michael Hanselmann
      self._pool_to_pool.notifyAll()
251 76094e37 Michael Hanselmann
252 76094e37 Michael Hanselmann
      self._lock.release()
253 76094e37 Michael Hanselmann
254 76094e37 Michael Hanselmann
  def _NewWorkerIdUnlocked(self):
255 76094e37 Michael Hanselmann
    self._last_worker_id += 1
256 76094e37 Michael Hanselmann
    return self._last_worker_id
257 76094e37 Michael Hanselmann
258 76094e37 Michael Hanselmann
  def _ResizeUnlocked(self, num_workers):
259 76094e37 Michael Hanselmann
    """Changes the number of workers.
260 76094e37 Michael Hanselmann

261 76094e37 Michael Hanselmann
    """
262 76094e37 Michael Hanselmann
    assert num_workers >= 0, "num_workers must be >= 0"
263 76094e37 Michael Hanselmann
264 76094e37 Michael Hanselmann
    logging.debug("Resizing to %s workers", num_workers)
265 76094e37 Michael Hanselmann
266 76094e37 Michael Hanselmann
    current_count = len(self._workers)
267 76094e37 Michael Hanselmann
268 76094e37 Michael Hanselmann
    if current_count == num_workers:
269 76094e37 Michael Hanselmann
      # Nothing to do
270 76094e37 Michael Hanselmann
      pass
271 76094e37 Michael Hanselmann
272 76094e37 Michael Hanselmann
    elif current_count > num_workers:
273 76094e37 Michael Hanselmann
      if num_workers == 0:
274 76094e37 Michael Hanselmann
        # Create copy of list to iterate over while lock isn't held.
275 76094e37 Michael Hanselmann
        termworkers = self._workers[:]
276 76094e37 Michael Hanselmann
        del self._workers[:]
277 76094e37 Michael Hanselmann
      else:
278 76094e37 Michael Hanselmann
        # TODO: Implement partial downsizing
279 76094e37 Michael Hanselmann
        raise NotImplementedError()
280 76094e37 Michael Hanselmann
        #termworkers = ...
281 76094e37 Michael Hanselmann
282 76094e37 Michael Hanselmann
      self._termworkers += termworkers
283 76094e37 Michael Hanselmann
284 76094e37 Michael Hanselmann
      # Notify workers that something has changed
285 53b1d12b Michael Hanselmann
      self._pool_to_worker.notifyAll()
286 76094e37 Michael Hanselmann
287 76094e37 Michael Hanselmann
      # Join all terminating workers
288 76094e37 Michael Hanselmann
      self._lock.release()
289 76094e37 Michael Hanselmann
      try:
290 76094e37 Michael Hanselmann
        for worker in termworkers:
291 c0a8eb9e Michael Hanselmann
          logging.debug("Waiting for thread %s", worker.getName())
292 76094e37 Michael Hanselmann
          worker.join()
293 76094e37 Michael Hanselmann
      finally:
294 76094e37 Michael Hanselmann
        self._lock.acquire()
295 76094e37 Michael Hanselmann
296 76094e37 Michael Hanselmann
      # Remove terminated threads. This could be done in a more efficient way
297 76094e37 Michael Hanselmann
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
298 76094e37 Michael Hanselmann
      # don't leave zombie threads around.
299 76094e37 Michael Hanselmann
      for worker in termworkers:
300 76094e37 Michael Hanselmann
        assert worker in self._termworkers, ("Worker not in list of"
301 76094e37 Michael Hanselmann
                                             " terminating workers")
302 76094e37 Michael Hanselmann
        if not worker.isAlive():
303 76094e37 Michael Hanselmann
          self._termworkers.remove(worker)
304 76094e37 Michael Hanselmann
305 76094e37 Michael Hanselmann
      assert not self._termworkers, "Zombie worker detected"
306 76094e37 Michael Hanselmann
307 76094e37 Michael Hanselmann
    elif current_count < num_workers:
308 76094e37 Michael Hanselmann
      # Create (num_workers - current_count) new workers
309 76094e37 Michael Hanselmann
      for i in xrange(num_workers - current_count):
310 76094e37 Michael Hanselmann
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
311 76094e37 Michael Hanselmann
        self._workers.append(worker)
312 76094e37 Michael Hanselmann
        worker.start()
313 76094e37 Michael Hanselmann
314 76094e37 Michael Hanselmann
  def Resize(self, num_workers):
315 76094e37 Michael Hanselmann
    """Changes the number of workers in the pool.
316 76094e37 Michael Hanselmann

317 76094e37 Michael Hanselmann
    Args:
318 76094e37 Michael Hanselmann
    - num_workers: New number of workers
319 76094e37 Michael Hanselmann

320 76094e37 Michael Hanselmann
    """
321 76094e37 Michael Hanselmann
    self._lock.acquire()
322 76094e37 Michael Hanselmann
    try:
323 76094e37 Michael Hanselmann
      return self._ResizeUnlocked(num_workers)
324 76094e37 Michael Hanselmann
    finally:
325 76094e37 Michael Hanselmann
      self._lock.release()
326 76094e37 Michael Hanselmann
327 76094e37 Michael Hanselmann
  def TerminateWorkers(self):
328 76094e37 Michael Hanselmann
    """Terminate all worker threads.
329 76094e37 Michael Hanselmann

330 76094e37 Michael Hanselmann
    Unstarted tasks will be ignored.
331 76094e37 Michael Hanselmann

332 76094e37 Michael Hanselmann
    """
333 76094e37 Michael Hanselmann
    logging.debug("Terminating all workers")
334 76094e37 Michael Hanselmann
335 76094e37 Michael Hanselmann
    self._lock.acquire()
336 76094e37 Michael Hanselmann
    try:
337 76094e37 Michael Hanselmann
      self._ResizeUnlocked(0)
338 76094e37 Michael Hanselmann
339 76094e37 Michael Hanselmann
      if self._tasks:
340 76094e37 Michael Hanselmann
        logging.debug("There are %s tasks left", len(self._tasks))
341 76094e37 Michael Hanselmann
    finally:
342 76094e37 Michael Hanselmann
      self._lock.release()
343 76094e37 Michael Hanselmann
344 76094e37 Michael Hanselmann
    logging.debug("All workers terminated")