Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ daba67c7

History | View | Annotate | Download (10.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Base classes for worker pools.
23

24
"""
25

    
26
import collections
27
import logging
28
import threading
29

    
30
from ganeti import compat
31

    
32

    
33
_TERMINATE = object()
34

    
35

    
36
class BaseWorker(threading.Thread, object):
37
  """Base worker class for worker pools.
38

39
  Users of a worker pool must override RunTask in a subclass.
40

41
  """
42
  # pylint: disable-msg=W0212
43
  def __init__(self, pool, worker_id):
44
    """Constructor for BaseWorker thread.
45

46
    @param pool: the parent worker pool
47
    @param worker_id: identifier for this worker
48

49
    """
50
    super(BaseWorker, self).__init__(name=worker_id)
51
    self.pool = pool
52
    self._worker_id = worker_id
53
    self._current_task = None
54

    
55
    assert self.getName() == worker_id
56

    
57
  def ShouldTerminate(self):
58
    """Returns whether this worker should terminate.
59

60
    Should only be called from within L{RunTask}.
61

62
    """
63
    self.pool._lock.acquire()
64
    try:
65
      assert self._HasRunningTaskUnlocked()
66
      return self.pool._ShouldWorkerTerminateUnlocked(self)
67
    finally:
68
      self.pool._lock.release()
69

    
70
  def SetTaskName(self, taskname):
71
    """Sets the name of the current task.
72

73
    Should only be called from within L{RunTask}.
74

75
    @type taskname: string
76
    @param taskname: Task's name
77

78
    """
79
    if taskname:
80
      name = "%s/%s" % (self._worker_id, taskname)
81
    else:
82
      name = self._worker_id
83

    
84
    # Set thread name
85
    self.setName(name)
86

    
87
  def _HasRunningTaskUnlocked(self):
88
    """Returns whether this worker is currently running a task.
89

90
    """
91
    return (self._current_task is not None)
92

    
93
  def run(self):
94
    """Main thread function.
95

96
    Waits for new tasks to show up in the queue.
97

98
    """
99
    pool = self.pool
100

    
101
    while True:
102
      assert self._current_task is None
103
      try:
104
        # Wait on lock to be told either to terminate or to do a task
105
        pool._lock.acquire()
106
        try:
107
          task = pool._WaitForTaskUnlocked(self)
108

    
109
          if task is _TERMINATE:
110
            # Told to terminate
111
            break
112

    
113
          if task is None:
114
            # Spurious notification, ignore
115
            continue
116

    
117
          self._current_task = task
118

    
119
          # No longer needed, dispose of reference
120
          del task
121

    
122
          assert self._HasRunningTaskUnlocked()
123

    
124
        finally:
125
          pool._lock.release()
126

    
127
        # Run the actual task
128
        try:
129
          logging.debug("Starting task %r", self._current_task)
130
          assert self.getName() == self._worker_id
131
          try:
132
            self.RunTask(*self._current_task)
133
          finally:
134
            self.SetTaskName(None)
135
          logging.debug("Done with task %r", self._current_task)
136
        except: # pylint: disable-msg=W0702
137
          logging.exception("Caught unhandled exception")
138

    
139
        assert self._HasRunningTaskUnlocked()
140
      finally:
141
        # Notify pool
142
        pool._lock.acquire()
143
        try:
144
          if self._current_task:
145
            self._current_task = None
146
            pool._worker_to_pool.notifyAll()
147
        finally:
148
          pool._lock.release()
149

    
150
      assert not self._HasRunningTaskUnlocked()
151

    
152
    logging.debug("Terminates")
153

    
154
  def RunTask(self, *args):
155
    """Function called to start a task.
156

157
    This needs to be implemented by child classes.
158

159
    """
160
    raise NotImplementedError()
161

    
162

    
163
class WorkerPool(object):
164
  """Worker pool with a queue.
165

166
  This class is thread-safe.
167

168
  Tasks are guaranteed to be started in the order in which they're
169
  added to the pool. Due to the nature of threading, they're not
170
  guaranteed to finish in the same order.
171

172
  """
173
  def __init__(self, name, num_workers, worker_class):
174
    """Constructor for worker pool.
175

176
    @param num_workers: number of workers to be started
177
        (dynamic resizing is not yet implemented)
178
    @param worker_class: the class to be instantiated for workers;
179
        should derive from L{BaseWorker}
180

181
    """
182
    # Some of these variables are accessed by BaseWorker
183
    self._lock = threading.Lock()
184
    self._pool_to_pool = threading.Condition(self._lock)
185
    self._pool_to_worker = threading.Condition(self._lock)
186
    self._worker_to_pool = threading.Condition(self._lock)
187
    self._worker_class = worker_class
188
    self._name = name
189
    self._last_worker_id = 0
190
    self._workers = []
191
    self._quiescing = False
192

    
193
    # Terminating workers
194
    self._termworkers = []
195

    
196
    # Queued tasks
197
    self._tasks = collections.deque()
198

    
199
    # Start workers
200
    self.Resize(num_workers)
201

    
202
  # TODO: Implement dynamic resizing?
203

    
204
  def _WaitWhileQuiescingUnlocked(self):
205
    """Wait until the worker pool has finished quiescing.
206

207
    """
208
    while self._quiescing:
209
      self._pool_to_pool.wait()
210

    
211
  def _AddTaskUnlocked(self, args):
212
    assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
213

    
214
    self._tasks.append(args)
215

    
216
    # Notify a waiting worker
217
    self._pool_to_worker.notify()
218

    
219
  def AddTask(self, args):
220
    """Adds a task to the queue.
221

222
    @type args: sequence
223
    @param args: arguments passed to L{BaseWorker.RunTask}
224

225
    """
226
    self._lock.acquire()
227
    try:
228
      self._WaitWhileQuiescingUnlocked()
229
      self._AddTaskUnlocked(args)
230
    finally:
231
      self._lock.release()
232

    
233
  def AddManyTasks(self, tasks):
234
    """Add a list of tasks to the queue.
235

236
    @type tasks: list of tuples
237
    @param tasks: list of args passed to L{BaseWorker.RunTask}
238

239
    """
240
    assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
241
      "Each task must be a sequence"
242

    
243
    self._lock.acquire()
244
    try:
245
      self._WaitWhileQuiescingUnlocked()
246

    
247
      for args in tasks:
248
        self._AddTaskUnlocked(args)
249
    finally:
250
      self._lock.release()
251

    
252
  def _WaitForTaskUnlocked(self, worker):
253
    """Waits for a task for a worker.
254

255
    @type worker: L{BaseWorker}
256
    @param worker: Worker thread
257

258
    """
259
    if self._ShouldWorkerTerminateUnlocked(worker):
260
      return _TERMINATE
261

    
262
    # We only wait if there's no task for us.
263
    if not self._tasks:
264
      logging.debug("Waiting for tasks")
265

    
266
      # wait() releases the lock and sleeps until notified
267
      self._pool_to_worker.wait()
268

    
269
      logging.debug("Notified while waiting")
270

    
271
      # Were we woken up in order to terminate?
272
      if self._ShouldWorkerTerminateUnlocked(worker):
273
        return _TERMINATE
274

    
275
      if not self._tasks:
276
        # Spurious notification, ignore
277
        return None
278

    
279
    # Get task from queue and tell pool about it
280
    try:
281
      return self._tasks.popleft()
282
    finally:
283
      self._worker_to_pool.notifyAll()
284

    
285
  def _ShouldWorkerTerminateUnlocked(self, worker):
286
    """Returns whether a worker should terminate.
287

288
    """
289
    return (worker in self._termworkers)
290

    
291
  def _HasRunningTasksUnlocked(self):
292
    """Checks whether there's a task running in a worker.
293

294
    """
295
    for worker in self._workers + self._termworkers:
296
      if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
297
        return True
298
    return False
299

    
300
  def Quiesce(self):
301
    """Waits until the task queue is empty.
302

303
    """
304
    self._lock.acquire()
305
    try:
306
      self._quiescing = True
307

    
308
      # Wait while there are tasks pending or running
309
      while self._tasks or self._HasRunningTasksUnlocked():
310
        self._worker_to_pool.wait()
311

    
312
    finally:
313
      self._quiescing = False
314

    
315
      # Make sure AddTasks continues in case it was waiting
316
      self._pool_to_pool.notifyAll()
317

    
318
      self._lock.release()
319

    
320
  def _NewWorkerIdUnlocked(self):
321
    """Return an identifier for a new worker.
322

323
    """
324
    self._last_worker_id += 1
325

    
326
    return "%s%d" % (self._name, self._last_worker_id)
327

    
328
  def _ResizeUnlocked(self, num_workers):
329
    """Changes the number of workers.
330

331
    """
332
    assert num_workers >= 0, "num_workers must be >= 0"
333

    
334
    logging.debug("Resizing to %s workers", num_workers)
335

    
336
    current_count = len(self._workers)
337

    
338
    if current_count == num_workers:
339
      # Nothing to do
340
      pass
341

    
342
    elif current_count > num_workers:
343
      if num_workers == 0:
344
        # Create copy of list to iterate over while lock isn't held.
345
        termworkers = self._workers[:]
346
        del self._workers[:]
347
      else:
348
        # TODO: Implement partial downsizing
349
        raise NotImplementedError()
350
        #termworkers = ...
351

    
352
      self._termworkers += termworkers
353

    
354
      # Notify workers that something has changed
355
      self._pool_to_worker.notifyAll()
356

    
357
      # Join all terminating workers
358
      self._lock.release()
359
      try:
360
        for worker in termworkers:
361
          logging.debug("Waiting for thread %s", worker.getName())
362
          worker.join()
363
      finally:
364
        self._lock.acquire()
365

    
366
      # Remove terminated threads. This could be done in a more efficient way
367
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
368
      # don't leave zombie threads around.
369
      for worker in termworkers:
370
        assert worker in self._termworkers, ("Worker not in list of"
371
                                             " terminating workers")
372
        if not worker.isAlive():
373
          self._termworkers.remove(worker)
374

    
375
      assert not self._termworkers, "Zombie worker detected"
376

    
377
    elif current_count < num_workers:
378
      # Create (num_workers - current_count) new workers
379
      for _ in range(num_workers - current_count):
380
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
381
        self._workers.append(worker)
382
        worker.start()
383

    
384
  def Resize(self, num_workers):
385
    """Changes the number of workers in the pool.
386

387
    @param num_workers: the new number of workers
388

389
    """
390
    self._lock.acquire()
391
    try:
392
      return self._ResizeUnlocked(num_workers)
393
    finally:
394
      self._lock.release()
395

    
396
  def TerminateWorkers(self):
397
    """Terminate all worker threads.
398

399
    Unstarted tasks will be ignored.
400

401
    """
402
    logging.debug("Terminating all workers")
403

    
404
    self._lock.acquire()
405
    try:
406
      self._ResizeUnlocked(0)
407

    
408
      if self._tasks:
409
        logging.debug("There are %s tasks left", len(self._tasks))
410
    finally:
411
      self._lock.release()
412

    
413
    logging.debug("All workers terminated")