Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ 42a999d1

History | View | Annotate | Download (7.9 kB)

1 76094e37 Michael Hanselmann
#
2 76094e37 Michael Hanselmann
#
3 76094e37 Michael Hanselmann
4 76094e37 Michael Hanselmann
# Copyright (C) 2008 Google Inc.
5 76094e37 Michael Hanselmann
#
6 76094e37 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 76094e37 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 76094e37 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 76094e37 Michael Hanselmann
# (at your option) any later version.
10 76094e37 Michael Hanselmann
#
11 76094e37 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 76094e37 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 76094e37 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 76094e37 Michael Hanselmann
# General Public License for more details.
15 76094e37 Michael Hanselmann
#
16 76094e37 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 76094e37 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 76094e37 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 76094e37 Michael Hanselmann
# 02110-1301, USA.
20 76094e37 Michael Hanselmann
21 76094e37 Michael Hanselmann
22 76094e37 Michael Hanselmann
"""Base classes for worker pools.
23 76094e37 Michael Hanselmann

24 76094e37 Michael Hanselmann
"""
25 76094e37 Michael Hanselmann
26 76094e37 Michael Hanselmann
import collections
27 76094e37 Michael Hanselmann
import logging
28 76094e37 Michael Hanselmann
import threading
29 76094e37 Michael Hanselmann
30 76094e37 Michael Hanselmann
from ganeti import errors
31 76094e37 Michael Hanselmann
from ganeti import utils
32 76094e37 Michael Hanselmann
33 76094e37 Michael Hanselmann
34 76094e37 Michael Hanselmann
class BaseWorker(threading.Thread, object):
35 76094e37 Michael Hanselmann
  """Base worker class for worker pools.
36 76094e37 Michael Hanselmann

37 76094e37 Michael Hanselmann
  Users of a worker pool must override RunTask in a subclass.
38 76094e37 Michael Hanselmann

39 76094e37 Michael Hanselmann
  """
40 76094e37 Michael Hanselmann
  def __init__(self, pool, worker_id):
41 76094e37 Michael Hanselmann
    """Constructor for BaseWorker thread.
42 76094e37 Michael Hanselmann

43 76094e37 Michael Hanselmann
    Args:
44 76094e37 Michael Hanselmann
    - pool: Parent worker pool
45 76094e37 Michael Hanselmann
    - worker_id: Identifier for this worker
46 76094e37 Michael Hanselmann

47 76094e37 Michael Hanselmann
    """
48 76094e37 Michael Hanselmann
    super(BaseWorker, self).__init__()
49 76094e37 Michael Hanselmann
    self.pool = pool
50 76094e37 Michael Hanselmann
    self.worker_id = worker_id
51 76094e37 Michael Hanselmann
52 76094e37 Michael Hanselmann
    # Also used by WorkerPool
53 76094e37 Michael Hanselmann
    self._current_task = None
54 76094e37 Michael Hanselmann
55 76094e37 Michael Hanselmann
  def ShouldTerminate(self):
56 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
57 76094e37 Michael Hanselmann

58 76094e37 Michael Hanselmann
    """
59 76094e37 Michael Hanselmann
    return self.pool.ShouldWorkerTerminate(self)
60 76094e37 Michael Hanselmann
61 76094e37 Michael Hanselmann
  def run(self):
62 76094e37 Michael Hanselmann
    """Main thread function.
63 76094e37 Michael Hanselmann

64 76094e37 Michael Hanselmann
    Waits for new tasks to show up in the queue.
65 76094e37 Michael Hanselmann

