"""
assert op in job.ops
+ assert op.status in (constants.OP_STATUS_QUEUED,
+ constants.OP_STATUS_WAITLOCK)
+
+ update = False
- op.status = constants.OP_STATUS_WAITLOCK
op.result = None
- op.start_timestamp = TimeStampNow()
+
+ if op.status == constants.OP_STATUS_QUEUED:
+ op.status = constants.OP_STATUS_WAITLOCK
+ update = True
+
+ if op.start_timestamp is None:
+ op.start_timestamp = TimeStampNow()
+ update = True
if job.start_timestamp is None:
job.start_timestamp = op.start_timestamp
+ update = True
+
+ assert op.status == constants.OP_STATUS_WAITLOCK
+
+ return update
def _ExecOpCodeUnlocked(self, opctx):
"""Processes one opcode and returns the result.
if op.status == constants.OP_STATUS_CANCELING:
return (constants.OP_STATUS_CANCELING, None)
- return (constants.OP_STATUS_QUEUED, None)
+ # Stay in waitlock while trying to re-acquire lock
+ return (constants.OP_STATUS_WAITLOCK, None)
except CancelJob:
logging.exception("%s: Canceling job", opctx.log_prefix)
assert op.status == constants.OP_STATUS_CANCELING
# Is a previous opcode still pending?
if job.cur_opctx:
opctx = job.cur_opctx
+ job.cur_opctx = None
else:
if __debug__ and _nextop_fn:
_nextop_fn()
# Consistency check
assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
constants.OP_STATUS_CANCELED)
- for i in job.ops[opctx.index:])
+ for i in job.ops[opctx.index + 1:])
assert op.status in (constants.OP_STATUS_QUEUED,
constants.OP_STATUS_WAITLOCK,
if op.status != constants.OP_STATUS_CANCELED:
# Prepare to start opcode
- self._MarkWaitlock(job, op)
+ if self._MarkWaitlock(job, op):
+ # Write to disk
+ queue.UpdateJobUnlocked(job)
assert op.status == constants.OP_STATUS_WAITLOCK
assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
-
- # Write to disk
- queue.UpdateJobUnlocked(job)
+ assert job.start_timestamp and op.start_timestamp
logging.info("%s: opcode %s waiting for locks",
opctx.log_prefix, opctx.summary)
op.status = op_status
op.result = op_result
- if op.status == constants.OP_STATUS_QUEUED:
+ if op.status == constants.OP_STATUS_WAITLOCK:
# Couldn't get locks in time
assert not op.end_timestamp
else:
else:
assert op.status in constants.OPS_FINALIZED
- if op.status == constants.OP_STATUS_QUEUED:
+ if op.status == constants.OP_STATUS_WAITLOCK:
finalize = False
- opctx.CheckPriorityIncrease()
+ if opctx.CheckPriorityIncrease():
+ # Priority was changed, need to update on-disk file
+ queue.UpdateJobUnlocked(job)
# Keep around for another round
job.cur_opctx = opctx
op.priority >= constants.OP_PRIO_HIGHEST)
# In no case must the status be finalized here
- assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
-
- queue.UpdateJobUnlocked(job)
+ assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
else:
# Ensure all opcodes so far have been successful
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertFalse(job.cur_opctx)
def _AfterStart(op, cbs):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+ self.assertFalse(job.cur_opctx)
# Job is running, cancelling shouldn't be possible
(success, _) = job.Cancel()
self.retries = 0
self.prev_tsop = None
self.prev_prio = None
+ self.prev_status = None
+ self.lock_acq_prio = None
self.gave_lock = None
self.done_lock_before_blocking = False
def _BeforeStart(self, timeout, priority):
job = self.job
- self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+ # If status has changed, job must've been written
+ if self.prev_status != self.job.ops[self.curop].status:
+ self.assertEqual(self.queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
self.assertFalse(self.queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
self.assertEqual(priority, job.ops[self.curop].priority)
self.gave_lock = True
+ self.lock_acq_prio = priority
if (self.curop == 3 and
job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
def _AfterStart(self, op, cbs):
job = self.job
+ # Setting to "running" requires an update
self.assertEqual(self.queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
self.assertFalse(self.queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
def _NextOpcode(self):
self.curop = self.opcounter.next()
self.prev_prio = self.job.ops[self.curop].priority
+ self.prev_status = self.job.ops[self.curop].status
def _NewTimeoutStrategy(self):
job = self.job
self.assertFalse(self.done_lock_before_blocking)
- for i in itertools.count(0):
+ while True:
proc = jqueue._JobProcessor(self.queue, opexec, job,
_timeout_strategy_factory=tsf)
self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
+ if self.curop is not None:
+ self.prev_status = self.job.ops[self.curop].status
+
+ self.lock_acq_prio = None
+
result = proc(_nextop_fn=self._NextOpcode)
- self.assertEqual(self.queue.GetNextUpdate(), (job, True))
- self.assertRaises(IndexError, self.queue.GetNextUpdate)
+ assert self.curop is not None
+
+ if result or self.gave_lock:
+ # Got lock and/or job is done, result must've been written
+ self.assertFalse(job.cur_opctx)
+ self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, self.queue.GetNextUpdate)
+ self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
+ self.assert_(job.ops[self.curop].exec_timestamp)
+
if result:
self.assertFalse(job.cur_opctx)
break
self.assertFalse(result)
+ if self.curop == 0:
+ self.assertEqual(job.ops[self.curop].start_timestamp,
+ job.start_timestamp)
+
if self.gave_lock:
- self.assertFalse(job.cur_opctx)
+ # Opcode finished, but job not yet done
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
else:
+ # Did not get locks
self.assert_(job.cur_opctx)
self.assertEqual(job.cur_opctx._timeout_strategy._fn,
self.timeout_strategy.NextAttempt)
+ self.assertFalse(job.ops[self.curop].exec_timestamp)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+ # If priority has changed since acquiring locks, the job must've been
+ # updated
+ if self.lock_acq_prio != job.ops[self.curop].priority:
+ self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+
+ self.assertRaises(IndexError, self.queue.GetNextUpdate)
- self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)