4 # Copyright (C) 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the workerpool module"""
30 from ganeti import workerpool
33 class DummyBaseWorker(workerpool.BaseWorker):
34 def RunTask(self, text):
38 class ChecksumContext:
39 CHECKSUM_START = zlib.adler32("")
42 self.lock = threading.Condition(threading.Lock())
43 self.checksum = self.CHECKSUM_START
46 def UpdateChecksum(current, value):
47 return zlib.adler32(str(value), current)
50 class ChecksumBaseWorker(workerpool.BaseWorker):
51 def RunTask(self, ctx, number):
54 ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
59 class TestWorkerpool(unittest.TestCase):
60 """Workerpool tests"""
63 wp = workerpool.WorkerPool(3, DummyBaseWorker)
65 self._CheckWorkerCount(wp, 3)
68 wp.AddTask("Hello world %s" % i)
73 self._CheckWorkerCount(wp, 0)
75 def testNoTasks(self):
76 wp = workerpool.WorkerPool(3, DummyBaseWorker)
78 self._CheckWorkerCount(wp, 3)
79 self._CheckNoTasks(wp)
82 self._CheckWorkerCount(wp, 0)
84 def testNoTasksQuiesce(self):
85 wp = workerpool.WorkerPool(3, DummyBaseWorker)
87 self._CheckWorkerCount(wp, 3)
88 self._CheckNoTasks(wp)
90 self._CheckNoTasks(wp)
93 self._CheckWorkerCount(wp, 0)
95 def testChecksum(self):
96 # Tests whether all tasks are run and, since we're only using a single
97 # thread, whether everything is started in order.
98 wp = workerpool.WorkerPool(1, ChecksumBaseWorker)
100 self._CheckWorkerCount(wp, 1)
102 ctx = ChecksumContext()
103 checksum = ChecksumContext.CHECKSUM_START
104 for i in xrange(1, 100):
105 checksum = ChecksumContext.UpdateChecksum(checksum, i)
110 self._CheckNoTasks(wp)
115 self.assertEqual(checksum, ctx.checksum)
119 wp.TerminateWorkers()
120 self._CheckWorkerCount(wp, 0)
122 def _CheckNoTasks(self, wp):
125 # The task queue must be empty now
126 self.failUnless(not wp._tasks)
130 def _CheckWorkerCount(self, wp, num_workers):
133 self.assertEqual(len(wp._workers), num_workers)
138 if __name__ == '__main__':