66 76094e37 Michael Hanselmann
    """
67 76094e37 Michael Hanselmann
    pool = self.pool
68 76094e37 Michael Hanselmann
69 76094e37 Michael Hanselmann
    assert self._current_task is None
70 76094e37 Michael Hanselmann
71 76094e37 Michael Hanselmann
    while True:
72 76094e37 Michael Hanselmann
      try:
73 76094e37 Michael Hanselmann
        # We wait on lock to be told either terminate or do a task.
74 76094e37 Michael Hanselmann
        pool._lock.acquire()
75 76094e37 Michael Hanselmann
        try:
76 76094e37 Michael Hanselmann
          if pool._ShouldWorkerTerminateUnlocked(self):
77 76094e37 Michael Hanselmann
            break
78 76094e37 Michael Hanselmann
79 76094e37 Michael Hanselmann
          # We only wait if there's no task for us.
80 76094e37 Michael Hanselmann
          if not pool._tasks:
81 76094e37 Michael Hanselmann
            # wait() releases the lock and sleeps until notified
82 76094e37 Michael Hanselmann
            pool._lock.wait()
83 76094e37 Michael Hanselmann
84 76094e37 Michael Hanselmann
            # Were we woken up in order to terminate?
85 76094e37 Michael Hanselmann
            if pool._ShouldWorkerTerminateUnlocked(self):
86 76094e37 Michael Hanselmann
              break
87 76094e37 Michael Hanselmann
88 76094e37 Michael Hanselmann
            if not pool._tasks:
89 76094e37 Michael Hanselmann
              # Spurious notification, ignore
90 76094e37 Michael Hanselmann
              continue
91 76094e37 Michael Hanselmann
92 76094e37 Michael Hanselmann
          # Get task from queue and tell pool about it
93 76094e37 Michael Hanselmann
          try:
94 76094e37 Michael Hanselmann
            self._current_task = pool._tasks.popleft()
95 76094e37 Michael Hanselmann
          finally:
96 76094e37 Michael Hanselmann
            pool._lock.notifyAll()
97 76094e37 Michael Hanselmann
        finally:
98 76094e37 Michael Hanselmann
          pool._lock.release()
99 76094e37 Michael Hanselmann
100 76094e37 Michael Hanselmann
        # Run the actual task
101 76094e37 Michael Hanselmann
        try:
102 76094e37 Michael Hanselmann
          self.RunTask(*self._current_task)
103 76094e37 Michael Hanselmann
        except:
104 76094e37 Michael Hanselmann
          logging.error("Worker %s: Caught unhandled exception",
105 76094e37 Michael Hanselmann
                        self.worker_id, exc_info=True)
106 76094e37 Michael Hanselmann
      finally:
107 76094e37 Michael Hanselmann
        self._current_task = None
108 76094e37 Michael Hanselmann
109 76094e37 Michael Hanselmann
        # Notify pool
110 76094e37 Michael Hanselmann
        pool._lock.acquire()
111 76094e37 Michael Hanselmann
        try:
112 76094e37 Michael Hanselmann
          pool._lock.notifyAll()
113 76094e37 Michael Hanselmann
        finally:
114 76094e37 Michael Hanselmann
          pool._lock.release()
115 76094e37 Michael Hanselmann
116 76094e37 Michael Hanselmann
  def RunTask(self, *args):
117 76094e37 Michael Hanselmann
    """Function called to start a task.
118 76094e37 Michael Hanselmann

119 76094e37 Michael Hanselmann
    """
120 76094e37 Michael Hanselmann
    raise NotImplementedError()
121 76094e37 Michael Hanselmann
122 76094e37 Michael Hanselmann
123 76094e37 Michael Hanselmann
class WorkerPool(object):
124 76094e37 Michael Hanselmann
  """Worker pool with a queue.
125 76094e37 Michael Hanselmann

126 76094e37 Michael Hanselmann
  This class is thread-safe.
127 76094e37 Michael Hanselmann

128 76094e37 Michael Hanselmann
  Tasks are guaranteed to be started in the order in which they're added to the
129 76094e37 Michael Hanselmann
  pool. Due to the nature of threading, they're not guaranteed to finish in the
130 76094e37 Michael Hanselmann
  same order.
131 76094e37 Michael Hanselmann

