Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ 9a2564e7

History | View | Annotate | Download (17.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Base classes for worker pools.
23

24
"""
25

    
26
import logging
27
import threading
28
import heapq
29
import itertools
30

    
31
from ganeti import compat
32
from ganeti import errors
33

    
34

    
35
_TERMINATE = object()
36
_DEFAULT_PRIORITY = 0
37

    
38

    
39
class DeferTask(Exception):
40
  """Special exception class to defer a task.
41

42
  This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
43
  task. Optionally, the priority of the task can be changed.
44

45
  """
46
  def __init__(self, priority=None):
47
    """Initializes this class.
48

49
    @type priority: number
50
    @param priority: New task priority (None means no change)
51

52
    """
53
    Exception.__init__(self)
54
    self.priority = priority
55

    
56

    
57
class NoSuchTask(Exception):
58
  """Exception raised when a task can't be found.
59

60
  """
61

    
62

    
63
class BaseWorker(threading.Thread, object):
64
  """Base worker class for worker pools.
65

66
  Users of a worker pool must override RunTask in a subclass.
67

68
  """
69
  # pylint: disable=W0212
70
  def __init__(self, pool, worker_id):
71
    """Constructor for BaseWorker thread.
72

73
    @param pool: the parent worker pool
74
    @param worker_id: identifier for this worker
75

76
    """
77
    super(BaseWorker, self).__init__(name=worker_id)
78
    self.pool = pool
79
    self._worker_id = worker_id
80
    self._current_task = None
81

    
82
    assert self.getName() == worker_id
83

    
84
  def ShouldTerminate(self):
85
    """Returns whether this worker should terminate.
86

87
    Should only be called from within L{RunTask}.
88

89
    """
90
    self.pool._lock.acquire()
91
    try:
92
      assert self._HasRunningTaskUnlocked()
93
      return self.pool._ShouldWorkerTerminateUnlocked(self)
94
    finally:
95
      self.pool._lock.release()
96

    
97
  def GetCurrentPriority(self):
98
    """Returns the priority of the current task.
99

100
    Should only be called from within L{RunTask}.
101

102
    """
103
    self.pool._lock.acquire()
104
    try:
105
      assert self._HasRunningTaskUnlocked()
106

    
107
      (priority, _, _, _) = self._current_task
108

    
109
      return priority
110
    finally:
111
      self.pool._lock.release()
112

    
113
  def SetTaskName(self, taskname):
114
    """Sets the name of the current task.
115

116
    Should only be called from within L{RunTask}.
117

118
    @type taskname: string
119
    @param taskname: Task's name
120

121
    """
122
    if taskname:
123
      name = "%s/%s" % (self._worker_id, taskname)
124
    else:
125
      name = self._worker_id
126

    
127
    # Set thread name
128
    self.setName(name)
129

    
130
  def _HasRunningTaskUnlocked(self):
131
    """Returns whether this worker is currently running a task.
132

133
    """
134
    return (self._current_task is not None)
135

    
136
  def run(self):
137
    """Main thread function.
138

139
    Waits for new tasks to show up in the queue.
140

141
    """
142
    pool = self.pool
143

    
144
    while True:
145
      assert self._current_task is None
146

    
147
      defer = None
148
      try:
149
        # Wait on lock to be told either to terminate or to do a task
150
        pool._lock.acquire()
151
        try:
152
          task = pool._WaitForTaskUnlocked(self)
153

    
154
          if task is _TERMINATE:
155
            # Told to terminate
156
            break
157

    
158
          if task is None:
159
            # Spurious notification, ignore
160
            continue
161

    
162
          self._current_task = task
163

    
164
          # No longer needed, dispose of reference
165
          del task
166

    
167
          assert self._HasRunningTaskUnlocked()
168

    
169
        finally:
170
          pool._lock.release()
171

    
172
        (priority, _, _, args) = self._current_task
173
        try:
174
          # Run the actual task
175
          assert defer is None
176
          logging.debug("Starting task %r, priority %s", args, priority)
177
          assert self.getName() == self._worker_id
178
          try:
179
            self.RunTask(*args) # pylint: disable=W0142
180
          finally:
181
            self.SetTaskName(None)
182
          logging.debug("Done with task %r, priority %s", args, priority)
183
        except DeferTask, err:
184
          defer = err
185

    
186
          if defer.priority is None:
187
            # Use same priority
188
            defer.priority = priority
189

    
190
          logging.debug("Deferring task %r, new priority %s",
191
                        args, defer.priority)
192

    
193
          assert self._HasRunningTaskUnlocked()
194
        except: # pylint: disable=W0702
195
          logging.exception("Caught unhandled exception")
196

    
197
        assert self._HasRunningTaskUnlocked()
198
      finally:
199
        # Notify pool
200
        pool._lock.acquire()
201
        try:
202
          if defer:
203
            assert self._current_task
204
            # Schedule again for later run
205
            (_, _, _, args) = self._current_task
206
            pool._AddTaskUnlocked(args, defer.priority, None)
207

    
208
          if self._current_task:
209
            self._current_task = None
210
            pool._worker_to_pool.notifyAll()
211
        finally:
212
          pool._lock.release()
213

    
214
      assert not self._HasRunningTaskUnlocked()
215

    
216
    logging.debug("Terminates")
217

    
218
  def RunTask(self, *args):
219
    """Function called to start a task.
220

221
    This needs to be implemented by child classes.
222

223
    """
224
    raise NotImplementedError()
225

    
226

    
227
class WorkerPool(object):
228
  """Worker pool with a queue.
229

230
  This class is thread-safe.
231

232
  Tasks are guaranteed to be started in the order in which they're
233
  added to the pool. Due to the nature of threading, they're not
234
  guaranteed to finish in the same order.
235

236
  @type _tasks: list of tuples
237
  @ivar _tasks: Each tuple has the format (priority, order ID, task ID,
238
    arguments). Priority and order ID are numeric and essentially control the
239
    sort order. The order ID is an increasing number denoting the order in
240
    which tasks are added to the queue. The task ID is controlled by user of
241
    workerpool, see L{AddTask} for details. The task arguments are C{None} for
242
    abandoned tasks, otherwise a sequence of arguments to be passed to
243
    L{BaseWorker.RunTask}). The list must fulfill the heap property (for use by
244
    the C{heapq} module).
