Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ 89e2b4d2

History | View | Annotate | Download (9.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Base classes for worker pools.
23

24
"""
25

    
26
import collections
27
import logging
28
import threading
29

    
30

    
31
class BaseWorker(threading.Thread, object):
32
  """Base worker class for worker pools.
33

34
  Users of a worker pool must override RunTask in a subclass.
35

36
  """
37
  # pylint: disable-msg=W0212
38
  def __init__(self, pool, worker_id):
39
    """Constructor for BaseWorker thread.
40

41
    @param pool: the parent worker pool
42
    @param worker_id: identifier for this worker
43

44
    """
45
    super(BaseWorker, self).__init__()
46
    self.pool = pool
47
    self.worker_id = worker_id
48
    self._current_task = None
49

    
50
  def ShouldTerminate(self):
51
    """Returns whether a worker should terminate.
52

53
    """
54
    return self.pool.ShouldWorkerTerminate(self)
55

    
56
  def _HasRunningTaskUnlocked(self):
57
    """Returns whether this worker is currently running a task.
58

59
    """
60
    return (self._current_task is not None)
61

    
62
  def HasRunningTask(self):
63
    """Returns whether this worker is currently running a task.
64

65
    """
66
    self.pool._lock.acquire()
67
    try:
68
      return self._HasRunningTaskUnlocked()
69
    finally:
70
      self.pool._lock.release()
71

    
72
  def run(self):
73
    """Main thread function.
74

75
    Waits for new tasks to show up in the queue.
76

77
    """
78
    pool = self.pool
79

    
80
    assert not self.HasRunningTask()
81

    
82
    while True:
83
      try:
84
        # We wait on lock to be told either terminate or do a task.
85
        pool._lock.acquire()
86
        try:
87
          if pool._ShouldWorkerTerminateUnlocked(self):
88
            break
89

    
90
          # We only wait if there's no task for us.
91
          if not pool._tasks:
92
            logging.debug("Worker %s: waiting for tasks", self.worker_id)
93

    
94
            # wait() releases the lock and sleeps until notified
95
            pool._pool_to_worker.wait()
96

    
97
            logging.debug("Worker %s: notified while waiting", self.worker_id)
98

    
99
            # Were we woken up in order to terminate?
100
            if pool._ShouldWorkerTerminateUnlocked(self):
101
              break
102

    
103
            if not pool._tasks:
104
              # Spurious notification, ignore
105
              continue
106

    
107
          # Get task from queue and tell pool about it
108
          try:
109
            self._current_task = pool._tasks.popleft()
110
          finally:
111
            pool._worker_to_pool.notifyAll()
112
        finally:
113
          pool._lock.release()
114

    
115
        # Run the actual task
116
        try:
117
          logging.debug("Worker %s: starting task %r",
118
                        self.worker_id, self._current_task)
119
          self.RunTask(*self._current_task)
120
          logging.debug("Worker %s: done with task %r",
121
                        self.worker_id, self._current_task)
122
        except: # pylint: disable-msg=W0702
123
          logging.error("Worker %s: Caught unhandled exception",
124
                        self.worker_id, exc_info=True)
125
      finally:
126
        # Notify pool
127
        pool._lock.acquire()
128
        try:
129
          if self._current_task:
130
            self._current_task = None
131
            pool._worker_to_pool.notifyAll()
132
        finally:
133
          pool._lock.release()
134

    
135
    logging.debug("Worker %s: terminates", self.worker_id)
136

    
137
  def RunTask(self, *args):
138
    """Function called to start a task.
139

140
    This needs to be implemented by child classes.
141

142
    """
143
    raise NotImplementedError()
144

    
145

    
146
class WorkerPool(object):
147
  """Worker pool with a queue.
148

149
  This class is thread-safe.
150

151
  Tasks are guaranteed to be started in the order in which they're
152
  added to the pool. Due to the nature of threading, they're not
153
  guaranteed to finish in the same order.
154

155
  """
156
  def __init__(self, name, num_workers, worker_class):
157
    """Constructor for worker pool.
158

159
    @param num_workers: number of workers to be started
160
        (dynamic resizing is not yet implemented)
161
    @param worker_class: the class to be instantiated for workers;
162
        should derive from L{BaseWorker}
163

164
    """
165
    # Some of these variables are accessed by BaseWorker
166
    self._lock = threading.Lock()
167
    self._pool_to_pool = threading.Condition(self._lock)
168
    self._pool_to_worker = threading.Condition(self._lock)
169
    self._worker_to_pool = threading.Condition(self._lock)
170
    self._worker_class = worker_class
171
    self._name = name
172
    self._last_worker_id = 0
173
    self._workers = []
174
    self._quiescing = False
175

    
176
    # Terminating workers
177
    self._termworkers = []
178

    
179
    # Queued tasks
180
    self._tasks = collections.deque()
181

    
182
    # Start workers
183
    self.Resize(num_workers)
184

    
185
  # TODO: Implement dynamic resizing?
186

    
187
  def AddTask(self, *args):
188
    """Adds a task to the queue.
189

190
    @param args: arguments passed to L{BaseWorker.RunTask}
191

192
    """
193
    self._lock.acquire()
194
    try:
195
      # Don't add new tasks while we're quiescing
196
      while self._quiescing:
197
        self._pool_to_pool.wait()
198

    
199
      # Add task to internal queue
200
      self._tasks.append(args)
201

    
202
      # Wake one idling worker up
203
      self._pool_to_worker.notify()
204
    finally:
205
      self._lock.release()
206

    
207
  def _ShouldWorkerTerminateUnlocked(self, worker):
208
    """Returns whether a worker should terminate.
209

210
    """
211
    return (worker in self._termworkers)
212

    
213
  def ShouldWorkerTerminate(self, worker):
214
    """Returns whether a worker should terminate.
215

216
    """
217
    self._lock.acquire()
218
    try:
219
      return self._ShouldWorkerTerminateUnlocked(worker)
220
    finally:
221
      self._lock.release()
222

    
223
  def _HasRunningTasksUnlocked(self):
224
    """Checks whether there's a task running in a worker.
225

226
    """
227
    for worker in self._workers + self._termworkers:
228
      if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
229
        return True
230
    return False
231

    
232
  def Quiesce(self):
233
    """Waits until the task queue is empty.
234

235
    """
236
    self._lock.acquire()
237
    try:
238
      self._quiescing = True
239

    
240
      # Wait while there are tasks pending or running
241
      while self._tasks or self._HasRunningTasksUnlocked():
242
        self._worker_to_pool.wait()
243

    
244
    finally:
245
      self._quiescing = False
246

    
247
      # Make sure AddTasks continues in case it was waiting
248
      self._pool_to_pool.notifyAll()
249

    
250
      self._lock.release()
251

    
252
  def _NewWorkerIdUnlocked(self):
253
    """Return an identifier for a new worker.
254

255
    """
256
    self._last_worker_id += 1
257

    
258
    return "%s%d" % (self._name, self._last_worker_id)
259

    
260
  def _ResizeUnlocked(self, num_workers):
261
    """Changes the number of workers.
262

263
    """
264
    assert num_workers >= 0, "num_workers must be >= 0"
265

    
266
    logging.debug("Resizing to %s workers", num_workers)
267

    
268
    current_count = len(self._workers)
269

    
270
    if current_count == num_workers:
271
      # Nothing to do
272
      pass
273

    
274
    elif current_count > num_workers:
275
      if num_workers == 0:
276
        # Create copy of list to iterate over while lock isn't held.
277
        termworkers = self._workers[:]
278
        del self._workers[:]
279
      else:
280
        # TODO: Implement partial downsizing
281
        raise NotImplementedError()
282
        #termworkers = ...
283

    
284
      self._termworkers += termworkers
285

    
286
      # Notify workers that something has changed
287
      self._pool_to_worker.notifyAll()
288

    
289
      # Join all terminating workers
290
      self._lock.release()
291
      try:
292
        for worker in termworkers:
293
          logging.debug("Waiting for thread %s", worker.getName())
294
          worker.join()
295
      finally:
296
        self._lock.acquire()
297

    
298
      # Remove terminated threads. This could be done in a more efficient way
299
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
300
      # don't leave zombie threads around.
301
      for worker in termworkers:
302
        assert worker in self._termworkers, ("Worker not in list of"
303
                                             " terminating workers")
304
        if not worker.isAlive():
305
          self._termworkers.remove(worker)
306

    
307
      assert not self._termworkers, "Zombie worker detected"
308

    
309
    elif current_count < num_workers:
310
      # Create (num_workers - current_count) new workers
311
      for _ in range(num_workers - current_count):
312
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
313
        self._workers.append(worker)
314
        worker.start()
315

    
316
  def Resize(self, num_workers):
317
    """Changes the number of workers in the pool.
318

319
    @param num_workers: the new number of workers
320

321
    """
322
    self._lock.acquire()
323
    try:
324
      return self._ResizeUnlocked(num_workers)
325
    finally:
326
      self._lock.release()
327

    
328
  def TerminateWorkers(self):
329
    """Terminate all worker threads.
330

331
    Unstarted tasks will be ignored.
332

333
    """
334
    logging.debug("Terminating all workers")
335

    
336
    self._lock.acquire()
337
    try:
338
      self._ResizeUnlocked(0)
339

    
340
      if self._tasks:
341
        logging.debug("There are %s tasks left", len(self._tasks))
342
    finally:
343
      self._lock.release()
344

    
345
    logging.debug("All workers terminated")