KVM: Check instances for actual liveness
[ganeti-local] / lib / workerpool.py
1 #
2 #
3
4 # Copyright (C) 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Base classes for worker pools.
23
24 """
25
26 import collections
27 import logging
28 import threading
29
30
31 class BaseWorker(threading.Thread, object):
32   """Base worker class for worker pools.
33
34   Users of a worker pool must override RunTask in a subclass.
35
36   """
37   # pylint: disable-msg=W0212
38   def __init__(self, pool, worker_id):
39     """Constructor for BaseWorker thread.
40
41     @param pool: the parent worker pool
42     @param worker_id: identifier for this worker
43
44     """
45     super(BaseWorker, self).__init__(name=worker_id)
46     self.pool = pool
47     self._current_task = None
48
49   def ShouldTerminate(self):
50     """Returns whether a worker should terminate.
51
52     """
53     return self.pool.ShouldWorkerTerminate(self)
54
55   def _HasRunningTaskUnlocked(self):
56     """Returns whether this worker is currently running a task.
57
58     """
59     return (self._current_task is not None)
60
61   def HasRunningTask(self):
62     """Returns whether this worker is currently running a task.
63
64     """
65     self.pool._lock.acquire()
66     try:
67       return self._HasRunningTaskUnlocked()
68     finally:
69       self.pool._lock.release()
70
71   def run(self):
72     """Main thread function.
73
74     Waits for new tasks to show up in the queue.
75
76     """
77     pool = self.pool
78
79     assert not self.HasRunningTask()
80
81     while True:
82       try:
83         # We wait on lock to be told either terminate or do a task.
84         pool._lock.acquire()
85         try:
86           if pool._ShouldWorkerTerminateUnlocked(self):
87             break
88
89           # We only wait if there's no task for us.
90           if not pool._tasks:
91             logging.debug("Waiting for tasks")
92
93             # wait() releases the lock and sleeps until notified
94             pool._pool_to_worker.wait()
95
96             logging.debug("Notified while waiting")
97
98             # Were we woken up in order to terminate?
99             if pool._ShouldWorkerTerminateUnlocked(self):
100               break
101
102             if not pool._tasks:
103               # Spurious notification, ignore
104               continue
105
106           # Get task from queue and tell pool about it
107           try:
108             self._current_task = pool._tasks.popleft()
109           finally:
110             pool._worker_to_pool.notifyAll()
111         finally:
112           pool._lock.release()
113
114         # Run the actual task
115         try:
116           logging.debug("Starting task %r", self._current_task)
117           self.RunTask(*self._current_task)
118           logging.debug("Done with task %r", self._current_task)
119         except: # pylint: disable-msg=W0702
120           logging.exception("Caught unhandled exception")
121       finally:
122         # Notify pool
123         pool._lock.acquire()
124         try:
125           if self._current_task:
126             self._current_task = None
127             pool._worker_to_pool.notifyAll()
128         finally:
129           pool._lock.release()
130
131     logging.debug("Terminates")
132
133   def RunTask(self, *args):
134     """Function called to start a task.
135
136     This needs to be implemented by child classes.
137
138     """
139     raise NotImplementedError()
140
141
142 class WorkerPool(object):
143   """Worker pool with a queue.
144
145   This class is thread-safe.
146
147   Tasks are guaranteed to be started in the order in which they're
148   added to the pool. Due to the nature of threading, they're not
149   guaranteed to finish in the same order.
150
151   """
152   def __init__(self, name, num_workers, worker_class):
153     """Constructor for worker pool.
154
155     @param num_workers: number of workers to be started
156         (dynamic resizing is not yet implemented)
157     @param worker_class: the class to be instantiated for workers;
158         should derive from L{BaseWorker}
159
160     """
161     # Some of these variables are accessed by BaseWorker
162     self._lock = threading.Lock()
163     self._pool_to_pool = threading.Condition(self._lock)
164     self._pool_to_worker = threading.Condition(self._lock)
165     self._worker_to_pool = threading.Condition(self._lock)
166     self._worker_class = worker_class
167     self._name = name
168     self._last_worker_id = 0
169     self._workers = []
170     self._quiescing = False
171
172     # Terminating workers
173     self._termworkers = []
174
175     # Queued tasks
176     self._tasks = collections.deque()
177
178     # Start workers
179     self.Resize(num_workers)
180
181   # TODO: Implement dynamic resizing?
182
183   def AddTask(self, *args):
184     """Adds a task to the queue.
185
186     @param args: arguments passed to L{BaseWorker.RunTask}
187
188     """
189     self._lock.acquire()
190     try:
191       # Don't add new tasks while we're quiescing
192       while self._quiescing:
193         self._pool_to_pool.wait()
194
195       # Add task to internal queue
196       self._tasks.append(args)
197
198       # Wake one idling worker up
199       self._pool_to_worker.notify()
200     finally:
201       self._lock.release()
202
203   def _ShouldWorkerTerminateUnlocked(self, worker):
204     """Returns whether a worker should terminate.
205
206     """
207     return (worker in self._termworkers)
208
209   def ShouldWorkerTerminate(self, worker):
210     """Returns whether a worker should terminate.
211
212     """
213     self._lock.acquire()
214     try:
215       return self._ShouldWorkerTerminateUnlocked(worker)
216     finally:
217       self._lock.release()
218
219   def _HasRunningTasksUnlocked(self):
220     """Checks whether there's a task running in a worker.
221
222     """
223     for worker in self._workers + self._termworkers:
224       if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
225         return True
226     return False
227
228   def Quiesce(self):
229     """Waits until the task queue is empty.
230
231     """
232     self._lock.acquire()
233     try:
234       self._quiescing = True
235
236       # Wait while there are tasks pending or running
237       while self._tasks or self._HasRunningTasksUnlocked():
238         self._worker_to_pool.wait()
239
240     finally:
241       self._quiescing = False
242
243       # Make sure AddTasks continues in case it was waiting
244       self._pool_to_pool.notifyAll()
245
246       self._lock.release()
247
248   def _NewWorkerIdUnlocked(self):
249     """Return an identifier for a new worker.
250
251     """
252     self._last_worker_id += 1
253
254     return "%s%d" % (self._name, self._last_worker_id)
255
256   def _ResizeUnlocked(self, num_workers):
257     """Changes the number of workers.
258
259     """
260     assert num_workers >= 0, "num_workers must be >= 0"
261
262     logging.debug("Resizing to %s workers", num_workers)
263
264     current_count = len(self._workers)
265
266     if current_count == num_workers:
267       # Nothing to do
268       pass
269
270     elif current_count > num_workers:
271       if num_workers == 0:
272         # Create copy of list to iterate over while lock isn't held.
273         termworkers = self._workers[:]
274         del self._workers[:]
275       else:
276         # TODO: Implement partial downsizing
277         raise NotImplementedError()
278         #termworkers = ...
279
280       self._termworkers += termworkers
281
282       # Notify workers that something has changed
283       self._pool_to_worker.notifyAll()
284
285       # Join all terminating workers
286       self._lock.release()
287       try:
288         for worker in termworkers:
289           logging.debug("Waiting for thread %s", worker.getName())
290           worker.join()
291       finally:
292         self._lock.acquire()
293
294       # Remove terminated threads. This could be done in a more efficient way
295       # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
296       # don't leave zombie threads around.
297       for worker in termworkers:
298         assert worker in self._termworkers, ("Worker not in list of"
299                                              " terminating workers")
300         if not worker.isAlive():
301           self._termworkers.remove(worker)
302
303       assert not self._termworkers, "Zombie worker detected"
304
305     elif current_count < num_workers:
306       # Create (num_workers - current_count) new workers
307       for _ in range(num_workers - current_count):
308         worker = self._worker_class(self, self._NewWorkerIdUnlocked())
309         self._workers.append(worker)
310         worker.start()
311
312   def Resize(self, num_workers):
313     """Changes the number of workers in the pool.
314
315     @param num_workers: the new number of workers
316
317     """
318     self._lock.acquire()
319     try:
320       return self._ResizeUnlocked(num_workers)
321     finally:
322       self._lock.release()
323
324   def TerminateWorkers(self):
325     """Terminate all worker threads.
326
327     Unstarted tasks will be ignored.
328
329     """
330     logging.debug("Terminating all workers")
331
332     self._lock.acquire()
333     try:
334       self._ResizeUnlocked(0)
335
336       if self._tasks:
337         logging.debug("There are %s tasks left", len(self._tasks))
338     finally:
339       self._lock.release()
340
341     logging.debug("All workers terminated")