Rename OpRenameInstance and LURenameInstance
[ganeti-local] / test / ganeti.workerpool_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the workerpool module"""
23
24 import unittest
25 import threading
26 import time
27 import sys
28 import zlib
29 import random
30
31 from ganeti import workerpool
32 from ganeti import errors
33
34 import testutils
35
36
37 class CountingContext(object):
38   def __init__(self):
39     self._lock = threading.Condition(threading.Lock())
40     self.done = 0
41
42   def DoneTask(self):
43     self._lock.acquire()
44     try:
45       self.done += 1
46     finally:
47       self._lock.release()
48
49   def GetDoneTasks(self):
50     self._lock.acquire()
51     try:
52       return self.done
53     finally:
54       self._lock.release()
55
56   @staticmethod
57   def UpdateChecksum(current, value):
58     return zlib.adler32(str(value), current)
59
60
61 class CountingBaseWorker(workerpool.BaseWorker):
62   def RunTask(self, ctx, text):
63     ctx.DoneTask()
64
65
66 class ChecksumContext:
67   CHECKSUM_START = zlib.adler32("")
68
69   def __init__(self):
70     self.lock = threading.Condition(threading.Lock())
71     self.checksum = self.CHECKSUM_START
72
73   @staticmethod
74   def UpdateChecksum(current, value):
75     return zlib.adler32(str(value), current)
76
77
78 class ChecksumBaseWorker(workerpool.BaseWorker):
79   def RunTask(self, ctx, number):
80     name = "number%s" % number
81     self.SetTaskName(name)
82
83     # This assertion needs to be checked before updating the checksum. A
84     # failing assertion will then cause the result to be wrong.
85     assert self.getName() == ("%s/%s" % (self._worker_id, name))
86
87     ctx.lock.acquire()
88     try:
89       ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
90     finally:
91       ctx.lock.release()
92
93
94 class ListBuilderContext:
95   def __init__(self):
96     self.lock = threading.Lock()
97     self.result = []
98     self.prioresult = {}
99
100
101 class ListBuilderWorker(workerpool.BaseWorker):
102   def RunTask(self, ctx, data):
103     ctx.lock.acquire()
104     try:
105       ctx.result.append((self.GetCurrentPriority(), data))
106       ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
107     finally:
108       ctx.lock.release()
109
110
111 class DeferringTaskContext:
112   def __init__(self):
113     self.lock = threading.Lock()
114     self.prioresult = {}
115     self.samepriodefer = {}
116
117
118 class DeferringWorker(workerpool.BaseWorker):
119   def RunTask(self, ctx, num, targetprio):
120     ctx.lock.acquire()
121     try:
122       if num in ctx.samepriodefer:
123         del ctx.samepriodefer[num]
124         raise workerpool.DeferTask()
125
126       if self.GetCurrentPriority() > targetprio:
127         raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)
128
129       ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
130     finally:
131       ctx.lock.release()
132
133
134 class TestWorkerpool(unittest.TestCase):
135   """Workerpool tests"""
136
137   def testCounting(self):
138     ctx = CountingContext()
139     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
140     try:
141       self._CheckWorkerCount(wp, 3)
142
143       for i in range(10):
144         wp.AddTask((ctx, "Hello world %s" % i))
145
146       wp.Quiesce()
147     finally:
148       wp.TerminateWorkers()
149       self._CheckWorkerCount(wp, 0)
150
151     self.assertEquals(ctx.GetDoneTasks(), 10)
152
153   def testNoTasks(self):
154     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
155     try:
156       self._CheckWorkerCount(wp, 3)
157       self._CheckNoTasks(wp)
158     finally:
159       wp.TerminateWorkers()
160       self._CheckWorkerCount(wp, 0)
161
162   def testNoTasksQuiesce(self):
163     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
164     try:
165       self._CheckWorkerCount(wp, 3)
166       self._CheckNoTasks(wp)
167       wp.Quiesce()
168       self._CheckNoTasks(wp)
169     finally:
170       wp.TerminateWorkers()
171       self._CheckWorkerCount(wp, 0)
172
173   def testChecksum(self):
174     # Tests whether all tasks are run and, since we're only using a single
175     # thread, whether everything is started in order.
176     wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
177     try:
178       self._CheckWorkerCount(wp, 1)
179
180       ctx = ChecksumContext()
181       checksum = ChecksumContext.CHECKSUM_START
182       for i in range(1, 100):
183         checksum = ChecksumContext.UpdateChecksum(checksum, i)
184         wp.AddTask((ctx, i))
185
186       wp.Quiesce()
187
188       self._CheckNoTasks(wp)
189
190       # Check sum
191       ctx.lock.acquire()
192       try:
193         self.assertEqual(checksum, ctx.checksum)
194       finally:
195         ctx.lock.release()
196     finally:
197       wp.TerminateWorkers()
198       self._CheckWorkerCount(wp, 0)
199
200   def testAddManyTasks(self):
201     ctx = CountingContext()
202     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
203     try:
204       self._CheckWorkerCount(wp, 3)
205
206       wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
207       wp.AddTask((ctx, "A separate hello"))
208       wp.AddTask((ctx, "Once more, hi!"))
209       wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
210
211       wp.Quiesce()
212
213       self._CheckNoTasks(wp)
214     finally:
215       wp.TerminateWorkers()
216       self._CheckWorkerCount(wp, 0)
217
218     self.assertEquals(ctx.GetDoneTasks(), 22)
219
220   def testManyTasksSequence(self):
221     ctx = CountingContext()
222     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
223     try:
224       self._CheckWorkerCount(wp, 3)
225       self.assertRaises(AssertionError, wp.AddManyTasks,
226                         ["Hello world %s" % i for i in range(10)])
227       self.assertRaises(AssertionError, wp.AddManyTasks,
228                         [i for i in range(10)])
229
230       wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
231       wp.AddTask((ctx, "A separate hello"))
232
233       wp.Quiesce()
234
235       self._CheckNoTasks(wp)
236     finally:
237       wp.TerminateWorkers()
238       self._CheckWorkerCount(wp, 0)
239
240     self.assertEquals(ctx.GetDoneTasks(), 11)
241
242   def _CheckNoTasks(self, wp):
243     wp._lock.acquire()
244     try:
245       # The task queue must be empty now
246       self.failUnless(not wp._tasks)
247     finally:
248       wp._lock.release()
249
250   def _CheckWorkerCount(self, wp, num_workers):
251     wp._lock.acquire()
252     try:
253       self.assertEqual(len(wp._workers), num_workers)
254     finally:
255       wp._lock.release()
256
257   def testPriorityChecksum(self):
258     # Tests whether all tasks are run and, since we're only using a single
259     # thread, whether everything is started in order and respects the priority
260     wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
261     try:
262       self._CheckWorkerCount(wp, 1)
263
264       ctx = ChecksumContext()
265
266       data = {}
267       tasks = []
268       priorities = []
269       for i in range(1, 333):
270         prio = i % 7
271         tasks.append((ctx, i))
272         priorities.append(prio)
273         data.setdefault(prio, []).append(i)
274
275       wp.AddManyTasks(tasks, priority=priorities)
276
277       wp.Quiesce()
278
279       self._CheckNoTasks(wp)
280
281       # Check sum
282       ctx.lock.acquire()
283       try:
284         checksum = ChecksumContext.CHECKSUM_START
285         for priority in sorted(data.keys()):
286           for i in data[priority]:
287             checksum = ChecksumContext.UpdateChecksum(checksum, i)
288
289         self.assertEqual(checksum, ctx.checksum)
290       finally:
291         ctx.lock.release()
292
293       self._CheckWorkerCount(wp, 1)
294     finally:
295       wp.TerminateWorkers()
296       self._CheckWorkerCount(wp, 0)
297
298   def testPriorityListManyTasks(self):
299     # Tests whether all tasks are run and, since we're only using a single
300     # thread, whether everything is started in order and respects the priority
301     wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
302     try:
303       self._CheckWorkerCount(wp, 1)
304
305       ctx = ListBuilderContext()
306
307       # Use static seed for this test
308       rnd = random.Random(0)
309
310       data = {}
311       tasks = []
312       priorities = []
313       for i in range(1, 333):
314         prio = int(rnd.random() * 10)
315         tasks.append((ctx, i))
316         priorities.append(prio)
317         data.setdefault(prio, []).append((prio, i))
318
319       wp.AddManyTasks(tasks, priority=priorities)
320
321       self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
322                         [("x", ), ("y", )], priority=[1] * 5)
323
324       wp.Quiesce()
325
326       self._CheckNoTasks(wp)
327
328       # Check result
329       ctx.lock.acquire()
330       try:
331         expresult = []
332         for priority in sorted(data.keys()):
333           expresult.extend(data[priority])
334
335         self.assertEqual(expresult, ctx.result)
336       finally:
337         ctx.lock.release()
338
339       self._CheckWorkerCount(wp, 1)
340     finally:
341       wp.TerminateWorkers()
342       self._CheckWorkerCount(wp, 0)
343
344   def testPriorityListSingleTasks(self):
345     # Tests whether all tasks are run and, since we're only using a single
346     # thread, whether everything is started in order and respects the priority
347     wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
348     try:
349       self._CheckWorkerCount(wp, 1)
350
351       ctx = ListBuilderContext()
352
353       # Use static seed for this test
354       rnd = random.Random(26279)
355
356       data = {}
357       for i in range(1, 333):
358         prio = int(rnd.random() * 30)
359         wp.AddTask((ctx, i), priority=prio)
360         data.setdefault(prio, []).append(i)
361
362         # Cause some distortion
363         if i % 11 == 0:
364           time.sleep(.001)
365         if i % 41 == 0:
366           wp.Quiesce()
367
368       wp.Quiesce()
369
370       self._CheckNoTasks(wp)
371
372       # Check result
373       ctx.lock.acquire()
374       try:
375         self.assertEqual(data, ctx.prioresult)
376       finally:
377         ctx.lock.release()
378
379       self._CheckWorkerCount(wp, 1)
380     finally:
381       wp.TerminateWorkers()
382       self._CheckWorkerCount(wp, 0)
383
384   def testPriorityListSingleTasks(self):
385     # Tests whether all tasks are run and, since we're only using a single
386     # thread, whether everything is started in order and respects the priority
387     wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
388     try:
389       self._CheckWorkerCount(wp, 1)
390
391       ctx = ListBuilderContext()
392
393       # Use static seed for this test
394       rnd = random.Random(26279)
395
396       data = {}
397       for i in range(1, 333):
398         prio = int(rnd.random() * 30)
399         wp.AddTask((ctx, i), priority=prio)
400         data.setdefault(prio, []).append(i)
401
402         # Cause some distortion
403         if i % 11 == 0:
404           time.sleep(.001)
405         if i % 41 == 0:
406           wp.Quiesce()
407
408       wp.Quiesce()
409
410       self._CheckNoTasks(wp)
411
412       # Check result
413       ctx.lock.acquire()
414       try:
415         self.assertEqual(data, ctx.prioresult)
416       finally:
417         ctx.lock.release()
418
419       self._CheckWorkerCount(wp, 1)
420     finally:
421       wp.TerminateWorkers()
422       self._CheckWorkerCount(wp, 0)
423
424   def testDeferTask(self):
425     # Tests whether all tasks are run and, since we're only using a single
426     # thread, whether everything is started in order and respects the priority
427     wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
428     try:
429       self._CheckWorkerCount(wp, 1)
430
431       ctx = DeferringTaskContext()
432
433       # Use static seed for this test
434       rnd = random.Random(14921)
435
436       data = {}
437       for i in range(1, 333):
438         ctx.lock.acquire()
439         try:
440           if i % 5 == 0:
441             ctx.samepriodefer[i] = True
442         finally:
443           ctx.lock.release()
444
445         prio = int(rnd.random() * 30)
446         wp.AddTask((ctx, i, prio), priority=50)
447         data.setdefault(prio, set()).add(i)
448
449         # Cause some distortion
450         if i % 24 == 0:
451           time.sleep(.001)
452         if i % 31 == 0:
453           wp.Quiesce()
454
455       wp.Quiesce()
456
457       self._CheckNoTasks(wp)
458
459       # Check result
460       ctx.lock.acquire()
461       try:
462         self.assertEqual(data, ctx.prioresult)
463       finally:
464         ctx.lock.release()
465
466       self._CheckWorkerCount(wp, 1)
467     finally:
468       wp.TerminateWorkers()
469       self._CheckWorkerCount(wp, 0)
470
471
472 if __name__ == '__main__':
473   testutils.GanetiTestProgram()