root / test / ganeti.workerpool_unittest.py @ bba69414
History | View | Annotate | Download (19.5 kB)
1 |
#!/usr/bin/python
|
---|---|
2 |
#
|
3 |
|
4 |
# Copyright (C) 2008, 2009, 2010 Google Inc.
|
5 |
#
|
6 |
# This program is free software; you can redistribute it and/or modify
|
7 |
# it under the terms of the GNU General Public License as published by
|
8 |
# the Free Software Foundation; either version 2 of the License, or
|
9 |
# (at your option) any later version.
|
10 |
#
|
11 |
# This program is distributed in the hope that it will be useful, but
|
12 |
# WITHOUT ANY WARRANTY; without even the implied warranty of
|
13 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
14 |
# General Public License for more details.
|
15 |
#
|
16 |
# You should have received a copy of the GNU General Public License
|
17 |
# along with this program; if not, write to the Free Software
|
18 |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
19 |
# 02110-1301, USA.
|
20 |
|
21 |
|
22 |
"""Script for unittesting the workerpool module"""
|
23 |
|
24 |
import unittest |
25 |
import threading |
26 |
import time |
27 |
import sys |
28 |
import zlib |
29 |
import random |
30 |
|
31 |
from ganeti import workerpool |
32 |
from ganeti import errors |
33 |
from ganeti import utils |
34 |
from ganeti import compat |
35 |
|
36 |
import testutils |
37 |
|
38 |
|
39 |
class CountingContext(object): |
40 |
def __init__(self): |
41 |
self._lock = threading.Condition(threading.Lock())
|
42 |
self.done = 0 |
43 |
|
44 |
def DoneTask(self): |
45 |
self._lock.acquire()
|
46 |
try:
|
47 |
self.done += 1 |
48 |
finally:
|
49 |
self._lock.release()
|
50 |
|
51 |
def GetDoneTasks(self): |
52 |
self._lock.acquire()
|
53 |
try:
|
54 |
return self.done |
55 |
finally:
|
56 |
self._lock.release()
|
57 |
|
58 |
@staticmethod
|
59 |
def UpdateChecksum(current, value): |
60 |
return zlib.adler32(str(value), current) |
61 |
|
62 |
|
63 |
class CountingBaseWorker(workerpool.BaseWorker): |
64 |
def RunTask(self, ctx, text): |
65 |
ctx.DoneTask() |
66 |
|
67 |
|
68 |
class ChecksumContext: |
69 |
CHECKSUM_START = zlib.adler32("")
|
70 |
|
71 |
def __init__(self): |
72 |
self.lock = threading.Condition(threading.Lock())
|
73 |
self.checksum = self.CHECKSUM_START |
74 |
|
75 |
@staticmethod
|
76 |
def UpdateChecksum(current, value): |
77 |
return zlib.adler32(str(value), current) |
78 |
|
79 |
|
80 |
class ChecksumBaseWorker(workerpool.BaseWorker): |
81 |
def RunTask(self, ctx, number): |
82 |
name = "number%s" % number
|
83 |
self.SetTaskName(name)
|
84 |
|
85 |
# This assertion needs to be checked before updating the checksum. A
|
86 |
# failing assertion will then cause the result to be wrong.
|
87 |
assert self.getName() == ("%s/%s" % (self._worker_id, name)) |
88 |
|
89 |
ctx.lock.acquire() |
90 |
try:
|
91 |
ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number) |
92 |
finally:
|
93 |
ctx.lock.release() |
94 |
|
95 |
|
96 |
class ListBuilderContext: |
97 |
def __init__(self): |
98 |
self.lock = threading.Lock()
|
99 |
self.result = []
|
100 |
self.prioresult = {}
|
101 |
|
102 |
|
103 |
class ListBuilderWorker(workerpool.BaseWorker): |
104 |
def RunTask(self, ctx, data): |
105 |
ctx.lock.acquire() |
106 |
try:
|
107 |
ctx.result.append((self.GetCurrentPriority(), data))
|
108 |
ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
|
109 |
finally:
|
110 |
ctx.lock.release() |
111 |
|
112 |
|
113 |
class DeferringTaskContext: |
114 |
def __init__(self): |
115 |
self.lock = threading.Lock()
|
116 |
self.prioresult = {}
|
117 |
self.samepriodefer = {}
|
118 |
self.num2ordertaskid = {}
|
119 |
|
120 |
|
121 |
class DeferringWorker(workerpool.BaseWorker): |
122 |
def RunTask(self, ctx, num, targetprio): |
123 |
ctx.lock.acquire() |
124 |
try:
|
125 |
otilst = ctx.num2ordertaskid.setdefault(num, []) |
126 |
otilst.append(self._GetCurrentOrderAndTaskId())
|
127 |
|
128 |
if num in ctx.samepriodefer: |
129 |
del ctx.samepriodefer[num]
|
130 |
raise workerpool.DeferTask()
|
131 |
|
132 |
if self.GetCurrentPriority() > targetprio: |
133 |
raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1) |
134 |
|
135 |
ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num) |
136 |
finally:
|
137 |
ctx.lock.release() |
138 |
|
139 |
|
140 |
class PriorityContext: |
141 |
def __init__(self): |
142 |
self.lock = threading.Lock()
|
143 |
self.result = []
|
144 |
|
145 |
|
146 |
class PriorityWorker(workerpool.BaseWorker): |
147 |
def RunTask(self, ctx, data): |
148 |
ctx.lock.acquire() |
149 |
try:
|
150 |
ctx.result.append((self.GetCurrentPriority(), data))
|
151 |
finally:
|
152 |
ctx.lock.release() |
153 |
|
154 |
|
155 |
class NotImplementedWorker(workerpool.BaseWorker): |
156 |
def RunTask(self): |
157 |
raise NotImplementedError |
158 |
|
159 |
|
160 |
class TestWorkerpool(unittest.TestCase): |
161 |
"""Workerpool tests"""
|
162 |
|
163 |
def testCounting(self): |
164 |
ctx = CountingContext() |
165 |
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker) |
166 |
try:
|
167 |
self._CheckWorkerCount(wp, 3) |
168 |
|
169 |
for i in range(10): |
170 |
wp.AddTask((ctx, "Hello world %s" % i))
|
171 |
|
172 |
wp.Quiesce() |
173 |
finally:
|
174 |
wp.TerminateWorkers() |
175 |
self._CheckWorkerCount(wp, 0) |
176 |
|
177 |
self.assertEquals(ctx.GetDoneTasks(), 10) |
178 |
|
179 |
def testNoTasks(self): |
180 |
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker) |
181 |
try:
|
182 |
self._CheckWorkerCount(wp, 3) |
183 |
self._CheckNoTasks(wp)
|
184 |
finally:
|
185 |
wp.TerminateWorkers() |
186 |
self._CheckWorkerCount(wp, 0) |
187 |
|
188 |
def testNoTasksQuiesce(self): |
189 |
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker) |
190 |
try:
|
191 |
self._CheckWorkerCount(wp, 3) |
192 |
self._CheckNoTasks(wp)
|
193 |
wp.Quiesce() |
194 |
self._CheckNoTasks(wp)
|
195 |
finally:
|
196 |
wp.TerminateWorkers() |
197 |
self._CheckWorkerCount(wp, 0) |
198 |
|
199 |
def testActive(self): |
200 |
ctx = CountingContext() |
201 |
wp = workerpool.WorkerPool("TestActive", 5, CountingBaseWorker) |
202 |
try:
|
203 |
self._CheckWorkerCount(wp, 5) |
204 |
self.assertTrue(wp._active)
|
205 |
|
206 |
# Process some tasks
|
207 |
for _ in range(10): |
208 |
wp.AddTask((ctx, None))
|
209 |
|
210 |
wp.Quiesce() |
211 |
self._CheckNoTasks(wp)
|
212 |
self.assertEquals(ctx.GetDoneTasks(), 10) |
213 |
|
214 |
# Repeat a few times
|
215 |
for count in range(10): |
216 |
# Deactivate pool
|
217 |
wp.SetActive(False)
|
218 |
self._CheckNoTasks(wp)
|
219 |
|
220 |
# Queue some more tasks
|
221 |
for _ in range(10): |
222 |
wp.AddTask((ctx, None))
|
223 |
|
224 |
for _ in range(5): |
225 |
# Short delays to give other threads a chance to cause breakage
|
226 |
time.sleep(.01)
|
227 |
wp.AddTask((ctx, "Hello world %s" % 999)) |
228 |
self.assertFalse(wp._active)
|
229 |
|
230 |
self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15)) |
231 |
|
232 |
# Start processing again
|
233 |
wp.SetActive(True)
|
234 |
self.assertTrue(wp._active)
|
235 |
|
236 |
# Wait for tasks to finish
|
237 |
wp.Quiesce() |
238 |
self._CheckNoTasks(wp)
|
239 |
self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15) + 15) |
240 |
|
241 |
self._CheckWorkerCount(wp, 5) |
242 |
finally:
|
243 |
wp.TerminateWorkers() |
244 |
self._CheckWorkerCount(wp, 0) |
245 |
|
246 |
def testChecksum(self): |
247 |
# Tests whether all tasks are run and, since we're only using a single
|
248 |
# thread, whether everything is started in order.
|
249 |
wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker) |
250 |
try:
|
251 |
self._CheckWorkerCount(wp, 1) |
252 |
|
253 |
ctx = ChecksumContext() |
254 |
checksum = ChecksumContext.CHECKSUM_START |
255 |
for i in range(1, 100): |
256 |
checksum = ChecksumContext.UpdateChecksum(checksum, i) |
257 |
wp.AddTask((ctx, i)) |
258 |
|
259 |
wp.Quiesce() |
260 |
|
261 |
self._CheckNoTasks(wp)
|
262 |
|
263 |
# Check sum
|
264 |
ctx.lock.acquire() |
265 |
try:
|
266 |
self.assertEqual(checksum, ctx.checksum)
|
267 |
finally:
|
268 |
ctx.lock.release() |
269 |
finally:
|
270 |
wp.TerminateWorkers() |
271 |
self._CheckWorkerCount(wp, 0) |
272 |
|
273 |
def testAddManyTasks(self): |
274 |
ctx = CountingContext() |
275 |
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker) |
276 |
try:
|
277 |
self._CheckWorkerCount(wp, 3) |
278 |
|
279 |
wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)]) |
280 |
wp.AddTask((ctx, "A separate hello"))
|
281 |
wp.AddTask((ctx, "Once more, hi!"))
|
282 |
wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)]) |
283 |
|
284 |
wp.Quiesce() |
285 |
|
286 |
self._CheckNoTasks(wp)
|
287 |
finally:
|
288 |
wp.TerminateWorkers() |
289 |
self._CheckWorkerCount(wp, 0) |
290 |
|
291 |
self.assertEquals(ctx.GetDoneTasks(), 22) |
292 |
|
293 |
def testManyTasksSequence(self): |
294 |
ctx = CountingContext() |
295 |
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker) |
296 |
try:
|
297 |
self._CheckWorkerCount(wp, 3) |
298 |
self.assertRaises(AssertionError, wp.AddManyTasks, |
299 |
["Hello world %s" % i for i in range(10)]) |
300 |
self.assertRaises(AssertionError, wp.AddManyTasks, |
301 |
[i for i in range(10)]) |
302 |
self.assertRaises(AssertionError, wp.AddManyTasks, [], task_id=0) |
303 |
|
304 |
wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)]) |
305 |
wp.AddTask((ctx, "A separate hello"))
|
306 |
|
307 |
wp.Quiesce() |
308 |
|
309 |
self._CheckNoTasks(wp)
|
310 |
finally:
|
311 |
wp.TerminateWorkers() |
312 |
self._CheckWorkerCount(wp, 0) |
313 |
|
314 |
self.assertEquals(ctx.GetDoneTasks(), 11) |
315 |
|
316 |
def _CheckNoTasks(self, wp): |
317 |
wp._lock.acquire() |
318 |
try:
|
319 |
# The task queue must be empty now
|
320 |
self.assertFalse(wp._tasks)
|
321 |
self.assertFalse(wp._taskdata)
|
322 |
finally:
|
323 |
wp._lock.release() |
324 |
|
325 |
def _CheckWorkerCount(self, wp, num_workers): |
326 |
wp._lock.acquire() |
327 |
try:
|
328 |
self.assertEqual(len(wp._workers), num_workers) |
329 |
finally:
|
330 |
wp._lock.release() |
331 |
|
332 |
def testPriorityChecksum(self): |
333 |
# Tests whether all tasks are run and, since we're only using a single
|
334 |
# thread, whether everything is started in order and respects the priority
|
335 |
wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker) |
336 |
try:
|
337 |
self._CheckWorkerCount(wp, 1) |
338 |
|
339 |
ctx = ChecksumContext() |
340 |
|
341 |
data = {} |
342 |
tasks = [] |
343 |
priorities = [] |
344 |
for i in range(1, 333): |
345 |
prio = i % 7
|
346 |
tasks.append((ctx, i)) |
347 |
priorities.append(prio) |
348 |
data.setdefault(prio, []).append(i) |
349 |
|
350 |
wp.AddManyTasks(tasks, priority=priorities) |
351 |
|
352 |
wp.Quiesce() |
353 |
|
354 |
self._CheckNoTasks(wp)
|
355 |
|
356 |
# Check sum
|
357 |
ctx.lock.acquire() |
358 |
try:
|
359 |
checksum = ChecksumContext.CHECKSUM_START |
360 |
for priority in sorted(data.keys()): |
361 |
for i in data[priority]: |
362 |
checksum = ChecksumContext.UpdateChecksum(checksum, i) |
363 |
|
364 |
self.assertEqual(checksum, ctx.checksum)
|
365 |
finally:
|
366 |
ctx.lock.release() |
367 |
|
368 |
self._CheckWorkerCount(wp, 1) |
369 |
finally:
|
370 |
wp.TerminateWorkers() |
371 |
self._CheckWorkerCount(wp, 0) |
372 |
|
373 |
def testPriorityListManyTasks(self): |
374 |
# Tests whether all tasks are run and, since we're only using a single
|
375 |
# thread, whether everything is started in order and respects the priority
|
376 |
wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker) |
377 |
try:
|
378 |
self._CheckWorkerCount(wp, 1) |
379 |
|
380 |
ctx = ListBuilderContext() |
381 |
|
382 |
# Use static seed for this test
|
383 |
rnd = random.Random(0)
|
384 |
|
385 |
data = {} |
386 |
tasks = [] |
387 |
priorities = [] |
388 |
for i in range(1, 333): |
389 |
prio = int(rnd.random() * 10) |
390 |
tasks.append((ctx, i)) |
391 |
priorities.append(prio) |
392 |
data.setdefault(prio, []).append((prio, i)) |
393 |
|
394 |
wp.AddManyTasks(tasks, priority=priorities) |
395 |
|
396 |
self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
|
397 |
[("x", ), ("y", )], priority=[1] * 5) |
398 |
self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
|
399 |
[("x", ), ("y", )], task_id=[1] * 5) |
400 |
|
401 |
wp.Quiesce() |
402 |
|
403 |
self._CheckNoTasks(wp)
|
404 |
|
405 |
# Check result
|
406 |
ctx.lock.acquire() |
407 |
try:
|
408 |
expresult = [] |
409 |
for priority in sorted(data.keys()): |
410 |
expresult.extend(data[priority]) |
411 |
|
412 |
self.assertEqual(expresult, ctx.result)
|
413 |
finally:
|
414 |
ctx.lock.release() |
415 |
|
416 |
self._CheckWorkerCount(wp, 1) |
417 |
finally:
|
418 |
wp.TerminateWorkers() |
419 |
self._CheckWorkerCount(wp, 0) |
420 |
|
421 |
def testPriorityListSingleTasks(self): |
422 |
# Tests whether all tasks are run and, since we're only using a single
|
423 |
# thread, whether everything is started in order and respects the priority
|
424 |
wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker) |
425 |
try:
|
426 |
self._CheckWorkerCount(wp, 1) |
427 |
|
428 |
ctx = ListBuilderContext() |
429 |
|
430 |
# Use static seed for this test
|
431 |
rnd = random.Random(26279)
|
432 |
|
433 |
data = {} |
434 |
for i in range(1, 333): |
435 |
prio = int(rnd.random() * 30) |
436 |
wp.AddTask((ctx, i), priority=prio) |
437 |
data.setdefault(prio, []).append(i) |
438 |
|
439 |
# Cause some distortion
|
440 |
if i % 11 == 0: |
441 |
time.sleep(.001)
|
442 |
if i % 41 == 0: |
443 |
wp.Quiesce() |
444 |
|
445 |
wp.Quiesce() |
446 |
|
447 |
self._CheckNoTasks(wp)
|
448 |
|
449 |
# Check result
|
450 |
ctx.lock.acquire() |
451 |
try:
|
452 |
self.assertEqual(data, ctx.prioresult)
|
453 |
finally:
|
454 |
ctx.lock.release() |
455 |
|
456 |
self._CheckWorkerCount(wp, 1) |
457 |
finally:
|
458 |
wp.TerminateWorkers() |
459 |
self._CheckWorkerCount(wp, 0) |
460 |
|
461 |
def testDeferTask(self): |
462 |
# Tests whether all tasks are run and, since we're only using a single
|
463 |
# thread, whether everything is started in order and respects the priority
|
464 |
wp = workerpool.WorkerPool("Test", 1, DeferringWorker) |
465 |
try:
|
466 |
self._CheckWorkerCount(wp, 1) |
467 |
|
468 |
ctx = DeferringTaskContext() |
469 |
|
470 |
# Use static seed for this test
|
471 |
rnd = random.Random(14921)
|
472 |
|
473 |
data = {} |
474 |
num2taskid = {} |
475 |
for i in range(1, 333): |
476 |
ctx.lock.acquire() |
477 |
try:
|
478 |
if i % 5 == 0: |
479 |
ctx.samepriodefer[i] = True
|
480 |
finally:
|
481 |
ctx.lock.release() |
482 |
|
483 |
prio = int(rnd.random() * 30) |
484 |
num2taskid[i] = 1000 * i
|
485 |
wp.AddTask((ctx, i, prio), priority=50,
|
486 |
task_id=num2taskid[i]) |
487 |
data.setdefault(prio, set()).add(i)
|
488 |
|
489 |
# Cause some distortion
|
490 |
if i % 24 == 0: |
491 |
time.sleep(.001)
|
492 |
if i % 31 == 0: |
493 |
wp.Quiesce() |
494 |
|
495 |
wp.Quiesce() |
496 |
|
497 |
self._CheckNoTasks(wp)
|
498 |
|
499 |
# Check result
|
500 |
ctx.lock.acquire() |
501 |
try:
|
502 |
self.assertEqual(data, ctx.prioresult)
|
503 |
|
504 |
all_order_ids = [] |
505 |
|
506 |
for (num, numordertaskid) in ctx.num2ordertaskid.items(): |
507 |
order_ids = map(compat.fst, numordertaskid)
|
508 |
self.assertFalse(utils.FindDuplicates(order_ids),
|
509 |
msg="Order ID has been reused")
|
510 |
all_order_ids.extend(order_ids) |
511 |
|
512 |
for task_id in map(compat.snd, numordertaskid): |
513 |
self.assertEqual(task_id, num2taskid[num],
|
514 |
msg=("Task %s used different task IDs" % num))
|
515 |
|
516 |
self.assertFalse(utils.FindDuplicates(all_order_ids),
|
517 |
msg="Order ID has been reused")
|
518 |
finally:
|
519 |
ctx.lock.release() |
520 |
|
521 |
self._CheckWorkerCount(wp, 1) |
522 |
finally:
|
523 |
wp.TerminateWorkers() |
524 |
self._CheckWorkerCount(wp, 0) |
525 |
|
526 |
def testChangeTaskPriority(self): |
527 |
wp = workerpool.WorkerPool("Test", 1, PriorityWorker) |
528 |
try:
|
529 |
self._CheckWorkerCount(wp, 1) |
530 |
|
531 |
ctx = PriorityContext() |
532 |
|
533 |
# Use static seed for this test
|
534 |
rnd = random.Random(4727)
|
535 |
|
536 |
# Disable processing of tasks
|
537 |
wp.SetActive(False)
|
538 |
|
539 |
# No task ID
|
540 |
self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority,
|
541 |
None, 0) |
542 |
|
543 |
# Pre-generate task IDs and priorities
|
544 |
count = 100
|
545 |
task_ids = range(0, count) |
546 |
priorities = range(200, 200 + count) * 2 |
547 |
|
548 |
rnd.shuffle(task_ids) |
549 |
rnd.shuffle(priorities) |
550 |
|
551 |
# Make sure there are some duplicate priorities, but not all
|
552 |
priorities[count * 2 - 10:count * 2 - 1] = \ |
553 |
priorities[count - 10: count - 1] |
554 |
|
555 |
assert len(priorities) == 2 * count |
556 |
assert priorities[0:(count - 1)] != priorities[count:(2 * count - 1)] |
557 |
|
558 |
# Add some tasks; this loop consumes the first half of all previously
|
559 |
# generated priorities
|
560 |
for (idx, task_id) in enumerate(task_ids): |
561 |
wp.AddTask((ctx, idx), |
562 |
priority=priorities.pop(), |
563 |
task_id=task_id) |
564 |
|
565 |
self.assertEqual(len(wp._tasks), len(task_ids)) |
566 |
self.assertEqual(len(wp._taskdata), len(task_ids)) |
567 |
|
568 |
# Tasks have been added, so half of the priorities should have been
|
569 |
# consumed
|
570 |
assert len(priorities) == len(task_ids) |
571 |
|
572 |
# Change task priority
|
573 |
expected = [] |
574 |
for ((idx, task_id), prio) in zip(enumerate(task_ids), priorities): |
575 |
wp.ChangeTaskPriority(task_id, prio) |
576 |
expected.append((prio, idx)) |
577 |
|
578 |
self.assertEqual(len(wp._taskdata), len(task_ids)) |
579 |
|
580 |
# Half the entries are now abandoned tasks
|
581 |
self.assertEqual(len(wp._tasks), len(task_ids) * 2) |
582 |
|
583 |
assert len(priorities) == count |
584 |
assert len(task_ids) == count |
585 |
|
586 |
# Start processing
|
587 |
wp.SetActive(True)
|
588 |
|
589 |
# Wait for tasks to finish
|
590 |
wp.Quiesce() |
591 |
|
592 |
self._CheckNoTasks(wp)
|
593 |
|
594 |
for task_id in task_ids: |
595 |
# All tasks are done
|
596 |
self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority,
|
597 |
task_id, 0)
|
598 |
|
599 |
# Check result
|
600 |
ctx.lock.acquire() |
601 |
try:
|
602 |
self.assertEqual(ctx.result, sorted(expected)) |
603 |
finally:
|
604 |
ctx.lock.release() |
605 |
|
606 |
self._CheckWorkerCount(wp, 1) |
607 |
finally:
|
608 |
wp.TerminateWorkers() |
609 |
self._CheckWorkerCount(wp, 0) |
610 |
|
611 |
def testChangeTaskPriorityInteralStructures(self): |
612 |
wp = workerpool.WorkerPool("Test", 1, NotImplementedWorker) |
613 |
try:
|
614 |
self._CheckWorkerCount(wp, 1) |
615 |
|
616 |
# Use static seed for this test
|
617 |
rnd = random.Random(643)
|
618 |
|
619 |
(num1, num2) = rnd.sample(range(1000), 2) |
620 |
|
621 |
# Disable processing of tasks
|
622 |
wp.SetActive(False)
|
623 |
|
624 |
self.assertFalse(wp._tasks)
|
625 |
self.assertFalse(wp._taskdata)
|
626 |
|
627 |
# No priority or task ID
|
628 |
wp.AddTask(()) |
629 |
self.assertEqual(wp._tasks, [
|
630 |
[workerpool._DEFAULT_PRIORITY, 0, None, ()], |
631 |
]) |
632 |
self.assertFalse(wp._taskdata)
|
633 |
|
634 |
# No task ID
|
635 |
wp.AddTask((), priority=7413)
|
636 |
self.assertEqual(wp._tasks, [
|
637 |
[workerpool._DEFAULT_PRIORITY, 0, None, ()], |
638 |
[7413, 1, None, ()], |
639 |
]) |
640 |
self.assertFalse(wp._taskdata)
|
641 |
|
642 |
# Start adding real tasks
|
643 |
wp.AddTask((), priority=10267659, task_id=num1)
|
644 |
self.assertEqual(wp._tasks, [
|
645 |
[workerpool._DEFAULT_PRIORITY, 0, None, ()], |
646 |
[7413, 1, None, ()], |
647 |
[10267659, 2, num1, ()], |
648 |
]) |
649 |
self.assertEqual(wp._taskdata, {
|
650 |
num1: [10267659, 2, num1, ()], |
651 |
}) |
652 |
|
653 |
wp.AddTask((), priority=123, task_id=num2)
|
654 |
self.assertEqual(sorted(wp._tasks), [ |
655 |
[workerpool._DEFAULT_PRIORITY, 0, None, ()], |
656 |
[123, 3, num2, ()], |
657 |
[7413, 1, None, ()], |
658 |
[10267659, 2, num1, ()], |
659 |
]) |
660 |
self.assertEqual(wp._taskdata, {
|
661 |
num1: [10267659, 2, num1, ()], |
662 |
num2: [123, 3, num2, ()], |
663 |
}) |
664 |
|
665 |
wp.ChangeTaskPriority(num1, 100)
|
666 |
self.assertEqual(sorted(wp._tasks), [ |
667 |
[workerpool._DEFAULT_PRIORITY, 0, None, ()], |
668 |
[100, 2, num1, ()], |
669 |
[123, 3, num2, ()], |
670 |
[7413, 1, None, ()], |
671 |
[10267659, 2, num1, None], |
672 |
]) |
673 |
self.assertEqual(wp._taskdata, {
|
674 |
num1: [100, 2, num1, ()], |
675 |
num2: [123, 3, num2, ()], |
676 |
}) |
677 |
|
678 |
wp.ChangeTaskPriority(num2, 91337)
|
679 |
self.assertEqual(sorted(wp._tasks), [ |
680 |
[workerpool._DEFAULT_PRIORITY, 0, None, ()], |
681 |
[100, 2, num1, ()], |
682 |
[123, 3, num2, None], |
683 |
[7413, 1, None, ()], |
684 |
[91337, 3, num2, ()], |
685 |
[10267659, 2, num1, None], |
686 |
]) |
687 |
self.assertEqual(wp._taskdata, {
|
688 |
num1: [100, 2, num1, ()], |
689 |
num2: [91337, 3, num2, ()], |
690 |
}) |
691 |
|
692 |
wp.ChangeTaskPriority(num1, 10139)
|
693 |
self.assertEqual(sorted(wp._tasks), [ |
694 |
[workerpool._DEFAULT_PRIORITY, 0, None, ()], |
695 |
[100, 2, num1, None], |
696 |
[123, 3, num2, None], |
697 |
[7413, 1, None, ()], |
698 |
[10139, 2, num1, ()], |
699 |
[91337, 3, num2, ()], |
700 |
[10267659, 2, num1, None], |
701 |
]) |
702 |
self.assertEqual(wp._taskdata, {
|
703 |
num1: [10139, 2, num1, ()], |
704 |
num2: [91337, 3, num2, ()], |
705 |
}) |
706 |
|
707 |
# Change to the same priority once again
|
708 |
wp.ChangeTaskPriority(num1, 10139)
|
709 |
self.assertEqual(sorted(wp._tasks), [ |
710 |
[workerpool._DEFAULT_PRIORITY, 0, None, ()], |
711 |
[100, 2, num1, None], |
712 |
[123, 3, num2, None], |
713 |
[7413, 1, None, ()], |
714 |
[10139, 2, num1, None], |
715 |
[10139, 2, num1, ()], |
716 |
[91337, 3, num2, ()], |
717 |
[10267659, 2, num1, None], |
718 |
]) |
719 |
self.assertEqual(wp._taskdata, {
|
720 |
num1: [10139, 2, num1, ()], |
721 |
num2: [91337, 3, num2, ()], |
722 |
}) |
723 |
|
724 |
self._CheckWorkerCount(wp, 1) |
725 |
finally:
|
726 |
wp.TerminateWorkers() |
727 |
self._CheckWorkerCount(wp, 0) |
728 |
|
729 |
|
730 |
if __name__ == "__main__": |
731 |
testutils.GanetiTestProgram() |