4 # Copyright (C) 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the workerpool module"""
30 from ganeti import workerpool
35 class DummyBaseWorker(workerpool.BaseWorker):
36 def RunTask(self, text):
40 class ChecksumContext:
41 CHECKSUM_START = zlib.adler32("")
44 self.lock = threading.Condition(threading.Lock())
45 self.checksum = self.CHECKSUM_START
48 def UpdateChecksum(current, value):
49 return zlib.adler32(str(value), current)
52 class ChecksumBaseWorker(workerpool.BaseWorker):
53 def RunTask(self, ctx, number):
56 ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
61 class TestWorkerpool(unittest.TestCase):
62 """Workerpool tests"""
65 wp = workerpool.WorkerPool("Test", 3, DummyBaseWorker)
67 self._CheckWorkerCount(wp, 3)
70 wp.AddTask("Hello world %s" % i)
75 self._CheckWorkerCount(wp, 0)
77 def testNoTasks(self):
78 wp = workerpool.WorkerPool("Test", 3, DummyBaseWorker)
80 self._CheckWorkerCount(wp, 3)
81 self._CheckNoTasks(wp)
84 self._CheckWorkerCount(wp, 0)
86 def testNoTasksQuiesce(self):
87 wp = workerpool.WorkerPool("Test", 3, DummyBaseWorker)
89 self._CheckWorkerCount(wp, 3)
90 self._CheckNoTasks(wp)
92 self._CheckNoTasks(wp)
95 self._CheckWorkerCount(wp, 0)
97 def testChecksum(self):
98 # Tests whether all tasks are run and, since we're only using a single
99 # thread, whether everything is started in order.
100 wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
102 self._CheckWorkerCount(wp, 1)
104 ctx = ChecksumContext()
105 checksum = ChecksumContext.CHECKSUM_START
106 for i in range(1, 100):
107 checksum = ChecksumContext.UpdateChecksum(checksum, i)
112 self._CheckNoTasks(wp)
117 self.assertEqual(checksum, ctx.checksum)
121 wp.TerminateWorkers()
122 self._CheckWorkerCount(wp, 0)
124 def _CheckNoTasks(self, wp):
127 # The task queue must be empty now
128 self.failUnless(not wp._tasks)
132 def _CheckWorkerCount(self, wp, num_workers):
135 self.assertEqual(len(wp._workers), num_workers)
140 if __name__ == '__main__':
141 testutils.GanetiTestProgram()