Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ 46d0a3d0

History | View | Annotate | Download (9.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Base classes for worker pools.
23

24
"""
25

    
26
import collections
27
import logging
28
import threading
29

    
30
from ganeti import compat
31

    
32

    
33
_TERMINATE = object()
34

    
35

    
36
class BaseWorker(threading.Thread, object):
37
  """Base worker class for worker pools.
38

39
  Users of a worker pool must override RunTask in a subclass.
40

41
  """
42
  # pylint: disable-msg=W0212
43
  def __init__(self, pool, worker_id):
44
    """Constructor for BaseWorker thread.
45

46
    @param pool: the parent worker pool
47
    @param worker_id: identifier for this worker
48

49
    """
50
    super(BaseWorker, self).__init__(name=worker_id)
51
    self.pool = pool
52
    self._current_task = None
53

    
54
  def ShouldTerminate(self):
55
    """Returns whether this worker should terminate.
56

57
    Should only be called from within L{RunTask}.
58

59
    """
60
    self.pool._lock.acquire()
61
    try:
62
      assert self._HasRunningTaskUnlocked()
63
      return self.pool._ShouldWorkerTerminateUnlocked(self)
64
    finally:
65
      self.pool._lock.release()
66

    
67
  def _HasRunningTaskUnlocked(self):
68
    """Returns whether this worker is currently running a task.
69

70
    """
71
    return (self._current_task is not None)
72

    
73
  def run(self):
74
    """Main thread function.
75

76
    Waits for new tasks to show up in the queue.
77

78
    """
79
    pool = self.pool
80

    
81
    while True:
82
      assert self._current_task is None
83
      try:
84
        # Wait on lock to be told either to terminate or to do a task
85
        pool._lock.acquire()
86
        try:
87
          task = pool._WaitForTaskUnlocked(self)
88

    
89
          if task is _TERMINATE:
90
            # Told to terminate
91
            break
92

    
93
          if task is None:
94
            # Spurious notification, ignore
95
            continue
96

    
97
          self._current_task = task
98

    
99
          # No longer needed, dispose of reference
100
          del task
101

    
102
          assert self._HasRunningTaskUnlocked()
103

    
104
        finally:
105
          pool._lock.release()
106

    
107
        # Run the actual task
108
        try:
109
          logging.debug("Starting task %r", self._current_task)
110
          self.RunTask(*self._current_task)
111
          logging.debug("Done with task %r", self._current_task)
112
        except: # pylint: disable-msg=W0702
113
          logging.exception("Caught unhandled exception")
114

    
115
        assert self._HasRunningTaskUnlocked()
116
      finally:
117
        # Notify pool
118
        pool._lock.acquire()
119
        try:
120
          if self._current_task:
121
            self._current_task = None
122
            pool._worker_to_pool.notifyAll()
123
        finally:
124
          pool._lock.release()
125

    
126
      assert not self._HasRunningTaskUnlocked()
127

    
128
    logging.debug("Terminates")
129

    
130
  def RunTask(self, *args):
131
    """Function called to start a task.
132

133
    This needs to be implemented by child classes.
134

135
    """
136
    raise NotImplementedError()
137

    
138

    
139
class WorkerPool(object):
140
  """Worker pool with a queue.
141

142
  This class is thread-safe.
143

144
  Tasks are guaranteed to be started in the order in which they're
145
  added to the pool. Due to the nature of threading, they're not
146
  guaranteed to finish in the same order.
147

148
  """
149
  def __init__(self, name, num_workers, worker_class):
150
    """Constructor for worker pool.
151

152
    @param num_workers: number of workers to be started
153
        (dynamic resizing is not yet implemented)
154
    @param worker_class: the class to be instantiated for workers;
155
        should derive from L{BaseWorker}
156

157
    """
158
    # Some of these variables are accessed by BaseWorker
159
    self._lock = threading.Lock()
160
    self._pool_to_pool = threading.Condition(self._lock)
161
    self._pool_to_worker = threading.Condition(self._lock)
162
    self._worker_to_pool = threading.Condition(self._lock)
163
    self._worker_class = worker_class
164
    self._name = name
165
    self._last_worker_id = 0
166
    self._workers = []
167
    self._quiescing = False
168

    
169
    # Terminating workers
170
    self._termworkers = []
171

    
172
    # Queued tasks
173
    self._tasks = collections.deque()
174

    
175
    # Start workers
176
    self.Resize(num_workers)
177

    
178
  # TODO: Implement dynamic resizing?
179

    
180
  def _WaitWhileQuiescingUnlocked(self):
181
    """Wait until the worker pool has finished quiescing.
182

183
    """
184
    while self._quiescing:
185
      self._pool_to_pool.wait()
186

    
187
  def _AddTaskUnlocked(self, args):
188
    assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
189

    
190
    self._tasks.append(args)
191

    
192
    # Notify a waiting worker
193
    self._pool_to_worker.notify()
194

    
195
  def AddTask(self, args):
196
    """Adds a task to the queue.
197

198
    @type args: sequence
199
    @param args: arguments passed to L{BaseWorker.RunTask}
200

201
    """
202
    self._lock.acquire()
203
    try:
204
      self._WaitWhileQuiescingUnlocked()
205
      self._AddTaskUnlocked(args)
206
    finally:
207
      self._lock.release()
208

    
209
  def AddManyTasks(self, tasks):
210
    """Add a list of tasks to the queue.
211

212
    @type tasks: list of tuples
213
    @param tasks: list of args passed to L{BaseWorker.RunTask}
214

215
    """
216
    assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
217
      "Each task must be a sequence"
218

    
219
    self._lock.acquire()
220
    try:
221
      self._WaitWhileQuiescingUnlocked()
222

    
223
      for args in tasks:
224
        self._AddTaskUnlocked(args)
225
    finally:
226
      self._lock.release()
227

    
228
  def _WaitForTaskUnlocked(self, worker):
229
    """Waits for a task for a worker.
230

231
    @type worker: L{BaseWorker}
232
    @param worker: Worker thread
233

234
    """
235
    if self._ShouldWorkerTerminateUnlocked(worker):
236
      return _TERMINATE
237

    
238
    # We only wait if there's no task for us.
239
    if not self._tasks:
240
      logging.debug("Waiting for tasks")
241

    
242
      # wait() releases the lock and sleeps until notified
243
      self._pool_to_worker.wait()
244

    
245
      logging.debug("Notified while waiting")
246

    
247
      # Were we woken up in order to terminate?
248
      if self._ShouldWorkerTerminateUnlocked(worker):
249
        return _TERMINATE
250

    
251
      if not self._tasks:
252
        # Spurious notification, ignore
253
        return None
254

    
255
    # Get task from queue and tell pool about it
256
    try:
257
      return self._tasks.popleft()
258
    finally:
259
      self._worker_to_pool.notifyAll()
260

    
261
  def _ShouldWorkerTerminateUnlocked(self, worker):
262
    """Returns whether a worker should terminate.
263

264
    """
265
    return (worker in self._termworkers)
266

    
267
  def _HasRunningTasksUnlocked(self):
268
    """Checks whether there's a task running in a worker.
269

270
    """
271
    for worker in self._workers + self._termworkers:
272
      if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
273
        return True
274
    return False
275

    
276
  def Quiesce(self):
277
    """Waits until the task queue is empty.
278

279
    """
280
    self._lock.acquire()
281
    try:
282
      self._quiescing = True
283

    
284
      # Wait while there are tasks pending or running
285
      while self._tasks or self._HasRunningTasksUnlocked():
286
        self._worker_to_pool.wait()
287

    
288
    finally:
289
      self._quiescing = False
290

    
291
      # Make sure AddTasks continues in case it was waiting
292
      self._pool_to_pool.notifyAll()
293

    
294
      self._lock.release()
295

    
296
  def _NewWorkerIdUnlocked(self):
297
    """Return an identifier for a new worker.
298

299
    """
300
    self._last_worker_id += 1
301

    
302
    return "%s%d" % (self._name, self._last_worker_id)
303

    
304
  def _ResizeUnlocked(self, num_workers):
305
    """Changes the number of workers.
306

307
    """
308
    assert num_workers >= 0, "num_workers must be >= 0"
309

    
310
    logging.debug("Resizing to %s workers", num_workers)
311

    
312
    current_count = len(self._workers)
313

    
314
    if current_count == num_workers:
315
      # Nothing to do
316
      pass
317

    
318
    elif current_count > num_workers:
319
      if num_workers == 0:
320
        # Create copy of list to iterate over while lock isn't held.
321
        termworkers = self._workers[:]
322
        del self._workers[:]
323
      else:
324
        # TODO: Implement partial downsizing
325
        raise NotImplementedError()
326
        #termworkers = ...
327

    
328
      self._termworkers += termworkers
329

    
330
      # Notify workers that something has changed
331
      self._pool_to_worker.notifyAll()
332

    
333
      # Join all terminating workers
334
      self._lock.release()
335
      try:
336
        for worker in termworkers:
337
          logging.debug("Waiting for thread %s", worker.getName())
338
          worker.join()
339
      finally:
340
        self._lock.acquire()
341

    
342
      # Remove terminated threads. This could be done in a more efficient way
343
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
344
      # don't leave zombie threads around.
345
      for worker in termworkers:
346
        assert worker in self._termworkers, ("Worker not in list of"
347
                                             " terminating workers")
348
        if not worker.isAlive():
349
          self._termworkers.remove(worker)
350

    
351
      assert not self._termworkers, "Zombie worker detected"
352

    
353
    elif current_count < num_workers:
354
      # Create (num_workers - current_count) new workers
355
      for _ in range(num_workers - current_count):
356
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
357
        self._workers.append(worker)
358
        worker.start()
359

    
360
  def Resize(self, num_workers):
361
    """Changes the number of workers in the pool.
362

363
    @param num_workers: the new number of workers
364

365
    """
366
    self._lock.acquire()
367
    try:
368
      return self._ResizeUnlocked(num_workers)
369
    finally:
370
      self._lock.release()
371

    
372
  def TerminateWorkers(self):
373
    """Terminate all worker threads.
374

375
    Unstarted tasks will be ignored.
376

377
    """
378
    logging.debug("Terminating all workers")
379

    
380
    self._lock.acquire()
381
    try:
382
      self._ResizeUnlocked(0)
383

    
384
      if self._tasks:
385
        logging.debug("There are %s tasks left", len(self._tasks))
386
    finally:
387
      self._lock.release()
388

    
389
    logging.debug("All workers terminated")