Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ 9f7b4967

History | View | Annotate | Download (9.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Base classes for worker pools.
23

24
"""
25

    
26
import collections
27
import logging
28
import threading
29

    
30
from ganeti import compat
31

    
32

    
33
class BaseWorker(threading.Thread, object):
34
  """Base worker class for worker pools.
35

36
  Users of a worker pool must override RunTask in a subclass.
37

38
  """
39
  # pylint: disable-msg=W0212
40
  def __init__(self, pool, worker_id):
41
    """Constructor for BaseWorker thread.
42

43
    @param pool: the parent worker pool
44
    @param worker_id: identifier for this worker
45

46
    """
47
    super(BaseWorker, self).__init__(name=worker_id)
48
    self.pool = pool
49
    self._current_task = None
50

    
51
  def ShouldTerminate(self):
52
    """Returns whether a worker should terminate.
53

54
    """
55
    return self.pool.ShouldWorkerTerminate(self)
56

    
57
  def _HasRunningTaskUnlocked(self):
58
    """Returns whether this worker is currently running a task.
59

60
    """
61
    return (self._current_task is not None)
62

    
63
  def HasRunningTask(self):
64
    """Returns whether this worker is currently running a task.
65

66
    """
67
    self.pool._lock.acquire()
68
    try:
69
      return self._HasRunningTaskUnlocked()
70
    finally:
71
      self.pool._lock.release()
72

    
73
  def run(self):
74
    """Main thread function.
75

76
    Waits for new tasks to show up in the queue.
77

78
    """
79
    pool = self.pool
80

    
81
    assert not self.HasRunningTask()
82

    
83
    while True:
84
      try:
85
        # We wait on lock to be told either terminate or do a task.
86
        pool._lock.acquire()
87
        try:
88
          if pool._ShouldWorkerTerminateUnlocked(self):
89
            break
90

    
91
          # We only wait if there's no task for us.
92
          if not pool._tasks:
93
            logging.debug("Waiting for tasks")
94

    
95
            # wait() releases the lock and sleeps until notified
96
            pool._pool_to_worker.wait()
97

    
98
            logging.debug("Notified while waiting")
99

    
100
            # Were we woken up in order to terminate?
101
            if pool._ShouldWorkerTerminateUnlocked(self):
102
              break
103

    
104
            if not pool._tasks:
105
              # Spurious notification, ignore
106
              continue
107

    
108
          # Get task from queue and tell pool about it
109
          try:
110
            self._current_task = pool._tasks.popleft()
111
          finally:
112
            pool._worker_to_pool.notifyAll()
113
        finally:
114
          pool._lock.release()
115

    
116
        # Run the actual task
117
        try:
118
          logging.debug("Starting task %r", self._current_task)
119
          self.RunTask(*self._current_task)
120
          logging.debug("Done with task %r", self._current_task)
121
        except: # pylint: disable-msg=W0702
122
          logging.exception("Caught unhandled exception")
123
      finally:
124
        # Notify pool
125
        pool._lock.acquire()
126
        try:
127
          if self._current_task:
128
            self._current_task = None
129
            pool._worker_to_pool.notifyAll()
130
        finally:
131
          pool._lock.release()
132

    
133
    logging.debug("Terminates")
134

    
135
  def RunTask(self, *args):
136
    """Function called to start a task.
137

138
    This needs to be implemented by child classes.
139

140
    """
141
    raise NotImplementedError()
142

    
143

    
144
class WorkerPool(object):
145
  """Worker pool with a queue.
146

147
  This class is thread-safe.
148

149
  Tasks are guaranteed to be started in the order in which they're
150
  added to the pool. Due to the nature of threading, they're not
151
  guaranteed to finish in the same order.
152

153
  """
154
  def __init__(self, name, num_workers, worker_class):
155
    """Constructor for worker pool.
156

157
    @param num_workers: number of workers to be started
158
        (dynamic resizing is not yet implemented)
159
    @param worker_class: the class to be instantiated for workers;
160
        should derive from L{BaseWorker}
161

162
    """
163
    # Some of these variables are accessed by BaseWorker
164
    self._lock = threading.Lock()
165
    self._pool_to_pool = threading.Condition(self._lock)
166
    self._pool_to_worker = threading.Condition(self._lock)
167
    self._worker_to_pool = threading.Condition(self._lock)
168
    self._worker_class = worker_class
169
    self._name = name
170
    self._last_worker_id = 0
171
    self._workers = []
172
    self._quiescing = False
173

    
174
    # Terminating workers
175
    self._termworkers = []
176

    
177
    # Queued tasks
178
    self._tasks = collections.deque()
179

    
180
    # Start workers
181
    self.Resize(num_workers)
182

    
183
  # TODO: Implement dynamic resizing?
184

    
185
  def _WaitWhileQuiescingUnlocked(self):
186
    """Wait until the worker pool has finished quiescing.
187

188
    """
189
    while self._quiescing:
190
      self._pool_to_pool.wait()
191

    
192
  def AddTask(self, *args):
193
    """Adds a task to the queue.
194

195
    @param args: arguments passed to L{BaseWorker.RunTask}
196

197
    """
198
    self._lock.acquire()
199
    try:
200
      self._WaitWhileQuiescingUnlocked()
201

    
202
      self._tasks.append(args)
203

    
204
      # Wake one idling worker up
205
      self._pool_to_worker.notify()
206
    finally:
207
      self._lock.release()
208

    
209
  def AddManyTasks(self, tasks):
210
    """Add a list of tasks to the queue.
211

212
    @type tasks: list of tuples
213
    @param tasks: list of args passed to L{BaseWorker.RunTask}
214

215
    """
216
    assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
217
      "Each task must be a sequence"
218

    
219
    self._lock.acquire()
220
    try:
221
      self._WaitWhileQuiescingUnlocked()
222

    
223
      self._tasks.extend(tasks)
224

    
225
      for _ in tasks:
226
        self._pool_to_worker.notify()
227
    finally:
228
      self._lock.release()
229

    
230
  def _ShouldWorkerTerminateUnlocked(self, worker):
231
    """Returns whether a worker should terminate.
232

233
    """
234
    return (worker in self._termworkers)
235

    
236
  def ShouldWorkerTerminate(self, worker):
237
    """Returns whether a worker should terminate.
238

239
    """
240
    self._lock.acquire()
241
    try:
242
      return self._ShouldWorkerTerminateUnlocked(worker)
243
    finally:
244
      self._lock.release()
245

    
246
  def _HasRunningTasksUnlocked(self):
247
    """Checks whether there's a task running in a worker.
248

249
    """
250
    for worker in self._workers + self._termworkers:
251
      if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
252
        return True
253
    return False
254

    
255
  def Quiesce(self):
256
    """Waits until the task queue is empty.
257

258
    """
259
    self._lock.acquire()
260
    try:
261
      self._quiescing = True
262

    
263
      # Wait while there are tasks pending or running
264
      while self._tasks or self._HasRunningTasksUnlocked():
265
        self._worker_to_pool.wait()
266

    
267
    finally:
268
      self._quiescing = False
269

    
270
      # Make sure AddTasks continues in case it was waiting
271
      self._pool_to_pool.notifyAll()
272

    
273
      self._lock.release()
274

    
275
  def _NewWorkerIdUnlocked(self):
276
    """Return an identifier for a new worker.
277

278
    """
279
    self._last_worker_id += 1
280

    
281
    return "%s%d" % (self._name, self._last_worker_id)
282

    
283
  def _ResizeUnlocked(self, num_workers):
284
    """Changes the number of workers.
285

286
    """
287
    assert num_workers >= 0, "num_workers must be >= 0"
288

    
289
    logging.debug("Resizing to %s workers", num_workers)
290

    
291
    current_count = len(self._workers)
292

    
293
    if current_count == num_workers:
294
      # Nothing to do
295
      pass
296

    
297
    elif current_count > num_workers:
298
      if num_workers == 0:
299
        # Create copy of list to iterate over while lock isn't held.
300
        termworkers = self._workers[:]
301
        del self._workers[:]
302
      else:
303
        # TODO: Implement partial downsizing
304
        raise NotImplementedError()
305
        #termworkers = ...
306

    
307
      self._termworkers += termworkers
308

    
309
      # Notify workers that something has changed
310
      self._pool_to_worker.notifyAll()
311

    
312
      # Join all terminating workers
313
      self._lock.release()
314
      try:
315
        for worker in termworkers:
316
          logging.debug("Waiting for thread %s", worker.getName())
317
          worker.join()
318
      finally:
319
        self._lock.acquire()
320

    
321
      # Remove terminated threads. This could be done in a more efficient way
322
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
323
      # don't leave zombie threads around.
324
      for worker in termworkers:
325
        assert worker in self._termworkers, ("Worker not in list of"
326
                                             " terminating workers")
327
        if not worker.isAlive():
328
          self._termworkers.remove(worker)
329

    
330
      assert not self._termworkers, "Zombie worker detected"
331

    
332
    elif current_count < num_workers:
333
      # Create (num_workers - current_count) new workers
334
      for _ in range(num_workers - current_count):
335
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
336
        self._workers.append(worker)
337
        worker.start()
338

    
339
  def Resize(self, num_workers):
340
    """Changes the number of workers in the pool.
341

342
    @param num_workers: the new number of workers
343

344
    """
345
    self._lock.acquire()
346
    try:
347
      return self._ResizeUnlocked(num_workers)
348
    finally:
349
      self._lock.release()
350

    
351
  def TerminateWorkers(self):
352
    """Terminate all worker threads.
353

354
    Unstarted tasks will be ignored.
355

356
    """
357
    logging.debug("Terminating all workers")
358

    
359
    self._lock.acquire()
360
    try:
361
      self._ResizeUnlocked(0)
362

    
363
      if self._tasks:
364
        logging.debug("There are %s tasks left", len(self._tasks))
365
    finally:
366
      self._lock.release()
367

    
368
    logging.debug("All workers terminated")