"""
return (self._current_task is not None)
+ def _GetCurrentOrderAndTaskId(self):
+ """Returns the order and task ID of the current task.
+
+ Should only be called from within L{RunTask}.
+
+ """
+ self.pool._lock.acquire()
+ try:
+ assert self._HasRunningTaskUnlocked()
+
+ (_, order_id, task_id, _) = self._current_task
+
+ return (order_id, task_id)
+ finally:
+ self.pool._lock.release()
+
def run(self):
"""Main thread function.
if defer:
assert self._current_task
# Schedule again for later run
- (_, _, _, args) = self._current_task
- pool._AddTaskUnlocked(args, defer.priority, None)
+ (_, _, task_id, args) = self._current_task
+ pool._AddTaskUnlocked(args, defer.priority, task_id)
if self._current_task:
self._current_task = None
"""
assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
assert isinstance(priority, (int, long)), "Priority must be numeric"
+ assert task_id is None or isinstance(task_id, (int, long)), \
+ "Task ID must be numeric or None"
task = [priority, self._counter.next(), task_id, args]
from ganeti import workerpool
from ganeti import errors
from ganeti import utils
+from ganeti import compat
import testutils
self.lock = threading.Lock()
self.prioresult = {}
self.samepriodefer = {}
+ self.num2ordertaskid = {}
class DeferringWorker(workerpool.BaseWorker):
def RunTask(self, ctx, num, targetprio):
ctx.lock.acquire()
try:
+ otilst = ctx.num2ordertaskid.setdefault(num, [])
+ otilst.append(self._GetCurrentOrderAndTaskId())
+
if num in ctx.samepriodefer:
del ctx.samepriodefer[num]
raise workerpool.DeferTask()
rnd = random.Random(14921)
data = {}
+ num2taskid = {}
for i in range(1, 333):
ctx.lock.acquire()
try:
ctx.lock.release()
prio = int(rnd.random() * 30)
- wp.AddTask((ctx, i, prio), priority=50)
+ num2taskid[i] = 1000 * i
+ wp.AddTask((ctx, i, prio), priority=50,
+ task_id=num2taskid[i])
data.setdefault(prio, set()).add(i)
# Cause some distortion
ctx.lock.acquire()
try:
self.assertEqual(data, ctx.prioresult)
+
+ all_order_ids = []
+
+ for (num, numordertaskid) in ctx.num2ordertaskid.items():
+ order_ids = map(compat.fst, numordertaskid)
+ self.assertFalse(utils.FindDuplicates(order_ids),
+ msg="Order ID has been reused")
+ all_order_ids.extend(order_ids)
+
+ for task_id in map(compat.snd, numordertaskid):
+ self.assertEqual(task_id, num2taskid[num],
+ msg=("Task %s used different task IDs" % num))
+
+ self.assertFalse(utils.FindDuplicates(all_order_ids),
+ msg="Order ID has been reused")
finally:
ctx.lock.release()