Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ 8b3fd458

History | View | Annotate | Download (9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Base classes for worker pools.
23

24
"""
25

    
26
import collections
27
import logging
28
import threading
29

    
30

    
31
class BaseWorker(threading.Thread, object):
32
  """Base worker class for worker pools.
33

34
  Users of a worker pool must override RunTask in a subclass.
35

36
  """
37
  def __init__(self, pool, worker_id):
38
    """Constructor for BaseWorker thread.
39

40
    Args:
41
    - pool: Parent worker pool
42
    - worker_id: Identifier for this worker
43

44
    """
45
    super(BaseWorker, self).__init__()
46
    self.pool = pool
47
    self.worker_id = worker_id
48
    self._current_task = None
49

    
50
  def ShouldTerminate(self):
51
    """Returns whether a worker should terminate.
52

53
    """
54
    return self.pool.ShouldWorkerTerminate(self)
55

    
56
  def _HasRunningTaskUnlocked(self):
57
    """Returns whether this worker is currently running a task.
58

59
    """
60
    return (self._current_task is not None)
61

    
62
  def HasRunningTask(self):
63
    """Returns whether this worker is currently running a task.
64

65
    """
66
    self.pool._lock.acquire()
67
    try:
68
      return self._HasRunningTaskUnlocked()
69
    finally:
70
      self.pool._lock.release()
71

    
72
  def run(self):
73
    """Main thread function.
74

75
    Waits for new tasks to show up in the queue.
76

77
    """
78
    pool = self.pool
79

    
80
    assert not self.HasRunningTask()
81

    
82
    while True:
83
      try:
84
        # We wait on lock to be told either terminate or do a task.
85
        pool._lock.acquire()
86
        try:
87
          if pool._ShouldWorkerTerminateUnlocked(self):
88
            break
89

    
90
          # We only wait if there's no task for us.
91
          if not pool._tasks:
92
            logging.debug("Worker %s: waiting for tasks", self.worker_id)
93

    
94
            # wait() releases the lock and sleeps until notified
95
            pool._pool_to_worker.wait()
96

    
97
            logging.debug("Worker %s: notified while waiting", self.worker_id)
98

    
99
            # Were we woken up in order to terminate?
100
            if pool._ShouldWorkerTerminateUnlocked(self):
101
              break
102

    
103
            if not pool._tasks:
104
              # Spurious notification, ignore
105
              continue
106

    
107
          # Get task from queue and tell pool about it
108
          try:
109
            self._current_task = pool._tasks.popleft()
110
          finally:
111
            pool._worker_to_pool.notifyAll()
112
        finally:
113
          pool._lock.release()
114

    
115
        # Run the actual task
116
        try:
117
          logging.debug("Worker %s: starting task %r",
118
                        self.worker_id, self._current_task)
119
          self.RunTask(*self._current_task)
120
          logging.debug("Worker %s: done with task %r",
121
                        self.worker_id, self._current_task)
122
        except:
123
          logging.error("Worker %s: Caught unhandled exception",
124
                        self.worker_id, exc_info=True)
125
      finally:
126
        # Notify pool
127
        pool._lock.acquire()
128
        try:
129
          if self._current_task:
130
            self._current_task = None
131
            pool._worker_to_pool.notifyAll()
132
        finally:
133
          pool._lock.release()
134

    
135
    logging.debug("Worker %s: terminates", self.worker_id)
136

    
137
  def RunTask(self, *args):
138
    """Function called to start a task.
139

140
    """
141
    raise NotImplementedError()
142

    
143

    
144
class WorkerPool(object):
145
  """Worker pool with a queue.
146

147
  This class is thread-safe.
148

149
  Tasks are guaranteed to be started in the order in which they're added to the
150
  pool. Due to the nature of threading, they're not guaranteed to finish in the
151
  same order.
152

153
  """
154
  def __init__(self, num_workers, worker_class):
155
    """Constructor for worker pool.
156

157
    Args:
158
    - num_workers: Number of workers to be started (dynamic resizing is not
159
                   yet implemented)
160
    - worker_class: Class to be instantiated for workers; should derive from
161
                    BaseWorker
162

163
    """
164
    # Some of these variables are accessed by BaseWorker
165
    self._lock = threading.Lock()
166
    self._pool_to_pool = threading.Condition(self._lock)
167
    self._pool_to_worker = threading.Condition(self._lock)
168
    self._worker_to_pool = threading.Condition(self._lock)
169
    self._worker_class = worker_class
170
    self._last_worker_id = 0
171
    self._workers = []
172
    self._quiescing = False
173

    
174
    # Terminating workers
175
    self._termworkers = []
176

    
177
    # Queued tasks
178
    self._tasks = collections.deque()
179

    
180
    # Start workers
181
    self.Resize(num_workers)
182

    
183
  # TODO: Implement dynamic resizing?
184

    
185
  def AddTask(self, *args):
186
    """Adds a task to the queue.
187

188
    Args:
189
    - *args: Arguments passed to BaseWorker.RunTask
190

191
    """
192
    self._lock.acquire()
193
    try:
194
      # Don't add new tasks while we're quiescing
195
      while self._quiescing:
196
        self._pool_to_pool.wait()
197

    
198
      # Add task to internal queue
199
      self._tasks.append(args)
200

    
201
      # Wake one idling worker up
202
      self._pool_to_worker.notify()
203
    finally:
204
      self._lock.release()
205

    
206
  def _ShouldWorkerTerminateUnlocked(self, worker):
207
    """Returns whether a worker should terminate.
208

209
    """
210
    return (worker in self._termworkers)
211

    
212
  def ShouldWorkerTerminate(self, worker):
213
    """Returns whether a worker should terminate.
214

215
    """
216
    self._lock.acquire()
217
    try:
218
      return self._ShouldWorkerTerminateUnlocked(self)
219
    finally:
220
      self._lock.release()
221

    
222
  def _HasRunningTasksUnlocked(self):
223
    """Checks whether there's a task running in a worker.
224

225
    """
226
    for worker in self._workers + self._termworkers:
227
      if worker._HasRunningTaskUnlocked():
228
        return True
229
    return False
230

    
231
  def Quiesce(self):
232
    """Waits until the task queue is empty.
233

234
    """
235
    self._lock.acquire()
236
    try:
237
      self._quiescing = True
238

    
239
      # Wait while there are tasks pending or running
240
      while self._tasks or self._HasRunningTasksUnlocked():
241
        self._worker_to_pool.wait()
242

    
243
    finally:
244
      self._quiescing = False
245

    
246
      # Make sure AddTasks continues in case it was waiting
247
      self._pool_to_pool.notifyAll()
248

    
249
      self._lock.release()
250

    
251
  def _NewWorkerIdUnlocked(self):
252
    self._last_worker_id += 1
253
    return self._last_worker_id
254

    
255
  def _ResizeUnlocked(self, num_workers):
256
    """Changes the number of workers.
257

258
    """
259
    assert num_workers >= 0, "num_workers must be >= 0"
260

    
261
    logging.debug("Resizing to %s workers", num_workers)
262

    
263
    current_count = len(self._workers)
264

    
265
    if current_count == num_workers:
266
      # Nothing to do
267
      pass
268

    
269
    elif current_count > num_workers:
270
      if num_workers == 0:
271
        # Create copy of list to iterate over while lock isn't held.
272
        termworkers = self._workers[:]
273
        del self._workers[:]
274
      else:
275
        # TODO: Implement partial downsizing
276
        raise NotImplementedError()
277
        #termworkers = ...
278

    
279
      self._termworkers += termworkers
280

    
281
      # Notify workers that something has changed
282
      self._pool_to_worker.notifyAll()
283

    
284
      # Join all terminating workers
285
      self._lock.release()
286
      try:
287
        for worker in termworkers:
288
          logging.debug("Waiting for thread %s", worker.getName())
289
          worker.join()
290
      finally:
291
        self._lock.acquire()
292

    
293
      # Remove terminated threads. This could be done in a more efficient way
294
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
295
      # don't leave zombie threads around.
296
      for worker in termworkers:
297
        assert worker in self._termworkers, ("Worker not in list of"
298
                                             " terminating workers")
299
        if not worker.isAlive():
300
          self._termworkers.remove(worker)
301

    
302
      assert not self._termworkers, "Zombie worker detected"
303

    
304
    elif current_count < num_workers:
305
      # Create (num_workers - current_count) new workers
306
      for i in xrange(num_workers - current_count):
307
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
308
        self._workers.append(worker)
309
        worker.start()
310

    
311
  def Resize(self, num_workers):
312
    """Changes the number of workers in the pool.
313

314
    Args:
315
    - num_workers: New number of workers
316

317
    """
318
    self._lock.acquire()
319
    try:
320
      return self._ResizeUnlocked(num_workers)
321
    finally:
322
      self._lock.release()
323

    
324
  def TerminateWorkers(self):
325
    """Terminate all worker threads.
326

327
    Unstarted tasks will be ignored.
328

329
    """
330
    logging.debug("Terminating all workers")
331

    
332
    self._lock.acquire()
333
    try:
334
      self._ResizeUnlocked(0)
335

    
336
      if self._tasks:
337
        logging.debug("There are %s tasks left", len(self._tasks))
338
    finally:
339
      self._lock.release()
340

    
341
    logging.debug("All workers terminated")