Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ ccedb11b

History | View | Annotate | Download (9.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Base classes for worker pools.
23

24
"""
25

    
26
import collections
27
import logging
28
import threading
29

    
30
from ganeti import compat
31

    
32

    
33
_TERMINATE = object()
34

    
35

    
36
class BaseWorker(threading.Thread, object):
37
  """Base worker class for worker pools.
38

39
  Users of a worker pool must override RunTask in a subclass.
40

41
  """
42
  # pylint: disable-msg=W0212
43
  def __init__(self, pool, worker_id):
44
    """Constructor for BaseWorker thread.
45

46
    @param pool: the parent worker pool
47
    @param worker_id: identifier for this worker
48

49
    """
50
    super(BaseWorker, self).__init__(name=worker_id)
51
    self.pool = pool
52
    self._current_task = None
53

    
54
  def ShouldTerminate(self):
55
    """Returns whether a worker should terminate.
56

57
    """
58
    return self.pool.ShouldWorkerTerminate(self)
59

    
60
  def _HasRunningTaskUnlocked(self):
61
    """Returns whether this worker is currently running a task.
62

63
    """
64
    return (self._current_task is not None)
65

    
66
  def run(self):
67
    """Main thread function.
68

69
    Waits for new tasks to show up in the queue.
70

71
    """
72
    pool = self.pool
73

    
74
    assert self._current_task is None
75

    
76
    while True:
77
      try:
78
        # Wait on lock to be told either to terminate or to do a task
79
        pool._lock.acquire()
80
        try:
81
          task = pool._WaitForTaskUnlocked(self)
82

    
83
          if task is _TERMINATE:
84
            # Told to terminate
85
            break
86

    
87
          if task is None:
88
            # Spurious notification, ignore
89
            continue
90

    
91
          self._current_task = task
92

    
93
          assert self._HasRunningTaskUnlocked()
94
        finally:
95
          pool._lock.release()
96

    
97
        # Run the actual task
98
        try:
99
          logging.debug("Starting task %r", self._current_task)
100
          self.RunTask(*self._current_task)
101
          logging.debug("Done with task %r", self._current_task)
102
        except: # pylint: disable-msg=W0702
103
          logging.exception("Caught unhandled exception")
104
      finally:
105
        # Notify pool
106
        pool._lock.acquire()
107
        try:
108
          if self._current_task:
109
            self._current_task = None
110
            pool._worker_to_pool.notifyAll()
111
        finally:
112
          pool._lock.release()
113

    
114
    logging.debug("Terminates")
115

    
116
  def RunTask(self, *args):
117
    """Function called to start a task.
118

119
    This needs to be implemented by child classes.
120

121
    """
122
    raise NotImplementedError()
123

    
124

    
125
class WorkerPool(object):
126
  """Worker pool with a queue.
127

128
  This class is thread-safe.
129

130
  Tasks are guaranteed to be started in the order in which they're
131
  added to the pool. Due to the nature of threading, they're not
132
  guaranteed to finish in the same order.
133

134
  """
135
  def __init__(self, name, num_workers, worker_class):
136
    """Constructor for worker pool.
137

138
    @param num_workers: number of workers to be started
139
        (dynamic resizing is not yet implemented)
140
    @param worker_class: the class to be instantiated for workers;
141
        should derive from L{BaseWorker}
142

143
    """
144
    # Some of these variables are accessed by BaseWorker
145
    self._lock = threading.Lock()
146
    self._pool_to_pool = threading.Condition(self._lock)
147
    self._pool_to_worker = threading.Condition(self._lock)
148
    self._worker_to_pool = threading.Condition(self._lock)
149
    self._worker_class = worker_class
150
    self._name = name
151
    self._last_worker_id = 0
152
    self._workers = []
153
    self._quiescing = False
154

    
155
    # Terminating workers
156
    self._termworkers = []
157

    
158
    # Queued tasks
159
    self._tasks = collections.deque()
160

    
161
    # Start workers
162
    self.Resize(num_workers)
163

    
164
  # TODO: Implement dynamic resizing?
165

    
166
  def _WaitWhileQuiescingUnlocked(self):
167
    """Wait until the worker pool has finished quiescing.
168

169
    """
170
    while self._quiescing:
171
      self._pool_to_pool.wait()
172

    
173
  def _AddTaskUnlocked(self, args):
174
    assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
175

    
176
    self._tasks.append(args)
177

    
178
    # Notify a waiting worker
179
    self._pool_to_worker.notify()
180

    
181
  def AddTask(self, *args):
182
    """Adds a task to the queue.
183

184
    @param args: arguments passed to L{BaseWorker.RunTask}
185

186
    """
187
    self._lock.acquire()
188
    try:
189
      self._WaitWhileQuiescingUnlocked()
190
      self._AddTaskUnlocked(args)
191
    finally:
192
      self._lock.release()
193

    
194
  def AddManyTasks(self, tasks):
195
    """Add a list of tasks to the queue.
196

197
    @type tasks: list of tuples
198
    @param tasks: list of args passed to L{BaseWorker.RunTask}
199

200
    """
201
    assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
202
      "Each task must be a sequence"
203

    
204
    self._lock.acquire()
205
    try:
206
      self._WaitWhileQuiescingUnlocked()
207

    
208
      for args in tasks:
209
        self._AddTaskUnlocked(args)
210
    finally:
211
      self._lock.release()
212

    
213
  def _WaitForTaskUnlocked(self, worker):
214
    """Waits for a task for a worker.
215

216
    @type worker: L{BaseWorker}
217
    @param worker: Worker thread
218

219
    """
220
    if self._ShouldWorkerTerminateUnlocked(worker):
221
      return _TERMINATE
222

    
223
    # We only wait if there's no task for us.
224
    if not self._tasks:
225
      logging.debug("Waiting for tasks")
226

    
227
      # wait() releases the lock and sleeps until notified
228
      self._pool_to_worker.wait()
229

    
230
      logging.debug("Notified while waiting")
231

    
232
      # Were we woken up in order to terminate?
233
      if self._ShouldWorkerTerminateUnlocked(worker):
234
        return _TERMINATE
235

    
236
      if not self._tasks:
237
        # Spurious notification, ignore
238
        return None
239

    
240
    # Get task from queue and tell pool about it
241
    try:
242
      return self._tasks.popleft()
243
    finally:
244
      self._worker_to_pool.notifyAll()
245

    
246
  def _ShouldWorkerTerminateUnlocked(self, worker):
247
    """Returns whether a worker should terminate.
248

249
    """
250
    return (worker in self._termworkers)
251

    
252
  def ShouldWorkerTerminate(self, worker):
253
    """Returns whether a worker should terminate.
254

255
    """
256
    self._lock.acquire()
257
    try:
258
      return self._ShouldWorkerTerminateUnlocked(worker)
259
    finally:
260
      self._lock.release()
261

    
262
  def _HasRunningTasksUnlocked(self):
263
    """Checks whether there's a task running in a worker.
264

265
    """
266
    for worker in self._workers + self._termworkers:
267
      if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
268
        return True
269
    return False
270

    
271
  def Quiesce(self):
272
    """Waits until the task queue is empty.
273

274
    """
275
    self._lock.acquire()
276
    try:
277
      self._quiescing = True
278

    
279
      # Wait while there are tasks pending or running
280
      while self._tasks or self._HasRunningTasksUnlocked():
281
        self._worker_to_pool.wait()
282

    
283
    finally:
284
      self._quiescing = False
285

    
286
      # Make sure AddTasks continues in case it was waiting
287
      self._pool_to_pool.notifyAll()
288

    
289
      self._lock.release()
290

    
291
  def _NewWorkerIdUnlocked(self):
292
    """Return an identifier for a new worker.
293

294
    """
295
    self._last_worker_id += 1
296

    
297
    return "%s%d" % (self._name, self._last_worker_id)
298

    
299
  def _ResizeUnlocked(self, num_workers):
300
    """Changes the number of workers.
301

302
    """
303
    assert num_workers >= 0, "num_workers must be >= 0"
304

    
305
    logging.debug("Resizing to %s workers", num_workers)
306

    
307
    current_count = len(self._workers)
308

    
309
    if current_count == num_workers:
310
      # Nothing to do
311
      pass
312

    
313
    elif current_count > num_workers:
314
      if num_workers == 0:
315
        # Create copy of list to iterate over while lock isn't held.
316
        termworkers = self._workers[:]
317
        del self._workers[:]
318
      else:
319
        # TODO: Implement partial downsizing
320
        raise NotImplementedError()
321
        #termworkers = ...
322

    
323
      self._termworkers += termworkers
324

    
325
      # Notify workers that something has changed
326
      self._pool_to_worker.notifyAll()
327

    
328
      # Join all terminating workers
329
      self._lock.release()
330
      try:
331
        for worker in termworkers:
332
          logging.debug("Waiting for thread %s", worker.getName())
333
          worker.join()
334
      finally:
335
        self._lock.acquire()
336

    
337
      # Remove terminated threads. This could be done in a more efficient way
338
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
339
      # don't leave zombie threads around.
340
      for worker in termworkers:
341
        assert worker in self._termworkers, ("Worker not in list of"
342
                                             " terminating workers")
343
        if not worker.isAlive():
344
          self._termworkers.remove(worker)
345

    
346
      assert not self._termworkers, "Zombie worker detected"
347

    
348
    elif current_count < num_workers:
349
      # Create (num_workers - current_count) new workers
350
      for _ in range(num_workers - current_count):
351
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
352
        self._workers.append(worker)
353
        worker.start()
354

    
355
  def Resize(self, num_workers):
356
    """Changes the number of workers in the pool.
357

358
    @param num_workers: the new number of workers
359

360
    """
361
    self._lock.acquire()
362
    try:
363
      return self._ResizeUnlocked(num_workers)
364
    finally:
365
      self._lock.release()
366

    
367
  def TerminateWorkers(self):
368
    """Terminate all worker threads.
369

370
    Unstarted tasks will be ignored.
371

372
    """
373
    logging.debug("Terminating all workers")
374

    
375
    self._lock.acquire()
376
    try:
377
      self._ResizeUnlocked(0)
378

    
379
      if self._tasks:
380
        logging.debug("There are %s tasks left", len(self._tasks))
381
    finally:
382
      self._lock.release()
383

    
384
    logging.debug("All workers terminated")