Remove mcpu's ReportLocks callback
[ganeti-local] / lib / workerpool.py
1 #
2 #
3
4 # Copyright (C) 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Base classes for worker pools.
23
24 """
25
26 import collections
27 import logging
28 import threading
29
30 from ganeti import compat
31
32
33 _TERMINATE = object()
34
35
36 class BaseWorker(threading.Thread, object):
37   """Base worker class for worker pools.
38
39   Users of a worker pool must override RunTask in a subclass.
40
41   """
42   # pylint: disable-msg=W0212
43   def __init__(self, pool, worker_id):
44     """Constructor for BaseWorker thread.
45
46     @param pool: the parent worker pool
47     @param worker_id: identifier for this worker
48
49     """
50     super(BaseWorker, self).__init__(name=worker_id)
51     self.pool = pool
52     self._worker_id = worker_id
53     self._current_task = None
54
55     assert self.getName() == worker_id
56
57   def ShouldTerminate(self):
58     """Returns whether this worker should terminate.
59
60     Should only be called from within L{RunTask}.
61
62     """
63     self.pool._lock.acquire()
64     try:
65       assert self._HasRunningTaskUnlocked()
66       return self.pool._ShouldWorkerTerminateUnlocked(self)
67     finally:
68       self.pool._lock.release()
69
70   def SetTaskName(self, taskname):
71     """Sets the name of the current task.
72
73     Should only be called from within L{RunTask}.
74
75     @type taskname: string
76     @param taskname: Task's name
77
78     """
79     if taskname:
80       name = "%s/%s" % (self._worker_id, taskname)
81     else:
82       name = self._worker_id
83
84     # Set thread name
85     self.setName(name)
86
87   def _HasRunningTaskUnlocked(self):
88     """Returns whether this worker is currently running a task.
89
90     """
91     return (self._current_task is not None)
92
93   def run(self):
94     """Main thread function.
95
96     Waits for new tasks to show up in the queue.
97
98     """
99     pool = self.pool
100
101     while True:
102       assert self._current_task is None
103       try:
104         # Wait on lock to be told either to terminate or to do a task
105         pool._lock.acquire()
106         try:
107           task = pool._WaitForTaskUnlocked(self)
108
109           if task is _TERMINATE:
110             # Told to terminate
111             break
112
113           if task is None:
114             # Spurious notification, ignore
115             continue
116
117           self._current_task = task
118
119           # No longer needed, dispose of reference
120           del task
121
122           assert self._HasRunningTaskUnlocked()
123
124         finally:
125           pool._lock.release()
126
127         # Run the actual task
128         try:
129           logging.debug("Starting task %r", self._current_task)
130           assert self.getName() == self._worker_id
131           try:
132             self.RunTask(*self._current_task)
133           finally:
134             self.SetTaskName(None)
135           logging.debug("Done with task %r", self._current_task)
136         except: # pylint: disable-msg=W0702
137           logging.exception("Caught unhandled exception")
138
139         assert self._HasRunningTaskUnlocked()
140       finally:
141         # Notify pool
142         pool._lock.acquire()
143         try:
144           if self._current_task:
145             self._current_task = None
146             pool._worker_to_pool.notifyAll()
147         finally:
148           pool._lock.release()
149
150       assert not self._HasRunningTaskUnlocked()
151
152     logging.debug("Terminates")
153
154   def RunTask(self, *args):
155     """Function called to start a task.
156
157     This needs to be implemented by child classes.
158
159     """
160     raise NotImplementedError()
161
162
163 class WorkerPool(object):
164   """Worker pool with a queue.
165
166   This class is thread-safe.
167
168   Tasks are guaranteed to be started in the order in which they're
169   added to the pool. Due to the nature of threading, they're not
170   guaranteed to finish in the same order.
171
172   """
173   def __init__(self, name, num_workers, worker_class):
174     """Constructor for worker pool.
175
176     @param num_workers: number of workers to be started
177         (dynamic resizing is not yet implemented)
178     @param worker_class: the class to be instantiated for workers;
179         should derive from L{BaseWorker}
180
181     """
182     # Some of these variables are accessed by BaseWorker
183     self._lock = threading.Lock()
184     self._pool_to_pool = threading.Condition(self._lock)
185     self._pool_to_worker = threading.Condition(self._lock)
186     self._worker_to_pool = threading.Condition(self._lock)
187     self._worker_class = worker_class
188     self._name = name
189     self._last_worker_id = 0
190     self._workers = []
191     self._quiescing = False
192
193     # Terminating workers
194     self._termworkers = []
195
196     # Queued tasks
197     self._tasks = collections.deque()
198
199     # Start workers
200     self.Resize(num_workers)
201
202   # TODO: Implement dynamic resizing?
203
204   def _WaitWhileQuiescingUnlocked(self):
205     """Wait until the worker pool has finished quiescing.
206
207     """
208     while self._quiescing:
209       self._pool_to_pool.wait()
210
211   def _AddTaskUnlocked(self, args):
212     assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
213
214     self._tasks.append(args)
215
216     # Notify a waiting worker
217     self._pool_to_worker.notify()
218
219   def AddTask(self, args):
220     """Adds a task to the queue.
221
222     @type args: sequence
223     @param args: arguments passed to L{BaseWorker.RunTask}
224
225     """
226     self._lock.acquire()
227     try:
228       self._WaitWhileQuiescingUnlocked()
229       self._AddTaskUnlocked(args)
230     finally:
231       self._lock.release()
232
233   def AddManyTasks(self, tasks):
234     """Add a list of tasks to the queue.
235
236     @type tasks: list of tuples
237     @param tasks: list of args passed to L{BaseWorker.RunTask}
238
239     """
240     assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
241       "Each task must be a sequence"
242
243     self._lock.acquire()
244     try:
245       self._WaitWhileQuiescingUnlocked()
246
247       for args in tasks:
248         self._AddTaskUnlocked(args)
249     finally:
250       self._lock.release()
251
252   def _WaitForTaskUnlocked(self, worker):
253     """Waits for a task for a worker.
254
255     @type worker: L{BaseWorker}
256     @param worker: Worker thread
257
258     """
259     if self._ShouldWorkerTerminateUnlocked(worker):
260       return _TERMINATE
261
262     # We only wait if there's no task for us.
263     if not self._tasks:
264       logging.debug("Waiting for tasks")
265
266       # wait() releases the lock and sleeps until notified
267       self._pool_to_worker.wait()
268
269       logging.debug("Notified while waiting")
270
271       # Were we woken up in order to terminate?
272       if self._ShouldWorkerTerminateUnlocked(worker):
273         return _TERMINATE
274
275       if not self._tasks:
276         # Spurious notification, ignore
277         return None
278
279     # Get task from queue and tell pool about it
280     try:
281       return self._tasks.popleft()
282     finally:
283       self._worker_to_pool.notifyAll()
284
285   def _ShouldWorkerTerminateUnlocked(self, worker):
286     """Returns whether a worker should terminate.
287
288     """
289     return (worker in self._termworkers)
290
291   def _HasRunningTasksUnlocked(self):
292     """Checks whether there's a task running in a worker.
293
294     """
295     for worker in self._workers + self._termworkers:
296       if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
297         return True
298     return False
299
300   def Quiesce(self):
301     """Waits until the task queue is empty.
302
303     """
304     self._lock.acquire()
305     try:
306       self._quiescing = True
307
308       # Wait while there are tasks pending or running
309       while self._tasks or self._HasRunningTasksUnlocked():
310         self._worker_to_pool.wait()
311
312     finally:
313       self._quiescing = False
314
315       # Make sure AddTasks continues in case it was waiting
316       self._pool_to_pool.notifyAll()
317
318       self._lock.release()
319
320   def _NewWorkerIdUnlocked(self):
321     """Return an identifier for a new worker.
322
323     """
324     self._last_worker_id += 1
325
326     return "%s%d" % (self._name, self._last_worker_id)
327
328   def _ResizeUnlocked(self, num_workers):
329     """Changes the number of workers.
330
331     """
332     assert num_workers >= 0, "num_workers must be >= 0"
333
334     logging.debug("Resizing to %s workers", num_workers)
335
336     current_count = len(self._workers)
337
338     if current_count == num_workers:
339       # Nothing to do
340       pass
341
342     elif current_count > num_workers:
343       if num_workers == 0:
344         # Create copy of list to iterate over while lock isn't held.
345         termworkers = self._workers[:]
346         del self._workers[:]
347       else:
348         # TODO: Implement partial downsizing
349         raise NotImplementedError()
350         #termworkers = ...
351
352       self._termworkers += termworkers
353
354       # Notify workers that something has changed
355       self._pool_to_worker.notifyAll()
356
357       # Join all terminating workers
358       self._lock.release()
359       try:
360         for worker in termworkers:
361           logging.debug("Waiting for thread %s", worker.getName())
362           worker.join()
363       finally:
364         self._lock.acquire()
365
366       # Remove terminated threads. This could be done in a more efficient way
367       # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
368       # don't leave zombie threads around.
369       for worker in termworkers:
370         assert worker in self._termworkers, ("Worker not in list of"
371                                              " terminating workers")
372         if not worker.isAlive():
373           self._termworkers.remove(worker)
374
375       assert not self._termworkers, "Zombie worker detected"
376
377     elif current_count < num_workers:
378       # Create (num_workers - current_count) new workers
379       for _ in range(num_workers - current_count):
380         worker = self._worker_class(self, self._NewWorkerIdUnlocked())
381         self._workers.append(worker)
382         worker.start()
383
384   def Resize(self, num_workers):
385     """Changes the number of workers in the pool.
386
387     @param num_workers: the new number of workers
388
389     """
390     self._lock.acquire()
391     try:
392       return self._ResizeUnlocked(num_workers)
393     finally:
394       self._lock.release()
395
396   def TerminateWorkers(self):
397     """Terminate all worker threads.
398
399     Unstarted tasks will be ignored.
400
401     """
402     logging.debug("Terminating all workers")
403
404     self._lock.acquire()
405     try:
406       self._ResizeUnlocked(0)
407
408       if self._tasks:
409         logging.debug("There are %s tasks left", len(self._tasks))
410     finally:
411       self._lock.release()
412
413     logging.debug("All workers terminated")