Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ 364c350f

History | View | Annotate | Download (18.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Base classes for worker pools.
23

24
"""
25

    
26
import logging
27
import threading
28
import heapq
29
import itertools
30

    
31
from ganeti import compat
32
from ganeti import errors
33

    
34

    
35
_TERMINATE = object()
36
_DEFAULT_PRIORITY = 0
37

    
38

    
39
class DeferTask(Exception):
40
  """Special exception class to defer a task.
41

42
  This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
43
  task. Optionally, the priority of the task can be changed.
44

45
  """
46
  def __init__(self, priority=None):
47
    """Initializes this class.
48

49
    @type priority: number
50
    @param priority: New task priority (None means no change)
51

52
    """
53
    Exception.__init__(self)
54
    self.priority = priority
55

    
56

    
57
class NoSuchTask(Exception):
58
  """Exception raised when a task can't be found.
59

60
  """
61

    
62

    
63
class BaseWorker(threading.Thread, object):
64
  """Base worker class for worker pools.
65

66
  Users of a worker pool must override RunTask in a subclass.
67

68
  """
69
  # pylint: disable=W0212
70
  def __init__(self, pool, worker_id):
71
    """Constructor for BaseWorker thread.
72

73
    @param pool: the parent worker pool
74
    @param worker_id: identifier for this worker
75

76
    """
77
    super(BaseWorker, self).__init__(name=worker_id)
78
    self.pool = pool
79
    self._worker_id = worker_id
80
    self._current_task = None
81

    
82
    assert self.getName() == worker_id
83

    
84
  def ShouldTerminate(self):
85
    """Returns whether this worker should terminate.
86

87
    Should only be called from within L{RunTask}.
88

89
    """
90
    self.pool._lock.acquire()
91
    try:
92
      assert self._HasRunningTaskUnlocked()
93
      return self.pool._ShouldWorkerTerminateUnlocked(self)
94
    finally:
95
      self.pool._lock.release()
96

    
97
  def GetCurrentPriority(self):
98
    """Returns the priority of the current task.
99

100
    Should only be called from within L{RunTask}.
101

102
    """
103
    self.pool._lock.acquire()
104
    try:
105
      assert self._HasRunningTaskUnlocked()
106

    
107
      (priority, _, _, _) = self._current_task
108

    
109
      return priority
110
    finally:
111
      self.pool._lock.release()
112

    
113
  def SetTaskName(self, taskname):
114
    """Sets the name of the current task.
115

116
    Should only be called from within L{RunTask}.
117

118
    @type taskname: string
119
    @param taskname: Task's name
120

121
    """
122
    if taskname:
123
      name = "%s/%s" % (self._worker_id, taskname)
124
    else:
125
      name = self._worker_id
126

    
127
    # Set thread name
128
    self.setName(name)
129

    
130
  def _HasRunningTaskUnlocked(self):
131
    """Returns whether this worker is currently running a task.
132

133
    """
134
    return (self._current_task is not None)
135

    
136
  def _GetCurrentOrderAndTaskId(self):
137
    """Returns the order and task ID of the current task.
138

139
    Should only be called from within L{RunTask}.
140

141
    """
142
    self.pool._lock.acquire()
143
    try:
144
      assert self._HasRunningTaskUnlocked()
145

    
146
      (_, order_id, task_id, _) = self._current_task
147

    
148
      return (order_id, task_id)
149
    finally:
150
      self.pool._lock.release()
151

    
152
  def run(self):
153
    """Main thread function.
154

155
    Waits for new tasks to show up in the queue.
156

157
    """
158
    pool = self.pool
159

    
160
    while True:
161
      assert self._current_task is None
162

    
163
      defer = None
164
      try:
165
        # Wait on lock to be told either to terminate or to do a task
166
        pool._lock.acquire()
167
        try:
168
          task = pool._WaitForTaskUnlocked(self)
169

    
170
          if task is _TERMINATE:
171
            # Told to terminate
172
            break
173

    
174
          if task is None:
175
            # Spurious notification, ignore
176
            continue
177

    
178
          self._current_task = task
179

    
180
          # No longer needed, dispose of reference
181
          del task
182

    
183
          assert self._HasRunningTaskUnlocked()
184

    
185
        finally:
186
          pool._lock.release()
187

    
188
        (priority, _, _, args) = self._current_task
189
        try:
190
          # Run the actual task
191
          assert defer is None
192
          logging.debug("Starting task %r, priority %s", args, priority)
193
          assert self.getName() == self._worker_id
194
          try:
195
            self.RunTask(*args) # pylint: disable=W0142
196
          finally:
197
            self.SetTaskName(None)
198
          logging.debug("Done with task %r, priority %s", args, priority)
199
        except DeferTask, err:
200
          defer = err
201

    
202
          if defer.priority is None:
203
            # Use same priority
204
            defer.priority = priority
205

    
206
          logging.debug("Deferring task %r, new priority %s",
207
                        args, defer.priority)
208

    
209
          assert self._HasRunningTaskUnlocked()
210
        except: # pylint: disable=W0702
211
          logging.exception("Caught unhandled exception")
212

    
213
        assert self._HasRunningTaskUnlocked()
214
      finally:
215
        # Notify pool
216
        pool._lock.acquire()
217
        try:
218
          if defer:
219
            assert self._current_task
220
            # Schedule again for later run
221
            (_, _, task_id, args) = self._current_task
222
            pool._AddTaskUnlocked(args, defer.priority, task_id)
223

    
224
          if self._current_task:
225
            self._current_task = None
226
            pool._worker_to_pool.notifyAll()
227
        finally:
228
          pool._lock.release()
229

    
230
      assert not self._HasRunningTaskUnlocked()
231

    
232
    logging.debug("Terminates")
233

    
234
  def RunTask(self, *args):
235
    """Function called to start a task.
236

237
    This needs to be implemented by child classes.
238

239
    """
240
    raise NotImplementedError()
241

    
242

    
243
class WorkerPool(object):
244
  """Worker pool with a queue.
245

246
  This class is thread-safe.
247

248
  Tasks are guaranteed to be started in the order in which they're
249
  added to the pool. Due to the nature of threading, they're not
250
  guaranteed to finish in the same order.
251

252
  @type _tasks: list of tuples
253
  @ivar _tasks: Each tuple has the format (priority, order ID, task ID,
254
    arguments). Priority and order ID are numeric and essentially control the
255
    sort order. The order ID is an increasing number denoting the order in
256
    which tasks are added to the queue. The task ID is controlled by user of
257
    workerpool, see L{AddTask} for details. The task arguments are C{None} for
258
    abandoned tasks, otherwise a sequence of arguments to be passed to
259
    L{BaseWorker.RunTask}). The list must fulfill the heap property (for use by
260
    the C{heapq} module).
