locking: Change locking order, move NAL after instances
[ganeti-local] / test / ganeti.workerpool_unittest.py
index a91858e..8f35a69 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/python
 #
 
-# Copyright (C) 2008 Google Inc.
+# Copyright (C) 2008, 2009, 2010 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -26,15 +26,43 @@ import threading
 import time
 import sys
 import zlib
+import random
 
 from ganeti import workerpool
+from ganeti import errors
+from ganeti import utils
+from ganeti import compat
 
 import testutils
 
 
-class DummyBaseWorker(workerpool.BaseWorker):
-  def RunTask(self, text):
-    pass
+class CountingContext(object):
+  def __init__(self):
+    self._lock = threading.Condition(threading.Lock())
+    self.done = 0
+
+  def DoneTask(self):
+    self._lock.acquire()
+    try:
+      self.done += 1
+    finally:
+      self._lock.release()
+
+  def GetDoneTasks(self):
+    self._lock.acquire()
+    try:
+      return self.done
+    finally:
+      self._lock.release()
+
+  @staticmethod
+  def UpdateChecksum(current, value):
+    return zlib.adler32(str(value), current)
+
+
+class CountingBaseWorker(workerpool.BaseWorker):
+  def RunTask(self, ctx, text):
+    ctx.DoneTask()
 
 
 class ChecksumContext:
@@ -51,6 +79,13 @@ class ChecksumContext:
 
 class ChecksumBaseWorker(workerpool.BaseWorker):
   def RunTask(self, ctx, number):
+    name = "number%s" % number
+    self.SetTaskName(name)
+
+    # This assertion needs to be checked before updating the checksum. A
+    # failing assertion will then cause the result to be wrong.
+    assert self.getName() == ("%s/%s" % (self._worker_id, name))
+
     ctx.lock.acquire()
     try:
       ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
@@ -58,24 +93,91 @@ class ChecksumBaseWorker(workerpool.BaseWorker):
       ctx.lock.release()
 
 
+class ListBuilderContext:
+  def __init__(self):
+    self.lock = threading.Lock()
+    self.result = []
+    self.prioresult = {}
+
+
+class ListBuilderWorker(workerpool.BaseWorker):
+  def RunTask(self, ctx, data):
+    ctx.lock.acquire()
+    try:
+      ctx.result.append((self.GetCurrentPriority(), data))
+      ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
+    finally:
+      ctx.lock.release()
+
+
+class DeferringTaskContext:
+  def __init__(self):
+    self.lock = threading.Lock()
+    self.prioresult = {}
+    self.samepriodefer = {}
+    self.num2ordertaskid = {}
+
+
+class DeferringWorker(workerpool.BaseWorker):
+  def RunTask(self, ctx, num, targetprio):
+    ctx.lock.acquire()
+    try:
+      otilst = ctx.num2ordertaskid.setdefault(num, [])
+      otilst.append(self._GetCurrentOrderAndTaskId())
+
+      if num in ctx.samepriodefer:
+        del ctx.samepriodefer[num]
+        raise workerpool.DeferTask()
+
+      if self.GetCurrentPriority() > targetprio:
+        raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)
+
+      ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
+    finally:
+      ctx.lock.release()
+
+
+class PriorityContext:
+  def __init__(self):
+    self.lock = threading.Lock()
+    self.result = []
+
+
+class PriorityWorker(workerpool.BaseWorker):
+  def RunTask(self, ctx, data):
+    ctx.lock.acquire()
+    try:
+      ctx.result.append((self.GetCurrentPriority(), data))
+    finally:
+      ctx.lock.release()
+
+
+class NotImplementedWorker(workerpool.BaseWorker):
+  def RunTask(self):
+    raise NotImplementedError
+
+
 class TestWorkerpool(unittest.TestCase):
   """Workerpool tests"""
 
-  def testDummy(self):
-    wp = workerpool.WorkerPool(3, DummyBaseWorker)
+  def testCounting(self):
+    ctx = CountingContext()
+    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
     try:
       self._CheckWorkerCount(wp, 3)
 
       for i in range(10):
