Revision 52c47e4e test/ganeti.workerpool_unittest.py
b/test/ganeti.workerpool_unittest.py | ||
---|---|---|
1 | 1 |
#!/usr/bin/python |
2 | 2 |
# |
3 | 3 |
|
4 |
# Copyright (C) 2008 Google Inc. |
|
4 |
# Copyright (C) 2008, 2009, 2010 Google Inc.
|
|
5 | 5 |
# |
6 | 6 |
# This program is free software; you can redistribute it and/or modify |
7 | 7 |
# it under the terms of the GNU General Public License as published by |
... | ... | |
26 | 26 |
import time |
27 | 27 |
import sys |
28 | 28 |
import zlib |
29 |
import random |
|
29 | 30 |
|
30 | 31 |
from ganeti import workerpool |
32 |
from ganeti import errors |
|
31 | 33 |
|
32 | 34 |
import testutils |
33 | 35 |
|
34 |
class CountingContext(object): |
|
35 | 36 |
|
37 |
class CountingContext(object): |
|
36 | 38 |
def __init__(self): |
37 | 39 |
self._lock = threading.Condition(threading.Lock()) |
38 | 40 |
self.done = 0 |
... | ... | |
57 | 59 |
|
58 | 60 |
|
59 | 61 |
class CountingBaseWorker(workerpool.BaseWorker): |
60 |
|
|
61 | 62 |
def RunTask(self, ctx, text): |
62 | 63 |
ctx.DoneTask() |
63 | 64 |
|
... | ... | |
83 | 84 |
ctx.lock.release() |
84 | 85 |
|
85 | 86 |
|
87 |
class ListBuilderContext: |
|
88 |
def __init__(self): |
|
89 |
self.lock = threading.Lock() |
|
90 |
self.result = [] |
|
91 |
self.prioresult = {} |
|
92 |
|
|
93 |
|
|
94 |
class ListBuilderWorker(workerpool.BaseWorker): |
|
95 |
def RunTask(self, ctx, data): |
|
96 |
ctx.lock.acquire() |
|
97 |
try: |
|
98 |
ctx.result.append((self.GetCurrentPriority(), data)) |
|
99 |
ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data) |
|
100 |
finally: |
|
101 |
ctx.lock.release() |
|
102 |
|
|
103 |
|
|
104 |
class DeferringTaskContext: |
|
105 |
def __init__(self): |
|
106 |
self.lock = threading.Lock() |
|
107 |
self.prioresult = {} |
|
108 |
self.samepriodefer = {} |
|
109 |
|
|
110 |
|
|
111 |
class DeferringWorker(workerpool.BaseWorker): |
|
112 |
def RunTask(self, ctx, num, targetprio): |
|
113 |
ctx.lock.acquire() |
|
114 |
try: |
|
115 |
if num in ctx.samepriodefer: |
|
116 |
del ctx.samepriodefer[num] |
|
117 |
raise workerpool.DeferTask() |
|
118 |
|
|
119 |
if self.GetCurrentPriority() > targetprio: |
|
120 |
raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1) |
|
121 |
|
|
122 |
ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num) |
|
123 |
finally: |
|
124 |
ctx.lock.release() |
|
125 |
|
|
126 |
|
|
86 | 127 |
class TestWorkerpool(unittest.TestCase): |
87 | 128 |
"""Workerpool tests""" |
88 | 129 |
|
... | ... | |
206 | 247 |
finally: |
207 | 248 |
wp._lock.release() |
208 | 249 |
|
250 |
def testPriorityChecksum(self): |
|
251 |
# Tests whether all tasks are run and, since we're only using a single |
|
252 |
# thread, whether everything is started in order and respects the priority |
|
253 |
wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker) |
|
254 |
try: |
|
255 |
self._CheckWorkerCount(wp, 1) |
|
256 |
|
|
257 |
ctx = ChecksumContext() |
|
258 |
|
|
259 |
data = {} |
|
260 |
tasks = [] |
|
261 |
priorities = [] |
|
262 |
for i in range(1, 333): |
|
263 |
prio = i % 7 |
|
264 |
tasks.append((ctx, i)) |
|
265 |
priorities.append(prio) |
|
266 |
data.setdefault(prio, []).append(i) |
|
267 |
|
|
268 |
wp.AddManyTasks(tasks, priority=priorities) |
|
269 |
|
|
270 |
wp.Quiesce() |
|
271 |
|
|
272 |
self._CheckNoTasks(wp) |
|
273 |
|
|
274 |
# Check sum |
|
275 |
ctx.lock.acquire() |
|
276 |
try: |
|
277 |
checksum = ChecksumContext.CHECKSUM_START |
|
278 |
for priority in sorted(data.keys()): |
|
279 |
for i in data[priority]: |
|
280 |
checksum = ChecksumContext.UpdateChecksum(checksum, i) |
|
281 |
|
|
282 |
self.assertEqual(checksum, ctx.checksum) |
|
283 |
finally: |
|
284 |
ctx.lock.release() |
|
285 |
|
|
286 |
self._CheckWorkerCount(wp, 1) |
|
287 |
finally: |
|
288 |
wp.TerminateWorkers() |
|
289 |
self._CheckWorkerCount(wp, 0) |
|
290 |
|
|
291 |
def testPriorityListManyTasks(self): |
|
292 |
# Tests whether all tasks are run and, since we're only using a single |
|
293 |
# thread, whether everything is started in order and respects the priority |
|
294 |
wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker) |
|
295 |
try: |
|
296 |
self._CheckWorkerCount(wp, 1) |
|
297 |
|
|
298 |
ctx = ListBuilderContext() |
|
299 |
|
|
300 |
# Use static seed for this test |
|
301 |
rnd = random.Random(0) |
|
302 |
|
|
303 |
data = {} |
|
304 |
tasks = [] |
|
305 |
priorities = [] |
|
306 |
for i in range(1, 333): |
|
307 |
prio = int(rnd.random() * 10) |
|
308 |
tasks.append((ctx, i)) |
|
309 |
priorities.append(prio) |
|
310 |
data.setdefault(prio, []).append((prio, i)) |
|
311 |
|
|
312 |
wp.AddManyTasks(tasks, priority=priorities) |
|
313 |
|
|
314 |
self.assertRaises(errors.ProgrammerError, wp.AddManyTasks, |
|
315 |
[("x", ), ("y", )], priority=[1] * 5) |
|
316 |
|
|
317 |
wp.Quiesce() |
|
318 |
|
|
319 |
self._CheckNoTasks(wp) |
|
320 |
|
|
321 |
# Check result |
|
322 |
ctx.lock.acquire() |
|
323 |
try: |
|
324 |
expresult = [] |
|
325 |
for priority in sorted(data.keys()): |
|
326 |
expresult.extend(data[priority]) |
|
327 |
|
|
328 |
self.assertEqual(expresult, ctx.result) |
|
329 |
finally: |
|
330 |
ctx.lock.release() |
|
331 |
|
|
332 |
self._CheckWorkerCount(wp, 1) |
|
333 |
finally: |
|
334 |
wp.TerminateWorkers() |
|
335 |
self._CheckWorkerCount(wp, 0) |
|
336 |
|
|
337 |
def testPriorityListSingleTasks(self): |
|
338 |
# Tests whether all tasks are run and, since we're only using a single |
|
339 |
# thread, whether everything is started in order and respects the priority |
|
340 |
wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker) |
|
341 |
try: |
|
342 |
self._CheckWorkerCount(wp, 1) |
|
343 |
|
|
344 |
ctx = ListBuilderContext() |
|
345 |
|
|
346 |
# Use static seed for this test |
|
347 |
rnd = random.Random(26279) |
|
348 |
|
|
349 |
data = {} |
|
350 |
for i in range(1, 333): |
|
351 |
prio = int(rnd.random() * 30) |
|
352 |
wp.AddTask((ctx, i), priority=prio) |
|
353 |
data.setdefault(prio, []).append(i) |
|
354 |
|
|
355 |
# Cause some distortion |
|
356 |
if i % 11 == 0: |
|
357 |
time.sleep(.001) |
|
358 |
if i % 41 == 0: |
|
359 |
wp.Quiesce() |
|
360 |
|
|
361 |
wp.Quiesce() |
|
362 |
|
|
363 |
self._CheckNoTasks(wp) |
|
364 |
|
|
365 |
# Check result |
|
366 |
ctx.lock.acquire() |
|
367 |
try: |
|
368 |
self.assertEqual(data, ctx.prioresult) |
|
369 |
finally: |
|
370 |
ctx.lock.release() |
|
371 |
|
|
372 |
self._CheckWorkerCount(wp, 1) |
|
373 |
finally: |
|
374 |
wp.TerminateWorkers() |
|
375 |
self._CheckWorkerCount(wp, 0) |
|
376 |
|
|
377 |
def testPriorityListSingleTasks(self): |
|
378 |
# Tests whether all tasks are run and, since we're only using a single |
|
379 |
# thread, whether everything is started in order and respects the priority |
|
380 |
wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker) |
|
381 |
try: |
|
382 |
self._CheckWorkerCount(wp, 1) |
|
383 |
|
|
384 |
ctx = ListBuilderContext() |
|
385 |
|
|
386 |
# Use static seed for this test |
|
387 |
rnd = random.Random(26279) |
|
388 |
|
|
389 |
data = {} |
|
390 |
for i in range(1, 333): |
|
391 |
prio = int(rnd.random() * 30) |
|
392 |
wp.AddTask((ctx, i), priority=prio) |
|
393 |
data.setdefault(prio, []).append(i) |
|
394 |
|
|
395 |
# Cause some distortion |
|
396 |
if i % 11 == 0: |
|
397 |
time.sleep(.001) |
|
398 |
if i % 41 == 0: |
|
399 |
wp.Quiesce() |
|
400 |
|
|
401 |
wp.Quiesce() |
|
402 |
|
|
403 |
self._CheckNoTasks(wp) |
|
404 |
|
|
405 |
# Check result |
|
406 |
ctx.lock.acquire() |
|
407 |
try: |
|
408 |
self.assertEqual(data, ctx.prioresult) |
|
409 |
finally: |
|
410 |
ctx.lock.release() |
|
411 |
|
|
412 |
self._CheckWorkerCount(wp, 1) |
|
413 |
finally: |
|
414 |
wp.TerminateWorkers() |
|
415 |
self._CheckWorkerCount(wp, 0) |
|
416 |
|
|
417 |
def testDeferTask(self): |
|
418 |
# Tests whether all tasks are run and, since we're only using a single |
|
419 |
# thread, whether everything is started in order and respects the priority |
|
420 |
wp = workerpool.WorkerPool("Test", 1, DeferringWorker) |
|
421 |
try: |
|
422 |
self._CheckWorkerCount(wp, 1) |
|
423 |
|
|
424 |
ctx = DeferringTaskContext() |
|
425 |
|
|
426 |
# Use static seed for this test |
|
427 |
rnd = random.Random(14921) |
|
428 |
|
|
429 |
data = {} |
|
430 |
for i in range(1, 333): |
|
431 |
ctx.lock.acquire() |
|
432 |
try: |
|
433 |
if i % 5 == 0: |
|
434 |
ctx.samepriodefer[i] = True |
|
435 |
finally: |
|
436 |
ctx.lock.release() |
|
437 |
|
|
438 |
prio = int(rnd.random() * 30) |
|
439 |
wp.AddTask((ctx, i, prio), priority=50) |
|
440 |
data.setdefault(prio, set()).add(i) |
|
441 |
|
|
442 |
# Cause some distortion |
|
443 |
if i % 24 == 0: |
|
444 |
time.sleep(.001) |
|
445 |
if i % 31 == 0: |
|
446 |
wp.Quiesce() |
|
447 |
|
|
448 |
wp.Quiesce() |
|
449 |
|
|
450 |
self._CheckNoTasks(wp) |
|
451 |
|
|
452 |
# Check result |
|
453 |
ctx.lock.acquire() |
|
454 |
try: |
|
455 |
self.assertEqual(data, ctx.prioresult) |
|
456 |
finally: |
|
457 |
ctx.lock.release() |
|
458 |
|
|
459 |
self._CheckWorkerCount(wp, 1) |
|
460 |
finally: |
|
461 |
wp.TerminateWorkers() |
|
462 |
self._CheckWorkerCount(wp, 0) |
|
463 |
|
|
209 | 464 |
|
210 | 465 |
if __name__ == '__main__': |
211 | 466 |
testutils.GanetiTestProgram() |
Also available in: Unified diff