Update the 2.2 design doc with OS parameters
[ganeti-local] / lib / workerpool.py
1 #
2 #
3
4 # Copyright (C) 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Base classes for worker pools.
23
24 """
25
26 import collections
27 import logging
28 import threading
29
30 from ganeti import compat
31
32
33 class BaseWorker(threading.Thread, object):
34   """Base worker class for worker pools.
35
36   Users of a worker pool must override RunTask in a subclass.
37
38   """
39   # pylint: disable-msg=W0212
40   def __init__(self, pool, worker_id):
41     """Constructor for BaseWorker thread.
42
43     @param pool: the parent worker pool
44     @param worker_id: identifier for this worker
45
46     """
47     super(BaseWorker, self).__init__(name=worker_id)
48     self.pool = pool
49     self._current_task = None
50
51   def ShouldTerminate(self):
52     """Returns whether a worker should terminate.
53
54     """
55     return self.pool.ShouldWorkerTerminate(self)
56
57   def _HasRunningTaskUnlocked(self):
58     """Returns whether this worker is currently running a task.
59
60     """
61     return (self._current_task is not None)
62
63   def HasRunningTask(self):
64     """Returns whether this worker is currently running a task.
65
66     """
67     self.pool._lock.acquire()
68     try:
69       return self._HasRunningTaskUnlocked()
70     finally:
71       self.pool._lock.release()
72
73   def run(self):
74     """Main thread function.
75
76     Waits for new tasks to show up in the queue.
77
78     """
79     pool = self.pool
80
81     assert not self.HasRunningTask()
82
83     while True:
84       try:
85         # We wait on lock to be told either terminate or do a task.
86         pool._lock.acquire()
87         try:
88           if pool._ShouldWorkerTerminateUnlocked(self):
89             break
90
91           # We only wait if there's no task for us.
92           if not pool._tasks:
93             logging.debug("Waiting for tasks")
94
95             # wait() releases the lock and sleeps until notified
96             pool._pool_to_worker.wait()
97
98             logging.debug("Notified while waiting")
99
100             # Were we woken up in order to terminate?
101             if pool._ShouldWorkerTerminateUnlocked(self):
102               break
103
104             if not pool._tasks:
105               # Spurious notification, ignore
106               continue
107
108           # Get task from queue and tell pool about it
109           try:
110             self._current_task = pool._tasks.popleft()
111           finally:
112             pool._worker_to_pool.notifyAll()
113         finally:
114           pool._lock.release()
115
116         # Run the actual task
117         try:
118           logging.debug("Starting task %r", self._current_task)
119           self.RunTask(*self._current_task)
120           logging.debug("Done with task %r", self._current_task)
121         except: # pylint: disable-msg=W0702
122           logging.exception("Caught unhandled exception")
123       finally:
124         # Notify pool
125         pool._lock.acquire()
126         try:
127           if self._current_task:
128             self._current_task = None
129             pool._worker_to_pool.notifyAll()
130         finally:
131           pool._lock.release()
132
133     logging.debug("Terminates")
134
135   def RunTask(self, *args):
136     """Function called to start a task.
137
138     This needs to be implemented by child classes.
139
140     """
141     raise NotImplementedError()
142
143
144 class WorkerPool(object):
145   """Worker pool with a queue.
146
147   This class is thread-safe.
148
149   Tasks are guaranteed to be started in the order in which they're
150   added to the pool. Due to the nature of threading, they're not
151   guaranteed to finish in the same order.
152
153   """
154   def __init__(self, name, num_workers, worker_class):
155     """Constructor for worker pool.
156
157     @param num_workers: number of workers to be started
158         (dynamic resizing is not yet implemented)
159     @param worker_class: the class to be instantiated for workers;
160         should derive from L{BaseWorker}
161
162     """
163     # Some of these variables are accessed by BaseWorker
164     self._lock = threading.Lock()
165     self._pool_to_pool = threading.Condition(self._lock)
166     self._pool_to_worker = threading.Condition(self._lock)
167     self._worker_to_pool = threading.Condition(self._lock)
168     self._worker_class = worker_class
169     self._name = name
170     self._last_worker_id = 0
171     self._workers = []
172     self._quiescing = False
173
174     # Terminating workers
175     self._termworkers = []
176
177     # Queued tasks
178     self._tasks = collections.deque()
179
180     # Start workers
181     self.Resize(num_workers)
182
183   # TODO: Implement dynamic resizing?
184
185   def _WaitWhileQuiescingUnlocked(self):
186     """Wait until the worker pool has finished quiescing.
187
188     """
189     while self._quiescing:
190       self._pool_to_pool.wait()
191
192   def AddTask(self, *args):
193     """Adds a task to the queue.
194
195     @param args: arguments passed to L{BaseWorker.RunTask}
196
197     """
198     self._lock.acquire()
199     try:
200       self._WaitWhileQuiescingUnlocked()
201
202       self._tasks.append(args)
203
204       # Wake one idling worker up
205       self._pool_to_worker.notify()
206     finally:
207       self._lock.release()
208
209   def AddManyTasks(self, tasks):
210     """Add a list of tasks to the queue.
211
212     @type tasks: list of tuples
213     @param tasks: list of args passed to L{BaseWorker.RunTask}
214
215     """
216     assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
217       "Each task must be a sequence"
218
219     self._lock.acquire()
220     try:
221       self._WaitWhileQuiescingUnlocked()
222
223       self._tasks.extend(tasks)
224
225       for _ in tasks:
226         self._pool_to_worker.notify()
227     finally:
228       self._lock.release()
229
230   def _ShouldWorkerTerminateUnlocked(self, worker):
231     """Returns whether a worker should terminate.
232
233     """
234     return (worker in self._termworkers)
235
236   def ShouldWorkerTerminate(self, worker):
237     """Returns whether a worker should terminate.
238
239     """
240     self._lock.acquire()
241     try:
242       return self._ShouldWorkerTerminateUnlocked(worker)
243     finally:
244       self._lock.release()
245
246   def _HasRunningTasksUnlocked(self):
247     """Checks whether there's a task running in a worker.
248
249     """
250     for worker in self._workers + self._termworkers:
251       if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
252         return True
253     return False
254
255   def Quiesce(self):
256     """Waits until the task queue is empty.
257
258     """
259     self._lock.acquire()
260     try:
261       self._quiescing = True
262
263       # Wait while there are tasks pending or running
264       while self._tasks or self._HasRunningTasksUnlocked():
265         self._worker_to_pool.wait()
266
267     finally:
268       self._quiescing = False
269
270       # Make sure AddTasks continues in case it was waiting
271       self._pool_to_pool.notifyAll()
272
273       self._lock.release()
274
275   def _NewWorkerIdUnlocked(self):
276     """Return an identifier for a new worker.
277
278     """
279     self._last_worker_id += 1
280
281     return "%s%d" % (self._name, self._last_worker_id)
282
283   def _ResizeUnlocked(self, num_workers):
284     """Changes the number of workers.
285
286     """
287     assert num_workers >= 0, "num_workers must be >= 0"
288
289     logging.debug("Resizing to %s workers", num_workers)
290
291     current_count = len(self._workers)
292
293     if current_count == num_workers:
294       # Nothing to do
295       pass
296
297     elif current_count > num_workers:
298       if num_workers == 0:
299         # Create copy of list to iterate over while lock isn't held.
300         termworkers = self._workers[:]
301         del self._workers[:]
302       else:
303         # TODO: Implement partial downsizing
304         raise NotImplementedError()
305         #termworkers = ...
306
307       self._termworkers += termworkers
308
309       # Notify workers that something has changed
310       self._pool_to_worker.notifyAll()
311
312       # Join all terminating workers
313       self._lock.release()
314       try:
315         for worker in termworkers:
316           logging.debug("Waiting for thread %s", worker.getName())
317           worker.join()
318       finally:
319         self._lock.acquire()
320
321       # Remove terminated threads. This could be done in a more efficient way
322       # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
323       # don't leave zombie threads around.
324       for worker in termworkers:
325         assert worker in self._termworkers, ("Worker not in list of"
326                                              " terminating workers")
327         if not worker.isAlive():
328           self._termworkers.remove(worker)
329
330       assert not self._termworkers, "Zombie worker detected"
331
332     elif current_count < num_workers:
333       # Create (num_workers - current_count) new workers
334       for _ in range(num_workers - current_count):
335         worker = self._worker_class(self, self._NewWorkerIdUnlocked())
336         self._workers.append(worker)
337         worker.start()
338
339   def Resize(self, num_workers):
340     """Changes the number of workers in the pool.
341
342     @param num_workers: the new number of workers
343
344     """
345     self._lock.acquire()
346     try:
347       return self._ResizeUnlocked(num_workers)
348     finally:
349       self._lock.release()
350
351   def TerminateWorkers(self):
352     """Terminate all worker threads.
353
354     Unstarted tasks will be ignored.
355
356     """
357     logging.debug("Terminating all workers")
358
359     self._lock.acquire()
360     try:
361       self._ResizeUnlocked(0)
362
363       if self._tasks:
364         logging.debug("There are %s tasks left", len(self._tasks))
365     finally:
366       self._lock.release()
367
368     logging.debug("All workers terminated")