Revision 76094e37
b/Makefile.am | ||
---|---|---|
75 | 75 |
lib/serializer.py \ |
76 | 76 |
lib/ssconf.py \ |
77 | 77 |
lib/ssh.py \ |
78 |
lib/utils.py |
|
78 |
lib/utils.py \ |
|
79 |
lib/workerpool.py |
|
79 | 80 |
|
80 | 81 |
hypervisor_PYTHON = \ |
81 | 82 |
lib/hypervisor/__init__.py \ |
... | ... | |
172 | 173 |
test/ganeti.ssh_unittest.py \ |
173 | 174 |
test/ganeti.locking_unittest.py \ |
174 | 175 |
test/ganeti.serializer_unittest.py \ |
176 |
test/ganeti.workerpool_unittest.py \ |
|
175 | 177 |
test/ganeti.constants_unittest.py |
176 | 178 |
|
177 | 179 |
nodist_TESTS = |
b/lib/workerpool.py | ||
---|---|---|
1 |
# |
|
2 |
# |
|
3 |
|
|
4 |
# Copyright (C) 2008 Google Inc. |
|
5 |
# |
|
6 |
# This program is free software; you can redistribute it and/or modify |
|
7 |
# it under the terms of the GNU General Public License as published by |
|
8 |
# the Free Software Foundation; either version 2 of the License, or |
|
9 |
# (at your option) any later version. |
|
10 |
# |
|
11 |
# This program is distributed in the hope that it will be useful, but |
|
12 |
# WITHOUT ANY WARRANTY; without even the implied warranty of |
|
13 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
|
14 |
# General Public License for more details. |
|
15 |
# |
|
16 |
# You should have received a copy of the GNU General Public License |
|
17 |
# along with this program; if not, write to the Free Software |
|
18 |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
|
19 |
# 02110-1301, USA. |
|
20 |
|
|
21 |
|
|
22 |
"""Base classes for worker pools. |
|
23 |
|
|
24 |
""" |
|
25 |
|
|
26 |
import collections |
|
27 |
import logging |
|
28 |
import threading |
|
29 |
|
|
30 |
from ganeti import errors |
|
31 |
from ganeti import utils |
|
32 |
|
|
33 |
|
|
34 |
class BaseWorker(threading.Thread, object): |
|
35 |
"""Base worker class for worker pools. |
|
36 |
|
|
37 |
Users of a worker pool must override RunTask in a subclass. |
|
38 |
|
|
39 |
""" |
|
40 |
def __init__(self, pool, worker_id): |
|
41 |
"""Constructor for BaseWorker thread. |
|
42 |
|
|
43 |
Args: |
|
44 |
- pool: Parent worker pool |
|
45 |
- worker_id: Identifier for this worker |
|
46 |
|
|
47 |
""" |
|
48 |
super(BaseWorker, self).__init__() |
|
49 |
self.pool = pool |
|
50 |
self.worker_id = worker_id |
|
51 |
|
|
52 |
# Also used by WorkerPool |
|
53 |
self._current_task = None |
|
54 |
|
|
55 |
def ShouldTerminate(self): |
|
56 |
"""Returns whether a worker should terminate. |
|
57 |
|
|
58 |
""" |
|
59 |
return self.pool.ShouldWorkerTerminate(self) |
|
60 |
|
|
61 |
def run(self): |
|
62 |
"""Main thread function. |
|
63 |
|
|
64 |
Waits for new tasks to show up in the queue. |
|
65 |
|
|
66 |
""" |
|
67 |
pool = self.pool |
|
68 |
|
|
69 |
assert self._current_task is None |
|
70 |
|
|
71 |
while True: |
|
72 |
try: |
|
73 |
# We wait on lock to be told either terminate or do a task. |
|
74 |
pool._lock.acquire() |
|
75 |
try: |
|
76 |
if pool._ShouldWorkerTerminateUnlocked(self): |
|
77 |
break |
|
78 |
|
|
79 |
# We only wait if there's no task for us. |
|
80 |
if not pool._tasks: |
|
81 |
# wait() releases the lock and sleeps until notified |
|
82 |
pool._lock.wait() |
|
83 |
|
|
84 |
# Were we woken up in order to terminate? |
|
85 |
if pool._ShouldWorkerTerminateUnlocked(self): |
|
86 |
break |
|
87 |
|
|
88 |
if not pool._tasks: |
|
89 |
# Spurious notification, ignore |
|
90 |
continue |
|
91 |
|
|
92 |
# Get task from queue and tell pool about it |
|
93 |
try: |
|
94 |
self._current_task = pool._tasks.popleft() |
|
95 |
finally: |
|
96 |
pool._lock.notifyAll() |
|
97 |
finally: |
|
98 |
pool._lock.release() |
|
99 |
|
|
100 |
# Run the actual task |
|
101 |
try: |
|
102 |
self.RunTask(*self._current_task) |
|
103 |
except: |
|
104 |
logging.error("Worker %s: Caught unhandled exception", |
|
105 |
self.worker_id, exc_info=True) |
|
106 |
finally: |
|
107 |
self._current_task = None |
|
108 |
|
|
109 |
# Notify pool |
|
110 |
pool._lock.acquire() |
|
111 |
try: |
|
112 |
pool._lock.notifyAll() |
|
113 |
finally: |
|
114 |
pool._lock.release() |
|
115 |
|
|
116 |
def RunTask(self, *args): |
|
117 |
"""Function called to start a task. |
|
118 |
|
|
119 |
""" |
|
120 |
raise NotImplementedError() |
|
121 |
|
|
122 |
|
|
123 |
class WorkerPool(object): |
|
124 |
"""Worker pool with a queue. |
|
125 |
|
|
126 |
This class is thread-safe. |
|
127 |
|
|
128 |
Tasks are guaranteed to be started in the order in which they're added to the |
|
129 |
pool. Due to the nature of threading, they're not guaranteed to finish in the |
|
130 |
same order. |
|
131 |
|
|
132 |
""" |
|
133 |
def __init__(self, num_workers, worker_class): |
|
134 |
"""Constructor for worker pool. |
|
135 |
|
|
136 |
Args: |
|
137 |
- num_workers: Number of workers to be started (dynamic resizing is not |
|
138 |
yet implemented) |
|
139 |
- worker_class: Class to be instantiated for workers; should derive from |
|
140 |
BaseWorker |
|
141 |
|
|
142 |
""" |
|
143 |
# Some of these variables are accessed by BaseWorker |
|
144 |
self._lock = threading.Condition(threading.Lock()) |
|
145 |
self._worker_class = worker_class |
|
146 |
self._last_worker_id = 0 |
|
147 |
self._workers = [] |
|
148 |
self._quiescing = False |
|
149 |
|
|
150 |
# Terminating workers |
|
151 |
self._termworkers = [] |
|
152 |
|
|
153 |
# Queued tasks |
|
154 |
self._tasks = collections.deque() |
|
155 |
|
|
156 |
# Start workers |
|
157 |
self.Resize(num_workers) |
|
158 |
|
|
159 |
# TODO: Implement dynamic resizing? |
|
160 |
|
|
161 |
def AddTask(self, *args): |
|
162 |
"""Adds a task to the queue. |
|
163 |
|
|
164 |
Args: |
|
165 |
- *args: Arguments passed to BaseWorker.RunTask |
|
166 |
|
|
167 |
""" |
|
168 |
self._lock.acquire() |
|
169 |
try: |
|
170 |
# Don't add new tasks while we're quiescing |
|
171 |
while self._quiescing: |
|
172 |
self._lock.wait() |
|
173 |
|
|
174 |
# Add task to internal queue |
|
175 |
self._tasks.append(args) |
|
176 |
self._lock.notify() |
|
177 |
finally: |
|
178 |
self._lock.release() |
|
179 |
|
|
180 |
def _ShouldWorkerTerminateUnlocked(self, worker): |
|
181 |
"""Returns whether a worker should terminate. |
|
182 |
|
|
183 |
""" |
|
184 |
return (worker in self._termworkers) |
|
185 |
|
|
186 |
def ShouldWorkerTerminate(self, worker): |
|
187 |
"""Returns whether a worker should terminate. |
|
188 |
|
|
189 |
""" |
|
190 |
self._lock.acquire() |
|
191 |
try: |
|
192 |
return self._ShouldWorkerTerminateUnlocked(self) |
|
193 |
finally: |
|
194 |
self._lock.release() |
|
195 |
|
|
196 |
def _HasRunningTasksUnlocked(self): |
|
197 |
"""Checks whether there's a task running in a worker. |
|
198 |
|
|
199 |
""" |
|
200 |
for worker in self._workers + self._termworkers: |
|
201 |
if worker._current_task is not None: |
|
202 |
return True |
|
203 |
return False |
|
204 |
|
|
205 |
def Quiesce(self): |
|
206 |
"""Waits until the task queue is empty. |
|
207 |
|
|
208 |
""" |
|
209 |
self._lock.acquire() |
|
210 |
try: |
|
211 |
self._quiescing = True |
|
212 |
|
|
213 |
# Wait while there are tasks pending or running |
|
214 |
while self._tasks or self._HasRunningTasksUnlocked(): |
|
215 |
self._lock.wait() |
|
216 |
|
|
217 |
finally: |
|
218 |
self._quiescing = False |
|
219 |
|
|
220 |
# Make sure AddTasks continues in case it was waiting |
|
221 |
self._lock.notifyAll() |
|
222 |
|
|
223 |
self._lock.release() |
|
224 |
|
|
225 |
def _NewWorkerIdUnlocked(self): |
|
226 |
self._last_worker_id += 1 |
|
227 |
return self._last_worker_id |
|
228 |
|
|
229 |
def _ResizeUnlocked(self, num_workers): |
|
230 |
"""Changes the number of workers. |
|
231 |
|
|
232 |
""" |
|
233 |
assert num_workers >= 0, "num_workers must be >= 0" |
|
234 |
|
|
235 |
logging.debug("Resizing to %s workers", num_workers) |
|
236 |
|
|
237 |
current_count = len(self._workers) |
|
238 |
|
|
239 |
if current_count == num_workers: |
|
240 |
# Nothing to do |
|
241 |
pass |
|
242 |
|
|
243 |
elif current_count > num_workers: |
|
244 |
if num_workers == 0: |
|
245 |
# Create copy of list to iterate over while lock isn't held. |
|
246 |
termworkers = self._workers[:] |
|
247 |
del self._workers[:] |
|
248 |
else: |
|
249 |
# TODO: Implement partial downsizing |
|
250 |
raise NotImplementedError() |
|
251 |
#termworkers = ... |
|
252 |
|
|
253 |
self._termworkers += termworkers |
|
254 |
|
|
255 |
# Notify workers that something has changed |
|
256 |
self._lock.notifyAll() |
|
257 |
|
|
258 |
# Join all terminating workers |
|
259 |
self._lock.release() |
|
260 |
try: |
|
261 |
for worker in termworkers: |
|
262 |
worker.join() |
|
263 |
finally: |
|
264 |
self._lock.acquire() |
|
265 |
|
|
266 |
# Remove terminated threads. This could be done in a more efficient way |
|
267 |
# (del self._termworkers[:]), but checking worker.isAlive() makes sure we |
|
268 |
# don't leave zombie threads around. |
|
269 |
for worker in termworkers: |
|
270 |
assert worker in self._termworkers, ("Worker not in list of" |
|
271 |
" terminating workers") |
|
272 |
if not worker.isAlive(): |
|
273 |
self._termworkers.remove(worker) |
|
274 |
|
|
275 |
assert not self._termworkers, "Zombie worker detected" |
|
276 |
|
|
277 |
elif current_count < num_workers: |
|
278 |
# Create (num_workers - current_count) new workers |
|
279 |
for i in xrange(num_workers - current_count): |
|
280 |
worker = self._worker_class(self, self._NewWorkerIdUnlocked()) |
|
281 |
self._workers.append(worker) |
|
282 |
worker.start() |
|
283 |
|
|
284 |
def Resize(self, num_workers): |
|
285 |
"""Changes the number of workers in the pool. |
|
286 |
|
|
287 |
Args: |
|
288 |
- num_workers: New number of workers |
|
289 |
|
|
290 |
""" |
|
291 |
self._lock.acquire() |
|
292 |
try: |
|
293 |
return self._ResizeUnlocked(num_workers) |
|
294 |
finally: |
|
295 |
self._lock.release() |
|
296 |
|
|
297 |
def TerminateWorkers(self): |
|
298 |
"""Terminate all worker threads. |
|
299 |
|
|
300 |
Unstarted tasks will be ignored. |
|
301 |
|
|
302 |
""" |
|
303 |
logging.debug("Terminating all workers") |
|
304 |
|
|
305 |
self._lock.acquire() |
|
306 |
try: |
|
307 |
self._ResizeUnlocked(0) |
|
308 |
|
|
309 |
if self._tasks: |
|
310 |
logging.debug("There are %s tasks left", len(self._tasks)) |
|
311 |
finally: |
|
312 |
self._lock.release() |
|
313 |
|
|
314 |
logging.debug("All workers terminated") |
b/test/ganeti.workerpool_unittest.py | ||
---|---|---|
1 |
#!/usr/bin/python |
|
2 |
# |
|
3 |
|
|
4 |
# Copyright (C) 2008 Google Inc. |
|
5 |
# |
|
6 |
# This program is free software; you can redistribute it and/or modify |
|
7 |
# it under the terms of the GNU General Public License as published by |
|
8 |
# the Free Software Foundation; either version 2 of the License, or |
|
9 |
# (at your option) any later version. |
|
10 |
# |
|
11 |
# This program is distributed in the hope that it will be useful, but |
|
12 |
# WITHOUT ANY WARRANTY; without even the implied warranty of |
|
13 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
|
14 |
# General Public License for more details. |
|
15 |
# |
|
16 |
# You should have received a copy of the GNU General Public License |
|
17 |
# along with this program; if not, write to the Free Software |
|
18 |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
|
19 |
# 02110-1301, USA. |
|
20 |
|
|
21 |
|
|
22 |
"""Script for unittesting the workerpool module""" |
|
23 |
|
|
24 |
import unittest |
|
25 |
import threading |
|
26 |
import time |
|
27 |
import sys |
|
28 |
import zlib |
|
29 |
|
|
30 |
from ganeti import workerpool |
|
31 |
|
|
32 |
|
|
33 |
class DummyBaseWorker(workerpool.BaseWorker): |
|
34 |
def RunTask(self, text): |
|
35 |
pass |
|
36 |
|
|
37 |
|
|
38 |
class ChecksumContext: |
|
39 |
CHECKSUM_START = zlib.adler32("") |
|
40 |
|
|
41 |
def __init__(self): |
|
42 |
self.lock = threading.Condition(threading.Lock()) |
|
43 |
self.checksum = self.CHECKSUM_START |
|
44 |
|
|
45 |
@staticmethod |
|
46 |
def UpdateChecksum(current, value): |
|
47 |
return zlib.adler32(str(value), current) |
|
48 |
|
|
49 |
|
|
50 |
class ChecksumBaseWorker(workerpool.BaseWorker): |
|
51 |
def RunTask(self, ctx, number): |
|
52 |
ctx.lock.acquire() |
|
53 |
try: |
|
54 |
ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number) |
|
55 |
finally: |
|
56 |
ctx.lock.release() |
|
57 |
|
|
58 |
|
|
59 |
class TestWorkerpool(unittest.TestCase): |
|
60 |
"""Workerpool tests""" |
|
61 |
|
|
62 |
def testDummy(self): |
|
63 |
wp = workerpool.WorkerPool(3, DummyBaseWorker) |
|
64 |
try: |
|
65 |
self._CheckWorkerCount(wp, 3) |
|
66 |
|
|
67 |
for i in xrange(10): |
|
68 |
wp.AddTask("Hello world %s" % i) |
|
69 |
|
|
70 |
wp.Quiesce() |
|
71 |
finally: |
|
72 |
wp.TerminateWorkers() |
|
73 |
self._CheckWorkerCount(wp, 0) |
|
74 |
|
|
75 |
def testNoTasks(self): |
|
76 |
wp = workerpool.WorkerPool(3, DummyBaseWorker) |
|
77 |
try: |
|
78 |
self._CheckWorkerCount(wp, 3) |
|
79 |
self._CheckNoTasks(wp) |
|
80 |
finally: |
|
81 |
wp.TerminateWorkers() |
|
82 |
self._CheckWorkerCount(wp, 0) |
|
83 |
|
|
84 |
def testNoTasksQuiesce(self): |
|
85 |
wp = workerpool.WorkerPool(3, DummyBaseWorker) |
|
86 |
try: |
|
87 |
self._CheckWorkerCount(wp, 3) |
|
88 |
self._CheckNoTasks(wp) |
|
89 |
wp.Quiesce() |
|
90 |
self._CheckNoTasks(wp) |
|
91 |
finally: |
|
92 |
wp.TerminateWorkers() |
|
93 |
self._CheckWorkerCount(wp, 0) |
|
94 |
|
|
95 |
def testChecksum(self): |
|
96 |
# Tests whether all tasks are run and, since we're only using a single |
|
97 |
# thread, whether everything is started in order. |
|
98 |
wp = workerpool.WorkerPool(1, ChecksumBaseWorker) |
|
99 |
try: |
|
100 |
self._CheckWorkerCount(wp, 1) |
|
101 |
|
|
102 |
ctx = ChecksumContext() |
|
103 |
checksum = ChecksumContext.CHECKSUM_START |
|
104 |
for i in xrange(1, 100): |
|
105 |
checksum = ChecksumContext.UpdateChecksum(checksum, i) |
|
106 |
wp.AddTask(ctx, i) |
|
107 |
|
|
108 |
wp.Quiesce() |
|
109 |
|
|
110 |
self._CheckNoTasks(wp) |
|
111 |
|
|
112 |
# Check sum |
|
113 |
ctx.lock.acquire() |
|
114 |
try: |
|
115 |
self.assertEqual(checksum, ctx.checksum) |
|
116 |
finally: |
|
117 |
ctx.lock.release() |
|
118 |
finally: |
|
119 |
wp.TerminateWorkers() |
|
120 |
self._CheckWorkerCount(wp, 0) |
|
121 |
|
|
122 |
def _CheckNoTasks(self, wp): |
|
123 |
wp._lock.acquire() |
|
124 |
try: |
|
125 |
# The task queue must be empty now |
|
126 |
self.failUnless(not wp._tasks) |
|
127 |
finally: |
|
128 |
wp._lock.release() |
|
129 |
|
|
130 |
def _CheckWorkerCount(self, wp, num_workers): |
|
131 |
wp._lock.acquire() |
|
132 |
try: |
|
133 |
self.assertEqual(len(wp._workers), num_workers) |
|
134 |
finally: |
|
135 |
wp._lock.release() |
|
136 |
|
|
137 |
|
|
138 |
if __name__ == '__main__': |
|
139 |
unittest.main() |
Also available in: Unified diff