Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ b2e8a4d9

History | View | Annotate | Download (9.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Base classes for worker pools.
23

24
"""
25

    
26
import collections
27
import logging
28
import threading
29

    
30
from ganeti import compat
31

    
32

    
33
_TERMINATE = object()
34

    
35

    
36
class BaseWorker(threading.Thread, object):
37
  """Base worker class for worker pools.
38

39
  Users of a worker pool must override RunTask in a subclass.
40

41
  """
42
  # pylint: disable-msg=W0212
43
  def __init__(self, pool, worker_id):
44
    """Constructor for BaseWorker thread.
45

46
    @param pool: the parent worker pool
47
    @param worker_id: identifier for this worker
48

49
    """
50
    super(BaseWorker, self).__init__(name=worker_id)
51
    self.pool = pool
52
    self._current_task = None
53

    
54
  def ShouldTerminate(self):
55
    """Returns whether this worker should terminate.
56

57
    Should only be called from within L{RunTask}.
58

59
    """
60
    self.pool._lock.acquire()
61
    try:
62
      assert self._HasRunningTaskUnlocked()
63
      return self.pool._ShouldWorkerTerminateUnlocked(self)
64
    finally:
65
      self.pool._lock.release()
66

    
67
  def _HasRunningTaskUnlocked(self):
68
    """Returns whether this worker is currently running a task.
69

70
    """
71
    return (self._current_task is not None)
72

    
73
  def run(self):
74
    """Main thread function.
75

76
    Waits for new tasks to show up in the queue.
77

78
    """
79
    pool = self.pool
80

    
81
    assert self._current_task is None
82

    
83
    while True:
84
      try:
85
        # Wait on lock to be told either to terminate or to do a task
86
        pool._lock.acquire()
87
        try:
88
          task = pool._WaitForTaskUnlocked(self)
89

    
90
          if task is _TERMINATE:
91
            # Told to terminate
92
            break
93

    
94
          if task is None:
95
            # Spurious notification, ignore
96
            continue
97

    
98
          self._current_task = task
99

    
100
          assert self._HasRunningTaskUnlocked()
101
        finally:
102
          pool._lock.release()
103

    
104
        # Run the actual task
105
        try:
106
          logging.debug("Starting task %r", self._current_task)
107
          self.RunTask(*self._current_task)
108
          logging.debug("Done with task %r", self._current_task)
109
        except: # pylint: disable-msg=W0702
110
          logging.exception("Caught unhandled exception")
111

    
112
        assert self._HasRunningTaskUnlocked()
113
      finally:
114
        # Notify pool
115
        pool._lock.acquire()
116
        try:
117
          if self._current_task:
118
            self._current_task = None
119
            pool._worker_to_pool.notifyAll()
120
        finally:
121
          pool._lock.release()
122

    
123
      assert not self._HasRunningTaskUnlocked()
124

    
125
    logging.debug("Terminates")
126

    
127
  def RunTask(self, *args):
128
    """Function called to start a task.
129

130
    This needs to be implemented by child classes.
131

132
    """
133
    raise NotImplementedError()
134

    
135

    
136
class WorkerPool(object):
137
  """Worker pool with a queue.
138

139
  This class is thread-safe.
140

141
  Tasks are guaranteed to be started in the order in which they're
142
  added to the pool. Due to the nature of threading, they're not
143
  guaranteed to finish in the same order.
144

145
  """
146
  def __init__(self, name, num_workers, worker_class):
147
    """Constructor for worker pool.
148

149
    @param num_workers: number of workers to be started
150
        (dynamic resizing is not yet implemented)
151
    @param worker_class: the class to be instantiated for workers;
152
        should derive from L{BaseWorker}
153

154
    """
155
    # Some of these variables are accessed by BaseWorker
156
    self._lock = threading.Lock()
157
    self._pool_to_pool = threading.Condition(self._lock)
158
    self._pool_to_worker = threading.Condition(self._lock)
159
    self._worker_to_pool = threading.Condition(self._lock)
160
    self._worker_class = worker_class
161
    self._name = name
162
    self._last_worker_id = 0
163
    self._workers = []
164
    self._quiescing = False
165

    
166
    # Terminating workers
167
    self._termworkers = []
168

    
169
    # Queued tasks
170
    self._tasks = collections.deque()
171

    
172
    # Start workers
173
    self.Resize(num_workers)
174

    
175
  # TODO: Implement dynamic resizing?
176

    
177
  def _WaitWhileQuiescingUnlocked(self):
178
    """Wait until the worker pool has finished quiescing.
179

180
    """
181
    while self._quiescing:
182
      self._pool_to_pool.wait()
183

    
184
  def _AddTaskUnlocked(self, args):
185
    assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
186

    
187
    self._tasks.append(args)
188

    
189
    # Notify a waiting worker
190
    self._pool_to_worker.notify()
191

    
192
  def AddTask(self, args):
193
    """Adds a task to the queue.
194

195
    @type args: sequence
196
    @param args: arguments passed to L{BaseWorker.RunTask}
197

198
    """
199
    self._lock.acquire()
200
    try:
201
      self._WaitWhileQuiescingUnlocked()
202
      self._AddTaskUnlocked(args)
203
    finally:
204
      self._lock.release()
205

    
206
  def AddManyTasks(self, tasks):
207
    """Add a list of tasks to the queue.
208

209
    @type tasks: list of tuples
210
    @param tasks: list of args passed to L{BaseWorker.RunTask}
211

212
    """
213
    assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
214
      "Each task must be a sequence"
215

    
216
    self._lock.acquire()
217
    try:
218
      self._WaitWhileQuiescingUnlocked()
219

    
220
      for args in tasks:
221
        self._AddTaskUnlocked(args)
222
    finally:
223
      self._lock.release()
224

    
225
  def _WaitForTaskUnlocked(self, worker):
226
    """Waits for a task for a worker.
227

228
    @type worker: L{BaseWorker}
229
    @param worker: Worker thread
230

231
    """
232
    if self._ShouldWorkerTerminateUnlocked(worker):
233
      return _TERMINATE
234

    
235
    # We only wait if there's no task for us.
236
    if not self._tasks:
237
      logging.debug("Waiting for tasks")
238

    
239
      # wait() releases the lock and sleeps until notified
240
      self._pool_to_worker.wait()
241

    
242
      logging.debug("Notified while waiting")
243

    
244
      # Were we woken up in order to terminate?
245
      if self._ShouldWorkerTerminateUnlocked(worker):
246
        return _TERMINATE
247

    
248
      if not self._tasks:
249
        # Spurious notification, ignore
250
        return None
251

    
252
    # Get task from queue and tell pool about it
253
    try:
254
      return self._tasks.popleft()
255
    finally:
256
      self._worker_to_pool.notifyAll()
257

    
258
  def _ShouldWorkerTerminateUnlocked(self, worker):
259
    """Returns whether a worker should terminate.
260

261
    """
262
    return (worker in self._termworkers)
263

    
264
  def _HasRunningTasksUnlocked(self):
265
    """Checks whether there's a task running in a worker.
266

267
    """
268
    for worker in self._workers + self._termworkers:
269
      if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
270
        return True
271
    return False
272

    
273
  def Quiesce(self):
274
    """Waits until the task queue is empty.
275

276
    """
277
    self._lock.acquire()
278
    try:
279
      self._quiescing = True
280

    
281
      # Wait while there are tasks pending or running
282
      while self._tasks or self._HasRunningTasksUnlocked():
283
        self._worker_to_pool.wait()
284

    
285
    finally:
286
      self._quiescing = False
287

    
288
      # Make sure AddTasks continues in case it was waiting
289
      self._pool_to_pool.notifyAll()
290

    
291
      self._lock.release()
292

    
293
  def _NewWorkerIdUnlocked(self):
294
    """Return an identifier for a new worker.
295

296
    """
297
    self._last_worker_id += 1
298

    
299
    return "%s%d" % (self._name, self._last_worker_id)
300

    
301
  def _ResizeUnlocked(self, num_workers):
302
    """Changes the number of workers.
303

304
    """
305
    assert num_workers >= 0, "num_workers must be >= 0"
306

    
307
    logging.debug("Resizing to %s workers", num_workers)
308

    
309
    current_count = len(self._workers)
310

    
311
    if current_count == num_workers:
312
      # Nothing to do
313
      pass
314

    
315
    elif current_count > num_workers:
316
      if num_workers == 0:
317
        # Create copy of list to iterate over while lock isn't held.
318
        termworkers = self._workers[:]
319
        del self._workers[:]
320
      else:
321
        # TODO: Implement partial downsizing
322
        raise NotImplementedError()
323
        #termworkers = ...
324

    
325
      self._termworkers += termworkers
326

    
327
      # Notify workers that something has changed
328
      self._pool_to_worker.notifyAll()
329

    
330
      # Join all terminating workers
331
      self._lock.release()
332
      try:
333
        for worker in termworkers:
334
          logging.debug("Waiting for thread %s", worker.getName())
335
          worker.join()
336
      finally:
337
        self._lock.acquire()
338

    
339
      # Remove terminated threads. This could be done in a more efficient way
340
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
341
      # don't leave zombie threads around.
342
      for worker in termworkers:
343
        assert worker in self._termworkers, ("Worker not in list of"
344
                                             " terminating workers")
345
        if not worker.isAlive():
346
          self._termworkers.remove(worker)
347

    
348
      assert not self._termworkers, "Zombie worker detected"
349

    
350
    elif current_count < num_workers:
351
      # Create (num_workers - current_count) new workers
352
      for _ in range(num_workers - current_count):
353
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
354
        self._workers.append(worker)
355
        worker.start()
356

    
357
  def Resize(self, num_workers):
358
    """Changes the number of workers in the pool.
359

360
    @param num_workers: the new number of workers
361

362
    """
363
    self._lock.acquire()
364
    try:
365
      return self._ResizeUnlocked(num_workers)
366
    finally:
367
      self._lock.release()
368

    
369
  def TerminateWorkers(self):
370
    """Terminate all worker threads.
371

372
    Unstarted tasks will be ignored.
373

374
    """
375
    logging.debug("Terminating all workers")
376

    
377
    self._lock.acquire()
378
    try:
379
      self._ResizeUnlocked(0)
380

    
381
      if self._tasks:
382
        logging.debug("There are %s tasks left", len(self._tasks))
383
    finally:
384
      self._lock.release()
385

    
386
    logging.debug("All workers terminated")