-        wp.AddTask("Hello world %s" % i)
+        wp.AddTask((ctx, "Hello world %s" % i))
 
       wp.Quiesce()
     finally:
       wp.TerminateWorkers()
       self._CheckWorkerCount(wp, 0)
 
+    self.assertEquals(ctx.GetDoneTasks(), 10)
+
   def testNoTasks(self):
-    wp = workerpool.WorkerPool(3, DummyBaseWorker)
+    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
     try:
       self._CheckWorkerCount(wp, 3)
       self._CheckNoTasks(wp)
@@ -84,7 +186,7 @@ class TestWorkerpool(unittest.TestCase):
       self._CheckWorkerCount(wp, 0)
 
   def testNoTasksQuiesce(self):
-    wp = workerpool.WorkerPool(3, DummyBaseWorker)
+    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
     try:
       self._CheckWorkerCount(wp, 3)
       self._CheckNoTasks(wp)
@@ -94,10 +196,57 @@ class TestWorkerpool(unittest.TestCase):
       wp.TerminateWorkers()
       self._CheckWorkerCount(wp, 0)
 
+  def testActive(self):
+    ctx = CountingContext()
+    wp = workerpool.WorkerPool("TestActive", 5, CountingBaseWorker)
+    try:
+      self._CheckWorkerCount(wp, 5)
+      self.assertTrue(wp._active)
+
+      # Process some tasks
+      for _ in range(10):
+        wp.AddTask((ctx, None))
+
+      wp.Quiesce()
+      self._CheckNoTasks(wp)
+      self.assertEquals(ctx.GetDoneTasks(), 10)
+
+      # Repeat a few times
+      for count in range(10):
+        # Deactivate pool
+        wp.SetActive(False)
+        self._CheckNoTasks(wp)
+
+        # Queue some more tasks
+        for _ in range(10):
+          wp.AddTask((ctx, None))
+
+        for _ in range(5):
+          # Short delays to give other threads a chance to cause breakage
+          time.sleep(.01)
+          wp.AddTask((ctx, "Hello world %s" % 999))
+          self.assertFalse(wp._active)
+
+        self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15))
+
+        # Start processing again
+        wp.SetActive(True)
+        self.assertTrue(wp._active)
+
+        # Wait for tasks to finish
+        wp.Quiesce()
+        self._CheckNoTasks(wp)
+        self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15) + 15)
+
+        self._CheckWorkerCount(wp, 5)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
   def testChecksum(self):
     # Tests whether all tasks are run and, since we're only using a single
     # thread, whether everything is started in order.
-    wp = workerpool.WorkerPool(1, ChecksumBaseWorker)
+    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
     try:
       self._CheckWorkerCount(wp, 1)
 
@@ -105,7 +254,7 @@ class TestWorkerpool(unittest.TestCase):
       checksum = ChecksumContext.CHECKSUM_START
       for i in range(1, 100):
         checksum = ChecksumContext.UpdateChecksum(checksum, i)
-        wp.AddTask(ctx, i)
+        wp.AddTask((ctx, i))
 
       wp.Quiesce()
 
@@ -121,11 +270,55 @@ class TestWorkerpool(unittest.TestCase):
       wp.TerminateWorkers()
       self._CheckWorkerCount(wp, 0)
 
+  def testAddManyTasks(self):
+    ctx = CountingContext()
+    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
+    try:
+      self._CheckWorkerCount(wp, 3)
+
+      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
+      wp.AddTask((ctx, "A separate hello"))
+      wp.AddTask((ctx, "Once more, hi!"))
+      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
+
+      wp.Quiesce()
+
+      self._CheckNoTasks(wp)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
+    self.assertEquals(ctx.GetDoneTasks(), 22)
+
+  def testManyTasksSequence(self):
+    ctx = CountingContext()
+    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
+    try:
+      self._CheckWorkerCount(wp, 3)
+      self.assertRaises(AssertionError, wp.AddManyTasks,
+                        ["Hello world %s" % i for i in range(10)])
+      self.assertRaises(AssertionError, wp.AddManyTasks,
+                        [i for i in range(10)])
+      self.assertRaises(AssertionError, wp.AddManyTasks, [], task_id=0)
+
+      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
+      wp.AddTask((ctx, "A separate hello"))
+
+      wp.Quiesce()
+
+      self._CheckNoTasks(wp)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
+    self.assertEquals(ctx.GetDoneTasks(), 11)
+
   def _CheckNoTasks(self, wp):
     wp._lock.acquire()
     try:
       # The task queue must be empty now
