Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ 189d2714

History | View | Annotate | Download (9.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Base classes for worker pools.
23

24
"""
25

    
26
import collections
27
import logging
28
import threading
29

    
30
from ganeti import compat
31

    
32

    
33
class BaseWorker(threading.Thread, object):
34
  """Base worker class for worker pools.
35

36
  Users of a worker pool must override RunTask in a subclass.
37

38
  """
39
  # pylint: disable-msg=W0212
40
  def __init__(self, pool, worker_id):
41
    """Constructor for BaseWorker thread.
42

43
    @param pool: the parent worker pool
44
    @param worker_id: identifier for this worker
45

46
    """
47
    super(BaseWorker, self).__init__(name=worker_id)
48
    self.pool = pool
49
    self._current_task = None
50

    
51
  def ShouldTerminate(self):
52
    """Returns whether a worker should terminate.
53

54
    """
55
    return self.pool.ShouldWorkerTerminate(self)
56

    
57
  def _HasRunningTaskUnlocked(self):
58
    """Returns whether this worker is currently running a task.
59

60
    """
61
    return (self._current_task is not None)
62

    
63
  def HasRunningTask(self):
64
    """Returns whether this worker is currently running a task.
65

66
    """
67
    self.pool._lock.acquire()
68
    try:
69
      return self._HasRunningTaskUnlocked()
70
    finally:
71
      self.pool._lock.release()
72

    
73
  def run(self):
74
    """Main thread function.
75

76
    Waits for new tasks to show up in the queue.
77

78
    """
79
    pool = self.pool
80

    
81
    assert not self.HasRunningTask()
82

    
83
    while True:
84
      try:
85
        # We wait on lock to be told either terminate or do a task.
86
        pool._lock.acquire()
87
        try:
88
          if pool._ShouldWorkerTerminateUnlocked(self):
89
            break
90

    
91
          # We only wait if there's no task for us.
92
          if not pool._tasks:
93
            logging.debug("Waiting for tasks")
94

    
95
            # wait() releases the lock and sleeps until notified
96
            pool._pool_to_worker.wait()
97

    
98
            logging.debug("Notified while waiting")
99

    
100
            # Were we woken up in order to terminate?
101
            if pool._ShouldWorkerTerminateUnlocked(self):
102
              break
103

    
104
            if not pool._tasks:
105
              # Spurious notification, ignore
106
              continue
107

    
108
          # Get task from queue and tell pool about it
109
          try:
110
            self._current_task = pool._tasks.popleft()
111
          finally:
112
            pool._worker_to_pool.notifyAll()
113
        finally:
114
          pool._lock.release()
115

    
116
        # Run the actual task
117
        try:
118
          logging.debug("Starting task %r", self._current_task)
119
          self.RunTask(*self._current_task)
120
          logging.debug("Done with task %r", self._current_task)
121
        except: # pylint: disable-msg=W0702
122
          logging.exception("Caught unhandled exception")
123
      finally:
124
        # Notify pool
125
        pool._lock.acquire()
126
        try:
127
          if self._current_task:
128
            self._current_task = None
129
            pool._worker_to_pool.notifyAll()
130
        finally:
131
          pool._lock.release()
132

    
133
    logging.debug("Terminates")
134

    
135
  def RunTask(self, *args):
136
    """Function called to start a task.
137

138
    This needs to be implemented by child classes.
139

140
    """
141
    raise NotImplementedError()
142

    
143

    
144
class WorkerPool(object):
145
  """Worker pool with a queue.
146

147
  This class is thread-safe.
148

149
  Tasks are guaranteed to be started in the order in which they're
150
  added to the pool. Due to the nature of threading, they're not
151
  guaranteed to finish in the same order.
152

153
  """
154
  def __init__(self, name, num_workers, worker_class):
155
    """Constructor for worker pool.
156

157
    @param num_workers: number of workers to be started
158
        (dynamic resizing is not yet implemented)
159
    @param worker_class: the class to be instantiated for workers;
160
        should derive from L{BaseWorker}
161

162
    """
163
    # Some of these variables are accessed by BaseWorker
164
    self._lock = threading.Lock()
165
    self._pool_to_pool = threading.Condition(self._lock)
166
    self._pool_to_worker = threading.Condition(self._lock)
167
    self._worker_to_pool = threading.Condition(self._lock)
168
    self._worker_class = worker_class
169
    self._name = name
170
    self._last_worker_id = 0
171
    self._workers = []
172
    self._quiescing = False
173

    
174
    # Terminating workers
175
    self._termworkers = []
176

    
177
    # Queued tasks
178
    self._tasks = collections.deque()
179

    
180
    # Start workers
181
    self.Resize(num_workers)
182

    
183
  # TODO: Implement dynamic resizing?
184

    
185
  def _WaitWhileQuiescingUnlocked(self):
186
    """Wait until the worker pool has finished quiescing.
187

188
    """
189
    while self._quiescing:
190
      self._pool_to_pool.wait()
191

    
192
  def _AddTaskUnlocked(self, args):
193
    assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
194

    
195
    self._tasks.append(args)
196

    
197
    # Notify a waiting worker
198
    self._pool_to_worker.notify()
199

    
200
  def AddTask(self, *args):
201
    """Adds a task to the queue.
202

203
    @param args: arguments passed to L{BaseWorker.RunTask}
204

205
    """
206
    self._lock.acquire()
207
    try:
208
      self._WaitWhileQuiescingUnlocked()
209
      self._AddTaskUnlocked(args)
210
    finally:
211
      self._lock.release()
212

    
213
  def AddManyTasks(self, tasks):
214
    """Add a list of tasks to the queue.
215

216
    @type tasks: list of tuples
217
    @param tasks: list of args passed to L{BaseWorker.RunTask}
218

219
    """
220
    assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
221
      "Each task must be a sequence"
222

    
223
    self._lock.acquire()
224
    try:
225
      self._WaitWhileQuiescingUnlocked()
226

    
227
      for args in tasks:
228
        self._AddTaskUnlocked(args)
229
    finally:
230
      self._lock.release()
231

    
232
  def _ShouldWorkerTerminateUnlocked(self, worker):
233
    """Returns whether a worker should terminate.
234

235
    """
236
    return (worker in self._termworkers)
237

    
238
  def ShouldWorkerTerminate(self, worker):
239
    """Returns whether a worker should terminate.
240

241
    """
242
    self._lock.acquire()
243
    try:
244
      return self._ShouldWorkerTerminateUnlocked(worker)
245
    finally:
246
      self._lock.release()
247

    
248
  def _HasRunningTasksUnlocked(self):
249
    """Checks whether there's a task running in a worker.
250

251
    """
252
    for worker in self._workers + self._termworkers:
253
      if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
254
        return True
255
    return False
256

    
257
  def Quiesce(self):
258
    """Waits until the task queue is empty.
259

260
    """
261
    self._lock.acquire()
262
    try:
263
      self._quiescing = True
264

    
265
      # Wait while there are tasks pending or running
266
      while self._tasks or self._HasRunningTasksUnlocked():
267
        self._worker_to_pool.wait()
268

    
269
    finally:
270
      self._quiescing = False
271

    
272
      # Make sure AddTasks continues in case it was waiting
273
      self._pool_to_pool.notifyAll()
274

    
275
      self._lock.release()
276

    
277
  def _NewWorkerIdUnlocked(self):
278
    """Return an identifier for a new worker.
279

280
    """
281
    self._last_worker_id += 1
282

    
283
    return "%s%d" % (self._name, self._last_worker_id)
284

    
285
  def _ResizeUnlocked(self, num_workers):
286
    """Changes the number of workers.
287

288
    """
289
    assert num_workers >= 0, "num_workers must be >= 0"
290

    
291
    logging.debug("Resizing to %s workers", num_workers)
292

    
293
    current_count = len(self._workers)
294

    
295
    if current_count == num_workers:
296
      # Nothing to do
297
      pass
298

    
299
    elif current_count > num_workers:
300
      if num_workers == 0:
301
        # Create copy of list to iterate over while lock isn't held.
302
        termworkers = self._workers[:]
303
        del self._workers[:]
304
      else:
305
        # TODO: Implement partial downsizing
306
        raise NotImplementedError()
307
        #termworkers = ...
308

    
309
      self._termworkers += termworkers
310

    
311
      # Notify workers that something has changed
312
      self._pool_to_worker.notifyAll()
313

    
314
      # Join all terminating workers
315
      self._lock.release()
316
      try:
317
        for worker in termworkers:
318
          logging.debug("Waiting for thread %s", worker.getName())
319
          worker.join()
320
      finally:
321
        self._lock.acquire()
322

    
323
      # Remove terminated threads. This could be done in a more efficient way
324
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
325
      # don't leave zombie threads around.
326
      for worker in termworkers:
327
        assert worker in self._termworkers, ("Worker not in list of"
328
                                             " terminating workers")
329
        if not worker.isAlive():
330
          self._termworkers.remove(worker)
331

    
332
      assert not self._termworkers, "Zombie worker detected"
333

    
334
    elif current_count < num_workers:
335
      # Create (num_workers - current_count) new workers
336
      for _ in range(num_workers - current_count):
337
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
338
        self._workers.append(worker)
339
        worker.start()
340

    
341
  def Resize(self, num_workers):
342
    """Changes the number of workers in the pool.
343

344
    @param num_workers: the new number of workers
345

346
    """
347
    self._lock.acquire()
348
    try:
349
      return self._ResizeUnlocked(num_workers)
350
    finally:
351
      self._lock.release()
352

    
353
  def TerminateWorkers(self):
354
    """Terminate all worker threads.
355

356
    Unstarted tasks will be ignored.
357

358
    """
359
    logging.debug("Terminating all workers")
360

    
361
    self._lock.acquire()
362
    try:
363
      self._ResizeUnlocked(0)
364

    
365
      if self._tasks:
366
        logging.debug("There are %s tasks left", len(self._tasks))
367
    finally:
368
      self._lock.release()
369

    
370
    logging.debug("All workers terminated")