Add and remove instance/node locks
[ganeti-local] / lib / workerpool.py
1 #
2 #
3
4 # Copyright (C) 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Base classes for worker pools.
23
24 """
25
26 import collections
27 import logging
28 import threading
29
30 from ganeti import errors
31 from ganeti import utils
32
33
34 class BaseWorker(threading.Thread, object):
35   """Base worker class for worker pools.
36
37   Users of a worker pool must override RunTask in a subclass.
38
39   """
40   def __init__(self, pool, worker_id):
41     """Constructor for BaseWorker thread.
42
43     Args:
44     - pool: Parent worker pool
45     - worker_id: Identifier for this worker
46
47     """
48     super(BaseWorker, self).__init__()
49     self.pool = pool
50     self.worker_id = worker_id
51
52     # Also used by WorkerPool
53     self._current_task = None
54
55   def ShouldTerminate(self):
56     """Returns whether a worker should terminate.
57
58     """
59     return self.pool.ShouldWorkerTerminate(self)
60
61   def run(self):
62     """Main thread function.
63
64     Waits for new tasks to show up in the queue.
65
66     """
67     pool = self.pool
68
69     assert self._current_task is None
70
71     while True:
72       try:
73         # We wait on lock to be told either terminate or do a task.
74         pool._lock.acquire()
75         try:
76           if pool._ShouldWorkerTerminateUnlocked(self):
77             break
78
79           # We only wait if there's no task for us.
80           if not pool._tasks:
81             # wait() releases the lock and sleeps until notified
82             pool._lock.wait()
83
84             # Were we woken up in order to terminate?
85             if pool._ShouldWorkerTerminateUnlocked(self):
86               break
87
88             if not pool._tasks:
89               # Spurious notification, ignore
90               continue
91
92           # Get task from queue and tell pool about it
93           try:
94             self._current_task = pool._tasks.popleft()
95           finally:
96             pool._lock.notifyAll()
97         finally:
98           pool._lock.release()
99
100         # Run the actual task
101         try:
102           self.RunTask(*self._current_task)
103         except:
104           logging.error("Worker %s: Caught unhandled exception",
105                         self.worker_id, exc_info=True)
106       finally:
107         self._current_task = None
108
109         # Notify pool
110         pool._lock.acquire()
111         try:
112           pool._lock.notifyAll()
113         finally:
114           pool._lock.release()
115
116   def RunTask(self, *args):
117     """Function called to start a task.
118
119     """
120     raise NotImplementedError()
121
122
123 class WorkerPool(object):
124   """Worker pool with a queue.
125
126   This class is thread-safe.
127
128   Tasks are guaranteed to be started in the order in which they're added to the
129   pool. Due to the nature of threading, they're not guaranteed to finish in the
130   same order.
131
132   """
133   def __init__(self, num_workers, worker_class):
134     """Constructor for worker pool.
135
136     Args:
137     - num_workers: Number of workers to be started (dynamic resizing is not
138                    yet implemented)
139     - worker_class: Class to be instantiated for workers; should derive from
140                     BaseWorker
141
142     """
143     # Some of these variables are accessed by BaseWorker
144     self._lock = threading.Condition(threading.Lock())
145     self._worker_class = worker_class
146     self._last_worker_id = 0
147     self._workers = []
148     self._quiescing = False
149
150     # Terminating workers
151     self._termworkers = []
152
153     # Queued tasks
154     self._tasks = collections.deque()
155
156     # Start workers
157     self.Resize(num_workers)
158
159   # TODO: Implement dynamic resizing?
160
161   def AddTask(self, *args):
162     """Adds a task to the queue.
163
164     Args:
165     - *args: Arguments passed to BaseWorker.RunTask
166
167     """
168     self._lock.acquire()
169     try:
170       # Don't add new tasks while we're quiescing
171       while self._quiescing:
172         self._lock.wait()
173
174       # Add task to internal queue
175       self._tasks.append(args)
176       self._lock.notify()
177     finally:
178       self._lock.release()
179
180   def _ShouldWorkerTerminateUnlocked(self, worker):
181     """Returns whether a worker should terminate.
182
183     """
184     return (worker in self._termworkers)
185
186   def ShouldWorkerTerminate(self, worker):
187     """Returns whether a worker should terminate.
188
189     """
190     self._lock.acquire()
191     try:
192       return self._ShouldWorkerTerminateUnlocked(self)
193     finally:
194       self._lock.release()
195
196   def _HasRunningTasksUnlocked(self):
197     """Checks whether there's a task running in a worker.
198
199     """
200     for worker in self._workers + self._termworkers:
201       if worker._current_task is not None:
202         return True
203     return False
204
205   def Quiesce(self):
206     """Waits until the task queue is empty.
207
208     """
209     self._lock.acquire()
210     try:
211       self._quiescing = True
212
213       # Wait while there are tasks pending or running
214       while self._tasks or self._HasRunningTasksUnlocked():
215         self._lock.wait()
216
217     finally:
218       self._quiescing = False
219
220       # Make sure AddTasks continues in case it was waiting
221       self._lock.notifyAll()
222
223       self._lock.release()
224
225   def _NewWorkerIdUnlocked(self):
226     self._last_worker_id += 1
227     return self._last_worker_id
228
229   def _ResizeUnlocked(self, num_workers):
230     """Changes the number of workers.
231
232     """
233     assert num_workers >= 0, "num_workers must be >= 0"
234
235     logging.debug("Resizing to %s workers", num_workers)
236
237     current_count = len(self._workers)
238
239     if current_count == num_workers:
240       # Nothing to do
241       pass
242
243     elif current_count > num_workers:
244       if num_workers == 0:
245         # Create copy of list to iterate over while lock isn't held.
246         termworkers = self._workers[:]
247         del self._workers[:]
248       else:
249         # TODO: Implement partial downsizing
250         raise NotImplementedError()
251         #termworkers = ...
252
253       self._termworkers += termworkers
254
255       # Notify workers that something has changed
256       self._lock.notifyAll()
257
258       # Join all terminating workers
259       self._lock.release()
260       try:
261         for worker in termworkers:
262           worker.join()
263       finally:
264         self._lock.acquire()
265
266       # Remove terminated threads. This could be done in a more efficient way
267       # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
268       # don't leave zombie threads around.
269       for worker in termworkers:
270         assert worker in self._termworkers, ("Worker not in list of"
271                                              " terminating workers")
272         if not worker.isAlive():
273           self._termworkers.remove(worker)
274
275       assert not self._termworkers, "Zombie worker detected"
276
277     elif current_count < num_workers:
278       # Create (num_workers - current_count) new workers
279       for i in xrange(num_workers - current_count):
280         worker = self._worker_class(self, self._NewWorkerIdUnlocked())
281         self._workers.append(worker)
282         worker.start()
283
284   def Resize(self, num_workers):
285     """Changes the number of workers in the pool.
286
287     Args:
288     - num_workers: New number of workers
289
290     """
291     self._lock.acquire()
292     try:
293       return self._ResizeUnlocked(num_workers)
294     finally:
295       self._lock.release()
296
297   def TerminateWorkers(self):
298     """Terminate all worker threads.
299
300     Unstarted tasks will be ignored.
301
302     """
303     logging.debug("Terminating all workers")
304
305     self._lock.acquire()
306     try:
307       self._ResizeUnlocked(0)
308
309       if self._tasks:
310         logging.debug("There are %s tasks left", len(self._tasks))
311     finally:
312       self._lock.release()
313
314     logging.debug("All workers terminated")