Add Ganeti 2.9 design document
[ganeti-local] / lib / workerpool.py
1 #
2 #
3
4 # Copyright (C) 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Base classes for worker pools.
23
24 """
25
26 import logging
27 import threading
28 import heapq
29 import itertools
30
31 from ganeti import compat
32 from ganeti import errors
33
34
35 _TERMINATE = object()
36 _DEFAULT_PRIORITY = 0
37
38
39 class DeferTask(Exception):
40   """Special exception class to defer a task.
41
42   This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
43   task. Optionally, the priority of the task can be changed.
44
45   """
46   def __init__(self, priority=None):
47     """Initializes this class.
48
49     @type priority: number
50     @param priority: New task priority (None means no change)
51
52     """
53     Exception.__init__(self)
54     self.priority = priority
55
56
57 class NoSuchTask(Exception):
58   """Exception raised when a task can't be found.
59
60   """
61
62
63 class BaseWorker(threading.Thread, object):
64   """Base worker class for worker pools.
65
66   Users of a worker pool must override RunTask in a subclass.
67
68   """
69   # pylint: disable=W0212
70   def __init__(self, pool, worker_id):
71     """Constructor for BaseWorker thread.
72
73     @param pool: the parent worker pool
74     @param worker_id: identifier for this worker
75
76     """
77     super(BaseWorker, self).__init__(name=worker_id)
78     self.pool = pool
79     self._worker_id = worker_id
80     self._current_task = None
81
82     assert self.getName() == worker_id
83
84   def ShouldTerminate(self):
85     """Returns whether this worker should terminate.
86
87     Should only be called from within L{RunTask}.
88
89     """
90     self.pool._lock.acquire()
91     try:
92       assert self._HasRunningTaskUnlocked()
93       return self.pool._ShouldWorkerTerminateUnlocked(self)
94     finally:
95       self.pool._lock.release()
96
97   def GetCurrentPriority(self):
98     """Returns the priority of the current task.
99
100     Should only be called from within L{RunTask}.
101
102     """
103     self.pool._lock.acquire()
104     try:
105       assert self._HasRunningTaskUnlocked()
106
107       (priority, _, _, _) = self._current_task
108
109       return priority
110     finally:
111       self.pool._lock.release()
112
113   def SetTaskName(self, taskname):
114     """Sets the name of the current task.
115
116     Should only be called from within L{RunTask}.
117
118     @type taskname: string
119     @param taskname: Task's name
120
121     """
122     if taskname:
123       name = "%s/%s" % (self._worker_id, taskname)
124     else:
125       name = self._worker_id
126
127     # Set thread name
128     self.setName(name)
129
130   def _HasRunningTaskUnlocked(self):
131     """Returns whether this worker is currently running a task.
132
133     """
134     return (self._current_task is not None)
135
136   def _GetCurrentOrderAndTaskId(self):
137     """Returns the order and task ID of the current task.
138
139     Should only be called from within L{RunTask}.
140
141     """
142     self.pool._lock.acquire()
143     try:
144       assert self._HasRunningTaskUnlocked()
145
146       (_, order_id, task_id, _) = self._current_task
147
148       return (order_id, task_id)
149     finally:
150       self.pool._lock.release()
151
152   def run(self):
153     """Main thread function.
154
155     Waits for new tasks to show up in the queue.
156
157     """
158     pool = self.pool
159
160     while True:
161       assert self._current_task is None
162
163       defer = None
164       try:
165         # Wait on lock to be told either to terminate or to do a task
166         pool._lock.acquire()
167         try:
168           task = pool._WaitForTaskUnlocked(self)
169
170           if task is _TERMINATE:
171             # Told to terminate
172             break
173
174           if task is None:
175             # Spurious notification, ignore
176             continue
177
178           self._current_task = task
179
180           # No longer needed, dispose of reference
181           del task
182
183           assert self._HasRunningTaskUnlocked()
184
185         finally:
186           pool._lock.release()
187
188         (priority, _, _, args) = self._current_task
189         try:
190           # Run the actual task
191           assert defer is None
192           logging.debug("Starting task %r, priority %s", args, priority)
193           assert self.getName() == self._worker_id
194           try:
195             self.RunTask(*args) # pylint: disable=W0142
196           finally:
197             self.SetTaskName(None)
198           logging.debug("Done with task %r, priority %s", args, priority)
199         except DeferTask, err:
200           defer = err
201
202           if defer.priority is None:
203             # Use same priority
204             defer.priority = priority
205
206           logging.debug("Deferring task %r, new priority %s",
207                         args, defer.priority)
208
209           assert self._HasRunningTaskUnlocked()
210         except: # pylint: disable=W0702
211           logging.exception("Caught unhandled exception")
212
213         assert self._HasRunningTaskUnlocked()
214       finally:
215         # Notify pool
216         pool._lock.acquire()
217         try:
218           if defer:
219             assert self._current_task
220             # Schedule again for later run
221             (_, _, task_id, args) = self._current_task
222             pool._AddTaskUnlocked(args, defer.priority, task_id)
223
224           if self._current_task:
225             self._current_task = None
226             pool._worker_to_pool.notifyAll()
227         finally:
228           pool._lock.release()
229
230       assert not self._HasRunningTaskUnlocked()
231
232     logging.debug("Terminates")
233
234   def RunTask(self, *args):
235     """Function called to start a task.
236
237     This needs to be implemented by child classes.
238
239     """
240     raise NotImplementedError()
241
242
243 class WorkerPool(object):
244   """Worker pool with a queue.
245
246   This class is thread-safe.
247
248   Tasks are guaranteed to be started in the order in which they're
249   added to the pool. Due to the nature of threading, they're not
250   guaranteed to finish in the same order.
251
252   @type _tasks: list of tuples
253   @ivar _tasks: Each tuple has the format (priority, order ID, task ID,
254     arguments). Priority and order ID are numeric and essentially control the
255     sort order. The order ID is an increasing number denoting the order in
256     which tasks are added to the queue. The task ID is controlled by user of
257     workerpool, see L{AddTask} for details. The task arguments are C{None} for
258     abandoned tasks, otherwise a sequence of arguments to be passed to
259     L{BaseWorker.RunTask}). The list must fulfill the heap property (for use by
260     the C{heapq} module).
261   @type _taskdata: dict; (task IDs as keys, tuples as values)
262   @ivar _taskdata: Mapping from task IDs to entries in L{_tasks}
263
264   """
265   def __init__(self, name, num_workers, worker_class):
266     """Constructor for worker pool.
267
268     @param num_workers: number of workers to be started
269         (dynamic resizing is not yet implemented)
270     @param worker_class: the class to be instantiated for workers;
271         should derive from L{BaseWorker}
272
273     """
274     # Some of these variables are accessed by BaseWorker
275     self._lock = threading.Lock()
276     self._pool_to_pool = threading.Condition(self._lock)
277     self._pool_to_worker = threading.Condition(self._lock)
278     self._worker_to_pool = threading.Condition(self._lock)
279     self._worker_class = worker_class
280     self._name = name
281     self._last_worker_id = 0
282     self._workers = []
283     self._quiescing = False
284     self._active = True
285
286     # Terminating workers
287     self._termworkers = []
288
289     # Queued tasks
290     self._counter = itertools.count()
291     self._tasks = []
292     self._taskdata = {}
293
294     # Start workers
295     self.Resize(num_workers)
296
297   # TODO: Implement dynamic resizing?
298
299   def _WaitWhileQuiescingUnlocked(self):
300     """Wait until the worker pool has finished quiescing.
301
302     """
303     while self._quiescing:
304       self._pool_to_pool.wait()
305
306   def _AddTaskUnlocked(self, args, priority, task_id):
307     """Adds a task to the internal queue.
308
309     @type args: sequence
310     @param args: Arguments passed to L{BaseWorker.RunTask}
311     @type priority: number
312     @param priority: Task priority
313     @param task_id: Task ID
314
315     """
316     assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
317     assert isinstance(priority, (int, long)), "Priority must be numeric"
318     assert task_id is None or isinstance(task_id, (int, long)), \
319       "Task ID must be numeric or None"
320
321     task = [priority, self._counter.next(), task_id, args]
322
323     if task_id is not None:
324       assert task_id not in self._taskdata
325       # Keep a reference to change priority later if necessary
326       self._taskdata[task_id] = task
327
328     # A counter is used to ensure elements are processed in their incoming
329     # order. For processing they're sorted by priority and then counter.
330     heapq.heappush(self._tasks, task)
331
332     # Notify a waiting worker
333     self._pool_to_worker.notify()
334
335   def AddTask(self, args, priority=_DEFAULT_PRIORITY, task_id=None):
336     """Adds a task to the queue.
337
338     @type args: sequence
339     @param args: arguments passed to L{BaseWorker.RunTask}
340     @type priority: number
341     @param priority: Task priority
342     @param task_id: Task ID
343     @note: The task ID can be essentially anything that can be used as a
344       dictionary key. Callers, however, must ensure a task ID is unique while a
345       task is in the pool or while it might return to the pool due to deferring
346       using L{DeferTask}.
347
348     """
349     self._lock.acquire()
350     try:
351       self._WaitWhileQuiescingUnlocked()
352       self._AddTaskUnlocked(args, priority, task_id)
353     finally:
354       self._lock.release()
355
356   def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY, task_id=None):
357     """Add a list of tasks to the queue.
358
359     @type tasks: list of tuples
360     @param tasks: list of args passed to L{BaseWorker.RunTask}
361     @type priority: number or list of numbers
362     @param priority: Priority for all added tasks or a list with the priority
363                      for each task
364     @type task_id: list
365     @param task_id: List with the ID for each task
366     @note: See L{AddTask} for a note on task IDs.
367
368     """
369     assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
370            "Each task must be a sequence"
371     assert (isinstance(priority, (int, long)) or
372             compat.all(isinstance(prio, (int, long)) for prio in priority)), \
373            "Priority must be numeric or be a list of numeric values"
374     assert task_id is None or isinstance(task_id, (tuple, list)), \
375            "Task IDs must be in a sequence"
376
377     if isinstance(priority, (int, long)):
378       priority = [priority] * len(tasks)
379     elif len(priority) != len(tasks):
380       raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
381                                    " number of tasks (%s)" %
382                                    (len(priority), len(tasks)))
383
384     if task_id is None:
385       task_id = [None] * len(tasks)
386     elif len(task_id) != len(tasks):
387       raise errors.ProgrammerError("Number of task IDs (%s) doesn't match"
388                                    " number of tasks (%s)" %
389                                    (len(task_id), len(tasks)))
390
391     self._lock.acquire()
392     try:
393       self._WaitWhileQuiescingUnlocked()
394
395       assert compat.all(isinstance(prio, (int, long)) for prio in priority)
396       assert len(tasks) == len(priority)
397       assert len(tasks) == len(task_id)
398
399       for (args, prio, tid) in zip(tasks, priority, task_id):
400         self._AddTaskUnlocked(args, prio, tid)
401     finally:
402       self._lock.release()
403
404   def ChangeTaskPriority(self, task_id, priority):
405     """Changes a task's priority.
406
407     @param task_id: Task ID
408     @type priority: number
409     @param priority: New task priority
410     @raise NoSuchTask: When the task referred by C{task_id} can not be found
411       (it may never have existed, may have already been processed, or is
412       currently running)
413
414     """
415     assert isinstance(priority, (int, long)), "Priority must be numeric"
416
417     self._lock.acquire()
418     try:
419       logging.debug("About to change priority of task %s to %s",
420                     task_id, priority)
421
422       # Find old task
423       oldtask = self._taskdata.get(task_id, None)
424       if oldtask is None:
425         msg = "Task '%s' was not found" % task_id
426         logging.debug(msg)
427         raise NoSuchTask(msg)
428
429       # Prepare new task
430       newtask = [priority] + oldtask[1:]
431
432       # Mark old entry as abandoned (this doesn't change the sort order and
433       # therefore doesn't invalidate the heap property of L{self._tasks}).
434       # See also <http://docs.python.org/library/heapq.html#priority-queue-
435       # implementation-notes>.
436       oldtask[-1] = None
437
438       # Change reference to new task entry and forget the old one
439       assert task_id is not None
440       self._taskdata[task_id] = newtask
441
442       # Add a new task with the old number and arguments
443       heapq.heappush(self._tasks, newtask)
444
445       # Notify a waiting worker
446       self._pool_to_worker.notify()
447     finally:
448       self._lock.release()
449
450   def SetActive(self, active):
451     """Enable/disable processing of tasks.
452
453     This is different from L{Quiesce} in the sense that this function just
454     changes an internal flag and doesn't wait for the queue to be empty. Tasks
455     already being processed continue normally, but no new tasks will be
456     started. New tasks can still be added.
457
458     @type active: bool
459     @param active: Whether tasks should be processed
460
461     """
462     self._lock.acquire()
463     try:
464       self._active = active
465
466       if active:
467         # Tell all workers to continue processing
468         self._pool_to_worker.notifyAll()
469     finally:
470       self._lock.release()
471
472   def _WaitForTaskUnlocked(self, worker):
473     """Waits for a task for a worker.
474
475     @type worker: L{BaseWorker}
476     @param worker: Worker thread
477
478     """
479     while True:
480       if self._ShouldWorkerTerminateUnlocked(worker):
481         return _TERMINATE
482
483       # If there's a pending task, return it immediately
484       if self._active and self._tasks:
485         # Get task from queue and tell pool about it
486         try:
487           task = heapq.heappop(self._tasks)
488         finally:
489           self._worker_to_pool.notifyAll()
490
491         (_, _, task_id, args) = task
492
493         # If the priority was changed, "args" is None
494         if args is None:
495           # Try again
496           logging.debug("Found abandoned task (%r)", task)
497           continue
498
499         # Delete reference
500         if task_id is not None:
501           del self._taskdata[task_id]
502
503         return task
504
505       logging.debug("Waiting for tasks")
506
507       # wait() releases the lock and sleeps until notified
508       self._pool_to_worker.wait()
509
510       logging.debug("Notified while waiting")
511
512   def _ShouldWorkerTerminateUnlocked(self, worker):
513     """Returns whether a worker should terminate.
514
515     """
516     return (worker in self._termworkers)
517
518   def _HasRunningTasksUnlocked(self):
519     """Checks whether there's a task running in a worker.
520
521     """
522     for worker in self._workers + self._termworkers:
523       if worker._HasRunningTaskUnlocked(): # pylint: disable=W0212
524         return True
525     return False
526
527   def HasRunningTasks(self):
528     """Checks whether there's at least one task running.
529
530     """
531     self._lock.acquire()
532     try:
533       return self._HasRunningTasksUnlocked()
534     finally:
535       self._lock.release()
536
537   def Quiesce(self):
538     """Waits until the task queue is empty.
539
540     """
541     self._lock.acquire()
542     try:
543       self._quiescing = True
544
545       # Wait while there are tasks pending or running
546       while self._tasks or self._HasRunningTasksUnlocked():
547         self._worker_to_pool.wait()
548
549     finally:
550       self._quiescing = False
551
552       # Make sure AddTasks continues in case it was waiting
553       self._pool_to_pool.notifyAll()
554
555       self._lock.release()
556
557   def _NewWorkerIdUnlocked(self):
558     """Return an identifier for a new worker.
559
560     """
561     self._last_worker_id += 1
562
563     return "%s%d" % (self._name, self._last_worker_id)
564
565   def _ResizeUnlocked(self, num_workers):
566     """Changes the number of workers.
567
568     """
569     assert num_workers >= 0, "num_workers must be >= 0"
570
571     logging.debug("Resizing to %s workers", num_workers)
572
573     current_count = len(self._workers)
574
575     if current_count == num_workers:
576       # Nothing to do
577       pass
578
579     elif current_count > num_workers:
580       if num_workers == 0:
581         # Create copy of list to iterate over while lock isn't held.
582         termworkers = self._workers[:]
583         del self._workers[:]
584       else:
585         # TODO: Implement partial downsizing
586         raise NotImplementedError()
587         #termworkers = ...
588
589       self._termworkers += termworkers
590
591       # Notify workers that something has changed
592       self._pool_to_worker.notifyAll()
593
594       # Join all terminating workers
595       self._lock.release()
596       try:
597         for worker in termworkers:
598           logging.debug("Waiting for thread %s", worker.getName())
599           worker.join()
600       finally:
601         self._lock.acquire()
602
603       # Remove terminated threads. This could be done in a more efficient way
604       # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
605       # don't leave zombie threads around.
606       for worker in termworkers:
607         assert worker in self._termworkers, ("Worker not in list of"
608                                              " terminating workers")
609         if not worker.isAlive():
610           self._termworkers.remove(worker)
611
612       assert not self._termworkers, "Zombie worker detected"
613
614     elif current_count < num_workers:
615       # Create (num_workers - current_count) new workers
616       for _ in range(num_workers - current_count):
617         worker = self._worker_class(self, self._NewWorkerIdUnlocked())
618         self._workers.append(worker)
619         worker.start()
620
621   def Resize(self, num_workers):
622     """Changes the number of workers in the pool.
623
624     @param num_workers: the new number of workers
625
626     """
627     self._lock.acquire()
628     try:
629       return self._ResizeUnlocked(num_workers)
630     finally:
631       self._lock.release()
632
633   def TerminateWorkers(self):
634     """Terminate all worker threads.
635
636     Unstarted tasks will be ignored.
637
638     """
639     logging.debug("Terminating all workers")
640
641     self._lock.acquire()
642     try:
643       self._ResizeUnlocked(0)
644
645       if self._tasks:
646         logging.debug("There are %s tasks left", len(self._tasks))
647     finally:
648       self._lock.release()
649
650     logging.debug("All workers terminated")