except mcpu.LockAcquireTimeout:
assert timeout is not None, "Received timeout for blocking acquire"
logging.debug("Couldn't acquire locks in %0.6fs", timeout)
- assert op.status == constants.OP_STATUS_WAITLOCK
+
+ assert op.status in (constants.OP_STATUS_WAITLOCK,
+ constants.OP_STATUS_CANCELING)
+
+ # Was job cancelled while we were waiting for the lock?
+ if op.status == constants.OP_STATUS_CANCELING:
+ return (constants.OP_STATUS_CANCELING, None)
+
return (constants.OP_STATUS_QUEUED, None)
except CancelJob:
logging.exception("%s: Canceling job", opctx.log_prefix)
[[constants.OP_STATUS_CANCELED for _ in job.ops],
["Job canceled by request" for _ in job.ops]])
+ def testCancelWhileWaitlockWithTimeout(self):
+ queue = _FakeQueueForProc()
+
+ ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+ for i in range(5)]
+
+ # Create job
+ job_id = 24314
+ job = self._CreateJob(queue, job_id, ops)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+ def _BeforeStart(timeout, priority):
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+ # Mark as cancelled
+ (success, _) = job.Cancel()
+ self.assert_(success)
+
+ self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
+ for op in job.ops))
+
+ # Fake an acquire attempt timing out
+ raise mcpu.LockAcquireTimeout()
+
+ def _AfterStart(op, cbs):
+ self.fail("Should not reach this")
+
+ opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
+
+ self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+
+ # Check result
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+ self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+ self.assert_(job.start_timestamp)
+ self.assert_(job.end_timestamp)
+ self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
+ for op in job.ops))
+ self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+ [[constants.OP_STATUS_CANCELED for _ in job.ops],
+ ["Job canceled by request" for _ in job.ops]])
+
def testCancelWhileRunning(self):
# Tests canceling a job with finished opcodes and more, unprocessed ones
queue = _FakeQueueForProc()