261
  @type _taskdata: dict; (task IDs as keys, tuples as values)
262
  @ivar _taskdata: Mapping from task IDs to entries in L{_tasks}
263

264
  """
265
  def __init__(self, name, num_workers, worker_class):
266
    """Constructor for worker pool.
267

268
    @param num_workers: number of workers to be started
269
        (dynamic resizing is not yet implemented)
270
    @param worker_class: the class to be instantiated for workers;
271
        should derive from L{BaseWorker}
272

273
    """
274
    # Some of these variables are accessed by BaseWorker
275
    self._lock = threading.Lock()
276
    self._pool_to_pool = threading.Condition(self._lock)
277
    self._pool_to_worker = threading.Condition(self._lock)
278
    self._worker_to_pool = threading.Condition(self._lock)
279
    self._worker_class = worker_class
280
    self._name = name
281
    self._last_worker_id = 0
282
    self._workers = []
283
    self._quiescing = False
284
    self._active = True
285

    
286
    # Terminating workers
287
    self._termworkers = []
288

    
289
    # Queued tasks
290
    self._counter = itertools.count()
291
    self._tasks = []
292
    self._taskdata = {}
293

    
294
    # Start workers
295
    self.Resize(num_workers)
296

    
297
  # TODO: Implement dynamic resizing?
298

    
299
  def _WaitWhileQuiescingUnlocked(self):
300
    """Wait until the worker pool has finished quiescing.