132 76094e37 Michael Hanselmann
  """
133 76094e37 Michael Hanselmann
  def __init__(self, num_workers, worker_class):
134 76094e37 Michael Hanselmann
    """Constructor for worker pool.
135 76094e37 Michael Hanselmann

136 76094e37 Michael Hanselmann
    Args:
137 76094e37 Michael Hanselmann
    - num_workers: Number of workers to be started (dynamic resizing is not
138 76094e37 Michael Hanselmann
                   yet implemented)
139 76094e37 Michael Hanselmann
    - worker_class: Class to be instantiated for workers; should derive from
140 76094e37 Michael Hanselmann
                    BaseWorker
141 76094e37 Michael Hanselmann

142 76094e37 Michael Hanselmann
    """
143 76094e37 Michael Hanselmann
    # Some of these variables are accessed by BaseWorker
144 76094e37 Michael Hanselmann
    self._lock = threading.Condition(threading.Lock())
145 76094e37 Michael Hanselmann
    self._worker_class = worker_class
146 76094e37 Michael Hanselmann
    self._last_worker_id = 0
147 76094e37 Michael Hanselmann
    self._workers = []
148 76094e37 Michael Hanselmann
    self._quiescing = False
149 76094e37 Michael Hanselmann
150 76094e37 Michael Hanselmann
    # Terminating workers
151 76094e37 Michael Hanselmann
    self._termworkers = []
152 76094e37 Michael Hanselmann
153 76094e37 Michael Hanselmann
    # Queued tasks
154 76094e37 Michael Hanselmann
    self._tasks = collections.deque()
155 76094e37 Michael Hanselmann
156 76094e37 Michael Hanselmann
    # Start workers
157 76094e37 Michael Hanselmann
    self.Resize(num_workers)
158 76094e37 Michael Hanselmann
159 76094e37 Michael Hanselmann
  # TODO: Implement dynamic resizing?
160 76094e37 Michael Hanselmann
161 76094e37 Michael Hanselmann
  def AddTask(self, *args):
162 76094e37 Michael Hanselmann
    """Adds a task to the queue.
163 76094e37 Michael Hanselmann

164 76094e37 Michael Hanselmann
    Args:
165 76094e37 Michael Hanselmann
    - *args: Arguments passed to BaseWorker.RunTask
166 76094e37 Michael Hanselmann

167 76094e37 Michael Hanselmann
    """
168 76094e37 Michael Hanselmann
    self._lock.acquire()
169 76094e37 Michael Hanselmann
    try:
170 76094e37 Michael Hanselmann
      # Don't add new tasks while we're quiescing
171 76094e37 Michael Hanselmann
      while self._quiescing:
172 76094e37 Michael Hanselmann
        self._lock.wait()
173 76094e37 Michael Hanselmann
174 76094e37 Michael Hanselmann
      # Add task to internal queue
175 76094e37 Michael Hanselmann
      self._tasks.append(args)
176 76094e37 Michael Hanselmann
      self._lock.notify()
177 76094e37 Michael Hanselmann
    finally:
178 76094e37 Michael Hanselmann
      self._lock.release()
179 76094e37 Michael Hanselmann
180 76094e37 Michael Hanselmann
  def _ShouldWorkerTerminateUnlocked(self, worker):
181 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
182 76094e37 Michael Hanselmann

183 76094e37 Michael Hanselmann
    """
184 76094e37 Michael Hanselmann
    return (worker in self._termworkers)
185 76094e37 Michael Hanselmann
186 76094e37 Michael Hanselmann
  def ShouldWorkerTerminate(self, worker):
187 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
188 76094e37 Michael Hanselmann

189 76094e37 Michael Hanselmann
    """
190 76094e37 Michael Hanselmann
    self._lock.acquire()
191 76094e37 Michael Hanselmann
    try:
192 76094e37 Michael Hanselmann
      return self._ShouldWorkerTerminateUnlocked(self)
193 76094e37 Michael Hanselmann
    finally:
194 76094e37 Michael Hanselmann
      self._lock.release()
195 76094e37 Michael Hanselmann
196 76094e37 Michael Hanselmann
  def _HasRunningTasksUnlocked(self):
197 76094e37 Michael Hanselmann
    """Checks whether there's a task running in a worker.
