From 52c47e4e8adce0406f08ad1992ce99c1b8f2f2b7 Mon Sep 17 00:00:00 2001 From: Michael Hanselmann Date: Sun, 18 Jul 2010 22:01:01 +0200 Subject: [PATCH] workerpool: Add support for task priority To add job priorities, the worker pool underlying the job queue must support priorities per task. This patch adds them to the worker pool. Signed-off-by: Michael Hanselmann Reviewed-by: Iustin Pop --- doc/design-2.3.rst | 48 +++++++ lib/workerpool.py | 119 ++++++++++++++-- test/ganeti.workerpool_unittest.py | 261 +++++++++++++++++++++++++++++++++++- 3 files changed, 411 insertions(+), 17 deletions(-) diff --git a/doc/design-2.3.rst b/doc/design-2.3.rst index efd8bf2..71a9d4d 100644 --- a/doc/design-2.3.rst +++ b/doc/design-2.3.rst @@ -20,6 +20,54 @@ As for 2.1 and 2.2 we divide the 2.3 design into three areas: Core changes ------------ +Job priorities +~~~~~~~~~~~~~~ + +Current state and shortcomings +++++++++++++++++++++++++++++++ + +.. TODO: Describe current situation + +Proposed changes +++++++++++++++++ + +.. TODO: Describe changes to job queue and potentially client programs + +Worker pool +^^^^^^^^^^^ + +To support job priorities in the job queue, the worker pool underlying +the job queue must be enhanced to support task priorities. Currently +tasks are processed in the order they are added to the queue (but, due +to their nature, they don't necessarily finish in that order). All tasks +are equal. To support tasks with higher or lower priority, a few changes +have to be made to the queue inside a worker pool. + +Each task is assigned a priority when added to the queue. This priority +can not be changed until the task is executed (this is fine as in all +current use-cases, tasks are added to a pool and then forgotten about +until they're done). + +A task's priority can be compared to Unix' process priorities. The lower +the priority number, the closer to the queue's front it is. A task with +priority 0 is going to be run before one with priority 10. Tasks with +the same priority are executed in the order in which they were added. + +While a task is running it can query its own priority. If it's not ready +yet for finishing, it can raise an exception to defer itself, optionally +changing its own priority. This is useful for the following cases: + +- A task is trying to acquire locks, but those locks are still held by + other tasks. By deferring itself, the task gives others a chance to + run. This is especially useful when all workers are busy. +- If a task decides it hasn't gotten its locks in a long time, it can + start to increase its own priority. +- Tasks waiting for long-running operations running asynchronously could + defer themselves while waiting for a long-running operation. + +With these changes, the job queue will be able to implement per-job +priorities. + Feature changes --------------- diff --git a/lib/workerpool.py b/lib/workerpool.py index 8127329..1838d97 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -23,14 +23,34 @@ """ -import collections import logging import threading +import heapq from ganeti import compat +from ganeti import errors _TERMINATE = object() +_DEFAULT_PRIORITY = 0 + + +class DeferTask(Exception): + """Special exception class to defer a task. + + This class can be raised by L{BaseWorker.RunTask} to defer the execution of a + task. Optionally, the priority of the task can be changed. + + """ + def __init__(self, priority=None): + """Initializes this class. + + @type priority: number + @param priority: New task priority (None means no change) + + """ + Exception.__init__(self) + self.priority = priority class BaseWorker(threading.Thread, object): @@ -64,6 +84,22 @@ class BaseWorker(threading.Thread, object): finally: self.pool._lock.release() + def GetCurrentPriority(self): + """Returns the priority of the current task. + + Should only be called from within L{RunTask}. + + """ + self.pool._lock.acquire() + try: + assert self._HasRunningTaskUnlocked() + + (priority, _, _) = self._current_task + + return priority + finally: + self.pool._lock.release() + def _HasRunningTaskUnlocked(self): """Returns whether this worker is currently running a task. @@ -80,6 +116,8 @@ class BaseWorker(threading.Thread, object): while True: assert self._current_task is None + + defer = None try: # Wait on lock to be told either to terminate or to do a task pool._lock.acquire() @@ -104,11 +142,24 @@ class BaseWorker(threading.Thread, object): finally: pool._lock.release() - # Run the actual task + (priority, _, args) = self._current_task try: - logging.debug("Starting task %r", self._current_task) - self.RunTask(*self._current_task) - logging.debug("Done with task %r", self._current_task) + # Run the actual task + assert defer is None + logging.debug("Starting task %r, priority %s", args, priority) + self.RunTask(*args) # pylint: disable-msg=W0142 + logging.debug("Done with task %r, priority %s", args, priority) + except DeferTask, err: + defer = err + + if defer.priority is None: + # Use same priority + defer.priority = priority + + logging.debug("Deferring task %r, new priority %s", defer.priority) + + assert self._HasRunningTaskUnlocked() + except: # pylint: disable-msg=W0702 logging.exception("Caught unhandled exception") @@ -117,6 +168,12 @@ class BaseWorker(threading.Thread, object): # Notify pool pool._lock.acquire() try: + if defer: + assert self._current_task + # Schedule again for later run + (_, _, args) = self._current_task + pool._AddTaskUnlocked(args, defer.priority) + if self._current_task: self._current_task = None pool._worker_to_pool.notifyAll() @@ -170,7 +227,8 @@ class WorkerPool(object): self._termworkers = [] # Queued tasks - self._tasks = collections.deque() + self._counter = 0 + self._tasks = [] # Start workers self.Resize(num_workers) @@ -184,44 +242,77 @@ class WorkerPool(object): while self._quiescing: self._pool_to_pool.wait() - def _AddTaskUnlocked(self, args): + def _AddTaskUnlocked(self, args, priority): + """Adds a task to the internal queue. + + @type args: sequence + @param args: Arguments passed to L{BaseWorker.RunTask} + @type priority: number + @param priority: Task priority + + """ assert isinstance(args, (tuple, list)), "Arguments must be a sequence" + assert isinstance(priority, (int, long)), "Priority must be numeric" - self._tasks.append(args) + # This counter is used to ensure elements are processed in their + # incoming order. For processing they're sorted by priority and then + # counter. + self._counter += 1 + + heapq.heappush(self._tasks, (priority, self._counter, args)) # Notify a waiting worker self._pool_to_worker.notify() - def AddTask(self, args): + def AddTask(self, args, priority=_DEFAULT_PRIORITY): """Adds a task to the queue. @type args: sequence @param args: arguments passed to L{BaseWorker.RunTask} + @type priority: number + @param priority: Task priority """ self._lock.acquire() try: self._WaitWhileQuiescingUnlocked() - self._AddTaskUnlocked(args) + self._AddTaskUnlocked(args, priority) finally: self._lock.release() - def AddManyTasks(self, tasks): + def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY): """Add a list of tasks to the queue. @type tasks: list of tuples @param tasks: list of args passed to L{BaseWorker.RunTask} + @type priority: number or list of numbers + @param priority: Priority for all added tasks or a list with the priority + for each task """ assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \ "Each task must be a sequence" + assert (isinstance(priority, (int, long)) or + compat.all(isinstance(prio, (int, long)) for prio in priority)), \ + "Priority must be numeric or be a list of numeric values" + + if isinstance(priority, (int, long)): + priority = [priority] * len(tasks) + elif len(priority) != len(tasks): + raise errors.ProgrammerError("Number of priorities (%s) doesn't match" + " number of tasks (%s)" % + (len(priority), len(tasks))) + self._lock.acquire() try: self._WaitWhileQuiescingUnlocked() - for args in tasks: - self._AddTaskUnlocked(args) + assert compat.all(isinstance(prio, (int, long)) for prio in priority) + assert len(tasks) == len(priority) + + for args, priority in zip(tasks, priority): + self._AddTaskUnlocked(args, priority) finally: self._lock.release() @@ -254,7 +345,7 @@ class WorkerPool(object): # Get task from queue and tell pool about it try: - return self._tasks.popleft() + return heapq.heappop(self._tasks) finally: self._worker_to_pool.notifyAll() diff --git a/test/ganeti.workerpool_unittest.py b/test/ganeti.workerpool_unittest.py index 586cc5e..6386862 100755 --- a/test/ganeti.workerpool_unittest.py +++ b/test/ganeti.workerpool_unittest.py @@ -1,7 +1,7 @@ #!/usr/bin/python # -# Copyright (C) 2008 Google Inc. +# Copyright (C) 2008, 2009, 2010 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -26,13 +26,15 @@ import threading import time import sys import zlib +import random from ganeti import workerpool +from ganeti import errors import testutils -class CountingContext(object): +class CountingContext(object): def __init__(self): self._lock = threading.Condition(threading.Lock()) self.done = 0 @@ -57,7 +59,6 @@ class CountingContext(object): class CountingBaseWorker(workerpool.BaseWorker): - def RunTask(self, ctx, text): ctx.DoneTask() @@ -83,6 +84,46 @@ class ChecksumBaseWorker(workerpool.BaseWorker): ctx.lock.release() +class ListBuilderContext: + def __init__(self): + self.lock = threading.Lock() + self.result = [] + self.prioresult = {} + + +class ListBuilderWorker(workerpool.BaseWorker): + def RunTask(self, ctx, data): + ctx.lock.acquire() + try: + ctx.result.append((self.GetCurrentPriority(), data)) + ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data) + finally: + ctx.lock.release() + + +class DeferringTaskContext: + def __init__(self): + self.lock = threading.Lock() + self.prioresult = {} + self.samepriodefer = {} + + +class DeferringWorker(workerpool.BaseWorker): + def RunTask(self, ctx, num, targetprio): + ctx.lock.acquire() + try: + if num in ctx.samepriodefer: + del ctx.samepriodefer[num] + raise workerpool.DeferTask() + + if self.GetCurrentPriority() > targetprio: + raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1) + + ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num) + finally: + ctx.lock.release() + + class TestWorkerpool(unittest.TestCase): """Workerpool tests""" @@ -206,6 +247,220 @@ class TestWorkerpool(unittest.TestCase): finally: wp._lock.release() + def testPriorityChecksum(self): + # Tests whether all tasks are run and, since we're only using a single + # thread, whether everything is started in order and respects the priority + wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker) + try: + self._CheckWorkerCount(wp, 1) + + ctx = ChecksumContext() + + data = {} + tasks = [] + priorities = [] + for i in range(1, 333): + prio = i % 7 + tasks.append((ctx, i)) + priorities.append(prio) + data.setdefault(prio, []).append(i) + + wp.AddManyTasks(tasks, priority=priorities) + + wp.Quiesce() + + self._CheckNoTasks(wp) + + # Check sum + ctx.lock.acquire() + try: + checksum = ChecksumContext.CHECKSUM_START + for priority in sorted(data.keys()): + for i in data[priority]: + checksum = ChecksumContext.UpdateChecksum(checksum, i) + + self.assertEqual(checksum, ctx.checksum) + finally: + ctx.lock.release() + + self._CheckWorkerCount(wp, 1) + finally: + wp.TerminateWorkers() + self._CheckWorkerCount(wp, 0) + + def testPriorityListManyTasks(self): + # Tests whether all tasks are run and, since we're only using a single + # thread, whether everything is started in order and respects the priority + wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker) + try: + self._CheckWorkerCount(wp, 1) + + ctx = ListBuilderContext() + + # Use static seed for this test + rnd = random.Random(0) + + data = {} + tasks = [] + priorities = [] + for i in range(1, 333): + prio = int(rnd.random() * 10) + tasks.append((ctx, i)) + priorities.append(prio) + data.setdefault(prio, []).append((prio, i)) + + wp.AddManyTasks(tasks, priority=priorities) + + self.assertRaises(errors.ProgrammerError, wp.AddManyTasks, + [("x", ), ("y", )], priority=[1] * 5) + + wp.Quiesce() + + self._CheckNoTasks(wp) + + # Check result + ctx.lock.acquire() + try: + expresult = [] + for priority in sorted(data.keys()): + expresult.extend(data[priority]) + + self.assertEqual(expresult, ctx.result) + finally: + ctx.lock.release() + + self._CheckWorkerCount(wp, 1) + finally: + wp.TerminateWorkers() + self._CheckWorkerCount(wp, 0) + + def testPriorityListSingleTasks(self): + # Tests whether all tasks are run and, since we're only using a single + # thread, whether everything is started in order and respects the priority + wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker) + try: + self._CheckWorkerCount(wp, 1) + + ctx = ListBuilderContext() + + # Use static seed for this test + rnd = random.Random(26279) + + data = {} + for i in range(1, 333): + prio = int(rnd.random() * 30) + wp.AddTask((ctx, i), priority=prio) + data.setdefault(prio, []).append(i) + + # Cause some distortion + if i % 11 == 0: + time.sleep(.001) + if i % 41 == 0: + wp.Quiesce() + + wp.Quiesce() + + self._CheckNoTasks(wp) + + # Check result + ctx.lock.acquire() + try: + self.assertEqual(data, ctx.prioresult) + finally: + ctx.lock.release() + + self._CheckWorkerCount(wp, 1) + finally: + wp.TerminateWorkers() + self._CheckWorkerCount(wp, 0) + + def testPriorityListSingleTasks(self): + # Tests whether all tasks are run and, since we're only using a single + # thread, whether everything is started in order and respects the priority + wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker) + try: + self._CheckWorkerCount(wp, 1) + + ctx = ListBuilderContext() + + # Use static seed for this test + rnd = random.Random(26279) + + data = {} + for i in range(1, 333): + prio = int(rnd.random() * 30) + wp.AddTask((ctx, i), priority=prio) + data.setdefault(prio, []).append(i) + + # Cause some distortion + if i % 11 == 0: + time.sleep(.001) + if i % 41 == 0: + wp.Quiesce() + + wp.Quiesce() + + self._CheckNoTasks(wp) + + # Check result + ctx.lock.acquire() + try: + self.assertEqual(data, ctx.prioresult) + finally: + ctx.lock.release() + + self._CheckWorkerCount(wp, 1) + finally: + wp.TerminateWorkers() + self._CheckWorkerCount(wp, 0) + + def testDeferTask(self): + # Tests whether all tasks are run and, since we're only using a single + # thread, whether everything is started in order and respects the priority + wp = workerpool.WorkerPool("Test", 1, DeferringWorker) + try: + self._CheckWorkerCount(wp, 1) + + ctx = DeferringTaskContext() + + # Use static seed for this test + rnd = random.Random(14921) + + data = {} + for i in range(1, 333): + ctx.lock.acquire() + try: + if i % 5 == 0: + ctx.samepriodefer[i] = True + finally: + ctx.lock.release() + + prio = int(rnd.random() * 30) + wp.AddTask((ctx, i, prio), priority=50) + data.setdefault(prio, set()).add(i) + + # Cause some distortion + if i % 24 == 0: + time.sleep(.001) + if i % 31 == 0: + wp.Quiesce() + + wp.Quiesce() + + self._CheckNoTasks(wp) + + # Check result + ctx.lock.acquire() + try: + self.assertEqual(data, ctx.prioresult) + finally: + ctx.lock.release() + + self._CheckWorkerCount(wp, 1) + finally: + wp.TerminateWorkers() + self._CheckWorkerCount(wp, 0) + if __name__ == '__main__': testutils.GanetiTestProgram() -- 1.7.10.4