301

302
    """
303
    while self._quiescing:
304
      self._pool_to_pool.wait()
305

    
306
  def _AddTaskUnlocked(self, args, priority, task_id):
307
    """Adds a task to the internal queue.
308

309
    @type args: sequence
310
    @param args: Arguments passed to L{BaseWorker.RunTask}
311
    @type priority: number
312
    @param priority: Task priority
313
    @param task_id: Task ID
314

315
    """
316
    assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
317
    assert isinstance(priority, (int, long)), "Priority must be numeric"
318
    assert task_id is None or isinstance(task_id, (int, long)), \
319
      "Task ID must be numeric or None"
320

    
321
    task = [priority, self._counter.next(), task_id, args]
322

    
323
    if task_id is not None:
324
      assert task_id not in self._taskdata
325
      # Keep a reference to change priority later if necessary
326
      self._taskdata[task_id] = task
327

    
328
    # A counter is used to ensure elements are processed in their incoming
329
    # order. For processing they're sorted by priority and then counter.
330
    heapq.heappush(self._tasks, task)
331

    
332
    # Notify a waiting worker
333
    self._pool_to_worker.notify()
334

    
335
  def AddTask(self, args, priority=_DEFAULT_PRIORITY, task_id=None):
336
    """Adds a task to the queue.
337

338
    @type args: sequence
339
    @param args: arguments passed to L{BaseWorker.RunTask}
340
    @type priority: number
341
    @param priority: Task priority
342
    @param task_id: Task ID
343
    @note: The task ID can be essentially anything that can be used as a
344
      dictionary key. Callers, however, must ensure a task ID is unique while a
345
      task is in the pool or while it might return to the pool due to deferring
346
      using L{DeferTask}.
347

348
    """
349
    self._lock.acquire()
350
    try:
351
      self._WaitWhileQuiescingUnlocked()
352
      self._AddTaskUnlocked(args, priority, task_id)
353
    finally:
354
      self._lock.release()
355

    
356
  def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY, task_id=None):
357
    """Add a list of tasks to the queue.
358

359
    @type tasks: list of tuples
360
    @param tasks: list of args passed to L{BaseWorker.RunTask}
361
    @type priority: number or list of numbers
362
    @param priority: Priority for all added tasks or a list with the priority
363
                     for each task
364
    @type task_id: list
365
    @param task_id: List with the ID for each task
366
    @note: See L{AddTask} for a note on task IDs.
367

368
    """
369
    assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
370
           "Each task must be a sequence"
371
    assert (isinstance(priority, (int, long)) or
372
            compat.all(isinstance(prio, (int, long)) for prio in priority)), \
373
           "Priority must be numeric or be a list of numeric values"
374
    assert task_id is None or isinstance(task_id, (tuple, list)), \
375
           "Task IDs must be in a sequence"
376

    
377
    if isinstance(priority, (int, long)):
378
      priority = [priority] * len(tasks)
379
    elif len(priority) != len(tasks):
380
      raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
381
                                   " number of tasks (%s)" %
382
                                   (len(priority), len(tasks)))
383

    
384
    if task_id is None:
385
      task_id = [None] * len(tasks)
386
    elif len(task_id) != len(tasks):
387
      raise errors.ProgrammerError("Number of task IDs (%s) doesn't match"
388
                                   " number of tasks (%s)" %
389
                                   (len(task_id), len(tasks)))
390

    
391
    self._lock.acquire()
392
    try:
393
      self._WaitWhileQuiescingUnlocked()
394

    
395
      assert compat.all(isinstance(prio, (int, long)) for prio in priority)
396
      assert len(tasks) == len(priority)
397
      assert len(tasks) == len(task_id)
398

    
399
      for (args, prio, tid) in zip(tasks, priority, task_id):
400
        self._AddTaskUnlocked(args, prio, tid)
401
    finally:
402
      self._lock.release()
403

    
404
  def ChangeTaskPriority(self, task_id, priority):
405
    """Changes a task's priority.
