"""Workerpool tests"""
def testDummy(self):
- wp = workerpool.WorkerPool(3, DummyBaseWorker)
+ wp = workerpool.WorkerPool("Test", 3, DummyBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
self._CheckWorkerCount(wp, 0)
def testNoTasks(self):
- wp = workerpool.WorkerPool(3, DummyBaseWorker)
+ wp = workerpool.WorkerPool("Test", 3, DummyBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
self._CheckNoTasks(wp)
self._CheckWorkerCount(wp, 0)
def testNoTasksQuiesce(self):
- wp = workerpool.WorkerPool(3, DummyBaseWorker)
+ wp = workerpool.WorkerPool("Test", 3, DummyBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
self._CheckNoTasks(wp)
def testChecksum(self):
# Tests whether all tasks are run and, since we're only using a single
# thread, whether everything is started in order.
- wp = workerpool.WorkerPool(1, ChecksumBaseWorker)
+ wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
try:
self._CheckWorkerCount(wp, 1)