198 76094e37 Michael Hanselmann

199 76094e37 Michael Hanselmann
    """
200 76094e37 Michael Hanselmann
    for worker in self._workers + self._termworkers:
201 76094e37 Michael Hanselmann
      if worker._current_task is not None:
202 76094e37 Michael Hanselmann
        return True
203 76094e37 Michael Hanselmann
    return False
204 76094e37 Michael Hanselmann
205 76094e37 Michael Hanselmann
  def Quiesce(self):
206 76094e37 Michael Hanselmann
    """Waits until the task queue is empty.
207 76094e37 Michael Hanselmann

208 76094e37 Michael Hanselmann
    """
209 76094e37 Michael Hanselmann
    self._lock.acquire()
210 76094e37 Michael Hanselmann
    try:
211 76094e37 Michael Hanselmann
      self._quiescing = True
212 76094e37 Michael Hanselmann
213 76094e37 Michael Hanselmann
      # Wait while there are tasks pending or running
214 76094e37 Michael Hanselmann
      while self._tasks or self._HasRunningTasksUnlocked():
215 76094e37 Michael Hanselmann
        self._lock.wait()
216 76094e37 Michael Hanselmann
217 76094e37 Michael Hanselmann
    finally:
218 76094e37 Michael Hanselmann
      self._quiescing = False
219 76094e37 Michael Hanselmann
220 76094e37 Michael Hanselmann
      # Make sure AddTasks continues in case it was waiting
221 76094e37 Michael Hanselmann
      self._lock.notifyAll()
222 76094e37 Michael Hanselmann
223 76094e37 Michael Hanselmann
      self._lock.release()
224 76094e37 Michael Hanselmann
225 76094e37 Michael Hanselmann
  def _NewWorkerIdUnlocked(self):
226 76094e37 Michael Hanselmann
    self._last_worker_id += 1
227 76094e37 Michael Hanselmann
    return self._last_worker_id
228 76094e37 Michael Hanselmann
229 76094e37 Michael Hanselmann
  def _ResizeUnlocked(self, num_workers):
230 76094e37 Michael Hanselmann
    """Changes the number of workers.
231 76094e37 Michael Hanselmann