406

407
    @param task_id: Task ID
408
    @type priority: number
409
    @param priority: New task priority
410
    @raise NoSuchTask: When the task referred by C{task_id} can not be found
411
      (it may never have existed, may have already been processed, or is
412
      currently running)
413

414
    """
415
    assert isinstance(priority, (int, long)), "Priority must be numeric"
416

    
417
    self._lock.acquire()
418
    try:
419
      logging.debug("About to change priority of task %s to %s",
420
                    task_id, priority)
421

    
422
      # Find old task
423
      oldtask = self._taskdata.get(task_id, None)
424
      if oldtask is None:
425
        msg = "Task '%s' was not found" % task_id
426
        logging.debug(msg)
427
        raise NoSuchTask(msg)
428

    
429
      # Prepare new task
430
      newtask = [priority] + oldtask[1:]
431

    
432
      # Mark old entry as abandoned (this doesn't change the sort order and
433
      # therefore doesn't invalidate the heap property of L{self._tasks}).
434
      # See also <http://docs.python.org/library/heapq.html#priority-queue-
435
      # implementation-notes>.
436
      oldtask[-1] = None
437

    
438
      # Change reference to new task entry and forget the old one
439
      assert task_id is not None
440
      self._taskdata[task_id] = newtask
441

    
442
      # Add a new task with the old number and arguments
443
      heapq.heappush(self._tasks, newtask)
444

    
445
      # Notify a waiting worker
446
      self._pool_to_worker.notify()
447
    finally:
448
      self._lock.release()
449

    
450
  def SetActive(self, active):
451
    """Enable/disable processing of tasks.
452

453
    This is different from L{Quiesce} in the sense that this function just
454
    changes an internal flag and doesn't wait for the queue to be empty. Tasks
455
    already being processed continue normally, but no new tasks will be
456
    started. New tasks can still be added.
457

458
    @type active: bool
459
    @param active: Whether tasks should be processed
460

461
    """
462
    self._lock.acquire()
463
    try:
464
      self._active = active
465

    
466
      if active:
467
        # Tell all workers to continue processing
468
        self._pool_to_worker.notifyAll()
469
    finally:
470
      self._lock.release()
471

    
472
  def _WaitForTaskUnlocked(self, worker):
473
    """Waits for a task for a worker.
474

475
    @type worker: L{BaseWorker}
476
    @param worker: Worker thread
477

478
    """
479
    while True:
480
      if self._ShouldWorkerTerminateUnlocked(worker):
481
        return _TERMINATE
482

    
483
      # If there's a pending task, return it immediately
484
      if self._active and self._tasks:
485
        # Get task from queue and tell pool about it
486
        try:
487
          task = heapq.heappop(self._tasks)
488
        finally:
489
          self._worker_to_pool.notifyAll()
490

    
491
        (_, _, task_id, args) = task
492

    
493
        # If the priority was changed, "args" is None
494
        if args is None:
495
          # Try again
496
          logging.debug("Found abandoned task (%r)", task)
497
          continue
498

    
499
        # Delete reference
500
        if task_id is not None:
501
          del self._taskdata[task_id]
502

    
503
        return task
504

    
505
      logging.debug("Waiting for tasks")
506

    
507
      # wait() releases the lock and sleeps until notified
508
      self._pool_to_worker.wait()
509

    
510
      logging.debug("Notified while waiting")
511

    
512
  def _ShouldWorkerTerminateUnlocked(self, worker):
513
    """Returns whether a worker should terminate.
514

