Merge branch 'stable-2.6'
[ganeti-local] / test / ganeti.workerpool_unittest.py
index 6386862..1ad8d74 100755 (executable)
@@ -77,6 +77,13 @@ class ChecksumContext:
 
 class ChecksumBaseWorker(workerpool.BaseWorker):
   def RunTask(self, ctx, number):
+    name = "number%s" % number
+    self.SetTaskName(name)
+
+    # This assertion needs to be checked before updating the checksum. A
+    # failing assertion will then cause the result to be wrong.
+    assert self.getName() == ("%s/%s" % (self._worker_id, name))
+
     ctx.lock.acquire()
     try:
       ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
@@ -163,6 +170,53 @@ class TestWorkerpool(unittest.TestCase):
       wp.TerminateWorkers()
       self._CheckWorkerCount(wp, 0)
 
+  def testActive(self):
+    ctx = CountingContext()
+    wp = workerpool.WorkerPool("TestActive", 5, CountingBaseWorker)
+    try:
+      self._CheckWorkerCount(wp, 5)
+      self.assertTrue(wp._active)
+
+      # Process some tasks
+      for _ in range(10):
+        wp.AddTask((ctx, None))
+
+      wp.Quiesce()
+      self._CheckNoTasks(wp)
+      self.assertEquals(ctx.GetDoneTasks(), 10)
+
+      # Repeat a few times
+      for count in range(10):
+        # Deactivate pool
+        wp.SetActive(False)
+        self._CheckNoTasks(wp)
+
+        # Queue some more tasks
+        for _ in range(10):
+          wp.AddTask((ctx, None))
+
+        for _ in range(5):
+          # Short delays to give other threads a chance to cause breakage
+          time.sleep(.01)
+          wp.AddTask((ctx, "Hello world %s" % 999))
+          self.assertFalse(wp._active)
+
+        self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15))
+
+        # Start processing again
+        wp.SetActive(True)
+        self.assertTrue(wp._active)
+
+        # Wait for tasks to finish
+        wp.Quiesce()
+        self._CheckNoTasks(wp)
+        self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15) + 15)
+
+        self._CheckWorkerCount(wp, 5)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
   def testChecksum(self):
     # Tests whether all tasks are run and, since we're only using a single
     # thread, whether everything is started in order.