Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ 89b70f39

History | View | Annotate | Download (8.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Base classes for worker pools.
23

24
"""
25

    
26
import collections
27
import logging
28
import threading
29

    
30

    
31
class BaseWorker(threading.Thread, object):
32
  """Base worker class for worker pools.
33

34
  Users of a worker pool must override RunTask in a subclass.
35

36
  """
37
  # pylint: disable-msg=W0212
38
  def __init__(self, pool, worker_id):
39
    """Constructor for BaseWorker thread.
40

41
    @param pool: the parent worker pool
42
    @param worker_id: identifier for this worker
43

44
    """
45
    super(BaseWorker, self).__init__(name=worker_id)
46
    self.pool = pool
47
    self._current_task = None
48

    
49
  def ShouldTerminate(self):
50
    """Returns whether a worker should terminate.
51

52
    """
53
    return self.pool.ShouldWorkerTerminate(self)
54

    
55
  def _HasRunningTaskUnlocked(self):
56
    """Returns whether this worker is currently running a task.
57

58
    """
59
    return (self._current_task is not None)
60

    
61
  def HasRunningTask(self):
62
    """Returns whether this worker is currently running a task.
63

64
    """
65
    self.pool._lock.acquire()
66
    try:
67
      return self._HasRunningTaskUnlocked()
68
    finally:
69
      self.pool._lock.release()
70

    
71
  def run(self):
72
    """Main thread function.
73

74
    Waits for new tasks to show up in the queue.
75

76
    """
77
    pool = self.pool
78

    
79
    assert not self.HasRunningTask()
80

    
81
    while True:
82
      try:
83
        # We wait on lock to be told either terminate or do a task.
84
        pool._lock.acquire()
85
        try:
86
          if pool._ShouldWorkerTerminateUnlocked(self):
87
            break
88

    
89
          # We only wait if there's no task for us.
90
          if not pool._tasks:
91
            logging.debug("Waiting for tasks")
92

    
93
            # wait() releases the lock and sleeps until notified
94
            pool._pool_to_worker.wait()
95

    
96
            logging.debug("Notified while waiting")
97

    
98
            # Were we woken up in order to terminate?
99
            if pool._ShouldWorkerTerminateUnlocked(self):
100
              break
101

    
102
            if not pool._tasks:
103
              # Spurious notification, ignore
104
              continue
105

    
106
          # Get task from queue and tell pool about it
107
          try:
108
            self._current_task = pool._tasks.popleft()
109
          finally:
110
            pool._worker_to_pool.notifyAll()
111
        finally:
112
          pool._lock.release()
113

    
114
        # Run the actual task
115
        try:
116
          logging.debug("Starting task %r", self._current_task)
117
          self.RunTask(*self._current_task)
118
          logging.debug("Done with task %r", self._current_task)
119
        except: # pylint: disable-msg=W0702
120
          logging.exception("Caught unhandled exception")
121
      finally:
122
        # Notify pool
123
        pool._lock.acquire()
124
        try:
125
          if self._current_task:
126
            self._current_task = None
127
            pool._worker_to_pool.notifyAll()
128
        finally:
129
          pool._lock.release()
130

    
131
    logging.debug("Terminates")
132

    
133
  def RunTask(self, *args):
134
    """Function called to start a task.
135

136
    This needs to be implemented by child classes.
137

138
    """
139
    raise NotImplementedError()
140

    
141

    
142
class WorkerPool(object):
143
  """Worker pool with a queue.
144

145
  This class is thread-safe.
146

147
  Tasks are guaranteed to be started in the order in which they're
148
  added to the pool. Due to the nature of threading, they're not
149
  guaranteed to finish in the same order.
150

151
  """
152
  def __init__(self, name, num_workers, worker_class):
153
    """Constructor for worker pool.
154

155
    @param num_workers: number of workers to be started
156
        (dynamic resizing is not yet implemented)
157
    @param worker_class: the class to be instantiated for workers;
158
        should derive from L{BaseWorker}
159

160
    """
161
    # Some of these variables are accessed by BaseWorker
162
    self._lock = threading.Lock()
163
    self._pool_to_pool = threading.Condition(self._lock)
164
    self._pool_to_worker = threading.Condition(self._lock)
165
    self._worker_to_pool = threading.Condition(self._lock)
166
    self._worker_class = worker_class
167
    self._name = name
168
    self._last_worker_id = 0
169
    self._workers = []
170
    self._quiescing = False
171

    
172
    # Terminating workers
173
    self._termworkers = []
174

    
175
    # Queued tasks
176
    self._tasks = collections.deque()
177

    
178
    # Start workers
179
    self.Resize(num_workers)
180

    
181
  # TODO: Implement dynamic resizing?
182

    
183
  def AddTask(self, *args):
184
    """Adds a task to the queue.
185

186
    @param args: arguments passed to L{BaseWorker.RunTask}
187

188
    """
189
    self._lock.acquire()
190
    try:
191
      # Don't add new tasks while we're quiescing
192
      while self._quiescing:
193
        self._pool_to_pool.wait()
194

    
195
      # Add task to internal queue
196
      self._tasks.append(args)
197

    
198
      # Wake one idling worker up
199
      self._pool_to_worker.notify()
200
    finally:
201
      self._lock.release()
202

    
203
  def _ShouldWorkerTerminateUnlocked(self, worker):
204
    """Returns whether a worker should terminate.
205

206
    """
207
    return (worker in self._termworkers)
208

    
209
  def ShouldWorkerTerminate(self, worker):
210
    """Returns whether a worker should terminate.
211

212
    """
213
    self._lock.acquire()
214
    try:
215
      return self._ShouldWorkerTerminateUnlocked(worker)
216
    finally:
217
      self._lock.release()
218

    
219
  def _HasRunningTasksUnlocked(self):
220
    """Checks whether there's a task running in a worker.
221

222
    """
223
    for worker in self._workers + self._termworkers:
224
      if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
225
        return True
226
    return False
227

    
228
  def Quiesce(self):
229
    """Waits until the task queue is empty.
230

231
    """
232
    self._lock.acquire()
233
    try:
234
      self._quiescing = True
235

    
236
      # Wait while there are tasks pending or running
237
      while self._tasks or self._HasRunningTasksUnlocked():
238
        self._worker_to_pool.wait()
239

    
240
    finally:
241
      self._quiescing = False
242

    
243
      # Make sure AddTasks continues in case it was waiting
244
      self._pool_to_pool.notifyAll()
245

    
246
      self._lock.release()
247

    
248
  def _NewWorkerIdUnlocked(self):
249
    """Return an identifier for a new worker.
250

251
    """
252
    self._last_worker_id += 1
253

    
254
    return "%s%d" % (self._name, self._last_worker_id)
255

    
256
  def _ResizeUnlocked(self, num_workers):
257
    """Changes the number of workers.
258

259
    """
260
    assert num_workers >= 0, "num_workers must be >= 0"
261

    
262
    logging.debug("Resizing to %s workers", num_workers)
263

    
264
    current_count = len(self._workers)
265

    
266
    if current_count == num_workers:
267
      # Nothing to do
268
      pass
269

    
270
    elif current_count > num_workers:
271
      if num_workers == 0:
272
        # Create copy of list to iterate over while lock isn't held.
273
        termworkers = self._workers[:]
274
        del self._workers[:]
275
      else:
276
        # TODO: Implement partial downsizing
277
        raise NotImplementedError()
278
        #termworkers = ...
279

    
280
      self._termworkers += termworkers
281

    
282
      # Notify workers that something has changed
283
      self._pool_to_worker.notifyAll()
284

    
285
      # Join all terminating workers
286
      self._lock.release()
287
      try:
288
        for worker in termworkers:
289
          logging.debug("Waiting for thread %s", worker.getName())
290
          worker.join()
291
      finally:
292
        self._lock.acquire()
293

    
294
      # Remove terminated threads. This could be done in a more efficient way
295
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
296
      # don't leave zombie threads around.
297
      for worker in termworkers:
298
        assert worker in self._termworkers, ("Worker not in list of"
299
                                             " terminating workers")
300
        if not worker.isAlive():
301
          self._termworkers.remove(worker)
302

    
303
      assert not self._termworkers, "Zombie worker detected"
304

    
305
    elif current_count < num_workers:
306
      # Create (num_workers - current_count) new workers
307
      for _ in range(num_workers - current_count):
308
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
309
        self._workers.append(worker)
310
        worker.start()
311

    
312
  def Resize(self, num_workers):
313
    """Changes the number of workers in the pool.
314

315
    @param num_workers: the new number of workers
316

317
    """
318
    self._lock.acquire()
319
    try:
320
      return self._ResizeUnlocked(num_workers)
321
    finally:
322
      self._lock.release()
323

    
324
  def TerminateWorkers(self):
325
    """Terminate all worker threads.
326

327
    Unstarted tasks will be ignored.
328

329
    """
330
    logging.debug("Terminating all workers")
331

    
332
    self._lock.acquire()
333
    try:
334
      self._ResizeUnlocked(0)
335

    
336
      if self._tasks:
337
        logging.debug("There are %s tasks left", len(self._tasks))
338
    finally:
339
      self._lock.release()
340

    
341
    logging.debug("All workers terminated")