515
    """
516
    return (worker in self._termworkers)
517

    
518
  def _HasRunningTasksUnlocked(self):
519
    """Checks whether there's a task running in a worker.
520

521
    """
522
    for worker in self._workers + self._termworkers:
523
      if worker._HasRunningTaskUnlocked(): # pylint: disable=W0212
524
        return True
525
    return False
526

    
527
  def HasRunningTasks(self):
528
    """Checks whether there's at least one task running.
529

530
    """
531
    self._lock.acquire()
532
    try:
533
      return self._HasRunningTasksUnlocked()
534
    finally:
535
      self._lock.release()
536

    
537
  def Quiesce(self):
538
    """Waits until the task queue is empty.
539

540
    """
541
    self._lock.acquire()
542
    try:
543
      self._quiescing = True
544

    
545
      # Wait while there are tasks pending or running
546
      while self._tasks or self._HasRunningTasksUnlocked():
547
        self._worker_to_pool.wait()
548

    
549
    finally:
550
      self._quiescing = False
551

    
552
      # Make sure AddTasks continues in case it was waiting
553
      self._pool_to_pool.notifyAll()
554

    
555
      self._lock.release()
556

    
557
  def _NewWorkerIdUnlocked(self):
558
    """Return an identifier for a new worker.
559

560
    """
561
    self._last_worker_id += 1
562

    
563
    return "%s%d" % (self._name, self._last_worker_id)
564

    
565
  def _ResizeUnlocked(self, num_workers):
566
    """Changes the number of workers.
567

568
    """
569
    assert num_workers >= 0, "num_workers must be >= 0"
570

    
571
    logging.debug("Resizing to %s workers", num_workers)
572

    
573
    current_count = len(self._workers)
574

    
575
    if current_count == num_workers:
576
      # Nothing to do
577
      pass
578

    
579
    elif current_count > num_workers:
580
      if num_workers == 0:
581
        # Create copy of list to iterate over while lock isn't held.
582
        termworkers = self._workers[:]
583
        del self._workers[:]
584
      else:
585
        # TODO: Implement partial downsizing
586
        raise NotImplementedError()
587
        #termworkers = ...
588

    
589
      self._termworkers += termworkers
590

    
591
      # Notify workers that something has changed
592
      self._pool_to_worker.notifyAll()
593

    
594
      # Join all terminating workers
595
      self._lock.release()
596
      try:
597
        for worker in termworkers:
598
          logging.debug("Waiting for thread %s", worker.getName())
599
          worker.join()
600
      finally:
601
        self._lock.acquire()
602

    
603
      # Remove terminated threads. This could be done in a more efficient way
604
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
605
      # don't leave zombie threads around.
606
      for worker in termworkers:
607
        assert worker in self._termworkers, ("Worker not in list of"
608
                                             " terminating workers")
609
        if not worker.isAlive():
610
          self._termworkers.remove(worker)
611

    
612
      assert not self._termworkers, "Zombie worker detected"
613

    
614
    elif current_count < num_workers:
615
      # Create (num_workers - current_count) new workers
616
      for _ in range(num_workers - current_count):
617
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
618
        self._workers.append(worker)
619
        worker.start()
620

    
621
  def Resize(self, num_workers):
622
    """Changes the number of workers in the pool.
623

624
    @param num_workers: the new number of workers
625

626
    """
627
    self._lock.acquire()
628
    try:
629
      return self._ResizeUnlocked(num_workers)
630
    finally:
631
      self._lock.release()
632

    
633
  def TerminateWorkers(self):
634
    """Terminate all worker threads.
635

636
    Unstarted tasks will be ignored.
637

638
    """
639
    logging.debug("Terminating all workers")
640

    
641
    self._lock.acquire()
642
    try:
643
      self._ResizeUnlocked(0)
644

    
645
      if self._tasks:
646
        logging.debug("There are %s tasks left", len(self._tasks))
647
    finally:
648
      self._lock.release()
649

    
650
    logging.debug("All workers terminated")