import logging
import threading
+from ganeti import compat
+
class BaseWorker(threading.Thread, object):
"""Base worker class for worker pools.
@param tasks: list of args passed to L{BaseWorker.RunTask}
"""
+ assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
+ "Each task must be a sequence"
+
self._lock.acquire()
try:
self._WaitWhileQuiescingUnlocked()
self.assertEquals(ctx.GetDoneTasks(), 22)
+ def testManyTasksSequence(self):
+ ctx = CountingContext()
+ wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
+ try:
+ self._CheckWorkerCount(wp, 3)
+ self.assertRaises(AssertionError, wp.AddManyTasks,
+ ["Hello world %s" % i for i in range(10)])
+ self.assertRaises(AssertionError, wp.AddManyTasks,
+ [i for i in range(10)])
+
+ wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
+ wp.AddTask(ctx, "A separate hello")
+
+ wp.Quiesce()
+
+ self._CheckNoTasks(wp)
+ finally:
+ wp.TerminateWorkers()
+ self._CheckWorkerCount(wp, 0)
+
+ self.assertEquals(ctx.GetDoneTasks(), 11)
+
def _CheckNoTasks(self, wp):
wp._lock.acquire()
try: