Modify gnt-node add to call external script
[ganeti-local] / test / ganeti.workerpool_unittest.py
index 7582741..586cc5e 100755 (executable)
@@ -29,10 +29,37 @@ import zlib
 
 from ganeti import workerpool
 
+import testutils
 
-class DummyBaseWorker(workerpool.BaseWorker):
-  def RunTask(self, text):
-    pass
+class CountingContext(object):
+
+  def __init__(self):
+    self._lock = threading.Condition(threading.Lock())
+    self.done = 0
+
+  def DoneTask(self):
+    self._lock.acquire()
+    try:
+      self.done += 1
+    finally:
+      self._lock.release()
+
+  def GetDoneTasks(self):
+    self._lock.acquire()
+    try:
+      return self.done
+    finally:
+      self._lock.release()
+
+  @staticmethod
+  def UpdateChecksum(current, value):
+    return zlib.adler32(str(value), current)
+
+
+class CountingBaseWorker(workerpool.BaseWorker):
+
+  def RunTask(self, ctx, text):
+    ctx.DoneTask()
 
 
 class ChecksumContext:
@@ -59,21 +86,24 @@ class ChecksumBaseWorker(workerpool.BaseWorker):
 class TestWorkerpool(unittest.TestCase):
   """Workerpool tests"""
 
-  def testDummy(self):
-    wp = workerpool.WorkerPool(3, DummyBaseWorker)
+  def testCounting(self):
+    ctx = CountingContext()
+    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
     try:
       self._CheckWorkerCount(wp, 3)
 
       for i in range(10):
-        wp.AddTask("Hello world %s" % i)
+        wp.AddTask((ctx, "Hello world %s" % i))
 
       wp.Quiesce()
     finally:
       wp.TerminateWorkers()
       self._CheckWorkerCount(wp, 0)
 
+    self.assertEquals(ctx.GetDoneTasks(), 10)
+
   def testNoTasks(self):
-    wp = workerpool.WorkerPool(3, DummyBaseWorker)
+    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
     try:
       self._CheckWorkerCount(wp, 3)
       self._CheckNoTasks(wp)
@@ -82,7 +112,7 @@ class TestWorkerpool(unittest.TestCase):
       self._CheckWorkerCount(wp, 0)
 
   def testNoTasksQuiesce(self):
-    wp = workerpool.WorkerPool(3, DummyBaseWorker)
+    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
     try:
       self._CheckWorkerCount(wp, 3)
       self._CheckNoTasks(wp)
@@ -95,7 +125,7 @@ class TestWorkerpool(unittest.TestCase):
   def testChecksum(self):
     # Tests whether all tasks are run and, since we're only using a single
     # thread, whether everything is started in order.
-    wp = workerpool.WorkerPool(1, ChecksumBaseWorker)
+    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
     try:
       self._CheckWorkerCount(wp, 1)
 
@@ -103,7 +133,7 @@ class TestWorkerpool(unittest.TestCase):
       checksum = ChecksumContext.CHECKSUM_START
       for i in range(1, 100):
         checksum = ChecksumContext.UpdateChecksum(checksum, i)
-        wp.AddTask(ctx, i)
+        wp.AddTask((ctx, i))
 
       wp.Quiesce()
 
@@ -119,6 +149,48 @@ class TestWorkerpool(unittest.TestCase):
       wp.TerminateWorkers()
       self._CheckWorkerCount(wp, 0)
 
+  def testAddManyTasks(self):
+    ctx = CountingContext()
+    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
+    try:
+      self._CheckWorkerCount(wp, 3)
+
+      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
+      wp.AddTask((ctx, "A separate hello"))
+      wp.AddTask((ctx, "Once more, hi!"))
+      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
+
+      wp.Quiesce()
+
+      self._CheckNoTasks(wp)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
+    self.assertEquals(ctx.GetDoneTasks(), 22)
+
+  def testManyTasksSequence(self):
+    ctx = CountingContext()
+    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
+    try:
+      self._CheckWorkerCount(wp, 3)
+      self.assertRaises(AssertionError, wp.AddManyTasks,
+                        ["Hello world %s" % i for i in range(10)])
+      self.assertRaises(AssertionError, wp.AddManyTasks,
+                        [i for i in range(10)])
+
+      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
+      wp.AddTask((ctx, "A separate hello"))
+
+      wp.Quiesce()
+
+      self._CheckNoTasks(wp)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
+    self.assertEquals(ctx.GetDoneTasks(), 11)
+
   def _CheckNoTasks(self, wp):
     wp._lock.acquire()
     try:
@@ -136,4 +208,4 @@ class TestWorkerpool(unittest.TestCase):
 
 
 if __name__ == '__main__':
-  unittest.main()
+  testutils.GanetiTestProgram()