Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ 21c5ad52

History | View | Annotate | Download (10 kB)

1
#
2
#
3

    
4
# Copyright (C) 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Base classes for worker pools.
23

24
"""
25

    
26
import collections
27
import logging
28
import threading
29

    
30
from ganeti import compat
31

    
32

    
33
_TERMINATE = object()
34

    
35

    
36
class BaseWorker(threading.Thread, object):
37
  """Base worker class for worker pools.
38

39
  Users of a worker pool must override RunTask in a subclass.
40

41
  """
42
  # pylint: disable-msg=W0212
43
  def __init__(self, pool, worker_id):
44
    """Constructor for BaseWorker thread.
45

46
    @param pool: the parent worker pool
47
    @param worker_id: identifier for this worker
48

49
    """
50
    super(BaseWorker, self).__init__(name=worker_id)
51
    self.pool = pool
52
    self._current_task = None
53

    
54
  def ShouldTerminate(self):
55
    """Returns whether a worker should terminate.
56

57
    """
58
    return self.pool.ShouldWorkerTerminate(self)
59

    
60
  def _HasRunningTaskUnlocked(self):
61
    """Returns whether this worker is currently running a task.
62

63
    """
64
    return (self._current_task is not None)
65

    
66
  def HasRunningTask(self):
67
    """Returns whether this worker is currently running a task.
68

69
    """
70
    self.pool._lock.acquire()
71
    try:
72
      return self._HasRunningTaskUnlocked()
73
    finally:
74
      self.pool._lock.release()
75

    
76
  def run(self):
77
    """Main thread function.
78

79
    Waits for new tasks to show up in the queue.
80

81
    """
82
    pool = self.pool
83

    
84
    assert not self.HasRunningTask()
85

    
86
    while True:
87
      try:
88
        # Wait on lock to be told either to terminate or to do a task
89
        pool._lock.acquire()
90
        try:
91
          task = pool._WaitForTaskUnlocked(self)
92

    
93
          if task is _TERMINATE:
94
            # Told to terminate
95
            break
96

    
97
          if task is None:
98
            # Spurious notification, ignore
99
            continue
100

    
101
          self._current_task = task
102

    
103
          assert self._HasRunningTaskUnlocked()
104
        finally:
105
          pool._lock.release()
106

    
107
        # Run the actual task
108
        try:
109
          logging.debug("Starting task %r", self._current_task)
110
          self.RunTask(*self._current_task)
111
          logging.debug("Done with task %r", self._current_task)
112
        except: # pylint: disable-msg=W0702
113
          logging.exception("Caught unhandled exception")
114
      finally:
115
        # Notify pool
116
        pool._lock.acquire()
117
        try:
118
          if self._current_task:
119
            self._current_task = None
120
            pool._worker_to_pool.notifyAll()
121
        finally:
122
          pool._lock.release()
123

    
124
    logging.debug("Terminates")
125

    
126
  def RunTask(self, *args):
127
    """Function called to start a task.
128

129
    This needs to be implemented by child classes.
130

131
    """
132
    raise NotImplementedError()
133

    
134

    
135
class WorkerPool(object):
136
  """Worker pool with a queue.
137

138
  This class is thread-safe.
139

140
  Tasks are guaranteed to be started in the order in which they're
141
  added to the pool. Due to the nature of threading, they're not
142
  guaranteed to finish in the same order.
143

144
  """
145
  def __init__(self, name, num_workers, worker_class):
146
    """Constructor for worker pool.
147

148
    @param num_workers: number of workers to be started
149
        (dynamic resizing is not yet implemented)
150
    @param worker_class: the class to be instantiated for workers;
151
        should derive from L{BaseWorker}
152

153
    """
154
    # Some of these variables are accessed by BaseWorker
155
    self._lock = threading.Lock()
156
    self._pool_to_pool = threading.Condition(self._lock)
157
    self._pool_to_worker = threading.Condition(self._lock)
158
    self._worker_to_pool = threading.Condition(self._lock)
159
    self._worker_class = worker_class
160
    self._name = name
161
    self._last_worker_id = 0
162
    self._workers = []
163
    self._quiescing = False
164

    
165
    # Terminating workers
166
    self._termworkers = []
167

    
168
    # Queued tasks
169
    self._tasks = collections.deque()
170

    
171
    # Start workers
172
    self.Resize(num_workers)
173

    
174
  # TODO: Implement dynamic resizing?
175

    
176
  def _WaitWhileQuiescingUnlocked(self):
177
    """Wait until the worker pool has finished quiescing.
178

179
    """
180
    while self._quiescing:
181
      self._pool_to_pool.wait()
182

    
183
  def _AddTaskUnlocked(self, args):
184
    assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
185

    
186
    self._tasks.append(args)
187

    
188
    # Notify a waiting worker
189
    self._pool_to_worker.notify()
190

    
191
  def AddTask(self, *args):
192
    """Adds a task to the queue.
193

194
    @param args: arguments passed to L{BaseWorker.RunTask}
195

196
    """
197
    self._lock.acquire()
198
    try:
199
      self._WaitWhileQuiescingUnlocked()
200
      self._AddTaskUnlocked(args)
201
    finally:
202
      self._lock.release()
203

    
204
  def AddManyTasks(self, tasks):
205
    """Add a list of tasks to the queue.
206

207
    @type tasks: list of tuples
208
    @param tasks: list of args passed to L{BaseWorker.RunTask}
