Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ b8291e00

History | View | Annotate | Download (14.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Base classes for worker pools.
23

24
"""
25

    
26
import logging
27
import threading
28
import heapq
29

    
30
from ganeti import compat
31
from ganeti import errors
32

    
33

    
34
_TERMINATE = object()
35
_DEFAULT_PRIORITY = 0
36

    
37

    
38
class DeferTask(Exception):
39
  """Special exception class to defer a task.
40

41
  This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
42
  task. Optionally, the priority of the task can be changed.
43

44
  """
45
  def __init__(self, priority=None):
46
    """Initializes this class.
47

48
    @type priority: number
49
    @param priority: New task priority (None means no change)
50

51
    """
52
    Exception.__init__(self)
53
    self.priority = priority
54

    
55

    
56
class BaseWorker(threading.Thread, object):
57
  """Base worker class for worker pools.
58

59
  Users of a worker pool must override RunTask in a subclass.
60

61
  """
62
  # pylint: disable=W0212
63
  def __init__(self, pool, worker_id):
64
    """Constructor for BaseWorker thread.
65

66
    @param pool: the parent worker pool
67
    @param worker_id: identifier for this worker
68

69
    """
70
    super(BaseWorker, self).__init__(name=worker_id)
71
    self.pool = pool
72
    self._worker_id = worker_id
73
    self._current_task = None
74

    
75
    assert self.getName() == worker_id
76

    
77
  def ShouldTerminate(self):
78
    """Returns whether this worker should terminate.
79

80
    Should only be called from within L{RunTask}.
81

82
    """
83
    self.pool._lock.acquire()
84
    try:
85
      assert self._HasRunningTaskUnlocked()
86
      return self.pool._ShouldWorkerTerminateUnlocked(self)
87
    finally:
88
      self.pool._lock.release()
89

    
90
  def GetCurrentPriority(self):
91
    """Returns the priority of the current task.
92

93
    Should only be called from within L{RunTask}.
94

95
    """
96
    self.pool._lock.acquire()
97
    try:
98
      assert self._HasRunningTaskUnlocked()
99

    
100
      (priority, _, _) = self._current_task
101

    
102
      return priority
103
    finally:
104
      self.pool._lock.release()
105

    
106
  def SetTaskName(self, taskname):
107
    """Sets the name of the current task.
108

109
    Should only be called from within L{RunTask}.
110

111
    @type taskname: string
112
    @param taskname: Task's name
113

114
    """
115
    if taskname:
116
      name = "%s/%s" % (self._worker_id, taskname)
117
    else:
118
      name = self._worker_id
119

    
120
    # Set thread name
121
    self.setName(name)
122

    
123
  def _HasRunningTaskUnlocked(self):
124
    """Returns whether this worker is currently running a task.
125

126
    """
127
    return (self._current_task is not None)
128

    
129
  def run(self):
130
    """Main thread function.
131

132
    Waits for new tasks to show up in the queue.
133

134
    """
135
    pool = self.pool
136

    
137
    while True:
138
      assert self._current_task is None
139

    
140
      defer = None
141
      try:
142
        # Wait on lock to be told either to terminate or to do a task
143
        pool._lock.acquire()
144
        try:
145
          task = pool._WaitForTaskUnlocked(self)
146

    
147
          if task is _TERMINATE:
148
            # Told to terminate
149
            break
150

    
151
          if task is None:
152
            # Spurious notification, ignore
153
            continue
154

    
155
          self._current_task = task
156

    
157
          # No longer needed, dispose of reference
158
          del task
159

    
160
          assert self._HasRunningTaskUnlocked()
161

    
162
        finally:
163
          pool._lock.release()
164

    
165
        (priority, _, args) = self._current_task
166
        try:
167
          # Run the actual task
168
          assert defer is None
169
          logging.debug("Starting task %r, priority %s", args, priority)
170
          assert self.getName() == self._worker_id
171
          try:
172
            self.RunTask(*args) # pylint: disable=W0142
173
          finally:
174
            self.SetTaskName(None)
175
          logging.debug("Done with task %r, priority %s", args, priority)
176
        except DeferTask, err:
177
          defer = err
178

    
179
          if defer.priority is None:
180
            # Use same priority
181
            defer.priority = priority
182

    
183
          logging.debug("Deferring task %r, new priority %s",
184
                        args, defer.priority)
185

    
186
          assert self._HasRunningTaskUnlocked()
187
        except: # pylint: disable=W0702
188
          logging.exception("Caught unhandled exception")
189

    
190
        assert self._HasRunningTaskUnlocked()
191
      finally:
192
        # Notify pool
193
        pool._lock.acquire()
194
        try:
195
          if defer:
196
            assert self._current_task
197
            # Schedule again for later run
198
            (_, _, args) = self._current_task
199
            pool._AddTaskUnlocked(args, defer.priority)
200

    
201
          if self._current_task:
202
            self._current_task = None
203
            pool._worker_to_pool.notifyAll()
204
        finally:
205
          pool._lock.release()
206

    
207
      assert not self._HasRunningTaskUnlocked()
208

    
209
    logging.debug("Terminates")
210

    
211
  def RunTask(self, *args):
212
    """Function called to start a task.
213

214
    This needs to be implemented by child classes.
215

216
    """
217
    raise NotImplementedError()
218

    
219

    
220
class WorkerPool(object):
221
  """Worker pool with a queue.
222

223
  This class is thread-safe.
224

225
  Tasks are guaranteed to be started in the order in which they're
226
  added to the pool. Due to the nature of threading, they're not
227
  guaranteed to finish in the same order.
228

229
  """
230
  def __init__(self, name, num_workers, worker_class):
231
    """Constructor for worker pool.
232

233
    @param num_workers: number of workers to be started
234
        (dynamic resizing is not yet implemented)
235
    @param worker_class: the class to be instantiated for workers;
236
        should derive from L{BaseWorker}
237

238
    """
239
    # Some of these variables are accessed by BaseWorker
240
    self._lock = threading.Lock()
241
    self._pool_to_pool = threading.Condition(self._lock)
242
    self._pool_to_worker = threading.Condition(self._lock)
243
    self._worker_to_pool = threading.Condition(self._lock)
244
    self._worker_class = worker_class
245
    self._name = name
246
    self._last_worker_id = 0
247
    self._workers = []
248
    self._quiescing = False
249
    self._active = True
250

    
251
    # Terminating workers
252
    self._termworkers = []
253

    
254
    # Queued tasks
255
    self._counter = 0
256
    self._tasks = []
257

    
258
    # Start workers
259
    self.Resize(num_workers)
260

    
261
  # TODO: Implement dynamic resizing?
262

    
263
  def _WaitWhileQuiescingUnlocked(self):
264
    """Wait until the worker pool has finished quiescing.
265

266
    """
267
    while self._quiescing:
268
      self._pool_to_pool.wait()
269

    
270
  def _AddTaskUnlocked(self, args, priority):
271
    """Adds a task to the internal queue.
272

273
    @type args: sequence
274
    @param args: Arguments passed to L{BaseWorker.RunTask}
275
    @type priority: number
276
    @param priority: Task priority
277

278
    """
279
    assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
280
    assert isinstance(priority, (int, long)), "Priority must be numeric"
281

    
282
    # This counter is used to ensure elements are processed in their
283
    # incoming order. For processing they're sorted by priority and then
284
    # counter.
285
    self._counter += 1
286

    
287
    heapq.heappush(self._tasks, (priority, self._counter, args))
288

    
289
    # Notify a waiting worker
290
    self._pool_to_worker.notify()
291

    
292
  def AddTask(self, args, priority=_DEFAULT_PRIORITY):
293
    """Adds a task to the queue.
294

295
    @type args: sequence
296
    @param args: arguments passed to L{BaseWorker.RunTask}
297
    @type priority: number
298
    @param priority: Task priority
299

300
    """
301
    self._lock.acquire()
302
    try:
303
      self._WaitWhileQuiescingUnlocked()
304
      self._AddTaskUnlocked(args, priority)
305
    finally:
306
      self._lock.release()
307

    
308
  def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY):
309
    """Add a list of tasks to the queue.
310

311
    @type tasks: list of tuples
312
    @param tasks: list of args passed to L{BaseWorker.RunTask}
313
    @type priority: number or list of numbers
314
    @param priority: Priority for all added tasks or a list with the priority
315
                     for each task
316

317
    """
318
    assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
319
      "Each task must be a sequence"
320

    
321
    assert (isinstance(priority, (int, long)) or
322
            compat.all(isinstance(prio, (int, long)) for prio in priority)), \
323
           "Priority must be numeric or be a list of numeric values"
324

    
325
    if isinstance(priority, (int, long)):
326
      priority = [priority] * len(tasks)
327
    elif len(priority) != len(tasks):
328
      raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
329
                                   " number of tasks (%s)" %
330
                                   (len(priority), len(tasks)))
331

    
332
    self._lock.acquire()
333
    try:
334
      self._WaitWhileQuiescingUnlocked()
335

    
336
      assert compat.all(isinstance(prio, (int, long)) for prio in priority)
337
      assert len(tasks) == len(priority)
338

    
339
      for args, priority in zip(tasks, priority):
340
        self._AddTaskUnlocked(args, priority)
341
    finally:
342
      self._lock.release()
343

    
344
  def SetActive(self, active):
345
    """Enable/disable processing of tasks.
346

347
    This is different from L{Quiesce} in the sense that this function just
348
    changes an internal flag and doesn't wait for the queue to be empty. Tasks
349
    already being processed continue normally, but no new tasks will be
350
    started. New tasks can still be added.
351

352
    @type active: bool
353
    @param active: Whether tasks should be processed
354

355
    """
356
    self._lock.acquire()
357
    try:
358
      self._active = active
359

    
360
      if active:
361
        # Tell all workers to continue processing
362
        self._pool_to_worker.notifyAll()
363
    finally:
364
      self._lock.release()
365

    
366
  def _WaitForTaskUnlocked(self, worker):
367
    """Waits for a task for a worker.
368

369
    @type worker: L{BaseWorker}
370
    @param worker: Worker thread
371

372
    """
373
    if self._ShouldWorkerTerminateUnlocked(worker):
374
      return _TERMINATE
375

    
376
    # We only wait if there's no task for us.
377
    if not (self._active and self._tasks):
378
      logging.debug("Waiting for tasks")
379

    
380
      while True:
381
        # wait() releases the lock and sleeps until notified
382
        self._pool_to_worker.wait()
383

    
384
        logging.debug("Notified while waiting")
385

    
386
        # Were we woken up in order to terminate?
387
        if self._ShouldWorkerTerminateUnlocked(worker):
388
          return _TERMINATE
389

    
390
        # Just loop if pool is not processing tasks at this time
391
        if self._active and self._tasks:
392
          break
393

    
394
    # Get task from queue and tell pool about it
395
    try:
396
      return heapq.heappop(self._tasks)
397
    finally:
398
      self._worker_to_pool.notifyAll()
399

    
400
  def _ShouldWorkerTerminateUnlocked(self, worker):
401
    """Returns whether a worker should terminate.
402

403
    """
404
    return (worker in self._termworkers)
405

    
406
  def _HasRunningTasksUnlocked(self):
407
    """Checks whether there's a task running in a worker.
408

409
    """
410
    for worker in self._workers + self._termworkers:
411
      if worker._HasRunningTaskUnlocked(): # pylint: disable=W0212
412
        return True
413
    return False
414

    
415
  def HasRunningTasks(self):
416
    """Checks whether there's at least one task running.
417

418
    """
419
    self._lock.acquire()
420
    try:
421
      return self._HasRunningTasksUnlocked()
422
    finally:
423
      self._lock.release()
424

    
425
  def Quiesce(self):
426
    """Waits until the task queue is empty.
427

428
    """
429
    self._lock.acquire()
430
    try:
431
      self._quiescing = True
432

    
433
      # Wait while there are tasks pending or running
434
      while self._tasks or self._HasRunningTasksUnlocked():
435
        self._worker_to_pool.wait()
436

    
437
    finally:
438
      self._quiescing = False
439

    
440
      # Make sure AddTasks continues in case it was waiting
441
      self._pool_to_pool.notifyAll()
442

    
443
      self._lock.release()
444

    
445
  def _NewWorkerIdUnlocked(self):
446
    """Return an identifier for a new worker.
447

448
    """
449
    self._last_worker_id += 1
450

    
451
    return "%s%d" % (self._name, self._last_worker_id)
452

    
453
  def _ResizeUnlocked(self, num_workers):
454
    """Changes the number of workers.
455

456
    """
457
    assert num_workers >= 0, "num_workers must be >= 0"
458

    
459
    logging.debug("Resizing to %s workers", num_workers)
460

    
461
    current_count = len(self._workers)
462

    
463
    if current_count == num_workers:
464
      # Nothing to do
465
      pass
466

    
467
    elif current_count > num_workers:
468
      if num_workers == 0:
469
        # Create copy of list to iterate over while lock isn't held.
470
        termworkers = self._workers[:]
471
        del self._workers[:]
472
      else:
473
        # TODO: Implement partial downsizing
474
        raise NotImplementedError()
475
        #termworkers = ...
476

    
477
      self._termworkers += termworkers
478

    
479
      # Notify workers that something has changed
480
      self._pool_to_worker.notifyAll()
481

    
482
      # Join all terminating workers
483
      self._lock.release()
484
      try:
485
        for worker in termworkers:
486
          logging.debug("Waiting for thread %s", worker.getName())
487
          worker.join()
488
      finally:
489
        self._lock.acquire()
490

    
491
      # Remove terminated threads. This could be done in a more efficient way
492
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
493
      # don't leave zombie threads around.
494
      for worker in termworkers:
495
        assert worker in self._termworkers, ("Worker not in list of"
496
                                             " terminating workers")
497
        if not worker.isAlive():
498
          self._termworkers.remove(worker)
499

    
500
      assert not self._termworkers, "Zombie worker detected"
501

    
502
    elif current_count < num_workers:
503
      # Create (num_workers - current_count) new workers
504
      for _ in range(num_workers - current_count):
505
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
506
        self._workers.append(worker)
507
        worker.start()
508

    
509
  def Resize(self, num_workers):
510
    """Changes the number of workers in the pool.
511

512
    @param num_workers: the new number of workers
513

514
    """
515
    self._lock.acquire()
516
    try:
517
      return self._ResizeUnlocked(num_workers)
518
    finally:
519
      self._lock.release()
520

    
521
  def TerminateWorkers(self):
522
    """Terminate all worker threads.
523

524
    Unstarted tasks will be ignored.
525

526
    """
527
    logging.debug("Terminating all workers")
528

    
529
    self._lock.acquire()
530
    try:
531
      self._ResizeUnlocked(0)
532

    
533
      if self._tasks:
534
        logging.debug("There are %s tasks left", len(self._tasks))
535
    finally:
536
      self._lock.release()
537

    
538
    logging.debug("All workers terminated")