245
  @type _taskdata: dict; (task IDs as keys, tuples as values)
246
  @ivar _taskdata: Mapping from task IDs to entries in L{_tasks}
247

248
  """
249
  def __init__(self, name, num_workers, worker_class):
250
    """Constructor for worker pool.
251

252
    @param num_workers: number of workers to be started
253
        (dynamic resizing is not yet implemented)
254
    @param worker_class: the class to be instantiated for workers;
255
        should derive from L{BaseWorker}
256

257
    """
258
    # Some of these variables are accessed by BaseWorker
259
    self._lock = threading.Lock()
260
    self._pool_to_pool = threading.Condition(self._lock)
261
    self._pool_to_worker = threading.Condition(self._lock)
262
    self._worker_to_pool = threading.Condition(self._lock)
263
    self._worker_class = worker_class
264
    self._name = name
265
    self._last_worker_id = 0
266
    self._workers = []
267
    self._quiescing = False
268
    self._active = True
269

    
270
    # Terminating workers
271
    self._termworkers = []
272

    
273
    # Queued tasks
274
    self._counter = itertools.count()
275
    self._tasks = []
276
    self._taskdata = {}
277

    
278
    # Start workers
279
    self.Resize(num_workers)
280

    
281
  # TODO: Implement dynamic resizing?
282

    
283
  def _WaitWhileQuiescingUnlocked(self):
284
    """Wait until the worker pool has finished quiescing.
285

286
    """
287
    while self._quiescing:
288
      self._pool_to_pool.wait()
289

    
290
  def _AddTaskUnlocked(self, args, priority, task_id):
291
    """Adds a task to the internal queue.
292

293
    @type args: sequence
294
    @param args: Arguments passed to L{BaseWorker.RunTask}
295
    @type priority: number
296
    @param priority: Task priority
297
    @param task_id: Task ID
298

299
    """
300
    assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
301
    assert isinstance(priority, (int, long)), "Priority must be numeric"
302

    
303
    task = [priority, self._counter.next(), task_id, args]
304

    
305
    if task_id is not None:
306
      assert task_id not in self._taskdata
307
      # Keep a reference to change priority later if necessary
308
      self._taskdata[task_id] = task
309

    
310
    # A counter is used to ensure elements are processed in their incoming
311
    # order. For processing they're sorted by priority and then counter.
312
    heapq.heappush(self._tasks, task)
313

    
314
    # Notify a waiting worker
315
    self._pool_to_worker.notify()
316

    
317
  def AddTask(self, args, priority=_DEFAULT_PRIORITY, task_id=None):
318
    """Adds a task to the queue.
319

320
    @type args: sequence
321
    @param args: arguments passed to L{BaseWorker.RunTask}
322
    @type priority: number
323
    @param priority: Task priority
324
    @param task_id: Task ID
325
    @note: The task ID can be essentially anything that can be used as a
326
      dictionary key. Callers, however, must ensure a task ID is unique while a
327
      task is in the pool or while it might return to the pool due to deferring
328
      using L{DeferTask}.
329

330
    """
331
    self._lock.acquire()
332
    try:
333
      self._WaitWhileQuiescingUnlocked()
334
      self._AddTaskUnlocked(args, priority, task_id)
335
    finally:
336
      self._lock.release()
337

    
338
  def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY, task_id=None):
339
    """Add a list of tasks to the queue.
