import-export daemon: Allow changing compression method
[ganeti-local] / test / ganeti.workerpool_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the workerpool module"""
23
24 import unittest
25 import threading
26 import time
27 import sys
28 import zlib
29
30 from ganeti import workerpool
31
32 import testutils
33
34
35 class DummyBaseWorker(workerpool.BaseWorker):
36   def RunTask(self, text):
37     pass
38
39
40 class ChecksumContext:
41   CHECKSUM_START = zlib.adler32("")
42
43   def __init__(self):
44     self.lock = threading.Condition(threading.Lock())
45     self.checksum = self.CHECKSUM_START
46
47   @staticmethod
48   def UpdateChecksum(current, value):
49     return zlib.adler32(str(value), current)
50
51
52 class ChecksumBaseWorker(workerpool.BaseWorker):
53   def RunTask(self, ctx, number):
54     ctx.lock.acquire()
55     try:
56       ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
57     finally:
58       ctx.lock.release()
59
60
61 class TestWorkerpool(unittest.TestCase):
62   """Workerpool tests"""
63
64   def testDummy(self):
65     wp = workerpool.WorkerPool("Test", 3, DummyBaseWorker)
66     try:
67       self._CheckWorkerCount(wp, 3)
68
69       for i in range(10):
70         wp.AddTask("Hello world %s" % i)
71
72       wp.Quiesce()
73     finally:
74       wp.TerminateWorkers()
75       self._CheckWorkerCount(wp, 0)
76
77   def testNoTasks(self):
78     wp = workerpool.WorkerPool("Test", 3, DummyBaseWorker)
79     try:
80       self._CheckWorkerCount(wp, 3)
81       self._CheckNoTasks(wp)
82     finally:
83       wp.TerminateWorkers()
84       self._CheckWorkerCount(wp, 0)
85
86   def testNoTasksQuiesce(self):
87     wp = workerpool.WorkerPool("Test", 3, DummyBaseWorker)
88     try:
89       self._CheckWorkerCount(wp, 3)
90       self._CheckNoTasks(wp)
91       wp.Quiesce()
92       self._CheckNoTasks(wp)
93     finally:
94       wp.TerminateWorkers()
95       self._CheckWorkerCount(wp, 0)
96
97   def testChecksum(self):
98     # Tests whether all tasks are run and, since we're only using a single
99     # thread, whether everything is started in order.
100     wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
101     try:
102       self._CheckWorkerCount(wp, 1)
103
104       ctx = ChecksumContext()
105       checksum = ChecksumContext.CHECKSUM_START
106       for i in range(1, 100):
107         checksum = ChecksumContext.UpdateChecksum(checksum, i)
108         wp.AddTask(ctx, i)
109
110       wp.Quiesce()
111
112       self._CheckNoTasks(wp)
113
114       # Check sum
115       ctx.lock.acquire()
116       try:
117         self.assertEqual(checksum, ctx.checksum)
118       finally:
119         ctx.lock.release()
120     finally:
121       wp.TerminateWorkers()
122       self._CheckWorkerCount(wp, 0)
123
124   def _CheckNoTasks(self, wp):
125     wp._lock.acquire()
126     try:
127       # The task queue must be empty now
128       self.failUnless(not wp._tasks)
129     finally:
130       wp._lock.release()
131
132   def _CheckWorkerCount(self, wp, num_workers):
133     wp._lock.acquire()
134     try:
135       self.assertEqual(len(wp._workers), num_workers)
136     finally:
137       wp._lock.release()
138
139
140 if __name__ == '__main__':
141   testutils.GanetiTestProgram()