Remove duplicate workerpool test
[ganeti-local] / test / ganeti.workerpool_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the workerpool module"""
23
24 import unittest
25 import threading
26 import time
27 import sys
28 import zlib
29 import random
30
31 from ganeti import workerpool
32 from ganeti import errors
33
34 import testutils
35
36
37 class CountingContext(object):
38   def __init__(self):
39     self._lock = threading.Condition(threading.Lock())
40     self.done = 0
41
42   def DoneTask(self):
43     self._lock.acquire()
44     try:
45       self.done += 1
46     finally:
47       self._lock.release()
48
49   def GetDoneTasks(self):
50     self._lock.acquire()
51     try:
52       return self.done
53     finally:
54       self._lock.release()
55
56   @staticmethod
57   def UpdateChecksum(current, value):
58     return zlib.adler32(str(value), current)
59
60
61 class CountingBaseWorker(workerpool.BaseWorker):
62   def RunTask(self, ctx, text):
63     ctx.DoneTask()
64
65
66 class ChecksumContext:
67   CHECKSUM_START = zlib.adler32("")
68
69   def __init__(self):
70     self.lock = threading.Condition(threading.Lock())
71     self.checksum = self.CHECKSUM_START
72
73   @staticmethod
74   def UpdateChecksum(current, value):
75     return zlib.adler32(str(value), current)
76
77
78 class ChecksumBaseWorker(workerpool.BaseWorker):
79   def RunTask(self, ctx, number):
80     name = "number%s" % number
81     self.SetTaskName(name)
82
83     # This assertion needs to be checked before updating the checksum. A
84     # failing assertion will then cause the result to be wrong.
85     assert self.getName() == ("%s/%s" % (self._worker_id, name))
86
87     ctx.lock.acquire()
88     try:
89       ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
90     finally:
91       ctx.lock.release()
92
93
94 class ListBuilderContext:
95   def __init__(self):
96     self.lock = threading.Lock()
97     self.result = []
98     self.prioresult = {}
99
100
101 class ListBuilderWorker(workerpool.BaseWorker):
102   def RunTask(self, ctx, data):
103     ctx.lock.acquire()
104     try:
105       ctx.result.append((self.GetCurrentPriority(), data))
106       ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
107     finally:
108       ctx.lock.release()
109
110
111 class DeferringTaskContext:
112   def __init__(self):
113     self.lock = threading.Lock()
114     self.prioresult = {}
115     self.samepriodefer = {}
116
117
118 class DeferringWorker(workerpool.BaseWorker):
119   def RunTask(self, ctx, num, targetprio):
120     ctx.lock.acquire()
121     try:
122       if num in ctx.samepriodefer:
123         del ctx.samepriodefer[num]
124         raise workerpool.DeferTask()
125
126       if self.GetCurrentPriority() > targetprio:
127         raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)
128
129       ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
130     finally:
131       ctx.lock.release()
132
133
134 class TestWorkerpool(unittest.TestCase):
135   """Workerpool tests"""
136
137   def testCounting(self):
138     ctx = CountingContext()
139     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
140     try:
141       self._CheckWorkerCount(wp, 3)
142
143       for i in range(10):
144         wp.AddTask((ctx, "Hello world %s" % i))
145
146       wp.Quiesce()
147     finally:
148       wp.TerminateWorkers()
149       self._CheckWorkerCount(wp, 0)
150
151     self.assertEquals(ctx.GetDoneTasks(), 10)
152
153   def testNoTasks(self):
154     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
155     try:
156       self._CheckWorkerCount(wp, 3)
157       self._CheckNoTasks(wp)
158     finally:
159       wp.TerminateWorkers()
160       self._CheckWorkerCount(wp, 0)
161
162   def testNoTasksQuiesce(self):
163     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
164     try:
165       self._CheckWorkerCount(wp, 3)
166       self._CheckNoTasks(wp)
167       wp.Quiesce()
168       self._CheckNoTasks(wp)
169     finally:
170       wp.TerminateWorkers()
171       self._CheckWorkerCount(wp, 0)
172
173   def testActive(self):
174     ctx = CountingContext()
175     wp = workerpool.WorkerPool("TestActive", 5, CountingBaseWorker)
176     try:
177       self._CheckWorkerCount(wp, 5)
178       self.assertTrue(wp._active)
179
180       # Process some tasks
181       for _ in range(10):
182         wp.AddTask((ctx, None))
183
184       wp.Quiesce()
185       self._CheckNoTasks(wp)
186       self.assertEquals(ctx.GetDoneTasks(), 10)
187
188       # Repeat a few times
189       for count in range(10):
190         # Deactivate pool
191         wp.SetActive(False)
192         self._CheckNoTasks(wp)
193
194         # Queue some more tasks
195         for _ in range(10):
196           wp.AddTask((ctx, None))
197
198         for _ in range(5):
199           # Short delays to give other threads a chance to cause breakage
200           time.sleep(.01)
201           wp.AddTask((ctx, "Hello world %s" % 999))
202           self.assertFalse(wp._active)
203
204         self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15))
205
206         # Start processing again
207         wp.SetActive(True)
208         self.assertTrue(wp._active)
209
210         # Wait for tasks to finish
211         wp.Quiesce()
212         self._CheckNoTasks(wp)
213         self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15) + 15)
214
215         self._CheckWorkerCount(wp, 5)
216     finally:
217       wp.TerminateWorkers()
218       self._CheckWorkerCount(wp, 0)
219
220   def testChecksum(self):
221     # Tests whether all tasks are run and, since we're only using a single
222     # thread, whether everything is started in order.
223     wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
224     try:
225       self._CheckWorkerCount(wp, 1)
226
227       ctx = ChecksumContext()
228       checksum = ChecksumContext.CHECKSUM_START
229       for i in range(1, 100):
230         checksum = ChecksumContext.UpdateChecksum(checksum, i)
231         wp.AddTask((ctx, i))
232
233       wp.Quiesce()
234
235       self._CheckNoTasks(wp)
236
237       # Check sum
238       ctx.lock.acquire()
239       try:
240         self.assertEqual(checksum, ctx.checksum)
241       finally:
242         ctx.lock.release()
243     finally:
244       wp.TerminateWorkers()
245       self._CheckWorkerCount(wp, 0)
246
247   def testAddManyTasks(self):
248     ctx = CountingContext()
249     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
250     try:
251       self._CheckWorkerCount(wp, 3)
252
253       wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
254       wp.AddTask((ctx, "A separate hello"))
255       wp.AddTask((ctx, "Once more, hi!"))
256       wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
257
258       wp.Quiesce()
259
260       self._CheckNoTasks(wp)
261     finally:
262       wp.TerminateWorkers()
263       self._CheckWorkerCount(wp, 0)
264
265     self.assertEquals(ctx.GetDoneTasks(), 22)
266
267   def testManyTasksSequence(self):
268     ctx = CountingContext()
269     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
270     try:
271       self._CheckWorkerCount(wp, 3)
272       self.assertRaises(AssertionError, wp.AddManyTasks,
273                         ["Hello world %s" % i for i in range(10)])
274       self.assertRaises(AssertionError, wp.AddManyTasks,
275                         [i for i in range(10)])
276
277       wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
278       wp.AddTask((ctx, "A separate hello"))
279
280       wp.Quiesce()
281
282       self._CheckNoTasks(wp)
283     finally:
284       wp.TerminateWorkers()
285       self._CheckWorkerCount(wp, 0)
286
287     self.assertEquals(ctx.GetDoneTasks(), 11)
288
289   def _CheckNoTasks(self, wp):
290     wp._lock.acquire()
291     try:
292       # The task queue must be empty now
293       self.failUnless(not wp._tasks)
294     finally:
295       wp._lock.release()
296
297   def _CheckWorkerCount(self, wp, num_workers):
298     wp._lock.acquire()
299     try:
300       self.assertEqual(len(wp._workers), num_workers)
301     finally:
302       wp._lock.release()
303
304   def testPriorityChecksum(self):
305     # Tests whether all tasks are run and, since we're only using a single
306     # thread, whether everything is started in order and respects the priority
307     wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
308     try:
309       self._CheckWorkerCount(wp, 1)
310
311       ctx = ChecksumContext()
312
313       data = {}
314       tasks = []
315       priorities = []
316       for i in range(1, 333):
317         prio = i % 7
318         tasks.append((ctx, i))
319         priorities.append(prio)
320         data.setdefault(prio, []).append(i)
321
322       wp.AddManyTasks(tasks, priority=priorities)
323
324       wp.Quiesce()
325
326       self._CheckNoTasks(wp)
327
328       # Check sum
329       ctx.lock.acquire()
330       try:
331         checksum = ChecksumContext.CHECKSUM_START
332         for priority in sorted(data.keys()):
333           for i in data[priority]:
334             checksum = ChecksumContext.UpdateChecksum(checksum, i)
335
336         self.assertEqual(checksum, ctx.checksum)
337       finally:
338         ctx.lock.release()
339
340       self._CheckWorkerCount(wp, 1)
341     finally:
342       wp.TerminateWorkers()
343       self._CheckWorkerCount(wp, 0)
344
345   def testPriorityListManyTasks(self):
346     # Tests whether all tasks are run and, since we're only using a single
347     # thread, whether everything is started in order and respects the priority
348     wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
349     try:
350       self._CheckWorkerCount(wp, 1)
351
352       ctx = ListBuilderContext()
353
354       # Use static seed for this test
355       rnd = random.Random(0)
356
357       data = {}
358       tasks = []
359       priorities = []
360       for i in range(1, 333):
361         prio = int(rnd.random() * 10)
362         tasks.append((ctx, i))
363         priorities.append(prio)
364         data.setdefault(prio, []).append((prio, i))
365
366       wp.AddManyTasks(tasks, priority=priorities)
367
368       self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
369                         [("x", ), ("y", )], priority=[1] * 5)
370
371       wp.Quiesce()
372
373       self._CheckNoTasks(wp)
374
375       # Check result
376       ctx.lock.acquire()
377       try:
378         expresult = []
379         for priority in sorted(data.keys()):
380           expresult.extend(data[priority])
381
382         self.assertEqual(expresult, ctx.result)
383       finally:
384         ctx.lock.release()
385
386       self._CheckWorkerCount(wp, 1)
387     finally:
388       wp.TerminateWorkers()
389       self._CheckWorkerCount(wp, 0)
390
391   def testPriorityListSingleTasks(self):
392     # Tests whether all tasks are run and, since we're only using a single
393     # thread, whether everything is started in order and respects the priority
394     wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
395     try:
396       self._CheckWorkerCount(wp, 1)
397
398       ctx = ListBuilderContext()
399
400       # Use static seed for this test
401       rnd = random.Random(26279)
402
403       data = {}
404       for i in range(1, 333):
405         prio = int(rnd.random() * 30)
406         wp.AddTask((ctx, i), priority=prio)
407         data.setdefault(prio, []).append(i)
408
409         # Cause some distortion
410         if i % 11 == 0:
411           time.sleep(.001)
412         if i % 41 == 0:
413           wp.Quiesce()
414
415       wp.Quiesce()
416
417       self._CheckNoTasks(wp)
418
419       # Check result
420       ctx.lock.acquire()
421       try:
422         self.assertEqual(data, ctx.prioresult)
423       finally:
424         ctx.lock.release()
425
426       self._CheckWorkerCount(wp, 1)
427     finally:
428       wp.TerminateWorkers()
429       self._CheckWorkerCount(wp, 0)
430
431   def testDeferTask(self):
432     # Tests whether all tasks are run and, since we're only using a single
433     # thread, whether everything is started in order and respects the priority
434     wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
435     try:
436       self._CheckWorkerCount(wp, 1)
437
438       ctx = DeferringTaskContext()
439
440       # Use static seed for this test
441       rnd = random.Random(14921)
442
443       data = {}
444       for i in range(1, 333):
445         ctx.lock.acquire()
446         try:
447           if i % 5 == 0:
448             ctx.samepriodefer[i] = True
449         finally:
450           ctx.lock.release()
451
452         prio = int(rnd.random() * 30)
453         wp.AddTask((ctx, i, prio), priority=50)
454         data.setdefault(prio, set()).add(i)
455
456         # Cause some distortion
457         if i % 24 == 0:
458           time.sleep(.001)
459         if i % 31 == 0:
460           wp.Quiesce()
461
462       wp.Quiesce()
463
464       self._CheckNoTasks(wp)
465
466       # Check result
467       ctx.lock.acquire()
468       try:
469         self.assertEqual(data, ctx.prioresult)
470       finally:
471         ctx.lock.release()
472
473       self._CheckWorkerCount(wp, 1)
474     finally:
475       wp.TerminateWorkers()
476       self._CheckWorkerCount(wp, 0)
477
478
479 if __name__ == "__main__":
480   testutils.GanetiTestProgram()