workerpool: Remove unused worker method
[ganeti-local] / lib / workerpool.py
1 #
2 #
3
4 # Copyright (C) 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Base classes for worker pools.
23
24 """
25
26 import collections
27 import logging
28 import threading
29
30 from ganeti import compat
31
32
33 _TERMINATE = object()
34
35
36 class BaseWorker(threading.Thread, object):
37   """Base worker class for worker pools.
38
39   Users of a worker pool must override RunTask in a subclass.
40
41   """
42   # pylint: disable-msg=W0212
43   def __init__(self, pool, worker_id):
44     """Constructor for BaseWorker thread.
45
46     @param pool: the parent worker pool
47     @param worker_id: identifier for this worker
48
49     """
50     super(BaseWorker, self).__init__(name=worker_id)
51     self.pool = pool
52     self._current_task = None
53
54   def ShouldTerminate(self):
55     """Returns whether a worker should terminate.
56
57     """
58     return self.pool.ShouldWorkerTerminate(self)
59
60   def _HasRunningTaskUnlocked(self):
61     """Returns whether this worker is currently running a task.
62
63     """
64     return (self._current_task is not None)
65
66   def run(self):
67     """Main thread function.
68
69     Waits for new tasks to show up in the queue.
70
71     """
72     pool = self.pool
73
74     assert self._current_task is None
75
76     while True:
77       try:
78         # Wait on lock to be told either to terminate or to do a task
79         pool._lock.acquire()
80         try:
81           task = pool._WaitForTaskUnlocked(self)
82
83           if task is _TERMINATE:
84             # Told to terminate
85             break
86
87           if task is None:
88             # Spurious notification, ignore
89             continue
90
91           self._current_task = task
92
93           assert self._HasRunningTaskUnlocked()
94         finally:
95           pool._lock.release()
96
97         # Run the actual task
98         try:
99           logging.debug("Starting task %r", self._current_task)
100           self.RunTask(*self._current_task)
101           logging.debug("Done with task %r", self._current_task)
102         except: # pylint: disable-msg=W0702
103           logging.exception("Caught unhandled exception")
104       finally:
105         # Notify pool
106         pool._lock.acquire()
107         try:
108           if self._current_task:
109             self._current_task = None
110             pool._worker_to_pool.notifyAll()
111         finally:
112           pool._lock.release()
113
114     logging.debug("Terminates")
115
116   def RunTask(self, *args):
117     """Function called to start a task.
118
119     This needs to be implemented by child classes.
120
121     """
122     raise NotImplementedError()
123
124
125 class WorkerPool(object):
126   """Worker pool with a queue.
127
128   This class is thread-safe.
129
130   Tasks are guaranteed to be started in the order in which they're
131   added to the pool. Due to the nature of threading, they're not
132   guaranteed to finish in the same order.
133
134   """
135   def __init__(self, name, num_workers, worker_class):
136     """Constructor for worker pool.
137
138     @param num_workers: number of workers to be started
139         (dynamic resizing is not yet implemented)
140     @param worker_class: the class to be instantiated for workers;
141         should derive from L{BaseWorker}
142
143     """
144     # Some of these variables are accessed by BaseWorker
145     self._lock = threading.Lock()
146     self._pool_to_pool = threading.Condition(self._lock)
147     self._pool_to_worker = threading.Condition(self._lock)
148     self._worker_to_pool = threading.Condition(self._lock)
149     self._worker_class = worker_class
150     self._name = name
151     self._last_worker_id = 0
152     self._workers = []
153     self._quiescing = False
154
155     # Terminating workers
156     self._termworkers = []
157
158     # Queued tasks
159     self._tasks = collections.deque()
160
161     # Start workers
162     self.Resize(num_workers)
163
164   # TODO: Implement dynamic resizing?
165
166   def _WaitWhileQuiescingUnlocked(self):
167     """Wait until the worker pool has finished quiescing.
168
169     """
170     while self._quiescing:
171       self._pool_to_pool.wait()
172
173   def _AddTaskUnlocked(self, args):
174     assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
175
176     self._tasks.append(args)
177
178     # Notify a waiting worker
179     self._pool_to_worker.notify()
180
181   def AddTask(self, *args):
182     """Adds a task to the queue.
183
184     @param args: arguments passed to L{BaseWorker.RunTask}
185
186     """
187     self._lock.acquire()
188     try:
189       self._WaitWhileQuiescingUnlocked()
190       self._AddTaskUnlocked(args)
191     finally:
192       self._lock.release()
193
194   def AddManyTasks(self, tasks):
195     """Add a list of tasks to the queue.
196
197     @type tasks: list of tuples
198     @param tasks: list of args passed to L{BaseWorker.RunTask}
199
200     """
201     assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
202       "Each task must be a sequence"
203
204     self._lock.acquire()
205     try:
206       self._WaitWhileQuiescingUnlocked()
207
208       for args in tasks:
209         self._AddTaskUnlocked(args)
210     finally:
211       self._lock.release()
212
213   def _WaitForTaskUnlocked(self, worker):
214     """Waits for a task for a worker.
215
216     @type worker: L{BaseWorker}
217     @param worker: Worker thread
218
219     """
220     if self._ShouldWorkerTerminateUnlocked(worker):
221       return _TERMINATE
222
223     # We only wait if there's no task for us.
224     if not self._tasks:
225       logging.debug("Waiting for tasks")
226
227       # wait() releases the lock and sleeps until notified
228       self._pool_to_worker.wait()
229
230       logging.debug("Notified while waiting")
231
232       # Were we woken up in order to terminate?
233       if self._ShouldWorkerTerminateUnlocked(worker):
234         return _TERMINATE
235
236       if not self._tasks:
237         # Spurious notification, ignore
238         return None
239
240     # Get task from queue and tell pool about it
241     try:
242       return self._tasks.popleft()
243     finally:
244       self._worker_to_pool.notifyAll()
245
246   def _ShouldWorkerTerminateUnlocked(self, worker):
247     """Returns whether a worker should terminate.
248
249     """
250     return (worker in self._termworkers)
251
252   def ShouldWorkerTerminate(self, worker):
253     """Returns whether a worker should terminate.
254
255     """
256     self._lock.acquire()
257     try:
258       return self._ShouldWorkerTerminateUnlocked(worker)
259     finally:
260       self._lock.release()
261
262   def _HasRunningTasksUnlocked(self):
263     """Checks whether there's a task running in a worker.
264
265     """
266     for worker in self._workers + self._termworkers:
267       if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
268         return True
269     return False
270
271   def Quiesce(self):
272     """Waits until the task queue is empty.
273
274     """
275     self._lock.acquire()
276     try:
277       self._quiescing = True
278
279       # Wait while there are tasks pending or running
280       while self._tasks or self._HasRunningTasksUnlocked():
281         self._worker_to_pool.wait()
282
283     finally:
284       self._quiescing = False
285
286       # Make sure AddTasks continues in case it was waiting
287       self._pool_to_pool.notifyAll()
288
289       self._lock.release()
290
291   def _NewWorkerIdUnlocked(self):
292     """Return an identifier for a new worker.
293
294     """
295     self._last_worker_id += 1
296
297     return "%s%d" % (self._name, self._last_worker_id)
298
299   def _ResizeUnlocked(self, num_workers):
300     """Changes the number of workers.
301
302     """
303     assert num_workers >= 0, "num_workers must be >= 0"
304
305     logging.debug("Resizing to %s workers", num_workers)
306
307     current_count = len(self._workers)
308
309     if current_count == num_workers:
310       # Nothing to do
311       pass
312
313     elif current_count > num_workers:
314       if num_workers == 0:
315         # Create copy of list to iterate over while lock isn't held.
316         termworkers = self._workers[:]
317         del self._workers[:]
318       else:
319         # TODO: Implement partial downsizing
320         raise NotImplementedError()
321         #termworkers = ...
322
323       self._termworkers += termworkers
324
325       # Notify workers that something has changed
326       self._pool_to_worker.notifyAll()
327
328       # Join all terminating workers
329       self._lock.release()
330       try:
331         for worker in termworkers:
332           logging.debug("Waiting for thread %s", worker.getName())
333           worker.join()
334       finally:
335         self._lock.acquire()
336
337       # Remove terminated threads. This could be done in a more efficient way
338       # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
339       # don't leave zombie threads around.
340       for worker in termworkers:
341         assert worker in self._termworkers, ("Worker not in list of"
342                                              " terminating workers")
343         if not worker.isAlive():
344           self._termworkers.remove(worker)
345
346       assert not self._termworkers, "Zombie worker detected"
347
348     elif current_count < num_workers:
349       # Create (num_workers - current_count) new workers
350       for _ in range(num_workers - current_count):
351         worker = self._worker_class(self, self._NewWorkerIdUnlocked())
352         self._workers.append(worker)
353         worker.start()
354
355   def Resize(self, num_workers):
356     """Changes the number of workers in the pool.
357
358     @param num_workers: the new number of workers
359
360     """
361     self._lock.acquire()
362     try:
363       return self._ResizeUnlocked(num_workers)
364     finally:
365       self._lock.release()
366
367   def TerminateWorkers(self):
368     """Terminate all worker threads.
369
370     Unstarted tasks will be ignored.
371
372     """
373     logging.debug("Terminating all workers")
374
375     self._lock.acquire()
376     try:
377       self._ResizeUnlocked(0)
378
379       if self._tasks:
380         logging.debug("There are %s tasks left", len(self._tasks))
381     finally:
382       self._lock.release()
383
384     logging.debug("All workers terminated")