-      self.failUnless(not wp._tasks)
+      self.assertFalse(wp._tasks)
+      self.assertFalse(wp._taskdata)
     finally:
       wp._lock.release()
 
@@ -136,6 +329,403 @@ class TestWorkerpool(unittest.TestCase):
     finally:
       wp._lock.release()
 
+  def testPriorityChecksum(self):
+    # Tests whether all tasks are run and, since we're only using a single
+    # thread, whether everything is started in order and respects the priority
+    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
+    try:
+      self._CheckWorkerCount(wp, 1)
+
+      ctx = ChecksumContext()
+
+      data = {}
+      tasks = []
+      priorities = []
+      for i in range(1, 333):
+        prio = i % 7
+        tasks.append((ctx, i))
+        priorities.append(prio)
+        data.setdefault(prio, []).append(i)
+
+      wp.AddManyTasks(tasks, priority=priorities)
+
+      wp.Quiesce()
+
+      self._CheckNoTasks(wp)
+
+      # Check sum
+      ctx.lock.acquire()
+      try:
+        checksum = ChecksumContext.CHECKSUM_START
+        for priority in sorted(data.keys()):
+          for i in data[priority]:
+            checksum = ChecksumContext.UpdateChecksum(checksum, i)
+
+        self.assertEqual(checksum, ctx.checksum)
+      finally:
+        ctx.lock.release()
+
+      self._CheckWorkerCount(wp, 1)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
+  def testPriorityListManyTasks(self):
+    # Tests whether all tasks are run and, since we're only using a single
+    # thread, whether everything is started in order and respects the priority
+    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
+    try:
+      self._CheckWorkerCount(wp, 1)
+
+      ctx = ListBuilderContext()
+
+      # Use static seed for this test
+      rnd = random.Random(0)
+
+      data = {}
+      tasks = []
+      priorities = []
+      for i in range(1, 333):
+        prio = int(rnd.random() * 10)
+        tasks.append((ctx, i))
+        priorities.append(prio)
+        data.setdefault(prio, []).append((prio, i))
+
+      wp.AddManyTasks(tasks, priority=priorities)
+
+      self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
+                        [("x", ), ("y", )], priority=[1] * 5)
+      self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
+                        [("x", ), ("y", )], task_id=[1] * 5)
+
+      wp.Quiesce()
+
+      self._CheckNoTasks(wp)
+
+      # Check result
+      ctx.lock.acquire()
+      try:
+        expresult = []
+        for priority in sorted(data.keys()):
+          expresult.extend(data[priority])
+
+        self.assertEqual(expresult, ctx.result)
+      finally:
+        ctx.lock.release()
+
+      self._CheckWorkerCount(wp, 1)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
+  def testPriorityListSingleTasks(self):
+    # Tests whether all tasks are run and, since we're only using a single
+    # thread, whether everything is started in order and respects the priority
+    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
+    try:
+      self._CheckWorkerCount(wp, 1)
+
+      ctx = ListBuilderContext()
+
+      # Use static seed for this test
+      rnd = random.Random(26279)
+
+      data = {}
+      for i in range(1, 333):
+        prio = int(rnd.random() * 30)
+        wp.AddTask((ctx, i), priority=prio)
+        data.setdefault(prio, []).append(i)
+
+        # Cause some distortion
+        if i % 11 == 0:
+          time.sleep(.001)
+        if i % 41 == 0:
+          wp.Quiesce()
+
+      wp.Quiesce()
+
+      self._CheckNoTasks(wp)
+
+      # Check result
+      ctx.lock.acquire()
+      try:
+        self.assertEqual(data, ctx.prioresult)
+      finally:
+        ctx.lock.release()
+
+      self._CheckWorkerCount(wp, 1)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
+  def testDeferTask(self):
+    # Tests whether all tasks are run and, since we're only using a single
+    # thread, whether everything is started in order and respects the priority
+    wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
+    try:
+      self._CheckWorkerCount(wp, 1)
+
+      ctx = DeferringTaskContext()
+
+      # Use static seed for this test
+      rnd = random.Random(14921)
+
+      data = {}
+      num2taskid = {}
+      for i in range(1, 333):
+        ctx.lock.acquire()
+        try:
+          if i % 5 == 0:
+            ctx.samepriodefer[i] = True
+        finally:
+          ctx.lock.release()
+
+        prio = int(rnd.random() * 30)
+        num2taskid[i] = 1000 * i
+        wp.AddTask((ctx, i, prio), priority=50,
+                   task_id=num2taskid[i])
+        data.setdefault(prio, set()).add(i)
+
+        # Cause some distortion
+        if i % 24 == 0:
+          time.sleep(.001)
+        if i % 31 == 0:
+          wp.Quiesce()
+
+      wp.Quiesce()
+
+      self._CheckNoTasks(wp)
+
+      # Check result
+      ctx.lock.acquire()
+      try:
+        self.assertEqual(data, ctx.prioresult)
+
+        all_order_ids = []
+
+        for (num, numordertaskid) in ctx.num2ordertaskid.items():
+          order_ids = map(compat.fst, numordertaskid)
+          self.assertFalse(utils.FindDuplicates(order_ids),
+                           msg="Order ID has been reused")
+          all_order_ids.extend(order_ids)
+
+          for task_id in map(compat.snd, numordertaskid):
+            self.assertEqual(task_id, num2taskid[num],
+                             msg=("Task %s used different task IDs" % num))
+
+        self.assertFalse(utils.FindDuplicates(all_order_ids),
+                         msg="Order ID has been reused")
+      finally:
+        ctx.lock.release()
+
+      self._CheckWorkerCount(wp, 1)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
+  def testChangeTaskPriority(self):
+    wp = workerpool.WorkerPool("Test", 1, PriorityWorker)
+    try:
+      self._CheckWorkerCount(wp, 1)
+
+      ctx = PriorityContext()
+
+      # Use static seed for this test
+      rnd = random.Random(4727)
+
+      # Disable processing of tasks
+      wp.SetActive(False)
+
+      # No task ID
+      self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority,
+                        None, 0)
+
+      # Pre-generate task IDs and priorities
+      count = 100
+      task_ids = range(0, count)
+      priorities = range(200, 200 + count) * 2
+
+      rnd.shuffle(task_ids)
+      rnd.shuffle(priorities)
+
+      # Make sure there are some duplicate priorities, but not all
+      priorities[count * 2 - 10:count * 2 - 1] = \
+        priorities[count - 10: count - 1]
+
+      assert len(priorities) == 2 * count
+      assert priorities[0:(count - 1)] != priorities[count:(2 * count - 1)]
+
+      # Add some tasks; this loop consumes the first half of all previously
+      # generated priorities
+      for (idx, task_id) in enumerate(task_ids):
+        wp.AddTask((ctx, idx),
+                   priority=priorities.pop(),
+                   task_id=task_id)
+
+      self.assertEqual(len(wp._tasks), len(task_ids))
+      self.assertEqual(len(wp._taskdata), len(task_ids))
+
+      # Tasks have been added, so half of the priorities should have been
+      # consumed
+      assert len(priorities) == len(task_ids)
+
+      # Change task priority
+      expected = []
+      for ((idx, task_id), prio) in zip(enumerate(task_ids), priorities):
+        wp.ChangeTaskPriority(task_id, prio)
+        expected.append((prio, idx))
+
+      self.assertEqual(len(wp._taskdata), len(task_ids))
+
+      # Half the entries are now abandoned tasks
+      self.assertEqual(len(wp._tasks), len(task_ids) * 2)
+
+      assert len(priorities) == count
+      assert len(task_ids) == count
+
+      # Start processing
+      wp.SetActive(True)
+
+      # Wait for tasks to finish
+      wp.Quiesce()
+
+      self._CheckNoTasks(wp)
+
+      for task_id in task_ids:
+        # All tasks are done
+        self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority,
+                          task_id, 0)
+
+      # Check result
+      ctx.lock.acquire()
+      try:
+        self.assertEqual(ctx.result, sorted(expected))
+      finally:
+        ctx.lock.release()
+
+      self._CheckWorkerCount(wp, 1)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
+  def testChangeTaskPriorityInteralStructures(self):
+    wp = workerpool.WorkerPool("Test", 1, NotImplementedWorker)
+    try:
+      self._CheckWorkerCount(wp, 1)
+
+      # Use static seed for this test
+      rnd = random.Random(643)
+
+      (num1, num2) = rnd.sample(range(1000), 2)
+
+      # Disable processing of tasks
+      wp.SetActive(False)
+
+      self.assertFalse(wp._tasks)
+      self.assertFalse(wp._taskdata)
+
+      # No priority or task ID
+      wp.AddTask(())
+      self.assertEqual(wp._tasks, [
+        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+        ])
+      self.assertFalse(wp._taskdata)
+
+      # No task ID
+      wp.AddTask((), priority=7413)
+      self.assertEqual(wp._tasks, [
+        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+        [7413, 1, None, ()],
+        ])
+      self.assertFalse(wp._taskdata)
+
+      # Start adding real tasks
+      wp.AddTask((), priority=10267659, task_id=num1)
+      self.assertEqual(wp._tasks, [
+        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+        [7413, 1, None, ()],
+        [10267659, 2, num1, ()],
+        ])
+      self.assertEqual(wp._taskdata, {
+        num1: [10267659, 2, num1, ()],
+        })
+
+      wp.AddTask((), priority=123, task_id=num2)
+      self.assertEqual(sorted(wp._tasks), [
+        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+        [123, 3, num2, ()],
+        [7413, 1, None, ()],
+        [10267659, 2, num1, ()],
+        ])
+      self.assertEqual(wp._taskdata, {
+        num1: [10267659, 2, num1, ()],
+        num2: [123, 3, num2, ()],
+        })
+
+      wp.ChangeTaskPriority(num1, 100)
+      self.assertEqual(sorted(wp._tasks), [
+        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+        [100, 2, num1, ()],
+        [123, 3, num2, ()],
+        [7413, 1, None, ()],
+        [10267659, 2, num1, None],
+        ])
+      self.assertEqual(wp._taskdata, {
+        num1: [100, 2, num1, ()],
+        num2: [123, 3, num2, ()],
+        })
+
+      wp.ChangeTaskPriority(num2, 91337)
+      self.assertEqual(sorted(wp._tasks), [
+        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+        [100, 2, num1, ()],
+        [123, 3, num2, None],
+        [7413, 1, None, ()],
+        [91337, 3, num2, ()],
+        [10267659, 2, num1, None],
+        ])
+      self.assertEqual(wp._taskdata, {
+        num1: [100, 2, num1, ()],
+        num2: [91337, 3, num2, ()],
+        })
+
+      wp.ChangeTaskPriority(num1, 10139)
+      self.assertEqual(sorted(wp._tasks), [
+        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+        [100, 2, num1, None],
+        [123, 3, num2, None],
+        [7413, 1, None, ()],
+        [10139, 2, num1, ()],
+        [91337, 3, num2, ()],
+        [10267659, 2, num1, None],
+        ])
+      self.assertEqual(wp._taskdata, {
+        num1: [10139, 2, num1, ()],
+        num2: [91337, 3, num2, ()],
+        })
+
+      # Change to the same priority once again
+      wp.ChangeTaskPriority(num1, 10139)
+      self.assertEqual(sorted(wp._tasks), [
+        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
+        [100, 2, num1, None],
+        [123, 3, num2, None],
+        [7413, 1, None, ()],
+        [10139, 2, num1, None],
+        [10139, 2, num1, ()],
+        [91337, 3, num2, ()],
+        [10267659, 2, num1, None],
+        ])
+      self.assertEqual(wp._taskdata, {
+        num1: [10139, 2, num1, ()],
+        num2: [91337, 3, num2, ()],
+        })
+
+      self._CheckWorkerCount(wp, 1)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
 
-if __name__ == '__main__':
+if __name__ == "__main__":
   testutils.GanetiTestProgram()