X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/27caa99307862e7a37a9f0062d009710d875b320..48aaca91efa214b37dba94f28582be73f3c90dbd:/test/ganeti.workerpool_unittest.py?ds=sidebyside diff --git a/test/ganeti.workerpool_unittest.py b/test/ganeti.workerpool_unittest.py index 1ad8d74..8f35a69 100755 --- a/test/ganeti.workerpool_unittest.py +++ b/test/ganeti.workerpool_unittest.py @@ -30,6 +30,8 @@ import random from ganeti import workerpool from ganeti import errors +from ganeti import utils +from ganeti import compat import testutils @@ -113,12 +115,16 @@ class DeferringTaskContext: self.lock = threading.Lock() self.prioresult = {} self.samepriodefer = {} + self.num2ordertaskid = {} class DeferringWorker(workerpool.BaseWorker): def RunTask(self, ctx, num, targetprio): ctx.lock.acquire() try: + otilst = ctx.num2ordertaskid.setdefault(num, []) + otilst.append(self._GetCurrentOrderAndTaskId()) + if num in ctx.samepriodefer: del ctx.samepriodefer[num] raise workerpool.DeferTask() @@ -131,6 +137,26 @@ class DeferringWorker(workerpool.BaseWorker): ctx.lock.release() +class PriorityContext: + def __init__(self): + self.lock = threading.Lock() + self.result = [] + + +class PriorityWorker(workerpool.BaseWorker): + def RunTask(self, ctx, data): + ctx.lock.acquire() + try: + ctx.result.append((self.GetCurrentPriority(), data)) + finally: + ctx.lock.release() + + +class NotImplementedWorker(workerpool.BaseWorker): + def RunTask(self): + raise NotImplementedError + + class TestWorkerpool(unittest.TestCase): """Workerpool tests""" @@ -273,6 +299,7 @@ class TestWorkerpool(unittest.TestCase): ["Hello world %s" % i for i in range(10)]) self.assertRaises(AssertionError, wp.AddManyTasks, [i for i in range(10)]) + self.assertRaises(AssertionError, wp.AddManyTasks, [], task_id=0) wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)]) wp.AddTask((ctx, "A separate hello")) @@ -290,7 +317,8 @@ class TestWorkerpool(unittest.TestCase): wp._lock.acquire() try: # The task queue must be empty now - self.failUnless(not wp._tasks) + self.assertFalse(wp._tasks) + self.assertFalse(wp._taskdata) finally: wp._lock.release() @@ -367,6 +395,8 @@ class TestWorkerpool(unittest.TestCase): self.assertRaises(errors.ProgrammerError, wp.AddManyTasks, [("x", ), ("y", )], priority=[1] * 5) + self.assertRaises(errors.ProgrammerError, wp.AddManyTasks, + [("x", ), ("y", )], task_id=[1] * 5) wp.Quiesce() @@ -428,28 +458,38 @@ class TestWorkerpool(unittest.TestCase): wp.TerminateWorkers() self._CheckWorkerCount(wp, 0) - def testPriorityListSingleTasks(self): + def testDeferTask(self): # Tests whether all tasks are run and, since we're only using a single # thread, whether everything is started in order and respects the priority - wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker) + wp = workerpool.WorkerPool("Test", 1, DeferringWorker) try: self._CheckWorkerCount(wp, 1) - ctx = ListBuilderContext() + ctx = DeferringTaskContext() # Use static seed for this test - rnd = random.Random(26279) + rnd = random.Random(14921) data = {} + num2taskid = {} for i in range(1, 333): + ctx.lock.acquire() + try: + if i % 5 == 0: + ctx.samepriodefer[i] = True + finally: + ctx.lock.release() + prio = int(rnd.random() * 30) - wp.AddTask((ctx, i), priority=prio) - data.setdefault(prio, []).append(i) + num2taskid[i] = 1000 * i + wp.AddTask((ctx, i, prio), priority=50, + task_id=num2taskid[i]) + data.setdefault(prio, set()).add(i) # Cause some distortion - if i % 11 == 0: + if i % 24 == 0: time.sleep(.001) - if i % 41 == 0: + if i % 31 == 0: wp.Quiesce() wp.Quiesce() @@ -460,6 +500,21 @@ class TestWorkerpool(unittest.TestCase): ctx.lock.acquire() try: self.assertEqual(data, ctx.prioresult) + + all_order_ids = [] + + for (num, numordertaskid) in ctx.num2ordertaskid.items(): + order_ids = map(compat.fst, numordertaskid) + self.assertFalse(utils.FindDuplicates(order_ids), + msg="Order ID has been reused") + all_order_ids.extend(order_ids) + + for task_id in map(compat.snd, numordertaskid): + self.assertEqual(task_id, num2taskid[num], + msg=("Task %s used different task IDs" % num)) + + self.assertFalse(utils.FindDuplicates(all_order_ids), + msg="Order ID has been reused") finally: ctx.lock.release() @@ -468,45 +523,83 @@ class TestWorkerpool(unittest.TestCase): wp.TerminateWorkers() self._CheckWorkerCount(wp, 0) - def testDeferTask(self): - # Tests whether all tasks are run and, since we're only using a single - # thread, whether everything is started in order and respects the priority - wp = workerpool.WorkerPool("Test", 1, DeferringWorker) + def testChangeTaskPriority(self): + wp = workerpool.WorkerPool("Test", 1, PriorityWorker) try: self._CheckWorkerCount(wp, 1) - ctx = DeferringTaskContext() + ctx = PriorityContext() # Use static seed for this test - rnd = random.Random(14921) + rnd = random.Random(4727) - data = {} - for i in range(1, 333): - ctx.lock.acquire() - try: - if i % 5 == 0: - ctx.samepriodefer[i] = True - finally: - ctx.lock.release() + # Disable processing of tasks + wp.SetActive(False) - prio = int(rnd.random() * 30) - wp.AddTask((ctx, i, prio), priority=50) - data.setdefault(prio, set()).add(i) + # No task ID + self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority, + None, 0) - # Cause some distortion - if i % 24 == 0: - time.sleep(.001) - if i % 31 == 0: - wp.Quiesce() + # Pre-generate task IDs and priorities + count = 100 + task_ids = range(0, count) + priorities = range(200, 200 + count) * 2 + + rnd.shuffle(task_ids) + rnd.shuffle(priorities) + + # Make sure there are some duplicate priorities, but not all + priorities[count * 2 - 10:count * 2 - 1] = \ + priorities[count - 10: count - 1] + assert len(priorities) == 2 * count + assert priorities[0:(count - 1)] != priorities[count:(2 * count - 1)] + + # Add some tasks; this loop consumes the first half of all previously + # generated priorities + for (idx, task_id) in enumerate(task_ids): + wp.AddTask((ctx, idx), + priority=priorities.pop(), + task_id=task_id) + + self.assertEqual(len(wp._tasks), len(task_ids)) + self.assertEqual(len(wp._taskdata), len(task_ids)) + + # Tasks have been added, so half of the priorities should have been + # consumed + assert len(priorities) == len(task_ids) + + # Change task priority + expected = [] + for ((idx, task_id), prio) in zip(enumerate(task_ids), priorities): + wp.ChangeTaskPriority(task_id, prio) + expected.append((prio, idx)) + + self.assertEqual(len(wp._taskdata), len(task_ids)) + + # Half the entries are now abandoned tasks + self.assertEqual(len(wp._tasks), len(task_ids) * 2) + + assert len(priorities) == count + assert len(task_ids) == count + + # Start processing + wp.SetActive(True) + + # Wait for tasks to finish wp.Quiesce() self._CheckNoTasks(wp) + for task_id in task_ids: + # All tasks are done + self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority, + task_id, 0) + # Check result ctx.lock.acquire() try: - self.assertEqual(data, ctx.prioresult) + self.assertEqual(ctx.result, sorted(expected)) finally: ctx.lock.release() @@ -515,6 +608,124 @@ class TestWorkerpool(unittest.TestCase): wp.TerminateWorkers() self._CheckWorkerCount(wp, 0) + def testChangeTaskPriorityInteralStructures(self): + wp = workerpool.WorkerPool("Test", 1, NotImplementedWorker) + try: + self._CheckWorkerCount(wp, 1) + + # Use static seed for this test + rnd = random.Random(643) + + (num1, num2) = rnd.sample(range(1000), 2) + + # Disable processing of tasks + wp.SetActive(False) + + self.assertFalse(wp._tasks) + self.assertFalse(wp._taskdata) + + # No priority or task ID + wp.AddTask(()) + self.assertEqual(wp._tasks, [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + ]) + self.assertFalse(wp._taskdata) + + # No task ID + wp.AddTask((), priority=7413) + self.assertEqual(wp._tasks, [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + [7413, 1, None, ()], + ]) + self.assertFalse(wp._taskdata) + + # Start adding real tasks + wp.AddTask((), priority=10267659, task_id=num1) + self.assertEqual(wp._tasks, [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + [7413, 1, None, ()], + [10267659, 2, num1, ()], + ]) + self.assertEqual(wp._taskdata, { + num1: [10267659, 2, num1, ()], + }) + + wp.AddTask((), priority=123, task_id=num2) + self.assertEqual(sorted(wp._tasks), [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + [123, 3, num2, ()], + [7413, 1, None, ()], + [10267659, 2, num1, ()], + ]) + self.assertEqual(wp._taskdata, { + num1: [10267659, 2, num1, ()], + num2: [123, 3, num2, ()], + }) + + wp.ChangeTaskPriority(num1, 100) + self.assertEqual(sorted(wp._tasks), [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + [100, 2, num1, ()], + [123, 3, num2, ()], + [7413, 1, None, ()], + [10267659, 2, num1, None], + ]) + self.assertEqual(wp._taskdata, { + num1: [100, 2, num1, ()], + num2: [123, 3, num2, ()], + }) + + wp.ChangeTaskPriority(num2, 91337) + self.assertEqual(sorted(wp._tasks), [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + [100, 2, num1, ()], + [123, 3, num2, None], + [7413, 1, None, ()], + [91337, 3, num2, ()], + [10267659, 2, num1, None], + ]) + self.assertEqual(wp._taskdata, { + num1: [100, 2, num1, ()], + num2: [91337, 3, num2, ()], + }) + + wp.ChangeTaskPriority(num1, 10139) + self.assertEqual(sorted(wp._tasks), [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + [100, 2, num1, None], + [123, 3, num2, None], + [7413, 1, None, ()], + [10139, 2, num1, ()], + [91337, 3, num2, ()], + [10267659, 2, num1, None], + ]) + self.assertEqual(wp._taskdata, { + num1: [10139, 2, num1, ()], + num2: [91337, 3, num2, ()], + }) + + # Change to the same priority once again + wp.ChangeTaskPriority(num1, 10139) + self.assertEqual(sorted(wp._tasks), [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + [100, 2, num1, None], + [123, 3, num2, None], + [7413, 1, None, ()], + [10139, 2, num1, None], + [10139, 2, num1, ()], + [91337, 3, num2, ()], + [10267659, 2, num1, None], + ]) + self.assertEqual(wp._taskdata, { + num1: [10139, 2, num1, ()], + num2: [91337, 3, num2, ()], + }) + + self._CheckWorkerCount(wp, 1) + finally: + wp.TerminateWorkers() + self._CheckWorkerCount(wp, 0) + -if __name__ == '__main__': +if __name__ == "__main__": testutils.GanetiTestProgram()