232 76094e37 Michael Hanselmann
    """
233 76094e37 Michael Hanselmann
    assert num_workers >= 0, "num_workers must be >= 0"
234 76094e37 Michael Hanselmann
235 76094e37 Michael Hanselmann
    logging.debug("Resizing to %s workers", num_workers)
236 76094e37 Michael Hanselmann
237 76094e37 Michael Hanselmann
    current_count = len(self._workers)
238 76094e37 Michael Hanselmann
239 76094e37 Michael Hanselmann
    if current_count == num_workers:
240 76094e37 Michael Hanselmann
      # Nothing to do
241 76094e37 Michael Hanselmann
      pass
242 76094e37 Michael Hanselmann
243 76094e37 Michael Hanselmann
    elif current_count > num_workers:
244 76094e37 Michael Hanselmann
      if num_workers == 0:
245 76094e37 Michael Hanselmann
        # Create copy of list to iterate over while lock isn't held.
246 76094e37 Michael Hanselmann
        termworkers = self._workers[:]
247 76094e37 Michael Hanselmann
        del self._workers[:]
248 76094e37 Michael Hanselmann
      else:
249 76094e37 Michael Hanselmann
        # TODO: Implement partial downsizing
250 76094e37 Michael Hanselmann
        raise NotImplementedError()
251 76094e37 Michael Hanselmann
        #termworkers = ...
252 76094e37 Michael Hanselmann
253 76094e37 Michael Hanselmann
      self._termworkers += termworkers
254 76094e37 Michael Hanselmann
255 76094e37 Michael Hanselmann
      # Notify workers that something has changed
256 76094e37 Michael Hanselmann
      self._lock.notifyAll()
257 76094e37 Michael Hanselmann
258 76094e37 Michael Hanselmann
      # Join all terminating workers
259 76094e37 Michael Hanselmann
      self._lock.release()
260 76094e37 Michael Hanselmann
      try:
261 76094e37 Michael Hanselmann
        for worker in termworkers:
262 76094e37 Michael Hanselmann
          worker.join()
263 76094e37 Michael Hanselmann
      finally:
264 76094e37 Michael Hanselmann
        self._lock.acquire()
265 76094e37 Michael Hanselmann
266 76094e37 Michael Hanselmann
      # Remove terminated threads. This could be done in a more efficient way
267 76094e37 Michael Hanselmann
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
268 76094e37 Michael Hanselmann
      # don't leave zombie threads around.
269 76094e37 Michael Hanselmann
      for worker in termworkers:
270 76094e37 Michael Hanselmann
        assert worker in self._termworkers, ("Worker not in list of"
271 76094e37 Michael Hanselmann
                                             " terminating workers")
272 76094e37 Michael Hanselmann
        if not worker.isAlive():
273 76094e37 Michael Hanselmann
          self._termworkers.remove(worker)
274 76094e37 Michael Hanselmann
275 76094e37 Michael Hanselmann
      assert not self._termworkers, "Zombie worker detected"
276 76094e37 Michael Hanselmann
277 76094e37 Michael Hanselmann
    elif current_count < num_workers:
278 76094e37 Michael Hanselmann
      # Create (num_workers - current_count) new workers
279 76094e37 Michael Hanselmann
      for i in xrange(num_workers - current_count):
280 76094e37 Michael Hanselmann
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
281 76094e37 Michael Hanselmann
        self._workers.append(worker)
282 76094e37 Michael Hanselmann
        worker.start()
283 76094e37 Michael Hanselmann
284 76094e37 Michael Hanselmann
  def Resize(self, num_workers):
285 76094e37 Michael Hanselmann
    """Changes the number of workers in the pool.
286 76094e37 Michael Hanselmann

287 76094e37 Michael Hanselmann
    Args:
288 76094e37 Michael Hanselmann
    - num_workers: New number of workers
289 76094e37 Michael Hanselmann

290 76094e37 Michael Hanselmann
    """
291 76094e37 Michael Hanselmann
    self._lock.acquire()
292 76094e37 Michael Hanselmann
    try:
293 76094e37 Michael Hanselmann
      return self._ResizeUnlocked(num_workers)
294 76094e37 Michael Hanselmann
    finally:
295 76094e37 Michael Hanselmann
      self._lock.release()
296 76094e37 Michael Hanselmann
297 76094e37 Michael Hanselmann
  def TerminateWorkers(self):
298 76094e37 Michael Hanselmann
    """Terminate all worker threads.
299 76094e37 Michael Hanselmann

300 76094e37 Michael Hanselmann
    Unstarted tasks will be ignored.
301 76094e37 Michael Hanselmann

302 76094e37 Michael Hanselmann
    """
303 76094e37 Michael Hanselmann
    logging.debug("Terminating all workers")
304 76094e37 Michael Hanselmann
305 76094e37 Michael Hanselmann
    self._lock.acquire()
306 76094e37 Michael Hanselmann
    try:
307 76094e37 Michael Hanselmann
      self._ResizeUnlocked(0)
308 76094e37 Michael Hanselmann
309 76094e37 Michael Hanselmann
      if self._tasks:
310 76094e37 Michael Hanselmann
        logging.debug("There are %s tasks left", len(self._tasks))
311 76094e37 Michael Hanselmann
    finally:
312 76094e37 Michael Hanselmann
      self._lock.release()
313 76094e37 Michael Hanselmann
314 76094e37 Michael Hanselmann
    logging.debug("All workers terminated")