340

341
    @type tasks: list of tuples
342
    @param tasks: list of args passed to L{BaseWorker.RunTask}
343
    @type priority: number or list of numbers
344
    @param priority: Priority for all added tasks or a list with the priority
345
                     for each task
346
    @type task_id: list
347
    @param task_id: List with the ID for each task
348
    @note: See L{AddTask} for a note on task IDs.
349

350
    """
351
    assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
352
           "Each task must be a sequence"
353
    assert (isinstance(priority, (int, long)) or
354
            compat.all(isinstance(prio, (int, long)) for prio in priority)), \
355
           "Priority must be numeric or be a list of numeric values"
356
    assert task_id is None or isinstance(task_id, (tuple, list)), \
357
           "Task IDs must be in a sequence"
358

    
359
    if isinstance(priority, (int, long)):
360
      priority = [priority] * len(tasks)
361
    elif len(priority) != len(tasks):
362
      raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
363
                                   " number of tasks (%s)" %
364
                                   (len(priority), len(tasks)))
365

    
366
    if task_id is None:
367
      task_id = [None] * len(tasks)
368
    elif len(task_id) != len(tasks):
369
      raise errors.ProgrammerError("Number of task IDs (%s) doesn't match"
370
                                   " number of tasks (%s)" %
371
                                   (len(task_id), len(tasks)))
372

    
373
    self._lock.acquire()
374
    try:
375
      self._WaitWhileQuiescingUnlocked()
376

    
377
      assert compat.all(isinstance(prio, (int, long)) for prio in priority)
378
      assert len(tasks) == len(priority)
379
      assert len(tasks) == len(task_id)
380

    
381
      for (args, prio, tid) in zip(tasks, priority, task_id):
382
        self._AddTaskUnlocked(args, prio, tid)
383
    finally:
384
      self._lock.release()
385

    
386
  def ChangeTaskPriority(self, task_id, priority):
387
    """Changes a task's priority.
388

389
    @param task_id: Task ID
390
    @type priority: number
391
    @param priority: New task priority
392
    @raise NoSuchTask: When the task referred by C{task_id} can not be found
393
      (it may never have existed, may have already been processed, or is
394
      currently running)
395

396
    """
397
    assert isinstance(priority, (int, long)), "Priority must be numeric"
398

    
399
    self._lock.acquire()
400
    try:
401
      logging.debug("About to change priority of task %s to %s",
402
                    task_id, priority)
403

    
404
      # Find old task
405
      oldtask = self._taskdata.get(task_id, None)
406
      if oldtask is None:
407
        msg = "Task '%s' was not found" % task_id
408
        logging.debug(msg)
409
        raise NoSuchTask(msg)
410

    
411
      # Prepare new task
412
      newtask = [priority] + oldtask[1:]
413

    
414
      # Mark old entry as abandoned (this doesn't change the sort order and
415
      # therefore doesn't invalidate the heap property of L{self._tasks}).
416
      # See also <http://docs.python.org/library/heapq.html#priority-queue-
417
      # implementation-notes>.
418
      oldtask[-1] = None
419

    
420
      # Change reference to new task entry and forget the old one
421
      assert task_id is not None
422
      self._taskdata[task_id] = newtask
423

    
424
      # Add a new task with the old number and arguments
425
      heapq.heappush(self._tasks, newtask)
426

    
427
      # Notify a waiting worker
428
      self._pool_to_worker.notify()
429
    finally:
430
      self._lock.release()
431

    
432
  def SetActive(self, active):
433
    """Enable/disable processing of tasks.
434

435
    This is different from L{Quiesce} in the sense that this function just
436
    changes an internal flag and doesn't wait for the queue to be empty. Tasks
437
    already being processed continue normally, but no new tasks will be
438
    started. New tasks can still be added.
439

440
    @type active: bool
441
    @param active: Whether tasks should be processed
442

443
    """
444
    self._lock.acquire()
445
    try:
446
      self._active = active
447

    
448
      if active:
449
        # Tell all workers to continue processing
450
        self._pool_to_worker.notifyAll()
451
    finally:
452
      self._lock.release()
453

    
454
  def _WaitForTaskUnlocked(self, worker):
455
    """Waits for a task for a worker.
456

457
    @type worker: L{BaseWorker}
458
    @param worker: Worker thread
459