209

210
    """
211
    assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
212
      "Each task must be a sequence"
213

    
214
    self._lock.acquire()
215
    try:
216
      self._WaitWhileQuiescingUnlocked()
217

    
218
      for args in tasks:
219
        self._AddTaskUnlocked(args)
220
    finally:
221
      self._lock.release()
222

    
223
  def _WaitForTaskUnlocked(self, worker):
224
    """Waits for a task for a worker.
225

226
    @type worker: L{BaseWorker}
227
    @param worker: Worker thread
228

229
    """
230
    if self._ShouldWorkerTerminateUnlocked(worker):
231
      return _TERMINATE
232

    
233
    # We only wait if there's no task for us.
234
    if not self._tasks:
235
      logging.debug("Waiting for tasks")
236

    
237
      # wait() releases the lock and sleeps until notified
238
      self._pool_to_worker.wait()
239

    
240
      logging.debug("Notified while waiting")
241

    
242
      # Were we woken up in order to terminate?
243
      if self._ShouldWorkerTerminateUnlocked(worker):
244
        return _TERMINATE
245

    
246
      if not self._tasks:
247
        # Spurious notification, ignore
248
        return None
249

    
250
    # Get task from queue and tell pool about it
251
    try:
252
      return self._tasks.popleft()
253
    finally:
254
      self._worker_to_pool.notifyAll()
255

    
256
  def _ShouldWorkerTerminateUnlocked(self, worker):
257
    """Returns whether a worker should terminate.
258

259
    """
260
    return (worker in self._termworkers)
261

    
262
  def ShouldWorkerTerminate(self, worker):
263
    """Returns whether a worker should terminate.
264

265
    """
266
    self._lock.acquire()
267
    try:
268
      return self._ShouldWorkerTerminateUnlocked(worker)
269
    finally:
270
      self._lock.release()
271

    
272
  def _HasRunningTasksUnlocked(self):
273
    """Checks whether there's a task running in a worker.
274

275
    """
276
    for worker in self._workers + self._termworkers:
277
      if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
278
        return True
279
    return False
280

    
281
  def Quiesce(self):
282
    """Waits until the task queue is empty.
283

284
    """
285
    self._lock.acquire()
286
    try:
287
      self._quiescing = True
288

    
289
      # Wait while there are tasks pending or running
290
      while self._tasks or self._HasRunningTasksUnlocked():
291
        self._worker_to_pool.wait()
292

    
293
    finally:
294
      self._quiescing = False
295

    
296
      # Make sure AddTasks continues in case it was waiting
297
      self._pool_to_pool.notifyAll()
298

    
299
      self._lock.release()
300

    
301
  def _NewWorkerIdUnlocked(self):
302
    """Return an identifier for a new worker.
303

304
    """
305
    self._last_worker_id += 1
306

    
307
    return "%s%d" % (self._name, self._last_worker_id)
308

    
309
  def _ResizeUnlocked(self, num_workers):
310
    """Changes the number of workers.
311

312
    """
313
    assert num_workers >= 0, "num_workers must be >= 0"
314

    
315
    logging.debug("Resizing to %s workers", num_workers)
316

    
317
    current_count = len(self._workers)
318

    
319
    if current_count == num_workers:
320
      # Nothing to do
321
      pass
322

    
323
    elif current_count > num_workers:
324
      if num_workers == 0:
325
        # Create copy of list to iterate over while lock isn't held.
326
        termworkers = self._workers[:]
327
        del self._workers[:]
328
      else:
329
        # TODO: Implement partial downsizing
330
        raise NotImplementedError()
331
        #termworkers = ...
332

    
333
      self._termworkers += termworkers
334

    
335
      # Notify workers that something has changed
336
      self._pool_to_worker.notifyAll()
337

    
338
      # Join all terminating workers
339
      self._lock.release()
340
      try:
341
        for worker in termworkers:
342
          logging.debug("Waiting for thread %s", worker.getName())
343
          worker.join()
344
      finally:
345
        self._lock.acquire()
346

    
347
      # Remove terminated threads. This could be done in a more efficient way
348
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
349
      # don't leave zombie threads around.
350
      for worker in termworkers:
351
        assert worker in self._termworkers, ("Worker not in list of"
352
                                             " terminating workers")
353
        if not worker.isAlive():
354
          self._termworkers.remove(worker)
355

    
356
      assert not self._termworkers, "Zombie worker detected"
357

    
358
    elif current_count < num_workers:
359
      # Create (num_workers - current_count) new workers
360
      for _ in range(num_workers - current_count):
361
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
362
        self._workers.append(worker)
363
        worker.start()
364

    
365
  def Resize(self, num_workers):
366
    """Changes the number of workers in the pool.
367

368
    @param num_workers: the new number of workers
369

370
    """
371
    self._lock.acquire()
372
    try:
373
      return self._ResizeUnlocked(num_workers)
374
    finally:
375
      self._lock.release()
376

    
377
  def TerminateWorkers(self):
378
    """Terminate all worker threads.
379

380
    Unstarted tasks will be ignored.
381

382
    """
383
    logging.debug("Terminating all workers")
384

    
385
    self._lock.acquire()
386
    try:
387
      self._ResizeUnlocked(0)
388

    
389
      if self._tasks:
390
        logging.debug("There are %s tasks left", len(self._tasks))
391
    finally:
392
      self._lock.release()
393

    
394
    logging.debug("All workers terminated")