root / test / ganeti.workerpool_unittest.py @ 6e7f0cd9
History | View | Annotate | Download (12 kB)
1 |
#!/usr/bin/python
|
---|---|
2 |
#
|
3 |
|
4 |
# Copyright (C) 2008, 2009, 2010 Google Inc.
|
5 |
#
|
6 |
# This program is free software; you can redistribute it and/or modify
|
7 |
# it under the terms of the GNU General Public License as published by
|
8 |
# the Free Software Foundation; either version 2 of the License, or
|
9 |
# (at your option) any later version.
|
10 |
#
|
11 |
# This program is distributed in the hope that it will be useful, but
|
12 |
# WITHOUT ANY WARRANTY; without even the implied warranty of
|
13 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
14 |
# General Public License for more details.
|
15 |
#
|
16 |
# You should have received a copy of the GNU General Public License
|
17 |
# along with this program; if not, write to the Free Software
|
18 |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
19 |
# 02110-1301, USA.
|
20 |
|
21 |
|
22 |
"""Script for unittesting the workerpool module"""
|
23 |
|
24 |
import unittest |
25 |
import threading |
26 |
import time |
27 |
import sys |
28 |
import zlib |
29 |
import random |
30 |
|
31 |
from ganeti import workerpool |
32 |
from ganeti import errors |
33 |
|
34 |
import testutils |
35 |
|
36 |
|
37 |
class CountingContext(object): |
38 |
def __init__(self): |
39 |
self._lock = threading.Condition(threading.Lock())
|
40 |
self.done = 0 |
41 |
|
42 |
def DoneTask(self): |
43 |
self._lock.acquire()
|
44 |
try:
|
45 |
self.done += 1 |
46 |
finally:
|
47 |
self._lock.release()
|
48 |
|
49 |
def GetDoneTasks(self): |
50 |
self._lock.acquire()
|
51 |
try:
|
52 |
return self.done |
53 |
finally:
|
54 |
self._lock.release()
|
55 |
|
56 |
@staticmethod
|
57 |
def UpdateChecksum(current, value): |
58 |
return zlib.adler32(str(value), current) |
59 |
|
60 |
|
61 |
class CountingBaseWorker(workerpool.BaseWorker): |
62 |
def RunTask(self, ctx, text): |
63 |
ctx.DoneTask() |
64 |
|
65 |
|
66 |
class ChecksumContext: |
67 |
CHECKSUM_START = zlib.adler32("")
|
68 |
|
69 |
def __init__(self): |
70 |
self.lock = threading.Condition(threading.Lock())
|
71 |
self.checksum = self.CHECKSUM_START |
72 |
|
73 |
@staticmethod
|
74 |
def UpdateChecksum(current, value): |
75 |
return zlib.adler32(str(value), current) |
76 |
|
77 |
|
78 |
class ChecksumBaseWorker(workerpool.BaseWorker): |
79 |
def RunTask(self, ctx, number): |
80 |
name = "number%s" % number
|
81 |
self.SetTaskName(name)
|
82 |
|
83 |
# This assertion needs to be checked before updating the checksum. A
|
84 |
# failing assertion will then cause the result to be wrong.
|
85 |
assert self.getName() == ("%s/%s" % (self._worker_id, name)) |
86 |
|
87 |
ctx.lock.acquire() |
88 |
try:
|
89 |
ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number) |
90 |
finally:
|
91 |
ctx.lock.release() |
92 |
|
93 |
|
94 |
class ListBuilderContext: |
95 |
def __init__(self): |
96 |
self.lock = threading.Lock()
|
97 |
self.result = []
|
98 |
self.prioresult = {}
|
99 |
|
100 |
|
101 |
class ListBuilderWorker(workerpool.BaseWorker): |
102 |
def RunTask(self, ctx, data): |
103 |
ctx.lock.acquire() |
104 |
try:
|
105 |
ctx.result.append((self.GetCurrentPriority(), data))
|
106 |
ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
|
107 |
finally:
|
108 |
ctx.lock.release() |
109 |
|
110 |
|
111 |
class DeferringTaskContext: |
112 |
def __init__(self): |
113 |
self.lock = threading.Lock()
|
114 |
self.prioresult = {}
|
115 |
self.samepriodefer = {}
|
116 |
|
117 |
|
118 |
class DeferringWorker(workerpool.BaseWorker): |
119 |
def RunTask(self, ctx, num, targetprio): |
120 |
ctx.lock.acquire() |
121 |
try:
|
122 |
if num in ctx.samepriodefer: |
123 |
del ctx.samepriodefer[num]
|
124 |
raise workerpool.DeferTask()
|
125 |
|
126 |
if self.GetCurrentPriority() > targetprio: |
127 |
raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1) |
128 |
|
129 |
ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num) |
130 |
finally:
|
131 |
ctx.lock.release() |
132 |
|
133 |
|
134 |
class TestWorkerpool(unittest.TestCase): |
135 |
"""Workerpool tests"""
|
136 |
|
137 |
def testCounting(self): |
138 |
ctx = CountingContext() |
139 |
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker) |
140 |
try:
|
141 |
self._CheckWorkerCount(wp, 3) |
142 |
|
143 |
for i in range(10): |
144 |
wp.AddTask((ctx, "Hello world %s" % i))
|
145 |
|
146 |
wp.Quiesce() |
147 |
finally:
|
148 |
wp.TerminateWorkers() |
149 |
self._CheckWorkerCount(wp, 0) |
150 |
|
151 |
self.assertEquals(ctx.GetDoneTasks(), 10) |
152 |
|
153 |
def testNoTasks(self): |
154 |
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker) |
155 |
try:
|
156 |
self._CheckWorkerCount(wp, 3) |
157 |
self._CheckNoTasks(wp)
|
158 |
finally:
|
159 |
wp.TerminateWorkers() |
160 |
self._CheckWorkerCount(wp, 0) |
161 |
|
162 |
def testNoTasksQuiesce(self): |
163 |
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker) |
164 |
try:
|
165 |
self._CheckWorkerCount(wp, 3) |
166 |
self._CheckNoTasks(wp)
|
167 |
wp.Quiesce() |
168 |
self._CheckNoTasks(wp)
|
169 |
finally:
|
170 |
wp.TerminateWorkers() |
171 |
self._CheckWorkerCount(wp, 0) |
172 |
|
173 |
def testChecksum(self): |
174 |
# Tests whether all tasks are run and, since we're only using a single
|
175 |
# thread, whether everything is started in order.
|
176 |
wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker) |
177 |
try:
|
178 |
self._CheckWorkerCount(wp, 1) |
179 |
|
180 |
ctx = ChecksumContext() |
181 |
checksum = ChecksumContext.CHECKSUM_START |
182 |
for i in range(1, 100): |
183 |
checksum = ChecksumContext.UpdateChecksum(checksum, i) |
184 |
wp.AddTask((ctx, i)) |
185 |
|
186 |
wp.Quiesce() |
187 |
|
188 |
self._CheckNoTasks(wp)
|
189 |
|
190 |
# Check sum
|
191 |
ctx.lock.acquire() |
192 |
try:
|
193 |
self.assertEqual(checksum, ctx.checksum)
|
194 |
finally:
|
195 |
ctx.lock.release() |
196 |
finally:
|
197 |
wp.TerminateWorkers() |
198 |
self._CheckWorkerCount(wp, 0) |
199 |
|
200 |
def testAddManyTasks(self): |
201 |
ctx = CountingContext() |
202 |
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker) |
203 |
try:
|
204 |
self._CheckWorkerCount(wp, 3) |
205 |
|
206 |
wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)]) |
207 |
wp.AddTask((ctx, "A separate hello"))
|
208 |
wp.AddTask((ctx, "Once more, hi!"))
|
209 |
wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)]) |
210 |
|
211 |
wp.Quiesce() |
212 |
|
213 |
self._CheckNoTasks(wp)
|
214 |
finally:
|
215 |
wp.TerminateWorkers() |
216 |
self._CheckWorkerCount(wp, 0) |
217 |
|
218 |
self.assertEquals(ctx.GetDoneTasks(), 22) |
219 |
|
220 |
def testManyTasksSequence(self): |
221 |
ctx = CountingContext() |
222 |
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker) |
223 |
try:
|
224 |
self._CheckWorkerCount(wp, 3) |
225 |
self.assertRaises(AssertionError, wp.AddManyTasks, |
226 |
["Hello world %s" % i for i in range(10)]) |
227 |
self.assertRaises(AssertionError, wp.AddManyTasks, |
228 |
[i for i in range(10)]) |
229 |
|
230 |
wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)]) |
231 |
wp.AddTask((ctx, "A separate hello"))
|
232 |
|
233 |
wp.Quiesce() |
234 |
|
235 |
self._CheckNoTasks(wp)
|
236 |
finally:
|
237 |
wp.TerminateWorkers() |
238 |
self._CheckWorkerCount(wp, 0) |
239 |
|
240 |
self.assertEquals(ctx.GetDoneTasks(), 11) |
241 |
|
242 |
def _CheckNoTasks(self, wp): |
243 |
wp._lock.acquire() |
244 |
try:
|
245 |
# The task queue must be empty now
|
246 |
self.failUnless(not wp._tasks) |
247 |
finally:
|
248 |
wp._lock.release() |
249 |
|
250 |
def _CheckWorkerCount(self, wp, num_workers): |
251 |
wp._lock.acquire() |
252 |
try:
|
253 |
self.assertEqual(len(wp._workers), num_workers) |
254 |
finally:
|
255 |
wp._lock.release() |
256 |
|
257 |
def testPriorityChecksum(self): |
258 |
# Tests whether all tasks are run and, since we're only using a single
|
259 |
# thread, whether everything is started in order and respects the priority
|
260 |
wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker) |
261 |
try:
|
262 |
self._CheckWorkerCount(wp, 1) |
263 |
|
264 |
ctx = ChecksumContext() |
265 |
|
266 |
data = {} |
267 |
tasks = [] |
268 |
priorities = [] |
269 |
for i in range(1, 333): |
270 |
prio = i % 7
|
271 |
tasks.append((ctx, i)) |
272 |
priorities.append(prio) |
273 |
data.setdefault(prio, []).append(i) |
274 |
|
275 |
wp.AddManyTasks(tasks, priority=priorities) |
276 |
|
277 |
wp.Quiesce() |
278 |
|
279 |
self._CheckNoTasks(wp)
|
280 |
|
281 |
# Check sum
|
282 |
ctx.lock.acquire() |
283 |
try:
|
284 |
checksum = ChecksumContext.CHECKSUM_START |
285 |
for priority in sorted(data.keys()): |
286 |
for i in data[priority]: |
287 |
checksum = ChecksumContext.UpdateChecksum(checksum, i) |
288 |
|
289 |
self.assertEqual(checksum, ctx.checksum)
|
290 |
finally:
|
291 |
ctx.lock.release() |
292 |
|
293 |
self._CheckWorkerCount(wp, 1) |
294 |
finally:
|
295 |
wp.TerminateWorkers() |
296 |
self._CheckWorkerCount(wp, 0) |
297 |
|
298 |
def testPriorityListManyTasks(self): |
299 |
# Tests whether all tasks are run and, since we're only using a single
|
300 |
# thread, whether everything is started in order and respects the priority
|
301 |
wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker) |
302 |
try:
|
303 |
self._CheckWorkerCount(wp, 1) |
304 |
|
305 |
ctx = ListBuilderContext() |
306 |
|
307 |
# Use static seed for this test
|
308 |
rnd = random.Random(0)
|
309 |
|
310 |
data = {} |
311 |
tasks = [] |
312 |
priorities = [] |
313 |
for i in range(1, 333): |
314 |
prio = int(rnd.random() * 10) |
315 |
tasks.append((ctx, i)) |
316 |
priorities.append(prio) |
317 |
data.setdefault(prio, []).append((prio, i)) |
318 |
|
319 |
wp.AddManyTasks(tasks, priority=priorities) |
320 |
|
321 |
self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
|
322 |
[("x", ), ("y", )], priority=[1] * 5) |
323 |
|
324 |
wp.Quiesce() |
325 |
|
326 |
self._CheckNoTasks(wp)
|
327 |
|
328 |
# Check result
|
329 |
ctx.lock.acquire() |
330 |
try:
|
331 |
expresult = [] |
332 |
for priority in sorted(data.keys()): |
333 |
expresult.extend(data[priority]) |
334 |
|
335 |
self.assertEqual(expresult, ctx.result)
|
336 |
finally:
|
337 |
ctx.lock.release() |
338 |
|
339 |
self._CheckWorkerCount(wp, 1) |
340 |
finally:
|
341 |
wp.TerminateWorkers() |
342 |
self._CheckWorkerCount(wp, 0) |
343 |
|
344 |
def testPriorityListSingleTasks(self): |
345 |
# Tests whether all tasks are run and, since we're only using a single
|
346 |
# thread, whether everything is started in order and respects the priority
|
347 |
wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker) |
348 |
try:
|
349 |
self._CheckWorkerCount(wp, 1) |
350 |
|
351 |
ctx = ListBuilderContext() |
352 |
|
353 |
# Use static seed for this test
|
354 |
rnd = random.Random(26279)
|
355 |
|
356 |
data = {} |
357 |
for i in range(1, 333): |
358 |
prio = int(rnd.random() * 30) |
359 |
wp.AddTask((ctx, i), priority=prio) |
360 |
data.setdefault(prio, []).append(i) |
361 |
|
362 |
# Cause some distortion
|
363 |
if i % 11 == 0: |
364 |
time.sleep(.001)
|
365 |
if i % 41 == 0: |
366 |
wp.Quiesce() |
367 |
|
368 |
wp.Quiesce() |
369 |
|
370 |
self._CheckNoTasks(wp)
|
371 |
|
372 |
# Check result
|
373 |
ctx.lock.acquire() |
374 |
try:
|
375 |
self.assertEqual(data, ctx.prioresult)
|
376 |
finally:
|
377 |
ctx.lock.release() |
378 |
|
379 |
self._CheckWorkerCount(wp, 1) |
380 |
finally:
|
381 |
wp.TerminateWorkers() |
382 |
self._CheckWorkerCount(wp, 0) |
383 |
|
384 |
def testPriorityListSingleTasks(self): |
385 |
# Tests whether all tasks are run and, since we're only using a single
|
386 |
# thread, whether everything is started in order and respects the priority
|
387 |
wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker) |
388 |
try:
|
389 |
self._CheckWorkerCount(wp, 1) |
390 |
|
391 |
ctx = ListBuilderContext() |
392 |
|
393 |
# Use static seed for this test
|
394 |
rnd = random.Random(26279)
|
395 |
|
396 |
data = {} |
397 |
for i in range(1, 333): |
398 |
prio = int(rnd.random() * 30) |
399 |
wp.AddTask((ctx, i), priority=prio) |
400 |
data.setdefault(prio, []).append(i) |
401 |
|
402 |
# Cause some distortion
|
403 |
if i % 11 == 0: |
404 |
time.sleep(.001)
|
405 |
if i % 41 == 0: |
406 |
wp.Quiesce() |
407 |
|
408 |
wp.Quiesce() |
409 |
|
410 |
self._CheckNoTasks(wp)
|
411 |
|
412 |
# Check result
|
413 |
ctx.lock.acquire() |
414 |
try:
|
415 |
self.assertEqual(data, ctx.prioresult)
|
416 |
finally:
|
417 |
ctx.lock.release() |
418 |
|
419 |
self._CheckWorkerCount(wp, 1) |
420 |
finally:
|
421 |
wp.TerminateWorkers() |
422 |
self._CheckWorkerCount(wp, 0) |
423 |
|
424 |
def testDeferTask(self): |
425 |
# Tests whether all tasks are run and, since we're only using a single
|
426 |
# thread, whether everything is started in order and respects the priority
|
427 |
wp = workerpool.WorkerPool("Test", 1, DeferringWorker) |
428 |
try:
|
429 |
self._CheckWorkerCount(wp, 1) |
430 |
|
431 |
ctx = DeferringTaskContext() |
432 |
|
433 |
# Use static seed for this test
|
434 |
rnd = random.Random(14921)
|
435 |
|
436 |
data = {} |
437 |
for i in range(1, 333): |
438 |
ctx.lock.acquire() |
439 |
try:
|
440 |
if i % 5 == 0: |
441 |
ctx.samepriodefer[i] = True
|
442 |
finally:
|
443 |
ctx.lock.release() |
444 |
|
445 |
prio = int(rnd.random() * 30) |
446 |
wp.AddTask((ctx, i, prio), priority=50)
|
447 |
data.setdefault(prio, set()).add(i)
|
448 |
|
449 |
# Cause some distortion
|
450 |
if i % 24 == 0: |
451 |
time.sleep(.001)
|
452 |
if i % 31 == 0: |
453 |
wp.Quiesce() |
454 |
|
455 |
wp.Quiesce() |
456 |
|
457 |
self._CheckNoTasks(wp)
|
458 |
|
459 |
# Check result
|
460 |
ctx.lock.acquire() |
461 |
try:
|
462 |
self.assertEqual(data, ctx.prioresult)
|
463 |
finally:
|
464 |
ctx.lock.release() |
465 |
|
466 |
self._CheckWorkerCount(wp, 1) |
467 |
finally:
|
468 |
wp.TerminateWorkers() |
469 |
self._CheckWorkerCount(wp, 0) |
470 |
|
471 |
|
472 |
if __name__ == '__main__': |
473 |
testutils.GanetiTestProgram() |