Core changes
------------
+Job priorities
+~~~~~~~~~~~~~~
+
+Current state and shortcomings
+++++++++++++++++++++++++++++++
+
+.. TODO: Describe current situation
+
+Proposed changes
+++++++++++++++++
+
+.. TODO: Describe changes to job queue and potentially client programs
+
+Worker pool
+^^^^^^^^^^^
+
+To support job priorities in the job queue, the worker pool underlying
+the job queue must be enhanced to support task priorities. Currently
+tasks are processed in the order they are added to the queue (but, due
+to their nature, they don't necessarily finish in that order). All tasks
+are equal. To support tasks with higher or lower priority, a few changes
+have to be made to the queue inside a worker pool.
+
+Each task is assigned a priority when added to the queue. This priority
+can not be changed until the task is executed (this is fine as in all
+current use-cases, tasks are added to a pool and then forgotten about
+until they're done).
+
+A task's priority can be compared to Unix' process priorities. The lower
+the priority number, the closer to the queue's front it is. A task with
+priority 0 is going to be run before one with priority 10. Tasks with
+the same priority are executed in the order in which they were added.
+
+While a task is running it can query its own priority. If it's not ready
+yet for finishing, it can raise an exception to defer itself, optionally
+changing its own priority. This is useful for the following cases:
+
+- A task is trying to acquire locks, but those locks are still held by
+ other tasks. By deferring itself, the task gives others a chance to
+ run. This is especially useful when all workers are busy.
+- If a task decides it hasn't gotten its locks in a long time, it can
+ start to increase its own priority.
+- Tasks waiting for long-running operations running asynchronously could
+ defer themselves while waiting for a long-running operation.
+
+With these changes, the job queue will be able to implement per-job
+priorities.
+
Feature changes
---------------
"""
-import collections
import logging
import threading
+import heapq
from ganeti import compat
+from ganeti import errors
_TERMINATE = object()
+_DEFAULT_PRIORITY = 0
+
+
+class DeferTask(Exception):
+ """Special exception class to defer a task.
+
+ This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
+ task. Optionally, the priority of the task can be changed.
+
+ """
+ def __init__(self, priority=None):
+ """Initializes this class.
+
+ @type priority: number
+ @param priority: New task priority (None means no change)
+
+ """
+ Exception.__init__(self)
+ self.priority = priority
class BaseWorker(threading.Thread, object):
finally:
self.pool._lock.release()
+ def GetCurrentPriority(self):
+ """Returns the priority of the current task.
+
+ Should only be called from within L{RunTask}.
+
+ """
+ self.pool._lock.acquire()
+ try:
+ assert self._HasRunningTaskUnlocked()
+
+ (priority, _, _) = self._current_task
+
+ return priority
+ finally:
+ self.pool._lock.release()
+
def _HasRunningTaskUnlocked(self):
"""Returns whether this worker is currently running a task.
while True:
assert self._current_task is None
+
+ defer = None
try:
# Wait on lock to be told either to terminate or to do a task
pool._lock.acquire()
finally:
pool._lock.release()
- # Run the actual task
+ (priority, _, args) = self._current_task
try:
- logging.debug("Starting task %r", self._current_task)
- self.RunTask(*self._current_task)
- logging.debug("Done with task %r", self._current_task)
+ # Run the actual task
+ assert defer is None
+ logging.debug("Starting task %r, priority %s", args, priority)
+ self.RunTask(*args) # pylint: disable-msg=W0142
+ logging.debug("Done with task %r, priority %s", args, priority)
+ except DeferTask, err:
+ defer = err
+
+ if defer.priority is None:
+ # Use same priority
+ defer.priority = priority
+
+ logging.debug("Deferring task %r, new priority %s", defer.priority)
+
+ assert self._HasRunningTaskUnlocked()
+
except: # pylint: disable-msg=W0702
logging.exception("Caught unhandled exception")
# Notify pool
pool._lock.acquire()
try:
+ if defer:
+ assert self._current_task
+ # Schedule again for later run
+ (_, _, args) = self._current_task
+ pool._AddTaskUnlocked(args, defer.priority)
+
if self._current_task:
self._current_task = None
pool._worker_to_pool.notifyAll()
self._termworkers = []
# Queued tasks
- self._tasks = collections.deque()
+ self._counter = 0
+ self._tasks = []
# Start workers
self.Resize(num_workers)
while self._quiescing:
self._pool_to_pool.wait()
- def _AddTaskUnlocked(self, args):
+ def _AddTaskUnlocked(self, args, priority):
+ """Adds a task to the internal queue.
+
+ @type args: sequence
+ @param args: Arguments passed to L{BaseWorker.RunTask}
+ @type priority: number
+ @param priority: Task priority
+
+ """
assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
+ assert isinstance(priority, (int, long)), "Priority must be numeric"
- self._tasks.append(args)
+ # This counter is used to ensure elements are processed in their
+ # incoming order. For processing they're sorted by priority and then
+ # counter.
+ self._counter += 1
+
+ heapq.heappush(self._tasks, (priority, self._counter, args))
# Notify a waiting worker
self._pool_to_worker.notify()
- def AddTask(self, args):
+ def AddTask(self, args, priority=_DEFAULT_PRIORITY):
"""Adds a task to the queue.
@type args: sequence
@param args: arguments passed to L{BaseWorker.RunTask}
+ @type priority: number
+ @param priority: Task priority
"""
self._lock.acquire()
try:
self._WaitWhileQuiescingUnlocked()
- self._AddTaskUnlocked(args)
+ self._AddTaskUnlocked(args, priority)
finally:
self._lock.release()
- def AddManyTasks(self, tasks):
+ def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY):
"""Add a list of tasks to the queue.
@type tasks: list of tuples
@param tasks: list of args passed to L{BaseWorker.RunTask}
+ @type priority: number or list of numbers
+ @param priority: Priority for all added tasks or a list with the priority
+ for each task
"""
assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
"Each task must be a sequence"
+ assert (isinstance(priority, (int, long)) or
+ compat.all(isinstance(prio, (int, long)) for prio in priority)), \
+ "Priority must be numeric or be a list of numeric values"
+
+ if isinstance(priority, (int, long)):
+ priority = [priority] * len(tasks)
+ elif len(priority) != len(tasks):
+ raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
+ " number of tasks (%s)" %
+ (len(priority), len(tasks)))
+
self._lock.acquire()
try:
self._WaitWhileQuiescingUnlocked()
- for args in tasks:
- self._AddTaskUnlocked(args)
+ assert compat.all(isinstance(prio, (int, long)) for prio in priority)
+ assert len(tasks) == len(priority)
+
+ for args, priority in zip(tasks, priority):
+ self._AddTaskUnlocked(args, priority)
finally:
self._lock.release()
# Get task from queue and tell pool about it
try:
- return self._tasks.popleft()
+ return heapq.heappop(self._tasks)
finally:
self._worker_to_pool.notifyAll()
#!/usr/bin/python
#
-# Copyright (C) 2008 Google Inc.
+# Copyright (C) 2008, 2009, 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
import time
import sys
import zlib
+import random
from ganeti import workerpool
+from ganeti import errors
import testutils
-class CountingContext(object):
+class CountingContext(object):
def __init__(self):
self._lock = threading.Condition(threading.Lock())
self.done = 0
class CountingBaseWorker(workerpool.BaseWorker):
-
def RunTask(self, ctx, text):
ctx.DoneTask()
ctx.lock.release()
+class ListBuilderContext:
+ def __init__(self):
+ self.lock = threading.Lock()
+ self.result = []
+ self.prioresult = {}
+
+
+class ListBuilderWorker(workerpool.BaseWorker):
+ def RunTask(self, ctx, data):
+ ctx.lock.acquire()
+ try:
+ ctx.result.append((self.GetCurrentPriority(), data))
+ ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
+ finally:
+ ctx.lock.release()
+
+
+class DeferringTaskContext:
+ def __init__(self):
+ self.lock = threading.Lock()
+ self.prioresult = {}
+ self.samepriodefer = {}
+
+
+class DeferringWorker(workerpool.BaseWorker):
+ def RunTask(self, ctx, num, targetprio):
+ ctx.lock.acquire()
+ try:
+ if num in ctx.samepriodefer:
+ del ctx.samepriodefer[num]
+ raise workerpool.DeferTask()
+
+ if self.GetCurrentPriority() > targetprio:
+ raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)
+
+ ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
+ finally:
+ ctx.lock.release()
+
+
class TestWorkerpool(unittest.TestCase):
"""Workerpool tests"""
finally:
wp._lock.release()
+ def testPriorityChecksum(self):
+ # Tests whether all tasks are run and, since we're only using a single
+ # thread, whether everything is started in order and respects the priority
+ wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
+ try:
+ self._CheckWorkerCount(wp, 1)
+
+ ctx = ChecksumContext()
+
+ data = {}
+ tasks = []
+ priorities = []
+ for i in range(1, 333):
+ prio = i % 7
+ tasks.append((ctx, i))
+ priorities.append(prio)
+ data.setdefault(prio, []).append(i)
+
+ wp.AddManyTasks(tasks, priority=priorities)
+
+ wp.Quiesce()
+
+ self._CheckNoTasks(wp)
+
+ # Check sum
+ ctx.lock.acquire()
+ try:
+ checksum = ChecksumContext.CHECKSUM_START
+ for priority in sorted(data.keys()):
+ for i in data[priority]:
+ checksum = ChecksumContext.UpdateChecksum(checksum, i)
+
+ self.assertEqual(checksum, ctx.checksum)
+ finally:
+ ctx.lock.release()
+
+ self._CheckWorkerCount(wp, 1)
+ finally:
+ wp.TerminateWorkers()
+ self._CheckWorkerCount(wp, 0)
+
+ def testPriorityListManyTasks(self):
+ # Tests whether all tasks are run and, since we're only using a single
+ # thread, whether everything is started in order and respects the priority
+ wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
+ try:
+ self._CheckWorkerCount(wp, 1)
+
+ ctx = ListBuilderContext()
+
+ # Use static seed for this test
+ rnd = random.Random(0)
+
+ data = {}
+ tasks = []
+ priorities = []
+ for i in range(1, 333):
+ prio = int(rnd.random() * 10)
+ tasks.append((ctx, i))
+ priorities.append(prio)
+ data.setdefault(prio, []).append((prio, i))
+
+ wp.AddManyTasks(tasks, priority=priorities)
+
+ self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
+ [("x", ), ("y", )], priority=[1] * 5)
+
+ wp.Quiesce()
+
+ self._CheckNoTasks(wp)
+
+ # Check result
+ ctx.lock.acquire()
+ try:
+ expresult = []
+ for priority in sorted(data.keys()):
+ expresult.extend(data[priority])
+
+ self.assertEqual(expresult, ctx.result)
+ finally:
+ ctx.lock.release()
+
+ self._CheckWorkerCount(wp, 1)
+ finally:
+ wp.TerminateWorkers()
+ self._CheckWorkerCount(wp, 0)
+
+ def testPriorityListSingleTasks(self):
+ # Tests whether all tasks are run and, since we're only using a single
+ # thread, whether everything is started in order and respects the priority
+ wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
+ try:
+ self._CheckWorkerCount(wp, 1)
+
+ ctx = ListBuilderContext()
+
+ # Use static seed for this test
+ rnd = random.Random(26279)
+
+ data = {}
+ for i in range(1, 333):
+ prio = int(rnd.random() * 30)
+ wp.AddTask((ctx, i), priority=prio)
+ data.setdefault(prio, []).append(i)
+
+ # Cause some distortion
+ if i % 11 == 0:
+ time.sleep(.001)
+ if i % 41 == 0:
+ wp.Quiesce()
+
+ wp.Quiesce()
+
+ self._CheckNoTasks(wp)
+
+ # Check result
+ ctx.lock.acquire()
+ try:
+ self.assertEqual(data, ctx.prioresult)
+ finally:
+ ctx.lock.release()
+
+ self._CheckWorkerCount(wp, 1)
+ finally:
+ wp.TerminateWorkers()
+ self._CheckWorkerCount(wp, 0)
+
+ def testPriorityListSingleTasks(self):
+ # Tests whether all tasks are run and, since we're only using a single
+ # thread, whether everything is started in order and respects the priority
+ wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
+ try:
+ self._CheckWorkerCount(wp, 1)
+
+ ctx = ListBuilderContext()
+
+ # Use static seed for this test
+ rnd = random.Random(26279)
+
+ data = {}
+ for i in range(1, 333):
+ prio = int(rnd.random() * 30)
+ wp.AddTask((ctx, i), priority=prio)
+ data.setdefault(prio, []).append(i)
+
+ # Cause some distortion
+ if i % 11 == 0:
+ time.sleep(.001)
+ if i % 41 == 0:
+ wp.Quiesce()
+
+ wp.Quiesce()
+
+ self._CheckNoTasks(wp)
+
+ # Check result
+ ctx.lock.acquire()
+ try:
+ self.assertEqual(data, ctx.prioresult)
+ finally:
+ ctx.lock.release()
+
+ self._CheckWorkerCount(wp, 1)
+ finally:
+ wp.TerminateWorkers()
+ self._CheckWorkerCount(wp, 0)
+
+ def testDeferTask(self):
+ # Tests whether all tasks are run and, since we're only using a single
+ # thread, whether everything is started in order and respects the priority
+ wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
+ try:
+ self._CheckWorkerCount(wp, 1)
+
+ ctx = DeferringTaskContext()
+
+ # Use static seed for this test
+ rnd = random.Random(14921)
+
+ data = {}
+ for i in range(1, 333):
+ ctx.lock.acquire()
+ try:
+ if i % 5 == 0:
+ ctx.samepriodefer[i] = True
+ finally:
+ ctx.lock.release()
+
+ prio = int(rnd.random() * 30)
+ wp.AddTask((ctx, i, prio), priority=50)
+ data.setdefault(prio, set()).add(i)
+
+ # Cause some distortion
+ if i % 24 == 0:
+ time.sleep(.001)
+ if i % 31 == 0:
+ wp.Quiesce()
+
+ wp.Quiesce()
+
+ self._CheckNoTasks(wp)
+
+ # Check result
+ ctx.lock.acquire()
+ try:
+ self.assertEqual(data, ctx.prioresult)
+ finally:
+ ctx.lock.release()
+
+ self._CheckWorkerCount(wp, 1)
+ finally:
+ wp.TerminateWorkers()
+ self._CheckWorkerCount(wp, 0)
+
if __name__ == '__main__':
testutils.GanetiTestProgram()