import shutil
import errno
import itertools
+import random
from ganeti import constants
from ganeti import utils
from ganeti import opcodes
from ganeti import compat
from ganeti import mcpu
+from ganeti import query
import testutils
shutil.rmtree(self.tmpdir)
def _LoadWaitingJob(self):
- return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
+ return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
def _LoadLostJob(self):
return None
# No change
self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
- [constants.JOB_STATUS_WAITLOCK], None, 0.1),
+ [constants.JOB_STATUS_WAITING], None, 0.1),
constants.JOB_NOTCHANGED)
# No previous information
self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
["status"], None, None, 1.0),
- ([constants.JOB_STATUS_WAITLOCK], []))
+ ([constants.JOB_STATUS_WAITING], []))
def testLostJob(self):
wfjc = jqueue._WaitForJobChangesHelper()
for op in ops))
def _Waitlock1(ops):
- ops[0].status = constants.OP_STATUS_WAITLOCK
+ ops[0].status = constants.OP_STATUS_WAITING
def _Waitlock2(ops):
ops[0].status = constants.OP_STATUS_SUCCESS
ops[1].status = constants.OP_STATUS_SUCCESS
- ops[2].status = constants.OP_STATUS_WAITLOCK
+ ops[2].status = constants.OP_STATUS_WAITING
def _Running(ops):
ops[0].status = constants.OP_STATUS_SUCCESS
tests = {
constants.JOB_STATUS_QUEUED: [_Queued],
- constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
+ constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
constants.JOB_STATUS_RUNNING: [_Running],
constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
constants.JOB_STATUS_CANCELED: [_Canceled],
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
- self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertFalse(job.cur_opctx)
def _AfterStart(op, cbs):
self.assertRaises(IndexError, queue.GetNextUpdate)
if idx == len(ops) - 1:
# Last opcode
- self.assert_(result)
+ self.assertEqual(result, jqueue._JobProcessor.FINISHED)
else:
- self.assertFalse(result)
+ self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(job.start_timestamp)
self._GenericCheckJob(job)
# Calling the processor on a finished job should be a no-op
- self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
def testOpcodeError(self):
if idx in (failfrom, len(ops) - 1):
# Last opcode
- self.assert_(result)
+ self.assertEqual(result, jqueue._JobProcessor.FINISHED)
break
- self.assertFalse(result)
+ self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self._GenericCheckJob(job)
# Calling the processor on a finished job should be a no-op
- self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
def testCancelWhileInQueue(self):
# Simulate processor called in workerpool
opexec = _FakeExecOpCodeForProc(queue, None, None)
- self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
# Check result
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
- job.ops[0].status = constants.OP_STATUS_WAITLOCK
+ job.ops[0].status = constants.OP_STATUS_WAITING
assert len(job.ops) == 5
- self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
# Mark as cancelling
(success, _) = job.Cancel()
for op in job.ops))
opexec = _FakeExecOpCodeForProc(queue, None, None)
- self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
# Check result
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
- self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
# Mark as cancelled
(success, _) = job.Cancel()
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
self.assertRaises(IndexError, queue.GetNextUpdate)
- self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
def _BeforeStart(timeout, priority):
self.assertFalse(queue.IsAcquired())
- self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
# Mark as cancelled
(success, _) = job.Cancel()
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
- self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
# Check result
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
opexec = _FakeExecOpCodeForProc(queue, None, None)
# Run one opcode
- self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.DEFER)
# Job goes back to queued
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(success)
# Try processing another opcode (this will actually cancel the job)
- self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
# Check result
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
for _ in range(successcount):
- self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertEqual(job.GetInfo(["opstatus"]),
if remaining == 0:
# Last opcode
- self.assert_(result)
+ self.assertEqual(result, jqueue._JobProcessor.FINISHED)
break
- self.assertFalse(result)
+ self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self._GenericCheckJob(job)
# Calling the processor on a finished job should be a no-op
- self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
# ... also after being restored
job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
# Calling the processor on a finished job should be a no-op
- self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
+ jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
def testProcessorOnRunningJob(self):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
- self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
def _AfterStart(op, cbs):
self.assertEqual(queue.GetNextUpdate(), (job, True))
if remaining == 0:
# Last opcode
- self.assert_(result)
+ self.assertEqual(result, jqueue._JobProcessor.FINISHED)
break
- self.assertFalse(result)
+ self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
- self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertFalse(job.cur_opctx)
def _AfterStart(op, cbs):
self.assertRaises(IndexError, queue.GetNextUpdate)
if idx == len(ops) - 1:
# Last opcode
- self.assert_(result)
+ self.assertEqual(result, jqueue._JobProcessor.FINISHED)
else:
- self.assertFalse(result)
+ self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(job.start_timestamp)
self._GenericCheckJob(job)
# Calling the processor on a finished job should be a no-op
- self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
def testJobDependency(self):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
- self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertFalse(job.cur_opctx)
def _AfterStart(op, cbs):
if attempt < 5:
# Simulate waiting for other job
- self.assertTrue(result)
+ self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
self.assertTrue(job.cur_opctx)
- self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertRaises(IndexError, depmgr.GetNextNotification)
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)
continue
- if result:
+ if result == jqueue._JobProcessor.FINISHED:
# Last opcode
self.assertFalse(job.cur_opctx)
- self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
break
self.assertRaises(IndexError, depmgr.GetNextNotification)
- self.assertFalse(result)
+ self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)
self.assertFalse(depmgr.CountWaitingJobs())
# Calling the processor on a finished job should be a no-op
- self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
def testJobDependencyCancel(self):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
- self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertFalse(job.cur_opctx)
def _AfterStart(op, cbs):
if attempt > 0 and attempt < 4:
# Simulate waiting for other job
- self.assertTrue(result)
+ self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
self.assertTrue(job.cur_opctx)
- self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertRaises(IndexError, depmgr.GetNextNotification)
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)
continue
- if result:
+ if result == jqueue._JobProcessor.FINISHED:
# Last opcode
self.assertFalse(job.cur_opctx)
- self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
break
self.assertRaises(IndexError, depmgr.GetNextNotification)
- self.assertFalse(result)
+ self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)
self.assertFalse(depmgr.CountPendingResults())
# Calling the processor on a finished job should be a no-op
- self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
def testJobDependencyWrongstatus(self):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
- self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertFalse(job.cur_opctx)
def _AfterStart(op, cbs):
if attempt > 0 and attempt < 4:
# Simulate waiting for other job
- self.assertTrue(result)
+ self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
self.assertTrue(job.cur_opctx)
- self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertRaises(IndexError, depmgr.GetNextNotification)
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)
continue
- if result:
+ if result == jqueue._JobProcessor.FINISHED:
# Last opcode
self.assertFalse(job.cur_opctx)
- self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
break
self.assertRaises(IndexError, depmgr.GetNextNotification)
- self.assertFalse(result)
+ self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)
self.assertFalse(depmgr.CountPendingResults())
# Calling the processor on a finished job should be a no-op
- self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertRaises(IndexError, self.queue.GetNextUpdate)
self.assertFalse(self.queue.IsAcquired())
- self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
ts = self.timeout_strategy
result = proc(_nextop_fn=self._NextOpcode)
assert self.curop is not None
- if result or self.gave_lock:
+ if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
# Got lock and/or job is done, result must've been written
self.assertFalse(job.cur_opctx)
self.assertEqual(self.queue.GetNextUpdate(), (job, True))
self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
self.assert_(job.ops[self.curop].exec_timestamp)
- if result:
+ if result == jqueue._JobProcessor.FINISHED:
self.assertFalse(job.cur_opctx)
break
- self.assertFalse(result)
+ self.assertEqual(result, jqueue._JobProcessor.DEFER)
if self.curop == 0:
self.assertEqual(job.ops[self.curop].start_timestamp,
self.assertEqual(job.cur_opctx._timeout_strategy._fn,
self.timeout_strategy.NextAttempt)
self.assertFalse(job.ops[self.curop].exec_timestamp)
- self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
# If priority has changed since acquiring locks, the job must've been
# updated
for op in job.ops))
# Calling the processor on a finished job should be a no-op
- self.assertTrue(jqueue._JobProcessor(self.queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, self.queue.GetNextUpdate)
self.assertEqual(self.jdm._waiters, {
job_id: set([job]),
})
+ self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
+ ("job/28625", None, None, [("job", [job.id])])
+ ])
self._status.append((job_id, constants.JOB_STATUS_CANCELED))
(result, _) = self.jdm.CheckAndRegister(job, job_id, [])
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertFalse(self.jdm.JobWaiting(job))
+ self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
def testRequireCancel(self):
job = self._FakeJob(5278)
job_id = str(9610)
dep_status = [constants.JOB_STATUS_CANCELED]
- self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
+ self._status.append((job_id, constants.JOB_STATUS_WAITING))
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
self.assertEqual(result, self.jdm.WAIT)
self.assertFalse(self._status)
self.assertEqual(self.jdm._waiters, {
job_id: set([job]),
})
+ self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
+ ("job/9610", None, None, [("job", [job.id])])
+ ])
self._status.append((job_id, constants.JOB_STATUS_CANCELED))
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertFalse(self.jdm.JobWaiting(job))
+ self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
def testRequireError(self):
job = self._FakeJob(21459)
job_id = str(25519)
dep_status = [constants.JOB_STATUS_ERROR]
- self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
+ self._status.append((job_id, constants.JOB_STATUS_WAITING))
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
self.assertEqual(result, self.jdm.WAIT)
self.assertFalse(self._status)
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertFalse(self.jdm.JobWaiting(job))
+ self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
def testRequireMultiple(self):
dep_status = list(constants.JOBS_FINALIZED)
job = self._FakeJob(21343)
job_id = str(14609)
- self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
+ self._status.append((job_id, constants.JOB_STATUS_WAITING))
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
self.assertEqual(result, self.jdm.WAIT)
self.assertFalse(self._status)
self.assertEqual(self.jdm._waiters, {
job_id: set([job]),
})
+ self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
+ ("job/14609", None, None, [("job", [job.id])])
+ ])
self._status.append((job_id, end_status))
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertFalse(self.jdm.JobWaiting(job))
+ self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
def testNotify(self):
job = self._FakeJob(8227)
self.assertFalse(self._status)
self.assertFalse(self._queue)
+ def testMultipleWaiting(self):
+ # Use a deterministic random generator
+ rnd = random.Random(21402)
+
+ job_ids = map(str, rnd.sample(range(1, 10000), 150))
+
+ waiters = dict((job_ids.pop(),
+ set(map(self._FakeJob,
+ [job_ids.pop()
+ for _ in range(rnd.randint(1, 20))])))
+ for _ in range(10))
+
+ # Ensure there are no duplicate job IDs
+ assert not utils.FindDuplicates(waiters.keys() +
+ [job.id
+ for jobs in waiters.values()
+ for job in jobs])
+
+ # Register all jobs as waiters
+ for job_id, job in [(job_id, job)
+ for (job_id, jobs) in waiters.items()
+ for job in jobs]:
+ self._status.append((job_id, constants.JOB_STATUS_QUEUED))
+ (result, _) = self.jdm.CheckAndRegister(job, job_id,
+ [constants.JOB_STATUS_SUCCESS])
+ self.assertEqual(result, self.jdm.WAIT)
+ self.assertFalse(self._status)
+ self.assertFalse(self._queue)
+ self.assertTrue(self.jdm.JobWaiting(job))
+
+ self.assertEqual(self.jdm._waiters, waiters)
+
+ def _MakeSet((name, mode, owner_names, pending)):
+ return (name, mode, owner_names,
+ [(pendmode, set(pend)) for (pendmode, pend) in pending])
+
+ def _CheckLockInfo():
+ info = self.jdm.GetLockInfo([query.LQ_PENDING])
+ self.assertEqual(sorted(map(_MakeSet, info)), sorted([
+ ("job/%s" % job_id, None, None,
+ [("job", set([job.id for job in jobs]))])
+ for job_id, jobs in waiters.items()
+ if jobs
+ ]))
+
+ _CheckLockInfo()
+
+ # Notify in random order
+ for job_id in rnd.sample(waiters, len(waiters)):
+ # Remove from pending waiter list
+ jobs = waiters.pop(job_id)
+ for job in jobs:
+ self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
+ (result, _) = self.jdm.CheckAndRegister(job, job_id,
+ [constants.JOB_STATUS_SUCCESS])
+ self.assertEqual(result, self.jdm.CONTINUE)
+ self.assertFalse(self._status)
+ self.assertFalse(self._queue)
+ self.assertFalse(self.jdm.JobWaiting(job))
+
+ _CheckLockInfo()
+
+ self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
+
+ assert not waiters
+
def testSelfDependency(self):
job = self._FakeJob(18937)
(result, _) = jdm.CheckAndRegister(job, job_id, [])
self.assertEqual(result, self.jdm.ERROR)
self.assertFalse(jdm.JobWaiting(job))
+ self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
if __name__ == "__main__":