from ganeti import workerpool
+import testutils
-class DummyBaseWorker(workerpool.BaseWorker):
- def RunTask(self, text):
- pass
+class CountingContext(object):
+
+ def __init__(self):
+ self._lock = threading.Condition(threading.Lock())
+ self.done = 0
+
+ def DoneTask(self):
+ self._lock.acquire()
+ try:
+ self.done += 1
+ finally:
+ self._lock.release()
+
+ def GetDoneTasks(self):
+ self._lock.acquire()
+ try:
+ return self.done
+ finally:
+ self._lock.release()
+
+ @staticmethod
+ def UpdateChecksum(current, value):
+ return zlib.adler32(str(value), current)
+
+
+class CountingBaseWorker(workerpool.BaseWorker):
+
+ def RunTask(self, ctx, text):
+ ctx.DoneTask()
class ChecksumContext:
class TestWorkerpool(unittest.TestCase):
"""Workerpool tests"""
- def testDummy(self):
- wp = workerpool.WorkerPool(3, DummyBaseWorker)
+ def testCounting(self):
+ ctx = CountingContext()
+ wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
for i in range(10):
- wp.AddTask("Hello world %s" % i)
+ wp.AddTask((ctx, "Hello world %s" % i))
wp.Quiesce()
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
+ self.assertEquals(ctx.GetDoneTasks(), 10)
+
def testNoTasks(self):
- wp = workerpool.WorkerPool(3, DummyBaseWorker)
+ wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
self._CheckNoTasks(wp)
self._CheckWorkerCount(wp, 0)
def testNoTasksQuiesce(self):
- wp = workerpool.WorkerPool(3, DummyBaseWorker)
+ wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
self._CheckNoTasks(wp)
def testChecksum(self):
# Tests whether all tasks are run and, since we're only using a single
# thread, whether everything is started in order.
- wp = workerpool.WorkerPool(1, ChecksumBaseWorker)
+ wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
try:
self._CheckWorkerCount(wp, 1)
checksum = ChecksumContext.CHECKSUM_START
for i in range(1, 100):
checksum = ChecksumContext.UpdateChecksum(checksum, i)
- wp.AddTask(ctx, i)
+ wp.AddTask((ctx, i))
wp.Quiesce()
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
+ def testAddManyTasks(self):
+ ctx = CountingContext()
+ wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
+ try:
+ self._CheckWorkerCount(wp, 3)
+
+ wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
+ wp.AddTask((ctx, "A separate hello"))
+ wp.AddTask((ctx, "Once more, hi!"))
+ wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
+
+ wp.Quiesce()
+
+ self._CheckNoTasks(wp)
+ finally:
+ wp.TerminateWorkers()
+ self._CheckWorkerCount(wp, 0)
+
+ self.assertEquals(ctx.GetDoneTasks(), 22)
+
+ def testManyTasksSequence(self):
+ ctx = CountingContext()
+ wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
+ try:
+ self._CheckWorkerCount(wp, 3)
+ self.assertRaises(AssertionError, wp.AddManyTasks,
+ ["Hello world %s" % i for i in range(10)])
+ self.assertRaises(AssertionError, wp.AddManyTasks,
+ [i for i in range(10)])
+
+ wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
+ wp.AddTask((ctx, "A separate hello"))
+
+ wp.Quiesce()
+
+ self._CheckNoTasks(wp)
+ finally:
+ wp.TerminateWorkers()
+ self._CheckWorkerCount(wp, 0)
+
+ self.assertEquals(ctx.GetDoneTasks(), 11)
+
def _CheckNoTasks(self, wp):
wp._lock.acquire()
try:
if __name__ == '__main__':
- unittest.main()
+ testutils.GanetiTestProgram()