bash_completion: Enable extglob while parsing file
[ganeti-local] / test / ganeti.workerpool_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the workerpool module"""
23
24 import unittest
25 import threading
26 import time
27 import sys
28 import zlib
29 import random
30
31 from ganeti import workerpool
32 from ganeti import errors
33
34 import testutils
35
36
37 class CountingContext(object):
38   def __init__(self):
39     self._lock = threading.Condition(threading.Lock())
40     self.done = 0
41
42   def DoneTask(self):
43     self._lock.acquire()
44     try:
45       self.done += 1
46     finally:
47       self._lock.release()
48
49   def GetDoneTasks(self):
50     self._lock.acquire()
51     try:
52       return self.done
53     finally:
54       self._lock.release()
55
56   @staticmethod
57   def UpdateChecksum(current, value):
58     return zlib.adler32(str(value), current)
59
60
61 class CountingBaseWorker(workerpool.BaseWorker):
62   def RunTask(self, ctx, text):
63     ctx.DoneTask()
64
65
66 class ChecksumContext:
67   CHECKSUM_START = zlib.adler32("")
68
69   def __init__(self):
70     self.lock = threading.Condition(threading.Lock())
71     self.checksum = self.CHECKSUM_START
72
73   @staticmethod
74   def UpdateChecksum(current, value):
75     return zlib.adler32(str(value), current)
76
77
78 class ChecksumBaseWorker(workerpool.BaseWorker):
79   def RunTask(self, ctx, number):
80     name = "number%s" % number
81     self.SetTaskName(name)
82
83     # This assertion needs to be checked before updating the checksum. A
84     # failing assertion will then cause the result to be wrong.
85     assert self.getName() == ("%s/%s" % (self._worker_id, name))
86
87     ctx.lock.acquire()
88     try:
89       ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
90     finally:
91       ctx.lock.release()
92
93
94 class ListBuilderContext:
95   def __init__(self):
96     self.lock = threading.Lock()
97     self.result = []
98     self.prioresult = {}
99
100
101 class ListBuilderWorker(workerpool.BaseWorker):
102   def RunTask(self, ctx, data):
103     ctx.lock.acquire()
104     try:
105       ctx.result.append((self.GetCurrentPriority(), data))
106       ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
107     finally:
108       ctx.lock.release()
109
110
111 class DeferringTaskContext:
112   def __init__(self):
113     self.lock = threading.Lock()
114     self.prioresult = {}
115     self.samepriodefer = {}
116
117
118 class DeferringWorker(workerpool.BaseWorker):
119   def RunTask(self, ctx, num, targetprio):
120     ctx.lock.acquire()
121     try:
122       if num in ctx.samepriodefer:
123         del ctx.samepriodefer[num]
124         raise workerpool.DeferTask()
125
126       if self.GetCurrentPriority() > targetprio:
127         raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)
128
129       ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
130     finally:
131       ctx.lock.release()
132
133
134 class TestWorkerpool(unittest.TestCase):
135   """Workerpool tests"""
136
137   def testCounting(self):
138     ctx = CountingContext()
139     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
140     try:
141       self._CheckWorkerCount(wp, 3)
142
143       for i in range(10):
144         wp.AddTask((ctx, "Hello world %s" % i))
145
146       wp.Quiesce()
147     finally:
148       wp.TerminateWorkers()
149       self._CheckWorkerCount(wp, 0)
150
151     self.assertEquals(ctx.GetDoneTasks(), 10)
152
153   def testNoTasks(self):
154     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
155     try:
156       self._CheckWorkerCount(wp, 3)
157       self._CheckNoTasks(wp)
158     finally:
159       wp.TerminateWorkers()
160       self._CheckWorkerCount(wp, 0)
161
162   def testNoTasksQuiesce(self):
163     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
164     try:
165       self._CheckWorkerCount(wp, 3)
166       self._CheckNoTasks(wp)
167       wp.Quiesce()
168       self._CheckNoTasks(wp)
169     finally:
170       wp.TerminateWorkers()
171       self._CheckWorkerCount(wp, 0)
172
173   def testActive(self):
174     ctx = CountingContext()
175     wp = workerpool.WorkerPool("TestActive", 5, CountingBaseWorker)
176     try:
177       self._CheckWorkerCount(wp, 5)
178       self.assertTrue(wp._active)
179
180       # Process some tasks
181       for _ in range(10):
182         wp.AddTask((ctx, None))
183
184       wp.Quiesce()
185       self._CheckNoTasks(wp)
186       self.assertEquals(ctx.GetDoneTasks(), 10)
187
188       # Repeat a few times
189       for count in range(10):
190         # Deactivate pool
191         wp.SetActive(False)
192         self._CheckNoTasks(wp)
193
194         # Queue some more tasks
195         for _ in range(10):
196           wp.AddTask((ctx, None))
197
198         for _ in range(5):
199           # Short delays to give other threads a chance to cause breakage
200           time.sleep(.01)
201           wp.AddTask((ctx, "Hello world %s" % 999))
202           self.assertFalse(wp._active)
203
204         self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15))
205
206         # Start processing again
207         wp.SetActive(True)
208         self.assertTrue(wp._active)
209
210         # Wait for tasks to finish
211         wp.Quiesce()
212         self._CheckNoTasks(wp)
213         self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15) + 15)
214
215         self._CheckWorkerCount(wp, 5)
216     finally:
217       wp.TerminateWorkers()
218       self._CheckWorkerCount(wp, 0)
219
220   def testChecksum(self):
221     # Tests whether all tasks are run and, since we're only using a single
222     # thread, whether everything is started in order.
223     wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
224     try:
225       self._CheckWorkerCount(wp, 1)
226
227       ctx = ChecksumContext()
228       checksum = ChecksumContext.CHECKSUM_START
229       for i in range(1, 100):
230         checksum = ChecksumContext.UpdateChecksum(checksum, i)
231         wp.AddTask((ctx, i))
232
233       wp.Quiesce()
234
235       self._CheckNoTasks(wp)
236
237       # Check sum
238       ctx.lock.acquire()
239       try:
240         self.assertEqual(checksum, ctx.checksum)
241       finally:
242         ctx.lock.release()
243     finally:
244       wp.TerminateWorkers()
245       self._CheckWorkerCount(wp, 0)
246
247   def testAddManyTasks(self):
248     ctx = CountingContext()
249     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
250     try:
251       self._CheckWorkerCount(wp, 3)
252
253       wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
254       wp.AddTask((ctx, "A separate hello"))
255       wp.AddTask((ctx, "Once more, hi!"))
256       wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
257
258       wp.Quiesce()
259
260       self._CheckNoTasks(wp)
261     finally:
262       wp.TerminateWorkers()
263       self._CheckWorkerCount(wp, 0)
264
265     self.assertEquals(ctx.GetDoneTasks(), 22)
266
267   def testManyTasksSequence(self):
268     ctx = CountingContext()
269     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
270     try:
271       self._CheckWorkerCount(wp, 3)
272       self.assertRaises(AssertionError, wp.AddManyTasks,
273                         ["Hello world %s" % i for i in range(10)])
274       self.assertRaises(AssertionError, wp.AddManyTasks,
275                         [i for i in range(10)])
276
277       wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
278       wp.AddTask((ctx, "A separate hello"))
279
280       wp.Quiesce()
281
282       self._CheckNoTasks(wp)
283     finally:
284       wp.TerminateWorkers()
285       self._CheckWorkerCount(wp, 0)
286
287     self.assertEquals(ctx.GetDoneTasks(), 11)
288
289   def _CheckNoTasks(self, wp):
290     wp._lock.acquire()
291     try:
292       # The task queue must be empty now
293       self.failUnless(not wp._tasks)
294     finally:
295       wp._lock.release()
296
297   def _CheckWorkerCount(self, wp, num_workers):
298     wp._lock.acquire()
299     try:
300       self.assertEqual(len(wp._workers), num_workers)
301     finally:
302       wp._lock.release()
303
304   def testPriorityChecksum(self):
305     # Tests whether all tasks are run and, since we're only using a single
306     # thread, whether everything is started in order and respects the priority
307     wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
308     try:
309       self._CheckWorkerCount(wp, 1)
310
311       ctx = ChecksumContext()
312
313       data = {}
314       tasks = []
315       priorities = []
316       for i in range(1, 333):
317         prio = i % 7
318         tasks.append((ctx, i))
319         priorities.append(prio)
320         data.setdefault(prio, []).append(i)
321
322       wp.AddManyTasks(tasks, priority=priorities)
323
324       wp.Quiesce()
325
326       self._CheckNoTasks(wp)
327
328       # Check sum
329       ctx.lock.acquire()
330       try:
331         checksum = ChecksumContext.CHECKSUM_START
332         for priority in sorted(data.keys()):
333           for i in data[priority]:
334             checksum = ChecksumContext.UpdateChecksum(checksum, i)
335
336         self.assertEqual(checksum, ctx.checksum)
337       finally:
338         ctx.lock.release()
339
340       self._CheckWorkerCount(wp, 1)
341     finally:
342       wp.TerminateWorkers()
343       self._CheckWorkerCount(wp, 0)
344
345   def testPriorityListManyTasks(self):
346     # Tests whether all tasks are run and, since we're only using a single
347     # thread, whether everything is started in order and respects the priority
348     wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
349     try:
350       self._CheckWorkerCount(wp, 1)
351
352       ctx = ListBuilderContext()
353
354       # Use static seed for this test
355       rnd = random.Random(0)
356
357       data = {}
358       tasks = []
359       priorities = []
360       for i in range(1, 333):
361         prio = int(rnd.random() * 10)
362         tasks.append((ctx, i))
363         priorities.append(prio)
364         data.setdefault(prio, []).append((prio, i))
365
366       wp.AddManyTasks(tasks, priority=priorities)
367
368       self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
369                         [("x", ), ("y", )], priority=[1] * 5)
370
371       wp.Quiesce()
372
373       self._CheckNoTasks(wp)
374
375       # Check result
376       ctx.lock.acquire()
377       try:
378         expresult = []
379         for priority in sorted(data.keys()):
380           expresult.extend(data[priority])
381
382         self.assertEqual(expresult, ctx.result)
383       finally:
384         ctx.lock.release()
385
386       self._CheckWorkerCount(wp, 1)
387     finally:
388       wp.TerminateWorkers()
389       self._CheckWorkerCount(wp, 0)
390
391   def testPriorityListSingleTasks(self):
392     # Tests whether all tasks are run and, since we're only using a single
393     # thread, whether everything is started in order and respects the priority
394     wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
395     try:
396       self._CheckWorkerCount(wp, 1)
397
398       ctx = ListBuilderContext()
399
400       # Use static seed for this test
401       rnd = random.Random(26279)
402
403       data = {}
404       for i in range(1, 333):
405         prio = int(rnd.random() * 30)
406         wp.AddTask((ctx, i), priority=prio)
407         data.setdefault(prio, []).append(i)
408
409         # Cause some distortion
410         if i % 11 == 0:
411           time.sleep(.001)
412         if i % 41 == 0:
413           wp.Quiesce()
414
415       wp.Quiesce()
416
417       self._CheckNoTasks(wp)
418
419       # Check result
420       ctx.lock.acquire()
421       try:
422         self.assertEqual(data, ctx.prioresult)
423       finally:
424         ctx.lock.release()
425
426       self._CheckWorkerCount(wp, 1)
427     finally:
428       wp.TerminateWorkers()
429       self._CheckWorkerCount(wp, 0)
430
431   def testPriorityListSingleTasks(self):
432     # Tests whether all tasks are run and, since we're only using a single
433     # thread, whether everything is started in order and respects the priority
434     wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
435     try:
436       self._CheckWorkerCount(wp, 1)
437
438       ctx = ListBuilderContext()
439
440       # Use static seed for this test
441       rnd = random.Random(26279)
442
443       data = {}
444       for i in range(1, 333):
445         prio = int(rnd.random() * 30)
446         wp.AddTask((ctx, i), priority=prio)
447         data.setdefault(prio, []).append(i)
448
449         # Cause some distortion
450         if i % 11 == 0:
451           time.sleep(.001)
452         if i % 41 == 0:
453           wp.Quiesce()
454
455       wp.Quiesce()
456
457       self._CheckNoTasks(wp)
458
459       # Check result
460       ctx.lock.acquire()
461       try:
462         self.assertEqual(data, ctx.prioresult)
463       finally:
464         ctx.lock.release()
465
466       self._CheckWorkerCount(wp, 1)
467     finally:
468       wp.TerminateWorkers()
469       self._CheckWorkerCount(wp, 0)
470
471   def testDeferTask(self):
472     # Tests whether all tasks are run and, since we're only using a single
473     # thread, whether everything is started in order and respects the priority
474     wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
475     try:
476       self._CheckWorkerCount(wp, 1)
477
478       ctx = DeferringTaskContext()
479
480       # Use static seed for this test
481       rnd = random.Random(14921)
482
483       data = {}
484       for i in range(1, 333):
485         ctx.lock.acquire()
486         try:
487           if i % 5 == 0:
488             ctx.samepriodefer[i] = True
489         finally:
490           ctx.lock.release()
491
492         prio = int(rnd.random() * 30)
493         wp.AddTask((ctx, i, prio), priority=50)
494         data.setdefault(prio, set()).add(i)
495
496         # Cause some distortion
497         if i % 24 == 0:
498           time.sleep(.001)
499         if i % 31 == 0:
500           wp.Quiesce()
501
502       wp.Quiesce()
503
504       self._CheckNoTasks(wp)
505
506       # Check result
507       ctx.lock.acquire()
508       try:
509         self.assertEqual(data, ctx.prioresult)
510       finally:
511         ctx.lock.release()
512
513       self._CheckWorkerCount(wp, 1)
514     finally:
515       wp.TerminateWorkers()
516       self._CheckWorkerCount(wp, 0)
517
518
519 if __name__ == '__main__':
520   testutils.GanetiTestProgram()