jqueue: Allow changing of job priority
authorMichael Hanselmann <hansmi@google.com>
Wed, 24 Oct 2012 01:45:23 +0000 (03:45 +0200)
committerMichael Hanselmann <hansmi@google.com>
Tue, 13 Nov 2012 19:20:15 +0000 (20:20 +0100)
This is due to a feature request. Sometimes one wants to change the
priority of a job after it has been submitted, e.g. after submitting an
important job only to later notice many other pending jobs which will be
processed first. Priority changes only take effect at the next lock
acquisition or when the job is re-scheduled.

The design is very similar to how jobs are cancelled.

Unit tests for “_QueuedJob.ChangePriority” are included.

Also rename “TestQueuedJob.test” to “TestQueuedJob.testError”.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Bernardo Dal Seno <bdalseno@google.com>

lib/jqueue.py
test/ganeti.jqueue_unittest.py

index 3319a0a..d1b3b52 100644 (file)
@@ -482,6 +482,52 @@ class _QueuedJob(object):
       logging.debug("Job %s is no longer waiting in the queue", self.id)
       return (False, "Job %s is no longer waiting in the queue" % self.id)
 
+  def ChangePriority(self, priority):
+    """Changes the job priority.
+
+    @type priority: int
+    @param priority: New priority
+    @rtype: tuple; (bool, string)
+    @return: Boolean describing whether job's priority was successfully changed
+      and a text message
+
+    """
+    status = self.CalcStatus()
+
+    if status in constants.JOBS_FINALIZED:
+      return (False, "Job %s is finished" % self.id)
+    elif status == constants.JOB_STATUS_CANCELING:
+      return (False, "Job %s is cancelling" % self.id)
+    else:
+      assert status in (constants.JOB_STATUS_QUEUED,
+                        constants.JOB_STATUS_WAITING,
+                        constants.JOB_STATUS_RUNNING)
+
+      changed = False
+      for op in self.ops:
+        if (op.status == constants.OP_STATUS_RUNNING or
+            op.status in constants.OPS_FINALIZED):
+          assert not changed, \
+            ("Found opcode for which priority should not be changed after"
+             " priority has been changed for previous opcodes")
+          continue
+
+        assert op.status in (constants.OP_STATUS_QUEUED,
+                             constants.OP_STATUS_WAITING)
+
+        changed = True
+
+        # Note: this also changes the on-disk priority ("op.priority" is only in
+        # memory)
+        op.input.priority = priority
+        op.priority = priority
+
+      if changed:
+        return (True, ("Priorities of pending opcodes for job %s have been"
+                       " changed to %s" % (self.id, priority)))
+      else:
+        return (False, "Job %s had no pending opcodes" % self.id)
+
 
 class _OpExecCallbacks(mcpu.OpExecCbBase):
   def __init__(self, queue, job, op):
@@ -2356,6 +2402,37 @@ class JobQueue(object):
 
     return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
 
