X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/125b74b274849e717637438c672bbb17a43b5c58..48aaca91efa214b37dba94f28582be73f3c90dbd:/test/ganeti.workerpool_unittest.py diff --git a/test/ganeti.workerpool_unittest.py b/test/ganeti.workerpool_unittest.py index 2449cef..8f35a69 100755 --- a/test/ganeti.workerpool_unittest.py +++ b/test/ganeti.workerpool_unittest.py @@ -31,6 +31,7 @@ import random from ganeti import workerpool from ganeti import errors from ganeti import utils +from ganeti import compat import testutils @@ -114,12 +115,16 @@ class DeferringTaskContext: self.lock = threading.Lock() self.prioresult = {} self.samepriodefer = {} + self.num2ordertaskid = {} class DeferringWorker(workerpool.BaseWorker): def RunTask(self, ctx, num, targetprio): ctx.lock.acquire() try: + otilst = ctx.num2ordertaskid.setdefault(num, []) + otilst.append(self._GetCurrentOrderAndTaskId()) + if num in ctx.samepriodefer: del ctx.samepriodefer[num] raise workerpool.DeferTask() @@ -132,6 +137,26 @@ class DeferringWorker(workerpool.BaseWorker): ctx.lock.release() +class PriorityContext: + def __init__(self): + self.lock = threading.Lock() + self.result = [] + + +class PriorityWorker(workerpool.BaseWorker): + def RunTask(self, ctx, data): + ctx.lock.acquire() + try: + ctx.result.append((self.GetCurrentPriority(), data)) + finally: + ctx.lock.release() + + +class NotImplementedWorker(workerpool.BaseWorker): + def RunTask(self): + raise NotImplementedError + + class TestWorkerpool(unittest.TestCase): """Workerpool tests""" @@ -274,6 +299,7 @@ class TestWorkerpool(unittest.TestCase): ["Hello world %s" % i for i in range(10)]) self.assertRaises(AssertionError, wp.AddManyTasks, [i for i in range(10)]) + self.assertRaises(AssertionError, wp.AddManyTasks, [], task_id=0) wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)]) wp.AddTask((ctx, "A separate hello")) @@ -445,6 +471,7 @@ class TestWorkerpool(unittest.TestCase): rnd = random.Random(14921) data = {} + num2taskid = {} for i in range(1, 333): ctx.lock.acquire() try: @@ -454,7 +481,9 @@ class TestWorkerpool(unittest.TestCase): ctx.lock.release() prio = int(rnd.random() * 30) - wp.AddTask((ctx, i, prio), priority=50) + num2taskid[i] = 1000 * i + wp.AddTask((ctx, i, prio), priority=50, + task_id=num2taskid[i]) data.setdefault(prio, set()).add(i) # Cause some distortion @@ -471,6 +500,106 @@ class TestWorkerpool(unittest.TestCase): ctx.lock.acquire() try: self.assertEqual(data, ctx.prioresult) + + all_order_ids = [] + + for (num, numordertaskid) in ctx.num2ordertaskid.items(): + order_ids = map(compat.fst, numordertaskid) + self.assertFalse(utils.FindDuplicates(order_ids), + msg="Order ID has been reused") + all_order_ids.extend(order_ids) + + for task_id in map(compat.snd, numordertaskid): + self.assertEqual(task_id, num2taskid[num], + msg=("Task %s used different task IDs" % num)) + + self.assertFalse(utils.FindDuplicates(all_order_ids), + msg="Order ID has been reused") + finally: + ctx.lock.release() + + self._CheckWorkerCount(wp, 1) + finally: + wp.TerminateWorkers() + self._CheckWorkerCount(wp, 0) + + def testChangeTaskPriority(self): + wp = workerpool.WorkerPool("Test", 1, PriorityWorker) + try: + self._CheckWorkerCount(wp, 1) + + ctx = PriorityContext() + + # Use static seed for this test + rnd = random.Random(4727) + + # Disable processing of tasks + wp.SetActive(False) + + # No task ID + self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority, + None, 0) + + # Pre-generate task IDs and priorities + count = 100 + task_ids = range(0, count) + priorities = range(200, 200 + count) * 2 + + rnd.shuffle(task_ids) + rnd.shuffle(priorities) + + # Make sure there are some duplicate priorities, but not all + priorities[count * 2 - 10:count * 2 - 1] = \ + priorities[count - 10: count - 1] + + assert len(priorities) == 2 * count + assert priorities[0:(count - 1)] != priorities[count:(2 * count - 1)] + + # Add some tasks; this loop consumes the first half of all previously + # generated priorities + for (idx, task_id) in enumerate(task_ids): + wp.AddTask((ctx, idx), + priority=priorities.pop(), + task_id=task_id) + + self.assertEqual(len(wp._tasks), len(task_ids)) + self.assertEqual(len(wp._taskdata), len(task_ids)) + + # Tasks have been added, so half of the priorities should have been + # consumed + assert len(priorities) == len(task_ids) + + # Change task priority + expected = [] + for ((idx, task_id), prio) in zip(enumerate(task_ids), priorities): + wp.ChangeTaskPriority(task_id, prio) + expected.append((prio, idx)) + + self.assertEqual(len(wp._taskdata), len(task_ids)) + + # Half the entries are now abandoned tasks + self.assertEqual(len(wp._tasks), len(task_ids) * 2) + + assert len(priorities) == count + assert len(task_ids) == count + + # Start processing + wp.SetActive(True) + + # Wait for tasks to finish + wp.Quiesce() + + self._CheckNoTasks(wp) + + for task_id in task_ids: + # All tasks are done + self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority, + task_id, 0) + + # Check result + ctx.lock.acquire() + try: + self.assertEqual(ctx.result, sorted(expected)) finally: ctx.lock.release() @@ -479,6 +608,124 @@ class TestWorkerpool(unittest.TestCase): wp.TerminateWorkers() self._CheckWorkerCount(wp, 0) + def testChangeTaskPriorityInteralStructures(self): + wp = workerpool.WorkerPool("Test", 1, NotImplementedWorker) + try: + self._CheckWorkerCount(wp, 1) + + # Use static seed for this test + rnd = random.Random(643) + + (num1, num2) = rnd.sample(range(1000), 2) + + # Disable processing of tasks + wp.SetActive(False) + + self.assertFalse(wp._tasks) + self.assertFalse(wp._taskdata) + + # No priority or task ID + wp.AddTask(()) + self.assertEqual(wp._tasks, [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + ]) + self.assertFalse(wp._taskdata) + + # No task ID + wp.AddTask((), priority=7413) + self.assertEqual(wp._tasks, [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + [7413, 1, None, ()], + ]) + self.assertFalse(wp._taskdata) + + # Start adding real tasks + wp.AddTask((), priority=10267659, task_id=num1) + self.assertEqual(wp._tasks, [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + [7413, 1, None, ()], + [10267659, 2, num1, ()], + ]) + self.assertEqual(wp._taskdata, { + num1: [10267659, 2, num1, ()], + }) + + wp.AddTask((), priority=123, task_id=num2) + self.assertEqual(sorted(wp._tasks), [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + [123, 3, num2, ()], + [7413, 1, None, ()], + [10267659, 2, num1, ()], + ]) + self.assertEqual(wp._taskdata, { + num1: [10267659, 2, num1, ()], + num2: [123, 3, num2, ()], + }) + + wp.ChangeTaskPriority(num1, 100) + self.assertEqual(sorted(wp._tasks), [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + [100, 2, num1, ()], + [123, 3, num2, ()], + [7413, 1, None, ()], + [10267659, 2, num1, None], + ]) + self.assertEqual(wp._taskdata, { + num1: [100, 2, num1, ()], + num2: [123, 3, num2, ()], + }) + + wp.ChangeTaskPriority(num2, 91337) + self.assertEqual(sorted(wp._tasks), [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + [100, 2, num1, ()], + [123, 3, num2, None], + [7413, 1, None, ()], + [91337, 3, num2, ()], + [10267659, 2, num1, None], + ]) + self.assertEqual(wp._taskdata, { + num1: [100, 2, num1, ()], + num2: [91337, 3, num2, ()], + }) + + wp.ChangeTaskPriority(num1, 10139) + self.assertEqual(sorted(wp._tasks), [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + [100, 2, num1, None], + [123, 3, num2, None], + [7413, 1, None, ()], + [10139, 2, num1, ()], + [91337, 3, num2, ()], + [10267659, 2, num1, None], + ]) + self.assertEqual(wp._taskdata, { + num1: [10139, 2, num1, ()], + num2: [91337, 3, num2, ()], + }) + + # Change to the same priority once again + wp.ChangeTaskPriority(num1, 10139) + self.assertEqual(sorted(wp._tasks), [ + [workerpool._DEFAULT_PRIORITY, 0, None, ()], + [100, 2, num1, None], + [123, 3, num2, None], + [7413, 1, None, ()], + [10139, 2, num1, None], + [10139, 2, num1, ()], + [91337, 3, num2, ()], + [10267659, 2, num1, None], + ]) + self.assertEqual(wp._taskdata, { + num1: [10139, 2, num1, ()], + num2: [91337, 3, num2, ()], + }) + + self._CheckWorkerCount(wp, 1) + finally: + wp.TerminateWorkers() + self._CheckWorkerCount(wp, 0) + if __name__ == "__main__": testutils.GanetiTestProgram()