Merge branch 'stable-2.6-hotplug' into stable-2.6-ippool-hotplug-esi
[ganeti-local] / lib / workerpool.py
1 #
2 #
3
4 # Copyright (C) 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Base classes for worker pools.
23
24 """
25
26 import logging
27 import threading
28 import heapq
29
30 from ganeti import compat
31 from ganeti import errors
32
33
34 _TERMINATE = object()
35 _DEFAULT_PRIORITY = 0
36
37
38 class DeferTask(Exception):
39   """Special exception class to defer a task.
40
41   This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
42   task. Optionally, the priority of the task can be changed.
43
44   """
45   def __init__(self, priority=None):
46     """Initializes this class.
47
48     @type priority: number
49     @param priority: New task priority (None means no change)
50
51     """
52     Exception.__init__(self)
53     self.priority = priority
54
55
56 class BaseWorker(threading.Thread, object):
57   """Base worker class for worker pools.
58
59   Users of a worker pool must override RunTask in a subclass.
60
61   """
62   # pylint: disable=W0212
63   def __init__(self, pool, worker_id):
64     """Constructor for BaseWorker thread.
65
66     @param pool: the parent worker pool
67     @param worker_id: identifier for this worker
68
69     """
70     super(BaseWorker, self).__init__(name=worker_id)
71     self.pool = pool
72     self._worker_id = worker_id
73     self._current_task = None
74
75     assert self.getName() == worker_id
76
77   def ShouldTerminate(self):
78     """Returns whether this worker should terminate.
79
80     Should only be called from within L{RunTask}.
81
82     """
83     self.pool._lock.acquire()
84     try:
85       assert self._HasRunningTaskUnlocked()
86       return self.pool._ShouldWorkerTerminateUnlocked(self)
87     finally:
88       self.pool._lock.release()
89
90   def GetCurrentPriority(self):
91     """Returns the priority of the current task.
92
93     Should only be called from within L{RunTask}.
94
95     """
96     self.pool._lock.acquire()
97     try:
98       assert self._HasRunningTaskUnlocked()
99
100       (priority, _, _) = self._current_task
101
102       return priority
103     finally:
104       self.pool._lock.release()
105
106   def SetTaskName(self, taskname):
107     """Sets the name of the current task.
108
109     Should only be called from within L{RunTask}.
110
111     @type taskname: string
112     @param taskname: Task's name
113
114     """
115     if taskname:
116       name = "%s/%s" % (self._worker_id, taskname)
117     else:
118       name = self._worker_id
119
120     # Set thread name
121     self.setName(name)
122
123   def _HasRunningTaskUnlocked(self):
124     """Returns whether this worker is currently running a task.
125
126     """
127     return (self._current_task is not None)
128
129   def run(self):
130     """Main thread function.
131
132     Waits for new tasks to show up in the queue.
133
134     """
135     pool = self.pool
136
137     while True:
138       assert self._current_task is None
139
140       defer = None
141       try:
142         # Wait on lock to be told either to terminate or to do a task
143         pool._lock.acquire()
144         try:
145           task = pool._WaitForTaskUnlocked(self)
146
147           if task is _TERMINATE:
148             # Told to terminate
149             break
150
151           if task is None:
152             # Spurious notification, ignore
153             continue
154
155           self._current_task = task
156
157           # No longer needed, dispose of reference
158           del task
159
160           assert self._HasRunningTaskUnlocked()
161
162         finally:
163           pool._lock.release()
164
165         (priority, _, args) = self._current_task
166         try:
167           # Run the actual task
168           assert defer is None
169           logging.debug("Starting task %r, priority %s", args, priority)
170           assert self.getName() == self._worker_id
171           try:
172             self.RunTask(*args) # pylint: disable=W0142
173           finally:
174             self.SetTaskName(None)
175           logging.debug("Done with task %r, priority %s", args, priority)
176         except DeferTask, err:
177           defer = err
178
179           if defer.priority is None:
180             # Use same priority
181             defer.priority = priority
182
183           logging.debug("Deferring task %r, new priority %s",
184                         args, defer.priority)
185
186           assert self._HasRunningTaskUnlocked()
187         except: # pylint: disable=W0702
188           logging.exception("Caught unhandled exception")
189
190         assert self._HasRunningTaskUnlocked()
191       finally:
192         # Notify pool
193         pool._lock.acquire()
194         try:
195           if defer:
196             assert self._current_task
197             # Schedule again for later run
198             (_, _, args) = self._current_task
199             pool._AddTaskUnlocked(args, defer.priority)
200
201           if self._current_task:
202             self._current_task = None
203             pool._worker_to_pool.notifyAll()
204         finally:
205           pool._lock.release()
206
207       assert not self._HasRunningTaskUnlocked()
208
209     logging.debug("Terminates")
210
211   def RunTask(self, *args):
212     """Function called to start a task.
213
214     This needs to be implemented by child classes.
215
216     """
217     raise NotImplementedError()
218
219
220 class WorkerPool(object):
221   """Worker pool with a queue.
222
223   This class is thread-safe.
224
225   Tasks are guaranteed to be started in the order in which they're
226   added to the pool. Due to the nature of threading, they're not
227   guaranteed to finish in the same order.
228
229   """
230   def __init__(self, name, num_workers, worker_class):
231     """Constructor for worker pool.
232
233     @param num_workers: number of workers to be started
234         (dynamic resizing is not yet implemented)
235     @param worker_class: the class to be instantiated for workers;
236         should derive from L{BaseWorker}
237
238     """
239     # Some of these variables are accessed by BaseWorker
240     self._lock = threading.Lock()
241     self._pool_to_pool = threading.Condition(self._lock)
242     self._pool_to_worker = threading.Condition(self._lock)
243     self._worker_to_pool = threading.Condition(self._lock)
244     self._worker_class = worker_class
245     self._name = name
246     self._last_worker_id = 0
247     self._workers = []
248     self._quiescing = False
249     self._active = True
250
251     # Terminating workers
252     self._termworkers = []
253
254     # Queued tasks
255     self._counter = 0
256     self._tasks = []
257
258     # Start workers
259     self.Resize(num_workers)
260
261   # TODO: Implement dynamic resizing?
262
263   def _WaitWhileQuiescingUnlocked(self):
264     """Wait until the worker pool has finished quiescing.
265
266     """
267     while self._quiescing:
268       self._pool_to_pool.wait()
269
270   def _AddTaskUnlocked(self, args, priority):
271     """Adds a task to the internal queue.
272
273     @type args: sequence
274     @param args: Arguments passed to L{BaseWorker.RunTask}
275     @type priority: number
276     @param priority: Task priority
277
278     """
279     assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
280     assert isinstance(priority, (int, long)), "Priority must be numeric"
281
282     # This counter is used to ensure elements are processed in their
283     # incoming order. For processing they're sorted by priority and then
284     # counter.
285     self._counter += 1
286
287     heapq.heappush(self._tasks, (priority, self._counter, args))
288
289     # Notify a waiting worker
290     self._pool_to_worker.notify()
291
292   def AddTask(self, args, priority=_DEFAULT_PRIORITY):
293     """Adds a task to the queue.
294
295     @type args: sequence
296     @param args: arguments passed to L{BaseWorker.RunTask}
297     @type priority: number
298     @param priority: Task priority
299
300     """
301     self._lock.acquire()
302     try:
303       self._WaitWhileQuiescingUnlocked()
304       self._AddTaskUnlocked(args, priority)
305     finally:
306       self._lock.release()
307
308   def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY):
309     """Add a list of tasks to the queue.
310
311     @type tasks: list of tuples
312     @param tasks: list of args passed to L{BaseWorker.RunTask}
313     @type priority: number or list of numbers
314     @param priority: Priority for all added tasks or a list with the priority
315                      for each task
316
317     """
318     assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
319       "Each task must be a sequence"
320
321     assert (isinstance(priority, (int, long)) or
322             compat.all(isinstance(prio, (int, long)) for prio in priority)), \
323            "Priority must be numeric or be a list of numeric values"
324
325     if isinstance(priority, (int, long)):
326       priority = [priority] * len(tasks)
327     elif len(priority) != len(tasks):
328       raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
329                                    " number of tasks (%s)" %
330                                    (len(priority), len(tasks)))
331
332     self._lock.acquire()
333     try:
334       self._WaitWhileQuiescingUnlocked()
335
336       assert compat.all(isinstance(prio, (int, long)) for prio in priority)
337       assert len(tasks) == len(priority)
338
339       for args, priority in zip(tasks, priority):
340         self._AddTaskUnlocked(args, priority)
341     finally:
342       self._lock.release()
343
344   def SetActive(self, active):
345     """Enable/disable processing of tasks.
346
347     This is different from L{Quiesce} in the sense that this function just
348     changes an internal flag and doesn't wait for the queue to be empty. Tasks
349     already being processed continue normally, but no new tasks will be
350     started. New tasks can still be added.
351
352     @type active: bool
353     @param active: Whether tasks should be processed
354
355     """
356     self._lock.acquire()
357     try:
358       self._active = active
359
360       if active:
361         # Tell all workers to continue processing
362         self._pool_to_worker.notifyAll()
363     finally:
364       self._lock.release()
365
366   def _WaitForTaskUnlocked(self, worker):
367     """Waits for a task for a worker.
368
369     @type worker: L{BaseWorker}
370     @param worker: Worker thread
371
372     """
373     if self._ShouldWorkerTerminateUnlocked(worker):
374       return _TERMINATE
375
376     # We only wait if there's no task for us.
377     if not (self._active and self._tasks):
378       logging.debug("Waiting for tasks")
379
380       while True:
381         # wait() releases the lock and sleeps until notified
382         self._pool_to_worker.wait()
383
384         logging.debug("Notified while waiting")
385
386         # Were we woken up in order to terminate?
387         if self._ShouldWorkerTerminateUnlocked(worker):
388           return _TERMINATE
389
390         # Just loop if pool is not processing tasks at this time
391         if self._active and self._tasks:
392           break
393
394     # Get task from queue and tell pool about it
395     try:
396       return heapq.heappop(self._tasks)
397     finally:
398       self._worker_to_pool.notifyAll()
399
400   def _ShouldWorkerTerminateUnlocked(self, worker):
401     """Returns whether a worker should terminate.
402
403     """
404     return (worker in self._termworkers)
405
406   def _HasRunningTasksUnlocked(self):
407     """Checks whether there's a task running in a worker.
408
409     """
410     for worker in self._workers + self._termworkers:
411       if worker._HasRunningTaskUnlocked(): # pylint: disable=W0212
412         return True
413     return False
414
415   def HasRunningTasks(self):
416     """Checks whether there's at least one task running.
417
418     """
419     self._lock.acquire()
420     try:
421       return self._HasRunningTasksUnlocked()
422     finally:
423       self._lock.release()
424
425   def Quiesce(self):
426     """Waits until the task queue is empty.
427
428     """
429     self._lock.acquire()
430     try:
431       self._quiescing = True
432
433       # Wait while there are tasks pending or running
434       while self._tasks or self._HasRunningTasksUnlocked():
435         self._worker_to_pool.wait()
436
437     finally:
438       self._quiescing = False
439
440       # Make sure AddTasks continues in case it was waiting
441       self._pool_to_pool.notifyAll()
442
443       self._lock.release()
444
445   def _NewWorkerIdUnlocked(self):
446     """Return an identifier for a new worker.
447
448     """
449     self._last_worker_id += 1
450
451     return "%s%d" % (self._name, self._last_worker_id)
452
453   def _ResizeUnlocked(self, num_workers):
454     """Changes the number of workers.
455
456     """
457     assert num_workers >= 0, "num_workers must be >= 0"
458
459     logging.debug("Resizing to %s workers", num_workers)
460
461     current_count = len(self._workers)
462
463     if current_count == num_workers:
464       # Nothing to do
465       pass
466
467     elif current_count > num_workers:
468       if num_workers == 0:
469         # Create copy of list to iterate over while lock isn't held.
470         termworkers = self._workers[:]
471         del self._workers[:]
472       else:
473         # TODO: Implement partial downsizing
474         raise NotImplementedError()
475         #termworkers = ...
476
477       self._termworkers += termworkers
478
479       # Notify workers that something has changed
480       self._pool_to_worker.notifyAll()
481
482       # Join all terminating workers
483       self._lock.release()
484       try:
485         for worker in termworkers:
486           logging.debug("Waiting for thread %s", worker.getName())
487           worker.join()
488       finally:
489         self._lock.acquire()
490
491       # Remove terminated threads. This could be done in a more efficient way
492       # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
493       # don't leave zombie threads around.
494       for worker in termworkers:
495         assert worker in self._termworkers, ("Worker not in list of"
496                                              " terminating workers")
497         if not worker.isAlive():
498           self._termworkers.remove(worker)
499
500       assert not self._termworkers, "Zombie worker detected"
501
502     elif current_count < num_workers:
503       # Create (num_workers - current_count) new workers
504       for _ in range(num_workers - current_count):
505         worker = self._worker_class(self, self._NewWorkerIdUnlocked())
506         self._workers.append(worker)
507         worker.start()
508
509   def Resize(self, num_workers):
510     """Changes the number of workers in the pool.
511
512     @param num_workers: the new number of workers
513
514     """
515     self._lock.acquire()
516     try:
517       return self._ResizeUnlocked(num_workers)
518     finally:
519       self._lock.release()
520
521   def TerminateWorkers(self):
522     """Terminate all worker threads.
523
524     Unstarted tasks will be ignored.
525
526     """
527     logging.debug("Terminating all workers")
528
529     self._lock.acquire()
530     try:
531       self._ResizeUnlocked(0)
532
533       if self._tasks:
534         logging.debug("There are %s tasks left", len(self._tasks))
535     finally:
536       self._lock.release()
537
538     logging.debug("All workers terminated")