logging.debug("Job %s is no longer waiting in the queue", self.id)
return (False, "Job %s is no longer waiting in the queue" % self.id)
+ def ChangePriority(self, priority):
+ """Changes the job priority.
+
+ @type priority: int
+ @param priority: New priority
+ @rtype: tuple; (bool, string)
+ @return: Boolean describing whether job's priority was successfully changed
+ and a text message
+
+ """
+ status = self.CalcStatus()
+
+ if status in constants.JOBS_FINALIZED:
+ return (False, "Job %s is finished" % self.id)
+ elif status == constants.JOB_STATUS_CANCELING:
+ return (False, "Job %s is cancelling" % self.id)
+ else:
+ assert status in (constants.JOB_STATUS_QUEUED,
+ constants.JOB_STATUS_WAITING,
+ constants.JOB_STATUS_RUNNING)
+
+ changed = False
+ for op in self.ops:
+ if (op.status == constants.OP_STATUS_RUNNING or
+ op.status in constants.OPS_FINALIZED):
+ assert not changed, \
+ ("Found opcode for which priority should not be changed after"
+ " priority has been changed for previous opcodes")
+ continue
+
+ assert op.status in (constants.OP_STATUS_QUEUED,
+ constants.OP_STATUS_WAITING)
+
+ changed = True
+
+ # Note: this also changes the on-disk priority ("op.priority" is only in
+ # memory)
+ op.input.priority = priority
+ op.priority = priority
+
+ if changed:
+ return (True, ("Priorities of pending opcodes for job %s have been"
+ " changed to %s" % (self.id, priority)))
+ else:
+ return (False, "Job %s had no pending opcodes" % self.id)
+
class _OpExecCallbacks(mcpu.OpExecCbBase):
def __init__(self, queue, job, op):
return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
+ @locking.ssynchronized(_LOCK)
+ @_RequireOpenQueue
+ def ChangeJobPriority(self, job_id, priority):
+ """Changes a job's priority.
+
+ @type job_id: int
+ @param job_id: ID of the job whose priority should be changed
+ @type priority: int
+ @param priority: New priority
+
+ """
+ logging.info("Changing priority of job %s to %s", job_id, priority)
+
+ if priority not in constants.OP_PRIO_SUBMIT_VALID:
+ allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
+ raise errors.GenericError("Invalid priority %s, allowed are %s" %
+ (priority, allowed))
+
+ def fn(job):
+ (success, msg) = job.ChangePriority(priority)
+
+ if success:
+ try:
+ self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
+ except workerpool.NoSuchTask:
+ logging.debug("Job %s is not in workerpool at this time", job.id)
+
+ return (success, msg)
+
+ return self._ModifyJobUnlocked(job_id, fn)
+
def _ModifyJobUnlocked(self, job_id, mod_fn):
"""Modifies a job.
import errno
import itertools
import random
+import operator
from ganeti import constants
from ganeti import utils
class TestQueuedJob(unittest.TestCase):
- def test(self):
+ def testNoOpCodes(self):
self.assertRaises(errors.GenericError, jqueue._QueuedJob,
None, 1, [], False)
job.ops[0].priority -= 19
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
+ def _JobForPriority(self, job_id):
+ ops = [
+ opcodes.OpTagsGet(),
+ opcodes.OpTestDelay(),
+ opcodes.OpTagsGet(),
+ opcodes.OpTestDelay(),
+ ]
+
+ job = jqueue._QueuedJob(None, job_id, ops, True)
+
+ self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
+ for op in job.ops))
+ self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+ self.assertFalse(compat.any(hasattr(op.input, "priority")
+ for op in job.ops))
+
+ return job
+
+ def testChangePriorityAllQueued(self):
+ job = self._JobForPriority(24984)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+ self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
+ for op in job.ops))
+ result = job.ChangePriority(-10)
+ self.assertEqual(job.CalcPriority(), -10)
+ self.assertTrue(compat.all(op.priority == -10 for op in job.ops))
+ self.assertTrue(compat.all(op.input.priority == -10 for op in job.ops))
+ self.assertEqual(result,
+ (True, ("Priorities of pending opcodes for job 24984 have"
+ " been changed to -10")))
+
+ def testChangePriorityAllFinished(self):
+ job = self._JobForPriority(16405)
+
+ for (idx, op) in enumerate(job.ops):
+ if idx > 2:
+ op.status = constants.OP_STATUS_ERROR
+ else:
+ op.status = constants.OP_STATUS_SUCCESS
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
+ self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+ result = job.ChangePriority(-10)
+ self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+ self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
+ for op in job.ops))
+ self.assertFalse(compat.any(hasattr(op.input, "priority")
+ for op in job.ops))
+ self.assertEqual(map(operator.attrgetter("status"), job.ops), [
+ constants.OP_STATUS_SUCCESS,
+ constants.OP_STATUS_SUCCESS,
+ constants.OP_STATUS_SUCCESS,
+ constants.OP_STATUS_ERROR,
+ ])
+ self.assertEqual(result, (False, "Job 16405 is finished"))
+
+ def testChangePriorityCancelling(self):
+ job = self._JobForPriority(31572)
+
+ for (idx, op) in enumerate(job.ops):
+ if idx > 1:
+ op.status = constants.OP_STATUS_CANCELING
+ else:
+ op.status = constants.OP_STATUS_SUCCESS
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELING)
+ self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+ result = job.ChangePriority(5)
+ self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+ self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
+ for op in job.ops))
+ self.assertFalse(compat.any(hasattr(op.input, "priority")
+ for op in job.ops))
+ self.assertEqual(map(operator.attrgetter("status"), job.ops), [
+ constants.OP_STATUS_SUCCESS,
+ constants.OP_STATUS_SUCCESS,
+ constants.OP_STATUS_CANCELING,
+ constants.OP_STATUS_CANCELING,
+ ])
+ self.assertEqual(result, (False, "Job 31572 is cancelling"))
+
+ def testChangePriorityFirstRunning(self):
+ job = self._JobForPriority(1716215889)
+
+ for (idx, op) in enumerate(job.ops):
+ if idx == 0:
+ op.status = constants.OP_STATUS_RUNNING
+ else:
+ op.status = constants.OP_STATUS_QUEUED
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+ self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+ result = job.ChangePriority(7)
+ self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+ self.assertEqual(map(operator.attrgetter("priority"), job.ops),
+ [constants.OP_PRIO_DEFAULT, 7, 7, 7])
+ self.assertEqual([getattr(op.input, "priority", None) for op in job.ops],
+ [None, 7, 7, 7])
+ self.assertEqual(map(operator.attrgetter("status"), job.ops), [
+ constants.OP_STATUS_RUNNING,
+ constants.OP_STATUS_QUEUED,
+ constants.OP_STATUS_QUEUED,
+ constants.OP_STATUS_QUEUED,
+ ])
+ self.assertEqual(result,
+ (True, ("Priorities of pending opcodes for job"
+ " 1716215889 have been changed to 7")))
+
+ def testChangePriorityLastRunning(self):
+ job = self._JobForPriority(1308)
+
+ for (idx, op) in enumerate(job.ops):
+ if idx == (len(job.ops) - 1):
+ op.status = constants.OP_STATUS_RUNNING
+ else:
+ op.status = constants.OP_STATUS_SUCCESS
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+ self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+ result = job.ChangePriority(-3)
+ self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+ self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
+ for op in job.ops))
+ self.assertFalse(compat.any(hasattr(op.input, "priority")
+ for op in job.ops))
+ self.assertEqual(map(operator.attrgetter("status"), job.ops), [
+ constants.OP_STATUS_SUCCESS,
+ constants.OP_STATUS_SUCCESS,
+ constants.OP_STATUS_SUCCESS,
+ constants.OP_STATUS_RUNNING,
+ ])
+ self.assertEqual(result, (False, "Job 1308 had no pending opcodes"))
+
+ def testChangePrioritySecondOpcodeRunning(self):
+ job = self._JobForPriority(27701)
+
+ self.assertEqual(len(job.ops), 4)
+ job.ops[0].status = constants.OP_STATUS_SUCCESS
+ job.ops[1].status = constants.OP_STATUS_RUNNING
+ job.ops[2].status = constants.OP_STATUS_QUEUED
+ job.ops[3].status = constants.OP_STATUS_QUEUED
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+ result = job.ChangePriority(-19)
+ self.assertEqual(job.CalcPriority(), -19)
+ self.assertEqual(map(operator.attrgetter("priority"), job.ops),
+ [constants.OP_PRIO_DEFAULT, constants.OP_PRIO_DEFAULT,
+ -19, -19])
+ self.assertEqual([getattr(op.input, "priority", None) for op in job.ops],
+ [None, None, -19, -19])
+ self.assertEqual(map(operator.attrgetter("status"), job.ops), [
+ constants.OP_STATUS_SUCCESS,
+ constants.OP_STATUS_RUNNING,
+ constants.OP_STATUS_QUEUED,
+ constants.OP_STATUS_QUEUED,
+ ])
+ self.assertEqual(result,
+ (True, ("Priorities of pending opcodes for job"
+ " 27701 have been changed to -19")))
+
+ def testChangePriorityWithInconsistentJob(self):
+ job = self._JobForPriority(30097)
+
+ self.assertEqual(len(job.ops), 4)
+
+ # This job is invalid (as it has two opcodes marked as running) and make
+ # the call fail because an unprocessed opcode precedes a running one (which
+ # should never happen in reality)
+ job.ops[0].status = constants.OP_STATUS_SUCCESS
+ job.ops[1].status = constants.OP_STATUS_RUNNING
+ job.ops[2].status = constants.OP_STATUS_QUEUED
+ job.ops[3].status = constants.OP_STATUS_RUNNING
+
+ self.assertRaises(AssertionError, job.ChangePriority, 19)
+
def testCalcStatus(self):
def _Queued(ops):
# The default status is "queued"
self.assertRaises(IndexError, self.queue.GetNextUpdate)
+class TestJobProcessorChangePriority(unittest.TestCase, _JobProcessorTestUtils):
+ def setUp(self):
+ self.queue = _FakeQueueForProc()
+ self.opexecprio = []
+
+ def _BeforeStart(self, timeout, priority):
+ self.assertFalse(self.queue.IsAcquired())
+ self.opexecprio.append(priority)
+
+ def testChangePriorityWhileRunning(self):
+ # Tests changing the priority on a job while it has finished opcodes
+ # (successful) and more, unprocessed ones
+ ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+ for i in range(3)]
+
+ # Create job
+ job_id = 3499
+ job = self._CreateJob(self.queue, job_id, ops)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+ opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart, None)
+
+ # Run first opcode
+ self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
+ jqueue._JobProcessor.DEFER)
+
+ # Job goes back to queued
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+ self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+ self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+ [[constants.OP_STATUS_SUCCESS,
+ constants.OP_STATUS_QUEUED,
+ constants.OP_STATUS_QUEUED],
+ ["Res0", None, None]])
+
+ self.assertEqual(self.opexecprio.pop(0), constants.OP_PRIO_DEFAULT)
+ self.assertRaises(IndexError, self.opexecprio.pop, 0)
+
+ # Change priority
+ self.assertEqual(job.ChangePriority(-10),
+ (True,
+ ("Priorities of pending opcodes for job 3499 have"
+ " been changed to -10")))
+ self.assertEqual(job.CalcPriority(), -10)
+
+ # Process second opcode
+ self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
+ jqueue._JobProcessor.DEFER)
+
+ self.assertEqual(self.opexecprio.pop(0), -10)
+ self.assertRaises(IndexError, self.opexecprio.pop, 0)
+
+ # Check status
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+ self.assertEqual(job.CalcPriority(), -10)
+ self.assertEqual(job.GetInfo(["id"]), [job_id])
+ self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
+ self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+ [[constants.OP_STATUS_SUCCESS,
+ constants.OP_STATUS_SUCCESS,
+ constants.OP_STATUS_QUEUED],
+ ["Res0", "Res1", None]])
+
+ # Change priority once more
+ self.assertEqual(job.ChangePriority(5),
+ (True,
+ ("Priorities of pending opcodes for job 3499 have"
+ " been changed to 5")))
+ self.assertEqual(job.CalcPriority(), 5)
+
+ # Process third opcode
+ self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
+
+ self.assertEqual(self.opexecprio.pop(0), 5)
+ self.assertRaises(IndexError, self.opexecprio.pop, 0)
+
+ # Check status
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+ self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+ self.assertEqual(job.GetInfo(["id"]), [job_id])
+ self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+ self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+ [[constants.OP_STATUS_SUCCESS,
+ constants.OP_STATUS_SUCCESS,
+ constants.OP_STATUS_SUCCESS],
+ ["Res0", "Res1", "Res2"]])
+ self.assertEqual(map(operator.attrgetter("priority"), job.ops),
+ [constants.OP_PRIO_DEFAULT, -10, 5])
+
+
class _IdOnlyFakeJob:
def __init__(self, job_id, priority=NotImplemented):
self.id = str(job_id)