root / test / ganeti.workerpool_unittest.py @ 9fa567b3
History | View | Annotate | Download (13.3 kB)
1 |
#!/usr/bin/python
|
---|---|
2 |
#
|
3 |
|
4 |
# Copyright (C) 2008, 2009, 2010 Google Inc.
|
5 |
#
|
6 |
# This program is free software; you can redistribute it and/or modify
|
7 |
# it under the terms of the GNU General Public License as published by
|
8 |
# the Free Software Foundation; either version 2 of the License, or
|
9 |
# (at your option) any later version.
|
10 |
#
|
11 |
# This program is distributed in the hope that it will be useful, but
|
12 |
# WITHOUT ANY WARRANTY; without even the implied warranty of
|
13 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
14 |
# General Public License for more details.
|
15 |
#
|
16 |
# You should have received a copy of the GNU General Public License
|
17 |
# along with this program; if not, write to the Free Software
|
18 |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
19 |
# 02110-1301, USA.
|
20 |
|
21 |
|
22 |
"""Script for unittesting the workerpool module"""
|
23 |
|
24 |
import unittest |
25 |
import threading |
26 |
import time |
27 |
import sys |
28 |
import zlib |
29 |
import random |
30 |
|
31 |
from ganeti import workerpool |
32 |
from ganeti import errors |
33 |
|
34 |
import testutils |
35 |
|
36 |
|
37 |
class CountingContext(object): |
38 |
def __init__(self): |
39 |
self._lock = threading.Condition(threading.Lock())
|
40 |
self.done = 0 |
41 |
|
42 |
def DoneTask(self): |
43 |
self._lock.acquire()
|
44 |
try:
|
45 |
self.done += 1 |
46 |
finally:
|
47 |
self._lock.release()
|
48 |
|
49 |
def GetDoneTasks(self): |
50 |
self._lock.acquire()
|
51 |
try:
|
52 |
return self.done |
53 |
finally:
|
54 |
self._lock.release()
|
55 |
|
56 |
@staticmethod
|
57 |
def UpdateChecksum(current, value): |
58 |
return zlib.adler32(str(value), current) |
59 |
|
60 |
|
61 |
class CountingBaseWorker(workerpool.BaseWorker): |
62 |
def RunTask(self, ctx, text): |
63 |
ctx.DoneTask() |
64 |
|
65 |
|
66 |
class ChecksumContext: |
67 |
CHECKSUM_START = zlib.adler32("")
|
68 |
|
69 |
def __init__(self): |
70 |
self.lock = threading.Condition(threading.Lock())
|
71 |
self.checksum = self.CHECKSUM_START |
72 |
|
73 |
@staticmethod
|
74 |
def UpdateChecksum(current, value): |
75 |
return zlib.adler32(str(value), current) |
76 |
|
77 |
|
78 |
class ChecksumBaseWorker(workerpool.BaseWorker): |
79 |
def RunTask(self, ctx, number): |
80 |
name = "number%s" % number
|
81 |
self.SetTaskName(name)
|
82 |
|
83 |
# This assertion needs to be checked before updating the checksum. A
|
84 |
# failing assertion will then cause the result to be wrong.
|
85 |
assert self.getName() == ("%s/%s" % (self._worker_id, name)) |
86 |
|
87 |
ctx.lock.acquire() |
88 |
try:
|
89 |
ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number) |
90 |
finally:
|
91 |
ctx.lock.release() |
92 |
|
93 |
|
94 |
class ListBuilderContext: |
95 |
def __init__(self): |
96 |
self.lock = threading.Lock()
|
97 |
self.result = []
|
98 |
self.prioresult = {}
|
99 |
|
100 |
|
101 |
class ListBuilderWorker(workerpool.BaseWorker): |
102 |
def RunTask(self, ctx, data): |
103 |
ctx.lock.acquire() |
104 |
try:
|
105 |
ctx.result.append((self.GetCurrentPriority(), data))
|
106 |
ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
|
107 |
finally:
|
108 |
ctx.lock.release() |
109 |
|
110 |
|
111 |
class DeferringTaskContext: |
112 |
def __init__(self): |
113 |
self.lock = threading.Lock()
|
114 |
self.prioresult = {}
|
115 |
self.samepriodefer = {}
|
116 |
|
117 |
|
118 |
class DeferringWorker(workerpool.BaseWorker): |
119 |
def RunTask(self, ctx, num, targetprio): |
120 |
ctx.lock.acquire() |
121 |
try:
|
122 |
if num in ctx.samepriodefer: |
123 |
del ctx.samepriodefer[num]
|
124 |
raise workerpool.DeferTask()
|
125 |
|
126 |
if self.GetCurrentPriority() > targetprio: |
127 |
raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1) |
128 |
|
129 |
ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num) |
130 |
finally:
|
131 |
ctx.lock.release() |
132 |
|
133 |
|
134 |
class TestWorkerpool(unittest.TestCase): |
135 |
"""Workerpool tests"""
|
136 |
|
137 |
def testCounting(self): |
138 |
ctx = CountingContext() |
139 |
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker) |
140 |
try:
|
141 |
self._CheckWorkerCount(wp, 3) |
142 |
|
143 |
for i in range(10): |
144 |
wp.AddTask((ctx, "Hello world %s" % i))
|
145 |
|
146 |
wp.Quiesce() |
147 |
finally:
|
148 |
wp.TerminateWorkers() |
149 |
self._CheckWorkerCount(wp, 0) |
150 |
|
151 |
self.assertEquals(ctx.GetDoneTasks(), 10) |
152 |
|
153 |
def testNoTasks(self): |
154 |
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker) |
155 |
try:
|
156 |
self._CheckWorkerCount(wp, 3) |
157 |
self._CheckNoTasks(wp)
|
158 |
finally:
|
159 |
wp.TerminateWorkers() |
160 |
self._CheckWorkerCount(wp, 0) |
161 |
|
162 |
def testNoTasksQuiesce(self): |
163 |
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker) |
164 |
try:
|
165 |
self._CheckWorkerCount(wp, 3) |
166 |
self._CheckNoTasks(wp)
|
167 |
wp.Quiesce() |
168 |
self._CheckNoTasks(wp)
|
169 |
finally:
|
170 |
wp.TerminateWorkers() |
171 |
self._CheckWorkerCount(wp, 0) |
172 |
|
173 |
def testActive(self): |
174 |
ctx = CountingContext() |
175 |
wp = workerpool.WorkerPool("TestActive", 5, CountingBaseWorker) |
176 |
try:
|
177 |
self._CheckWorkerCount(wp, 5) |
178 |
self.assertTrue(wp._active)
|
179 |
|
180 |
# Process some tasks
|
181 |
for _ in range(10): |
182 |
wp.AddTask((ctx, None))
|
183 |
|
184 |
wp.Quiesce() |
185 |
self._CheckNoTasks(wp)
|
186 |
self.assertEquals(ctx.GetDoneTasks(), 10) |
187 |
|
188 |
# Repeat a few times
|
189 |
for count in range(10): |
190 |
# Deactivate pool
|
191 |
wp.SetActive(False)
|
192 |
self._CheckNoTasks(wp)
|
193 |
|
194 |
# Queue some more tasks
|
195 |
for _ in range(10): |
196 |
wp.AddTask((ctx, None))
|
197 |
|
198 |
for _ in range(5): |
199 |
# Short delays to give other threads a chance to cause breakage
|
200 |
time.sleep(.01)
|
201 |
wp.AddTask((ctx, "Hello world %s" % 999)) |
202 |
self.assertFalse(wp._active)
|
203 |
|
204 |
self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15)) |
205 |
|
206 |
# Start processing again
|
207 |
wp.SetActive(True)
|
208 |
self.assertTrue(wp._active)
|
209 |
|
210 |
# Wait for tasks to finish
|
211 |
wp.Quiesce() |
212 |
self._CheckNoTasks(wp)
|
213 |
self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15) + 15) |
214 |
|
215 |
self._CheckWorkerCount(wp, 5) |
216 |
finally:
|
217 |
wp.TerminateWorkers() |
218 |
self._CheckWorkerCount(wp, 0) |
219 |
|
220 |
def testChecksum(self): |
221 |
# Tests whether all tasks are run and, since we're only using a single
|
222 |
# thread, whether everything is started in order.
|
223 |
wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker) |
224 |
try:
|
225 |
self._CheckWorkerCount(wp, 1) |
226 |
|
227 |
ctx = ChecksumContext() |
228 |
checksum = ChecksumContext.CHECKSUM_START |
229 |
for i in range(1, 100): |
230 |
checksum = ChecksumContext.UpdateChecksum(checksum, i) |
231 |
wp.AddTask((ctx, i)) |
232 |
|
233 |
wp.Quiesce() |
234 |
|
235 |
self._CheckNoTasks(wp)
|
236 |
|
237 |
# Check sum
|
238 |
ctx.lock.acquire() |
239 |
try:
|
240 |
self.assertEqual(checksum, ctx.checksum)
|
241 |
finally:
|
242 |
ctx.lock.release() |
243 |
finally:
|
244 |
wp.TerminateWorkers() |
245 |
self._CheckWorkerCount(wp, 0) |
246 |
|
247 |
def testAddManyTasks(self): |
248 |
ctx = CountingContext() |
249 |
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker) |
250 |
try:
|
251 |
self._CheckWorkerCount(wp, 3) |
252 |
|
253 |
wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)]) |
254 |
wp.AddTask((ctx, "A separate hello"))
|
255 |
wp.AddTask((ctx, "Once more, hi!"))
|
256 |
wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)]) |
257 |
|
258 |
wp.Quiesce() |
259 |
|
260 |
self._CheckNoTasks(wp)
|
261 |
finally:
|
262 |
wp.TerminateWorkers() |
263 |
self._CheckWorkerCount(wp, 0) |
264 |
|
265 |
self.assertEquals(ctx.GetDoneTasks(), 22) |
266 |
|
267 |
def testManyTasksSequence(self): |
268 |
ctx = CountingContext() |
269 |
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker) |
270 |
try:
|
271 |
self._CheckWorkerCount(wp, 3) |
272 |
self.assertRaises(AssertionError, wp.AddManyTasks, |
273 |
["Hello world %s" % i for i in range(10)]) |
274 |
self.assertRaises(AssertionError, wp.AddManyTasks, |
275 |
[i for i in range(10)]) |
276 |
|
277 |
wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)]) |
278 |
wp.AddTask((ctx, "A separate hello"))
|
279 |
|
280 |
wp.Quiesce() |
281 |
|
282 |
self._CheckNoTasks(wp)
|
283 |
finally:
|
284 |
wp.TerminateWorkers() |
285 |
self._CheckWorkerCount(wp, 0) |
286 |
|
287 |
self.assertEquals(ctx.GetDoneTasks(), 11) |
288 |
|
289 |
def _CheckNoTasks(self, wp): |
290 |
wp._lock.acquire() |
291 |
try:
|
292 |
# The task queue must be empty now
|
293 |
self.failUnless(not wp._tasks) |
294 |
finally:
|
295 |
wp._lock.release() |
296 |
|
297 |
def _CheckWorkerCount(self, wp, num_workers): |
298 |
wp._lock.acquire() |
299 |
try:
|
300 |
self.assertEqual(len(wp._workers), num_workers) |
301 |
finally:
|
302 |
wp._lock.release() |
303 |
|
304 |
def testPriorityChecksum(self): |
305 |
# Tests whether all tasks are run and, since we're only using a single
|
306 |
# thread, whether everything is started in order and respects the priority
|
307 |
wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker) |
308 |
try:
|
309 |
self._CheckWorkerCount(wp, 1) |
310 |
|
311 |
ctx = ChecksumContext() |
312 |
|
313 |
data = {} |
314 |
tasks = [] |
315 |
priorities = [] |
316 |
for i in range(1, 333): |
317 |
prio = i % 7
|
318 |
tasks.append((ctx, i)) |
319 |
priorities.append(prio) |
320 |
data.setdefault(prio, []).append(i) |
321 |
|
322 |
wp.AddManyTasks(tasks, priority=priorities) |
323 |
|
324 |
wp.Quiesce() |
325 |
|
326 |
self._CheckNoTasks(wp)
|
327 |
|
328 |
# Check sum
|
329 |
ctx.lock.acquire() |
330 |
try:
|
331 |
checksum = ChecksumContext.CHECKSUM_START |
332 |
for priority in sorted(data.keys()): |
333 |
for i in data[priority]: |
334 |
checksum = ChecksumContext.UpdateChecksum(checksum, i) |
335 |
|
336 |
self.assertEqual(checksum, ctx.checksum)
|
337 |
finally:
|
338 |
ctx.lock.release() |
339 |
|
340 |
self._CheckWorkerCount(wp, 1) |
341 |
finally:
|
342 |
wp.TerminateWorkers() |
343 |
self._CheckWorkerCount(wp, 0) |
344 |
|
345 |
def testPriorityListManyTasks(self): |
346 |
# Tests whether all tasks are run and, since we're only using a single
|
347 |
# thread, whether everything is started in order and respects the priority
|
348 |
wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker) |
349 |
try:
|
350 |
self._CheckWorkerCount(wp, 1) |
351 |
|
352 |
ctx = ListBuilderContext() |
353 |
|
354 |
# Use static seed for this test
|
355 |
rnd = random.Random(0)
|
356 |
|
357 |
data = {} |
358 |
tasks = [] |
359 |
priorities = [] |
360 |
for i in range(1, 333): |
361 |
prio = int(rnd.random() * 10) |
362 |
tasks.append((ctx, i)) |
363 |
priorities.append(prio) |
364 |
data.setdefault(prio, []).append((prio, i)) |
365 |
|
366 |
wp.AddManyTasks(tasks, priority=priorities) |
367 |
|
368 |
self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
|
369 |
[("x", ), ("y", )], priority=[1] * 5) |
370 |
|
371 |
wp.Quiesce() |
372 |
|
373 |
self._CheckNoTasks(wp)
|
374 |
|
375 |
# Check result
|
376 |
ctx.lock.acquire() |
377 |
try:
|
378 |
expresult = [] |
379 |
for priority in sorted(data.keys()): |
380 |
expresult.extend(data[priority]) |
381 |
|
382 |
self.assertEqual(expresult, ctx.result)
|
383 |
finally:
|
384 |
ctx.lock.release() |
385 |
|
386 |
self._CheckWorkerCount(wp, 1) |
387 |
finally:
|
388 |
wp.TerminateWorkers() |
389 |
self._CheckWorkerCount(wp, 0) |
390 |
|
391 |
def testPriorityListSingleTasks(self): |
392 |
# Tests whether all tasks are run and, since we're only using a single
|
393 |
# thread, whether everything is started in order and respects the priority
|
394 |
wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker) |
395 |
try:
|
396 |
self._CheckWorkerCount(wp, 1) |
397 |
|
398 |
ctx = ListBuilderContext() |
399 |
|
400 |
# Use static seed for this test
|
401 |
rnd = random.Random(26279)
|
402 |
|
403 |
data = {} |
404 |
for i in range(1, 333): |
405 |
prio = int(rnd.random() * 30) |
406 |
wp.AddTask((ctx, i), priority=prio) |
407 |
data.setdefault(prio, []).append(i) |
408 |
|
409 |
# Cause some distortion
|
410 |
if i % 11 == 0: |
411 |
time.sleep(.001)
|
412 |
if i % 41 == 0: |
413 |
wp.Quiesce() |
414 |
|
415 |
wp.Quiesce() |
416 |
|
417 |
self._CheckNoTasks(wp)
|
418 |
|
419 |
# Check result
|
420 |
ctx.lock.acquire() |
421 |
try:
|
422 |
self.assertEqual(data, ctx.prioresult)
|
423 |
finally:
|
424 |
ctx.lock.release() |
425 |
|
426 |
self._CheckWorkerCount(wp, 1) |
427 |
finally:
|
428 |
wp.TerminateWorkers() |
429 |
self._CheckWorkerCount(wp, 0) |
430 |
|
431 |
def testPriorityListSingleTasks(self): |
432 |
# Tests whether all tasks are run and, since we're only using a single
|
433 |
# thread, whether everything is started in order and respects the priority
|
434 |
wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker) |
435 |
try:
|
436 |
self._CheckWorkerCount(wp, 1) |
437 |
|
438 |
ctx = ListBuilderContext() |
439 |
|
440 |
# Use static seed for this test
|
441 |
rnd = random.Random(26279)
|
442 |
|
443 |
data = {} |
444 |
for i in range(1, 333): |
445 |
prio = int(rnd.random() * 30) |
446 |
wp.AddTask((ctx, i), priority=prio) |
447 |
data.setdefault(prio, []).append(i) |
448 |
|
449 |
# Cause some distortion
|
450 |
if i % 11 == 0: |
451 |
time.sleep(.001)
|
452 |
if i % 41 == 0: |
453 |
wp.Quiesce() |
454 |
|
455 |
wp.Quiesce() |
456 |
|
457 |
self._CheckNoTasks(wp)
|
458 |
|
459 |
# Check result
|
460 |
ctx.lock.acquire() |
461 |
try:
|
462 |
self.assertEqual(data, ctx.prioresult)
|
463 |
finally:
|
464 |
ctx.lock.release() |
465 |
|
466 |
self._CheckWorkerCount(wp, 1) |
467 |
finally:
|
468 |
wp.TerminateWorkers() |
469 |
self._CheckWorkerCount(wp, 0) |
470 |
|
471 |
def testDeferTask(self): |
472 |
# Tests whether all tasks are run and, since we're only using a single
|
473 |
# thread, whether everything is started in order and respects the priority
|
474 |
wp = workerpool.WorkerPool("Test", 1, DeferringWorker) |
475 |
try:
|
476 |
self._CheckWorkerCount(wp, 1) |
477 |
|
478 |
ctx = DeferringTaskContext() |
479 |
|
480 |
# Use static seed for this test
|
481 |
rnd = random.Random(14921)
|
482 |
|
483 |
data = {} |
484 |
for i in range(1, 333): |
485 |
ctx.lock.acquire() |
486 |
try:
|
487 |
if i % 5 == 0: |
488 |
ctx.samepriodefer[i] = True
|
489 |
finally:
|
490 |
ctx.lock.release() |
491 |
|
492 |
prio = int(rnd.random() * 30) |
493 |
wp.AddTask((ctx, i, prio), priority=50)
|
494 |
data.setdefault(prio, set()).add(i)
|
495 |
|
496 |
# Cause some distortion
|
497 |
if i % 24 == 0: |
498 |
time.sleep(.001)
|
499 |
if i % 31 == 0: |
500 |
wp.Quiesce() |
501 |
|
502 |
wp.Quiesce() |
503 |
|
504 |
self._CheckNoTasks(wp)
|
505 |
|
506 |
# Check result
|
507 |
ctx.lock.acquire() |
508 |
try:
|
509 |
self.assertEqual(data, ctx.prioresult)
|
510 |
finally:
|
511 |
ctx.lock.release() |
512 |
|
513 |
self._CheckWorkerCount(wp, 1) |
514 |
finally:
|
515 |
wp.TerminateWorkers() |
516 |
self._CheckWorkerCount(wp, 0) |
517 |
|
518 |
|
519 |
if __name__ == '__main__': |
520 |
testutils.GanetiTestProgram() |