htools/Ganeti/Errors: Add ECodeTempNoRes
[ganeti-local] / test / ganeti.workerpool_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the workerpool module"""
23
24 import unittest
25 import threading
26 import time
27 import sys
28 import zlib
29 import random
30
31 from ganeti import workerpool
32 from ganeti import errors
33 from ganeti import utils
34 from ganeti import compat
35
36 import testutils
37
38
39 class CountingContext(object):
40   def __init__(self):
41     self._lock = threading.Condition(threading.Lock())
42     self.done = 0
43
44   def DoneTask(self):
45     self._lock.acquire()
46     try:
47       self.done += 1
48     finally:
49       self._lock.release()
50
51   def GetDoneTasks(self):
52     self._lock.acquire()
53     try:
54       return self.done
55     finally:
56       self._lock.release()
57
58   @staticmethod
59   def UpdateChecksum(current, value):
60     return zlib.adler32(str(value), current)
61
62
63 class CountingBaseWorker(workerpool.BaseWorker):
64   def RunTask(self, ctx, text):
65     ctx.DoneTask()
66
67
68 class ChecksumContext:
69   CHECKSUM_START = zlib.adler32("")
70
71   def __init__(self):
72     self.lock = threading.Condition(threading.Lock())
73     self.checksum = self.CHECKSUM_START
74
75   @staticmethod
76   def UpdateChecksum(current, value):
77     return zlib.adler32(str(value), current)
78
79
80 class ChecksumBaseWorker(workerpool.BaseWorker):
81   def RunTask(self, ctx, number):
82     name = "number%s" % number
83     self.SetTaskName(name)
84
85     # This assertion needs to be checked before updating the checksum. A
86     # failing assertion will then cause the result to be wrong.
87     assert self.getName() == ("%s/%s" % (self._worker_id, name))
88
89     ctx.lock.acquire()
90     try:
91       ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
92     finally:
93       ctx.lock.release()
94
95
96 class ListBuilderContext:
97   def __init__(self):
98     self.lock = threading.Lock()
99     self.result = []
100     self.prioresult = {}
101
102
103 class ListBuilderWorker(workerpool.BaseWorker):
104   def RunTask(self, ctx, data):
105     ctx.lock.acquire()
106     try:
107       ctx.result.append((self.GetCurrentPriority(), data))
108       ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
109     finally:
110       ctx.lock.release()
111
112
113 class DeferringTaskContext:
114   def __init__(self):
115     self.lock = threading.Lock()
116     self.prioresult = {}
117     self.samepriodefer = {}
118     self.num2ordertaskid = {}
119
120
121 class DeferringWorker(workerpool.BaseWorker):
122   def RunTask(self, ctx, num, targetprio):
123     ctx.lock.acquire()
124     try:
125       otilst = ctx.num2ordertaskid.setdefault(num, [])
126       otilst.append(self._GetCurrentOrderAndTaskId())
127
128       if num in ctx.samepriodefer:
129         del ctx.samepriodefer[num]
130         raise workerpool.DeferTask()
131
132       if self.GetCurrentPriority() > targetprio:
133         raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)
134
135       ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
136     finally:
137       ctx.lock.release()
138
139
140 class PriorityContext:
141   def __init__(self):
142     self.lock = threading.Lock()
143     self.result = []
144
145
146 class PriorityWorker(workerpool.BaseWorker):
147   def RunTask(self, ctx, data):
148     ctx.lock.acquire()
149     try:
150       ctx.result.append((self.GetCurrentPriority(), data))
151     finally:
152       ctx.lock.release()
153
154
155 class NotImplementedWorker(workerpool.BaseWorker):
156   def RunTask(self):
157     raise NotImplementedError
158
159
160 class TestWorkerpool(unittest.TestCase):
161   """Workerpool tests"""
162
163   def testCounting(self):
164     ctx = CountingContext()
165     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
166     try:
167       self._CheckWorkerCount(wp, 3)
168
169       for i in range(10):
170         wp.AddTask((ctx, "Hello world %s" % i))
171
172       wp.Quiesce()
173     finally:
174       wp.TerminateWorkers()
175       self._CheckWorkerCount(wp, 0)
176
177     self.assertEquals(ctx.GetDoneTasks(), 10)
178
179   def testNoTasks(self):
180     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
181     try:
182       self._CheckWorkerCount(wp, 3)
183       self._CheckNoTasks(wp)
184     finally:
185       wp.TerminateWorkers()
186       self._CheckWorkerCount(wp, 0)
187
188   def testNoTasksQuiesce(self):
189     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
190     try:
191       self._CheckWorkerCount(wp, 3)
192       self._CheckNoTasks(wp)
193       wp.Quiesce()
194       self._CheckNoTasks(wp)
195     finally:
196       wp.TerminateWorkers()
197       self._CheckWorkerCount(wp, 0)
198
199   def testActive(self):
200     ctx = CountingContext()
201     wp = workerpool.WorkerPool("TestActive", 5, CountingBaseWorker)
202     try:
203       self._CheckWorkerCount(wp, 5)
204       self.assertTrue(wp._active)
205
206       # Process some tasks
207       for _ in range(10):
208         wp.AddTask((ctx, None))
209
210       wp.Quiesce()
211       self._CheckNoTasks(wp)
212       self.assertEquals(ctx.GetDoneTasks(), 10)
213
214       # Repeat a few times
215       for count in range(10):
216         # Deactivate pool
217         wp.SetActive(False)
218         self._CheckNoTasks(wp)
219
220         # Queue some more tasks
221         for _ in range(10):
222           wp.AddTask((ctx, None))
223
224         for _ in range(5):
225           # Short delays to give other threads a chance to cause breakage
226           time.sleep(.01)
227           wp.AddTask((ctx, "Hello world %s" % 999))
228           self.assertFalse(wp._active)
229
230         self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15))
231
232         # Start processing again
233         wp.SetActive(True)
234         self.assertTrue(wp._active)
235
236         # Wait for tasks to finish
237         wp.Quiesce()
238         self._CheckNoTasks(wp)
239         self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15) + 15)
240
241         self._CheckWorkerCount(wp, 5)
242     finally:
243       wp.TerminateWorkers()
244       self._CheckWorkerCount(wp, 0)
245
246   def testChecksum(self):
247     # Tests whether all tasks are run and, since we're only using a single
248     # thread, whether everything is started in order.
249     wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
250     try:
251       self._CheckWorkerCount(wp, 1)
252
253       ctx = ChecksumContext()
254       checksum = ChecksumContext.CHECKSUM_START
255       for i in range(1, 100):
256         checksum = ChecksumContext.UpdateChecksum(checksum, i)
257         wp.AddTask((ctx, i))
258
259       wp.Quiesce()
260
261       self._CheckNoTasks(wp)
262
263       # Check sum
264       ctx.lock.acquire()
265       try:
266         self.assertEqual(checksum, ctx.checksum)
267       finally:
268         ctx.lock.release()
269     finally:
270       wp.TerminateWorkers()
271       self._CheckWorkerCount(wp, 0)
272
273   def testAddManyTasks(self):
274     ctx = CountingContext()
275     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
276     try:
277       self._CheckWorkerCount(wp, 3)
278
279       wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
280       wp.AddTask((ctx, "A separate hello"))
281       wp.AddTask((ctx, "Once more, hi!"))
282       wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
283
284       wp.Quiesce()
285
286       self._CheckNoTasks(wp)
287     finally:
288       wp.TerminateWorkers()
289       self._CheckWorkerCount(wp, 0)
290
291     self.assertEquals(ctx.GetDoneTasks(), 22)
292
293   def testManyTasksSequence(self):
294     ctx = CountingContext()
295     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
296     try:
297       self._CheckWorkerCount(wp, 3)
298       self.assertRaises(AssertionError, wp.AddManyTasks,
299                         ["Hello world %s" % i for i in range(10)])
300       self.assertRaises(AssertionError, wp.AddManyTasks,
301                         [i for i in range(10)])
302       self.assertRaises(AssertionError, wp.AddManyTasks, [], task_id=0)
303
304       wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
305       wp.AddTask((ctx, "A separate hello"))
306
307       wp.Quiesce()
308
309       self._CheckNoTasks(wp)
310     finally:
311       wp.TerminateWorkers()
312       self._CheckWorkerCount(wp, 0)
313
314     self.assertEquals(ctx.GetDoneTasks(), 11)
315
316   def _CheckNoTasks(self, wp):
317     wp._lock.acquire()
318     try:
319       # The task queue must be empty now
320       self.assertFalse(wp._tasks)
321       self.assertFalse(wp._taskdata)
322     finally:
323       wp._lock.release()
324
325   def _CheckWorkerCount(self, wp, num_workers):
326     wp._lock.acquire()
327     try:
328       self.assertEqual(len(wp._workers), num_workers)
329     finally:
330       wp._lock.release()
331
332   def testPriorityChecksum(self):
333     # Tests whether all tasks are run and, since we're only using a single
334     # thread, whether everything is started in order and respects the priority
335     wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
336     try:
337       self._CheckWorkerCount(wp, 1)
338
339       ctx = ChecksumContext()
340
341       data = {}
342       tasks = []
343       priorities = []
344       for i in range(1, 333):
345         prio = i % 7
346         tasks.append((ctx, i))
347         priorities.append(prio)
348         data.setdefault(prio, []).append(i)
349
350       wp.AddManyTasks(tasks, priority=priorities)
351
352       wp.Quiesce()
353
354       self._CheckNoTasks(wp)
355
356       # Check sum
357       ctx.lock.acquire()
358       try:
359         checksum = ChecksumContext.CHECKSUM_START
360         for priority in sorted(data.keys()):
361           for i in data[priority]:
362             checksum = ChecksumContext.UpdateChecksum(checksum, i)
363
364         self.assertEqual(checksum, ctx.checksum)
365       finally:
366         ctx.lock.release()
367
368       self._CheckWorkerCount(wp, 1)
369     finally:
370       wp.TerminateWorkers()
371       self._CheckWorkerCount(wp, 0)
372
373   def testPriorityListManyTasks(self):
374     # Tests whether all tasks are run and, since we're only using a single
375     # thread, whether everything is started in order and respects the priority
376     wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
377     try:
378       self._CheckWorkerCount(wp, 1)
379
380       ctx = ListBuilderContext()
381
382       # Use static seed for this test
383       rnd = random.Random(0)
384
385       data = {}
386       tasks = []
387       priorities = []
388       for i in range(1, 333):
389         prio = int(rnd.random() * 10)
390         tasks.append((ctx, i))
391         priorities.append(prio)
392         data.setdefault(prio, []).append((prio, i))
393
394       wp.AddManyTasks(tasks, priority=priorities)
395
396       self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
397                         [("x", ), ("y", )], priority=[1] * 5)
398       self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
399                         [("x", ), ("y", )], task_id=[1] * 5)
400
401       wp.Quiesce()
402
403       self._CheckNoTasks(wp)
404
405       # Check result
406       ctx.lock.acquire()
407       try:
408         expresult = []
409         for priority in sorted(data.keys()):
410           expresult.extend(data[priority])
411
412         self.assertEqual(expresult, ctx.result)
413       finally:
414         ctx.lock.release()
415
416       self._CheckWorkerCount(wp, 1)
417     finally:
418       wp.TerminateWorkers()
419       self._CheckWorkerCount(wp, 0)
420
421   def testPriorityListSingleTasks(self):
422     # Tests whether all tasks are run and, since we're only using a single
423     # thread, whether everything is started in order and respects the priority
424     wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
425     try:
426       self._CheckWorkerCount(wp, 1)
427
428       ctx = ListBuilderContext()
429
430       # Use static seed for this test
431       rnd = random.Random(26279)
432
433       data = {}
434       for i in range(1, 333):
435         prio = int(rnd.random() * 30)
436         wp.AddTask((ctx, i), priority=prio)
437         data.setdefault(prio, []).append(i)
438
439         # Cause some distortion
440         if i % 11 == 0:
441           time.sleep(.001)
442         if i % 41 == 0:
443           wp.Quiesce()
444
445       wp.Quiesce()
446
447       self._CheckNoTasks(wp)
448
449       # Check result
450       ctx.lock.acquire()
451       try:
452         self.assertEqual(data, ctx.prioresult)
453       finally:
454         ctx.lock.release()
455
456       self._CheckWorkerCount(wp, 1)
457     finally:
458       wp.TerminateWorkers()
459       self._CheckWorkerCount(wp, 0)
460
461   def testDeferTask(self):
462     # Tests whether all tasks are run and, since we're only using a single
463     # thread, whether everything is started in order and respects the priority
464     wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
465     try:
466       self._CheckWorkerCount(wp, 1)
467
468       ctx = DeferringTaskContext()
469
470       # Use static seed for this test
471       rnd = random.Random(14921)
472
473       data = {}
474       num2taskid = {}
475       for i in range(1, 333):
476         ctx.lock.acquire()
477         try:
478           if i % 5 == 0:
479             ctx.samepriodefer[i] = True
480         finally:
481           ctx.lock.release()
482
483         prio = int(rnd.random() * 30)
484         num2taskid[i] = 1000 * i
485         wp.AddTask((ctx, i, prio), priority=50,
486                    task_id=num2taskid[i])
487         data.setdefault(prio, set()).add(i)
488
489         # Cause some distortion
490         if i % 24 == 0:
491           time.sleep(.001)
492         if i % 31 == 0:
493           wp.Quiesce()
494
495       wp.Quiesce()
496
497       self._CheckNoTasks(wp)
498
499       # Check result
500       ctx.lock.acquire()
501       try:
502         self.assertEqual(data, ctx.prioresult)
503
504         all_order_ids = []
505
506         for (num, numordertaskid) in ctx.num2ordertaskid.items():
507           order_ids = map(compat.fst, numordertaskid)
508           self.assertFalse(utils.FindDuplicates(order_ids),
509                            msg="Order ID has been reused")
510           all_order_ids.extend(order_ids)
511
512           for task_id in map(compat.snd, numordertaskid):
513             self.assertEqual(task_id, num2taskid[num],
514                              msg=("Task %s used different task IDs" % num))
515
516         self.assertFalse(utils.FindDuplicates(all_order_ids),
517                          msg="Order ID has been reused")
518       finally:
519         ctx.lock.release()
520
521       self._CheckWorkerCount(wp, 1)
522     finally:
523       wp.TerminateWorkers()
524       self._CheckWorkerCount(wp, 0)
525
526   def testChangeTaskPriority(self):
527     wp = workerpool.WorkerPool("Test", 1, PriorityWorker)
528     try:
529       self._CheckWorkerCount(wp, 1)
530
531       ctx = PriorityContext()
532
533       # Use static seed for this test
534       rnd = random.Random(4727)
535
536       # Disable processing of tasks
537       wp.SetActive(False)
538
539       # No task ID
540       self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority,
541                         None, 0)
542
543       # Pre-generate task IDs and priorities
544       count = 100
545       task_ids = range(0, count)
546       priorities = range(200, 200 + count) * 2
547
548       rnd.shuffle(task_ids)
549       rnd.shuffle(priorities)
550
551       # Make sure there are some duplicate priorities, but not all
552       priorities[count * 2 - 10:count * 2 - 1] = \
553         priorities[count - 10: count - 1]
554
555       assert len(priorities) == 2 * count
556       assert priorities[0:(count - 1)] != priorities[count:(2 * count - 1)]
557
558       # Add some tasks; this loop consumes the first half of all previously
559       # generated priorities
560       for (idx, task_id) in enumerate(task_ids):
561         wp.AddTask((ctx, idx),
562                    priority=priorities.pop(),
563                    task_id=task_id)
564
565       self.assertEqual(len(wp._tasks), len(task_ids))
566       self.assertEqual(len(wp._taskdata), len(task_ids))
567
568       # Tasks have been added, so half of the priorities should have been
569       # consumed
570       assert len(priorities) == len(task_ids)
571
572       # Change task priority
573       expected = []
574       for ((idx, task_id), prio) in zip(enumerate(task_ids), priorities):
575         wp.ChangeTaskPriority(task_id, prio)
576         expected.append((prio, idx))
577
578       self.assertEqual(len(wp._taskdata), len(task_ids))
579
580       # Half the entries are now abandoned tasks
581       self.assertEqual(len(wp._tasks), len(task_ids) * 2)
582
583       assert len(priorities) == count
584       assert len(task_ids) == count
585
586       # Start processing
587       wp.SetActive(True)
588
589       # Wait for tasks to finish
590       wp.Quiesce()
591
592       self._CheckNoTasks(wp)
593
594       for task_id in task_ids:
595         # All tasks are done
596         self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority,
597                           task_id, 0)
598
599       # Check result
600       ctx.lock.acquire()
601       try:
602         self.assertEqual(ctx.result, sorted(expected))
603       finally:
604         ctx.lock.release()
605
606       self._CheckWorkerCount(wp, 1)
607     finally:
608       wp.TerminateWorkers()
609       self._CheckWorkerCount(wp, 0)
610
611   def testChangeTaskPriorityInteralStructures(self):
612     wp = workerpool.WorkerPool("Test", 1, NotImplementedWorker)
613     try:
614       self._CheckWorkerCount(wp, 1)
615
616       # Use static seed for this test
617       rnd = random.Random(643)
618
619       (num1, num2) = rnd.sample(range(1000), 2)
620
621       # Disable processing of tasks
622       wp.SetActive(False)
623
624       self.assertFalse(wp._tasks)
625       self.assertFalse(wp._taskdata)
626
627       # No priority or task ID
628       wp.AddTask(())
629       self.assertEqual(wp._tasks, [
630         [workerpool._DEFAULT_PRIORITY, 0, None, ()],
631         ])
632       self.assertFalse(wp._taskdata)
633
634       # No task ID
635       wp.AddTask((), priority=7413)
636       self.assertEqual(wp._tasks, [
637         [workerpool._DEFAULT_PRIORITY, 0, None, ()],
638         [7413, 1, None, ()],
639         ])
640       self.assertFalse(wp._taskdata)
641
642       # Start adding real tasks
643       wp.AddTask((), priority=10267659, task_id=num1)
644       self.assertEqual(wp._tasks, [
645         [workerpool._DEFAULT_PRIORITY, 0, None, ()],
646         [7413, 1, None, ()],
647         [10267659, 2, num1, ()],
648         ])
649       self.assertEqual(wp._taskdata, {
650         num1: [10267659, 2, num1, ()],
651         })
652
653       wp.AddTask((), priority=123, task_id=num2)
654       self.assertEqual(sorted(wp._tasks), [
655         [workerpool._DEFAULT_PRIORITY, 0, None, ()],
656         [123, 3, num2, ()],
657         [7413, 1, None, ()],
658         [10267659, 2, num1, ()],
659         ])
660       self.assertEqual(wp._taskdata, {
661         num1: [10267659, 2, num1, ()],
662         num2: [123, 3, num2, ()],
663         })
664
665       wp.ChangeTaskPriority(num1, 100)
666       self.assertEqual(sorted(wp._tasks), [
667         [workerpool._DEFAULT_PRIORITY, 0, None, ()],
668         [100, 2, num1, ()],
669         [123, 3, num2, ()],
670         [7413, 1, None, ()],
671         [10267659, 2, num1, None],
672         ])
673       self.assertEqual(wp._taskdata, {
674         num1: [100, 2, num1, ()],
675         num2: [123, 3, num2, ()],
676         })
677
678       wp.ChangeTaskPriority(num2, 91337)
679       self.assertEqual(sorted(wp._tasks), [
680         [workerpool._DEFAULT_PRIORITY, 0, None, ()],
681         [100, 2, num1, ()],
682         [123, 3, num2, None],
683         [7413, 1, None, ()],
684         [91337, 3, num2, ()],
685         [10267659, 2, num1, None],
686         ])
687       self.assertEqual(wp._taskdata, {
688         num1: [100, 2, num1, ()],
689         num2: [91337, 3, num2, ()],
690         })
691
692       wp.ChangeTaskPriority(num1, 10139)
693       self.assertEqual(sorted(wp._tasks), [
694         [workerpool._DEFAULT_PRIORITY, 0, None, ()],
695         [100, 2, num1, None],
696         [123, 3, num2, None],
697         [7413, 1, None, ()],
698         [10139, 2, num1, ()],
699         [91337, 3, num2, ()],
700         [10267659, 2, num1, None],
701         ])
702       self.assertEqual(wp._taskdata, {
703         num1: [10139, 2, num1, ()],
704         num2: [91337, 3, num2, ()],
705         })
706
707       # Change to the same priority once again
708       wp.ChangeTaskPriority(num1, 10139)
709       self.assertEqual(sorted(wp._tasks), [
710         [workerpool._DEFAULT_PRIORITY, 0, None, ()],
711         [100, 2, num1, None],
712         [123, 3, num2, None],
713         [7413, 1, None, ()],
714         [10139, 2, num1, None],
715         [10139, 2, num1, ()],
716         [91337, 3, num2, ()],
717         [10267659, 2, num1, None],
718         ])
719       self.assertEqual(wp._taskdata, {
720         num1: [10139, 2, num1, ()],
721         num2: [91337, 3, num2, ()],
722         })
723
724       self._CheckWorkerCount(wp, 1)
725     finally:
726       wp.TerminateWorkers()
727       self._CheckWorkerCount(wp, 0)
728
729
730 if __name__ == "__main__":
731   testutils.GanetiTestProgram()