460
    """
461
    while True:
462
      if self._ShouldWorkerTerminateUnlocked(worker):
463
        return _TERMINATE
464

    
465
      # If there's a pending task, return it immediately
466
      if self._active and self._tasks:
467
        # Get task from queue and tell pool about it
468
        try:
469
          task = heapq.heappop(self._tasks)
470
        finally:
471
          self._worker_to_pool.notifyAll()
472

    
473
        (_, _, task_id, args) = task
474

    
475
        # If the priority was changed, "args" is None
476
        if args is None:
477
          # Try again
478
          logging.debug("Found abandoned task (%r)", task)
479
          continue
480

    
481
        # Delete reference
482
        if task_id is not None:
483
          del self._taskdata[task_id]
484

    
485
        return task
486

    
487
      logging.debug("Waiting for tasks")
488

    
489
      # wait() releases the lock and sleeps until notified
490
      self._pool_to_worker.wait()
491

    
492
      logging.debug("Notified while waiting")
493

    
494
  def _ShouldWorkerTerminateUnlocked(self, worker):
495
    """Returns whether a worker should terminate.
496

497
    """
498
    return (worker in self._termworkers)
499

    
500
  def _HasRunningTasksUnlocked(self):
501
    """Checks whether there's a task running in a worker.
502

503
    """
504
    for worker in self._workers + self._termworkers:
505
      if worker._HasRunningTaskUnlocked(): # pylint: disable=W0212
506
        return True
507
    return False
508

    
509
  def HasRunningTasks(self):
510
    """Checks whether there's at least one task running.
511

512
    """
513
    self._lock.acquire()
514
    try:
515
      return self._HasRunningTasksUnlocked()
516
    finally:
517
      self._lock.release()
518

    
519
  def Quiesce(self):
520
    """Waits until the task queue is empty.
521

522
    """
523
    self._lock.acquire()
524
    try:
525
      self._quiescing = True
526

    
527
      # Wait while there are tasks pending or running
528
      while self._tasks or self._HasRunningTasksUnlocked():
529
        self._worker_to_pool.wait()
530

    
531
    finally:
532
      self._quiescing = False
533

    
534
      # Make sure AddTasks continues in case it was waiting
535
      self._pool_to_pool.notifyAll()
536

    
537
      self._lock.release()
538

    
539
  def _NewWorkerIdUnlocked(self):
540
    """Return an identifier for a new worker.
541

542
    """
543
    self._last_worker_id += 1
544

    
545
    return "%s%d" % (self._name, self._last_worker_id)
546

    
547
  def _ResizeUnlocked(self, num_workers):
548
    """Changes the number of workers.
549

550
    """
551
    assert num_workers >= 0, "num_workers must be >= 0"
552

    
553
    logging.debug("Resizing to %s workers", num_workers)
554

    
555
    current_count = len(self._workers)
556

    
557
    if current_count == num_workers:
558
      # Nothing to do
559
      pass
560

    
561
    elif current_count > num_workers:
562
      if num_workers == 0:
563
        # Create copy of list to iterate over while lock isn't held.
564
        termworkers = self._workers[:]
565
        del self._workers[:]
566
      else:
567
        # TODO: Implement partial downsizing
568
        raise NotImplementedError()
569
        #termworkers = ...
570

    
571
      self._termworkers += termworkers
572

    
573
      # Notify workers that something has changed
574
      self._pool_to_worker.notifyAll()
575

    
576
      # Join all terminating workers
577
      self._lock.release()
578
      try:
579
        for worker in termworkers:
580
          logging.debug("Waiting for thread %s", worker.getName())
581
          worker.join()
582
      finally:
583
        self._lock.acquire()
584

    
585
      # Remove terminated threads. This could be done in a more efficient way
586
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
587
      # don't leave zombie threads around.
588
      for worker in termworkers:
589
        assert worker in self._termworkers, ("Worker not in list of"
590
                                             " terminating workers")
591
        if not worker.isAlive():
592
          self._termworkers.remove(worker)
593

    
594
      assert not self._termworkers, "Zombie worker detected"
595

    
596
    elif current_count < num_workers:
597
      # Create (num_workers - current_count) new workers
598
      for _ in range(num_workers - current_count):
599
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
600
        self._workers.append(worker)
601
        worker.start()
602

    
603
  def Resize(self, num_workers):
604
    """Changes the number of workers in the pool.
605

606
    @param num_workers: the new number of workers
607

608
    """
609
    self._lock.acquire()
610
    try:
611
      return self._ResizeUnlocked(num_workers)
612
    finally:
613
      self._lock.release()
614

    
615
  def TerminateWorkers(self):
616
    """Terminate all worker threads.
617

618
    Unstarted tasks will be ignored.
619

620
    """
621
    logging.debug("Terminating all workers")
622

    
623
    self._lock.acquire()
624
    try:
625
      self._ResizeUnlocked(0)
626

    
627
      if self._tasks:
628
        logging.debug("There are %s tasks left", len(self._tasks))
629
    finally:
630
      self._lock.release()
631

    
632
    logging.debug("All workers terminated")