+  @locking.ssynchronized(_LOCK)
+  @_RequireOpenQueue
+  def ChangeJobPriority(self, job_id, priority):
+    """Changes a job's priority.
+
+    @type job_id: int
+    @param job_id: ID of the job whose priority should be changed
+    @type priority: int
+    @param priority: New priority
+
+    """
+    logging.info("Changing priority of job %s to %s", job_id, priority)
+
+    if priority not in constants.OP_PRIO_SUBMIT_VALID:
+      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
+      raise errors.GenericError("Invalid priority %s, allowed are %s" %
+                                (priority, allowed))
+
+    def fn(job):
+      (success, msg) = job.ChangePriority(priority)
+
+      if success:
+        try:
+          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
+        except workerpool.NoSuchTask:
+          logging.debug("Job %s is not in workerpool at this time", job.id)
+
+      return (success, msg)
+
+    return self._ModifyJobUnlocked(job_id, fn)
+
   def _ModifyJobUnlocked(self, job_id, mod_fn):
     """Modifies a job.
 
index a5b19ad..34cb71b 100755 (executable)
@@ -29,6 +29,7 @@ import shutil
 import errno
 import itertools
 import random
+import operator
 
 from ganeti import constants
 from ganeti import utils
@@ -281,7 +282,7 @@ class TestQueuedOpCode(unittest.TestCase):
 
 
 class TestQueuedJob(unittest.TestCase):
-  def test(self):
+  def testNoOpCodes(self):
     self.assertRaises(errors.GenericError, jqueue._QueuedJob,
                       None, 1, [], False)
 
@@ -371,6 +372,181 @@ class TestQueuedJob(unittest.TestCase):
     job.ops[0].priority -= 19
     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
 
+  def _JobForPriority(self, job_id):
+    ops = [
+      opcodes.OpTagsGet(),
+      opcodes.OpTestDelay(),
+      opcodes.OpTagsGet(),
+      opcodes.OpTestDelay(),
+      ]
+
+    job = jqueue._QueuedJob(None, job_id, ops, True)
+
+    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
+                               for op in job.ops))
+    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+    self.assertFalse(compat.any(hasattr(op.input, "priority")
+                                for op in job.ops))
+
+    return job
+
+  def testChangePriorityAllQueued(self):
+    job = self._JobForPriority(24984)
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+    self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
+                               for op in job.ops))
+    result = job.ChangePriority(-10)
+    self.assertEqual(job.CalcPriority(), -10)
+    self.assertTrue(compat.all(op.priority == -10 for op in job.ops))
+    self.assertTrue(compat.all(op.input.priority == -10 for op in job.ops))
+    self.assertEqual(result,
+                     (True, ("Priorities of pending opcodes for job 24984 have"
+                             " been changed to -10")))
+
+  def testChangePriorityAllFinished(self):
+    job = self._JobForPriority(16405)
+
+    for (idx, op) in enumerate(job.ops):
+      if idx > 2:
+        op.status = constants.OP_STATUS_ERROR
+      else:
+        op.status = constants.OP_STATUS_SUCCESS
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
+    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+    result = job.ChangePriority(-10)
+    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
+                               for op in job.ops))
+    self.assertFalse(compat.any(hasattr(op.input, "priority")
+                                for op in job.ops))
+    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
+      constants.OP_STATUS_SUCCESS,
+      constants.OP_STATUS_SUCCESS,
+      constants.OP_STATUS_SUCCESS,
+      constants.OP_STATUS_ERROR,
+      ])
+    self.assertEqual(result, (False, "Job 16405 is finished"))
+
+  def testChangePriorityCancelling(self):
+    job = self._JobForPriority(31572)
+
+    for (idx, op) in enumerate(job.ops):
+      if idx > 1:
+        op.status = constants.OP_STATUS_CANCELING
+      else:
+        op.status = constants.OP_STATUS_SUCCESS
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELING)
+    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+    result = job.ChangePriority(5)
+    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
+                               for op in job.ops))
+    self.assertFalse(compat.any(hasattr(op.input, "priority")
+                                for op in job.ops))
+    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
+      constants.OP_STATUS_SUCCESS,
+      constants.OP_STATUS_SUCCESS,
+      constants.OP_STATUS_CANCELING,
+      constants.OP_STATUS_CANCELING,
+      ])
+    self.assertEqual(result, (False, "Job 31572 is cancelling"))
+
+  def testChangePriorityFirstRunning(self):
+    job = self._JobForPriority(1716215889)
+
+    for (idx, op) in enumerate(job.ops):
+      if idx == 0:
+        op.status = constants.OP_STATUS_RUNNING
+      else:
+        op.status = constants.OP_STATUS_QUEUED
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+    result = job.ChangePriority(7)
+    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
+                     [constants.OP_PRIO_DEFAULT, 7, 7, 7])
+    self.assertEqual([getattr(op.input, "priority", None) for op in job.ops],
+                     [None, 7, 7, 7])
+    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
+      constants.OP_STATUS_RUNNING,
+      constants.OP_STATUS_QUEUED,
+      constants.OP_STATUS_QUEUED,
+      constants.OP_STATUS_QUEUED,
+      ])
+    self.assertEqual(result,
+                     (True, ("Priorities of pending opcodes for job"
+                             " 1716215889 have been changed to 7")))
+
+  def testChangePriorityLastRunning(self):
+    job = self._JobForPriority(1308)
+
+    for (idx, op) in enumerate(job.ops):
+      if idx == (len(job.ops) - 1):
+        op.status = constants.OP_STATUS_RUNNING
+      else:
+        op.status = constants.OP_STATUS_SUCCESS
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+    result = job.ChangePriority(-3)
+    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
+                               for op in job.ops))
+    self.assertFalse(compat.any(hasattr(op.input, "priority")
+                                for op in job.ops))
+    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
+      constants.OP_STATUS_SUCCESS,
+      constants.OP_STATUS_SUCCESS,
+      constants.OP_STATUS_SUCCESS,
+      constants.OP_STATUS_RUNNING,
+      ])
+    self.assertEqual(result, (False, "Job 1308 had no pending opcodes"))
+
+  def testChangePrioritySecondOpcodeRunning(self):
+    job = self._JobForPriority(27701)
+
+    self.assertEqual(len(job.ops), 4)
+    job.ops[0].status = constants.OP_STATUS_SUCCESS
+    job.ops[1].status = constants.OP_STATUS_RUNNING
+    job.ops[2].status = constants.OP_STATUS_QUEUED
+    job.ops[3].status = constants.OP_STATUS_QUEUED
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+    result = job.ChangePriority(-19)
+    self.assertEqual(job.CalcPriority(), -19)
+    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
+                     [constants.OP_PRIO_DEFAULT, constants.OP_PRIO_DEFAULT,
+                      -19, -19])
+    self.assertEqual([getattr(op.input, "priority", None) for op in job.ops],
+                     [None, None, -19, -19])
+    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
+      constants.OP_STATUS_SUCCESS,
+      constants.OP_STATUS_RUNNING,
+      constants.OP_STATUS_QUEUED,
+      constants.OP_STATUS_QUEUED,
+      ])
+    self.assertEqual(result,
+                     (True, ("Priorities of pending opcodes for job"
+                             " 27701 have been changed to -19")))
+
+  def testChangePriorityWithInconsistentJob(self):
+    job = self._JobForPriority(30097)
+
+    self.assertEqual(len(job.ops), 4)
+
+    # This job is invalid (as it has two opcodes marked as running) and make
+    # the call fail because an unprocessed opcode precedes a running one (which
+    # should never happen in reality)
+    job.ops[0].status = constants.OP_STATUS_SUCCESS
+    job.ops[1].status = constants.OP_STATUS_RUNNING
+    job.ops[2].status = constants.OP_STATUS_QUEUED
+    job.ops[3].status = constants.OP_STATUS_RUNNING
+
+    self.assertRaises(AssertionError, job.ChangePriority, 19)
+
   def testCalcStatus(self):
     def _Queued(ops):
       # The default status is "queued"
@@ -2105,6 +2281,98 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
     self.assertRaises(IndexError, self.queue.GetNextUpdate)
 
 
+class TestJobProcessorChangePriority(unittest.TestCase, _JobProcessorTestUtils):
+  def setUp(self):
+    self.queue = _FakeQueueForProc()
+    self.opexecprio = []
+
+  def _BeforeStart(self, timeout, priority):
+    self.assertFalse(self.queue.IsAcquired())
+    self.opexecprio.append(priority)
+
+  def testChangePriorityWhileRunning(self):
+    # Tests changing the priority on a job while it has finished opcodes
+    # (successful) and more, unprocessed ones
+    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+           for i in range(3)]
+
+    # Create job
+    job_id = 3499
+    job = self._CreateJob(self.queue, job_id, ops)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart, None)
+
+    # Run first opcode
+    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
+                     jqueue._JobProcessor.DEFER)
+
+    # Job goes back to queued
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_SUCCESS,
+                       constants.OP_STATUS_QUEUED,
+                       constants.OP_STATUS_QUEUED],
+                      ["Res0", None, None]])
+
+    self.assertEqual(self.opexecprio.pop(0), constants.OP_PRIO_DEFAULT)
+    self.assertRaises(IndexError, self.opexecprio.pop, 0)
+
+    # Change priority
+    self.assertEqual(job.ChangePriority(-10),
+                     (True,
+                      ("Priorities of pending opcodes for job 3499 have"
+                       " been changed to -10")))
+    self.assertEqual(job.CalcPriority(), -10)
+
+    # Process second opcode
+    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
+                     jqueue._JobProcessor.DEFER)
+
+    self.assertEqual(self.opexecprio.pop(0), -10)
+    self.assertRaises(IndexError, self.opexecprio.pop, 0)
+
+    # Check status
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+    self.assertEqual(job.CalcPriority(), -10)
+    self.assertEqual(job.GetInfo(["id"]), [job_id])
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_SUCCESS,
+                       constants.OP_STATUS_SUCCESS,
+                       constants.OP_STATUS_QUEUED],
+                      ["Res0", "Res1", None]])
+
+    # Change priority once more
+    self.assertEqual(job.ChangePriority(5),
+                     (True,
+                      ("Priorities of pending opcodes for job 3499 have"
+                       " been changed to 5")))
+    self.assertEqual(job.CalcPriority(), 5)
+
+    # Process third opcode
+    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
+                     jqueue._JobProcessor.FINISHED)
+
+    self.assertEqual(self.opexecprio.pop(0), 5)
+    self.assertRaises(IndexError, self.opexecprio.pop, 0)
+
+    # Check status
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+    self.assertEqual(job.GetInfo(["id"]), [job_id])
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_SUCCESS,
+                       constants.OP_STATUS_SUCCESS,
+                       constants.OP_STATUS_SUCCESS],
+                      ["Res0", "Res1", "Res2"]])
+    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
+                     [constants.OP_PRIO_DEFAULT, -10, 5])
+
+
 class _IdOnlyFakeJob:
   def __init__(self, job_id, priority=NotImplemented):
     self.id = str(job_id)