import shutil
import errno
import itertools
+import random
from ganeti import constants
from ganeti import utils
from ganeti import opcodes
from ganeti import compat
from ganeti import mcpu
+from ganeti import query
import testutils
class _FakeJob:
def __init__(self, job_id, status):
self.id = job_id
+ self.writable = False
self._status = status
self._log = []
shutil.rmtree(self.tmpdir)
def _LoadWaitingJob(self):
- return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
+ return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
def _LoadLostJob(self):
return None
# No change
self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
- [constants.JOB_STATUS_WAITLOCK], None, 0.1),
+ [constants.JOB_STATUS_WAITING], None, 0.1),
constants.JOB_NOTCHANGED)
# No previous information
self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
["status"], None, None, 1.0),
- ([constants.JOB_STATUS_WAITLOCK], []))
+ ([constants.JOB_STATUS_WAITING], []))
def testLostJob(self):
wfjc = jqueue._WaitForJobChangesHelper()
class TestQueuedJob(unittest.TestCase):
def test(self):
self.assertRaises(errors.GenericError, jqueue._QueuedJob,
- None, 1, [])
+ None, 1, [], False)
def testDefaults(self):
job_id = 4260
]
def _Check(job):
+ self.assertTrue(job.writable)
self.assertEqual(job.id, job_id)
self.assertEqual(job.log_serial, 0)
self.assert_(job.received_timestamp)
self.assertEqual(job.GetInfo(["summary"]),
[[op.input.Summary() for op in job.ops]])
- job1 = jqueue._QueuedJob(None, job_id, ops)
+ job1 = jqueue._QueuedJob(None, job_id, ops, True)
_Check(job1)
- job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
+ job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True)
_Check(job2)
self.assertEqual(job1.Serialize(), job2.Serialize())
+ def testWritable(self):
+ job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
+ self.assertFalse(job.writable)
+
+ job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
+ self.assertTrue(job.writable)
+
def testPriority(self):
job_id = 4283
ops = [
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(repr(job).startswith("<"))
- job = jqueue._QueuedJob(None, job_id, ops)
+ job = jqueue._QueuedJob(None, job_id, ops, True)
_Check(job)
self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
for op in job.ops))
for op in ops))
def _Waitlock1(ops):
- ops[0].status = constants.OP_STATUS_WAITLOCK
+ ops[0].status = constants.OP_STATUS_WAITING
def _Waitlock2(ops):
ops[0].status = constants.OP_STATUS_SUCCESS
ops[1].status = constants.OP_STATUS_SUCCESS
- ops[2].status = constants.OP_STATUS_WAITLOCK
+ ops[2].status = constants.OP_STATUS_WAITING
def _Running(ops):
ops[0].status = constants.OP_STATUS_SUCCESS
tests = {
constants.JOB_STATUS_QUEUED: [_Queued],
- constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
+ constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
constants.JOB_STATUS_RUNNING: [_Running],
constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
constants.JOB_STATUS_CANCELED: [_Canceled],
def _NewJob():
job = jqueue._QueuedJob(None, 1,
- [opcodes.OpTestDelay() for _ in range(10)])
+ [opcodes.OpTestDelay() for _ in range(10)],
+ True)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
for op in job.ops))
self.assertEqual(job.CalcStatus(), status)
-class _FakeQueueForProc:
+class _FakeDependencyManager:
def __init__(self):
+ self._checks = []
+ self._notifications = []
+ self._waiting = set()
+
+ def AddCheckResult(self, job, dep_job_id, dep_status, result):
+ self._checks.append((job, dep_job_id, dep_status, result))
+
+ def CountPendingResults(self):
+ return len(self._checks)
+
+ def CountWaitingJobs(self):
+ return len(self._waiting)
+
+ def GetNextNotification(self):
+ return self._notifications.pop(0)
+
+ def JobWaiting(self, job):
+ return job in self._waiting
+
+ def CheckAndRegister(self, job, dep_job_id, dep_status):
+ (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
+
+ assert exp_job == job
+ assert exp_dep_job_id == dep_job_id
+ assert exp_dep_status == dep_status
+
+ (result_status, _) = result
+
+ if result_status == jqueue._JobDependencyManager.WAIT:
+ self._waiting.add(job)
+ elif result_status == jqueue._JobDependencyManager.CONTINUE:
+ self._waiting.remove(job)
+
+ return result
+
+ def NotifyWaiters(self, job_id):
+ self._notifications.append(job_id)
+
+
+class _DisabledFakeDependencyManager:
+ def JobWaiting(self, _):
+ return False
+
+ def CheckAndRegister(self, *args):
+ assert False, "Should not be called"
+
+ def NotifyWaiters(self, _):
+ pass
+
+
+class _FakeQueueForProc:
+ def __init__(self, depmgr=None):
self._acquired = False
self._updates = []
self._submitted = []
self._submit_count = itertools.count(1000)
+ if depmgr:
+ self.depmgr = depmgr
+ else:
+ self.depmgr = _DisabledFakeDependencyManager()
+
def IsAcquired(self):
return self._acquired
class _JobProcessorTestUtils:
def _CreateJob(self, queue, job_id, ops):
- job = jqueue._QueuedJob(queue, job_id, ops)
+ job = jqueue._QueuedJob(queue, job_id, ops, True)
self.assertFalse(job.start_timestamp)
self.assertFalse(job.end_timestamp)
self.assertEqual(len(ops), len(job.ops))
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
- self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertFalse(job.cur_opctx)
def _AfterStart(op, cbs):
self.assertRaises(IndexError, queue.GetNextUpdate)
if idx == len(ops) - 1:
# Last opcode
- self.assert_(result)
+ self.assertEqual(result, jqueue._JobProcessor.FINISHED)
else:
- self.assertFalse(result)
+ self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(job.start_timestamp)
self._GenericCheckJob(job)
# Calling the processor on a finished job should be a no-op
- self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
def testOpcodeError(self):
if idx in (failfrom, len(ops) - 1):
# Last opcode
- self.assert_(result)
+ self.assertEqual(result, jqueue._JobProcessor.FINISHED)
break
- self.assertFalse(result)
+ self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self._GenericCheckJob(job)
# Calling the processor on a finished job should be a no-op
- self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
def testCancelWhileInQueue(self):
# Simulate processor called in workerpool
opexec = _FakeExecOpCodeForProc(queue, None, None)
- self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
# Check result
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
- job.ops[0].status = constants.OP_STATUS_WAITLOCK
+ job.ops[0].status = constants.OP_STATUS_WAITING
assert len(job.ops) == 5
- self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
# Mark as cancelling
(success, _) = job.Cancel()
for op in job.ops))
opexec = _FakeExecOpCodeForProc(queue, None, None)
- self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
# Check result
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
- self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
# Mark as cancelled
(success, _) = job.Cancel()
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
self.assertRaises(IndexError, queue.GetNextUpdate)
- self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
def _BeforeStart(timeout, priority):
self.assertFalse(queue.IsAcquired())
- self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
# Mark as cancelled
(success, _) = job.Cancel()
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
- self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
# Check result
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
opexec = _FakeExecOpCodeForProc(queue, None, None)
# Run one opcode
- self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.DEFER)
# Job goes back to queued
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(success)
# Try processing another opcode (this will actually cancel the job)
- self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
# Check result
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
for _ in range(successcount):
- self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertEqual(job.GetInfo(["opstatus"]),
self.assert_(job.ops_iter)
# Serialize and restore (simulates program restart)
- newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
+ newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
self.assertFalse(newjob.ops_iter)
self._TestPartial(newjob, successcount)
if remaining == 0:
# Last opcode
- self.assert_(result)
+ self.assertEqual(result, jqueue._JobProcessor.FINISHED)
break
- self.assertFalse(result)
+ self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self._GenericCheckJob(job)
# Calling the processor on a finished job should be a no-op
- self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
# ... also after being restored
- job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
- self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
+ job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
+ # Calling the processor on a finished job should be a no-op
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
+ jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
def testProcessorOnRunningJob(self):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
- self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
def _AfterStart(op, cbs):
self.assertEqual(queue.GetNextUpdate(), (job, True))
if remaining == 0:
# Last opcode
- self.assert_(result)
+ self.assertEqual(result, jqueue._JobProcessor.FINISHED)
break
- self.assertFalse(result)
+ self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self._CheckLogMessages(job, logmsgcount)
# Serialize and restore (simulates program restart)
- newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
+ newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
self._CheckLogMessages(newjob, logmsgcount)
# Check each message
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
- self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertFalse(job.cur_opctx)
def _AfterStart(op, cbs):
self.assertRaises(IndexError, queue.GetNextUpdate)
if idx == len(ops) - 1:
# Last opcode
- self.assert_(result)
+ self.assertEqual(result, jqueue._JobProcessor.FINISHED)
else:
- self.assertFalse(result)
+ self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(job.start_timestamp)
self._GenericCheckJob(job)
# Calling the processor on a finished job should be a no-op
- self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ def testJobDependency(self):
+ depmgr = _FakeDependencyManager()
+ queue = _FakeQueueForProc(depmgr=depmgr)
+
+ self.assertEqual(queue.depmgr, depmgr)
+
+ prev_job_id = 22113
+ prev_job_id2 = 28102
+ job_id = 29929
+ ops = [
+ opcodes.OpTestDummy(result="Res0", fail=False,
+ depends=[
+ [prev_job_id2, None],
+ [prev_job_id, None],
+ ]),
+ opcodes.OpTestDummy(result="Res1", fail=False),
+ ]
+
+ # Create job
+ job = self._CreateJob(queue, job_id, ops)
+
+ def _BeforeStart(timeout, priority):
+ if attempt == 0 or attempt > 5:
+ # Job should only be updated when it wasn't waiting for another job
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
+ self.assertFalse(job.cur_opctx)
+
+ def _AfterStart(op, cbs):
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+ self.assertFalse(job.cur_opctx)
+
+ # Job is running, cancelling shouldn't be possible
+ (success, _) = job.Cancel()
+ self.assertFalse(success)
+
+ opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+ counter = itertools.count()
+ while True:
+ attempt = counter.next()
+
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+ if attempt < 2:
+ depmgr.AddCheckResult(job, prev_job_id2, None,
+ (jqueue._JobDependencyManager.WAIT, "wait2"))
+ elif attempt == 2:
+ depmgr.AddCheckResult(job, prev_job_id2, None,
+ (jqueue._JobDependencyManager.CONTINUE, "cont"))
+ # The processor will ask for the next dependency immediately
+ depmgr.AddCheckResult(job, prev_job_id, None,
+ (jqueue._JobDependencyManager.WAIT, "wait"))
+ elif attempt < 5:
+ depmgr.AddCheckResult(job, prev_job_id, None,
+ (jqueue._JobDependencyManager.WAIT, "wait"))
+ elif attempt == 5:
+ depmgr.AddCheckResult(job, prev_job_id, None,
+ (jqueue._JobDependencyManager.CONTINUE, "cont"))
+ if attempt == 2:
+ self.assertEqual(depmgr.CountPendingResults(), 2)
+ elif attempt > 5:
+ self.assertEqual(depmgr.CountPendingResults(), 0)
+ else:
+ self.assertEqual(depmgr.CountPendingResults(), 1)
+
+ result = jqueue._JobProcessor(queue, opexec, job)()
+ if attempt == 0 or attempt >= 5:
+ # Job should only be updated if there was an actual change
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertFalse(depmgr.CountPendingResults())
+
+ if attempt < 5:
+ # Simulate waiting for other job
+ self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
+ self.assertTrue(job.cur_opctx)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+ self.assert_(job.start_timestamp)
+ self.assertFalse(job.end_timestamp)
+ continue
+
+ if result == jqueue._JobProcessor.FINISHED:
+ # Last opcode
+ self.assertFalse(job.cur_opctx)
+ break
+
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+ self.assertEqual(result, jqueue._JobProcessor.DEFER)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+ self.assert_(job.start_timestamp)
+ self.assertFalse(job.end_timestamp)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+ self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+ self.assertEqual(job.GetInfo(["opresult"]),
+ [[op.input.result for op in job.ops]])
+ self.assertEqual(job.GetInfo(["opstatus"]),
+ [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
+ self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
+ for op in job.ops))
+
+ self._GenericCheckJob(job)
+
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+ self.assertFalse(depmgr.CountPendingResults())
+ self.assertFalse(depmgr.CountWaitingJobs())
+
+ # Calling the processor on a finished job should be a no-op
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ def testJobDependencyCancel(self):
+ depmgr = _FakeDependencyManager()
+ queue = _FakeQueueForProc(depmgr=depmgr)
+
+ self.assertEqual(queue.depmgr, depmgr)
+
+ prev_job_id = 13623
+ job_id = 30876
+ ops = [
+ opcodes.OpTestDummy(result="Res0", fail=False),
+ opcodes.OpTestDummy(result="Res1", fail=False,
+ depends=[
+ [prev_job_id, None],
+ ]),
+ opcodes.OpTestDummy(result="Res2", fail=False),
+ ]
+
+ # Create job
+ job = self._CreateJob(queue, job_id, ops)
+
+ def _BeforeStart(timeout, priority):
+ if attempt == 0 or attempt > 5:
+ # Job should only be updated when it wasn't waiting for another job
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
+ self.assertFalse(job.cur_opctx)
+
+ def _AfterStart(op, cbs):
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+ self.assertFalse(job.cur_opctx)
+
+ # Job is running, cancelling shouldn't be possible
+ (success, _) = job.Cancel()
+ self.assertFalse(success)
+
+ opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+ counter = itertools.count()
+ while True:
+ attempt = counter.next()
+
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+ if attempt == 0:
+ # This will handle the first opcode
+ pass
+ elif attempt < 4:
+ depmgr.AddCheckResult(job, prev_job_id, None,
+ (jqueue._JobDependencyManager.WAIT, "wait"))
+ elif attempt == 4:
+ # Other job was cancelled
+ depmgr.AddCheckResult(job, prev_job_id, None,
+ (jqueue._JobDependencyManager.CANCEL, "cancel"))
+
+ if attempt == 0:
+ self.assertEqual(depmgr.CountPendingResults(), 0)
+ else:
+ self.assertEqual(depmgr.CountPendingResults(), 1)
+
+ result = jqueue._JobProcessor(queue, opexec, job)()
+ if attempt <= 1 or attempt >= 4:
+ # Job should only be updated if there was an actual change
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertFalse(depmgr.CountPendingResults())
+
+ if attempt > 0 and attempt < 4:
+ # Simulate waiting for other job
+ self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
+ self.assertTrue(job.cur_opctx)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+ self.assert_(job.start_timestamp)
+ self.assertFalse(job.end_timestamp)
+ continue
+
+ if result == jqueue._JobProcessor.FINISHED:
+ # Last opcode
+ self.assertFalse(job.cur_opctx)
+ break
+
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+ self.assertEqual(result, jqueue._JobProcessor.DEFER)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+ self.assert_(job.start_timestamp)
+ self.assertFalse(job.end_timestamp)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+ self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+ self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+ [[constants.OP_STATUS_SUCCESS,
+ constants.OP_STATUS_CANCELED,
+ constants.OP_STATUS_CANCELED],
+ ["Res0", "Job canceled by request",
+ "Job canceled by request"]])
+
+ self._GenericCheckJob(job)
+
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+ self.assertFalse(depmgr.CountPendingResults())
+
+ # Calling the processor on a finished job should be a no-op
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ def testJobDependencyWrongstatus(self):
+ depmgr = _FakeDependencyManager()
+ queue = _FakeQueueForProc(depmgr=depmgr)
+
+ self.assertEqual(queue.depmgr, depmgr)
+
+ prev_job_id = 9741
+ job_id = 11763
+ ops = [
+ opcodes.OpTestDummy(result="Res0", fail=False),
+ opcodes.OpTestDummy(result="Res1", fail=False,
+ depends=[
+ [prev_job_id, None],
+ ]),
+ opcodes.OpTestDummy(result="Res2", fail=False),
+ ]
+
+ # Create job
+ job = self._CreateJob(queue, job_id, ops)
+
+ def _BeforeStart(timeout, priority):
+ if attempt == 0 or attempt > 5:
+ # Job should only be updated when it wasn't waiting for another job
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
+ self.assertFalse(job.cur_opctx)
+
+ def _AfterStart(op, cbs):
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+ self.assertFalse(job.cur_opctx)
+
+ # Job is running, cancelling shouldn't be possible
+ (success, _) = job.Cancel()
+ self.assertFalse(success)
+
+ opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+ counter = itertools.count()
+ while True:
+ attempt = counter.next()
+
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+ if attempt == 0:
+ # This will handle the first opcode
+ pass
+ elif attempt < 4:
+ depmgr.AddCheckResult(job, prev_job_id, None,
+ (jqueue._JobDependencyManager.WAIT, "wait"))
+ elif attempt == 4:
+ # Other job failed
+ depmgr.AddCheckResult(job, prev_job_id, None,
+ (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
+
+ if attempt == 0:
+ self.assertEqual(depmgr.CountPendingResults(), 0)
+ else:
+ self.assertEqual(depmgr.CountPendingResults(), 1)
+
+ result = jqueue._JobProcessor(queue, opexec, job)()
+ if attempt <= 1 or attempt >= 4:
+ # Job should only be updated if there was an actual change
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertFalse(depmgr.CountPendingResults())
+
+ if attempt > 0 and attempt < 4:
+ # Simulate waiting for other job
+ self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
+ self.assertTrue(job.cur_opctx)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+ self.assert_(job.start_timestamp)
+ self.assertFalse(job.end_timestamp)
+ continue
+
+ if result == jqueue._JobProcessor.FINISHED:
+ # Last opcode
+ self.assertFalse(job.cur_opctx)
+ break
+
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+ self.assertEqual(result, jqueue._JobProcessor.DEFER)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+ self.assert_(job.start_timestamp)
+ self.assertFalse(job.end_timestamp)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
+ self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
+ self.assertEqual(job.GetInfo(["opstatus"]),
+ [[constants.OP_STATUS_SUCCESS,
+ constants.OP_STATUS_ERROR,
+ constants.OP_STATUS_ERROR]]),
+
+ (opresult, ) = job.GetInfo(["opresult"])
+ self.assertEqual(len(opresult), len(ops))
+ self.assertEqual(opresult[0], "Res0")
+ self.assertTrue(errors.GetEncodedError(opresult[1]))
+ self.assertTrue(errors.GetEncodedError(opresult[2]))
+
+ self._GenericCheckJob(job)
+
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+ self.assertFalse(depmgr.CountPendingResults())
+
+ # Calling the processor on a finished job should be a no-op
+ self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertRaises(IndexError, self.queue.GetNextUpdate)
self.assertFalse(self.queue.IsAcquired())
- self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
ts = self.timeout_strategy
result = proc(_nextop_fn=self._NextOpcode)
assert self.curop is not None
- if result or self.gave_lock:
+ if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
# Got lock and/or job is done, result must've been written
self.assertFalse(job.cur_opctx)
self.assertEqual(self.queue.GetNextUpdate(), (job, True))
self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
self.assert_(job.ops[self.curop].exec_timestamp)
- if result:
+ if result == jqueue._JobProcessor.FINISHED:
self.assertFalse(job.cur_opctx)
break
- self.assertFalse(result)
+ self.assertEqual(result, jqueue._JobProcessor.DEFER)
if self.curop == 0:
self.assertEqual(job.ops[self.curop].start_timestamp,
self.assertEqual(job.cur_opctx._timeout_strategy._fn,
self.timeout_strategy.NextAttempt)
self.assertFalse(job.ops[self.curop].exec_timestamp)
- self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
# If priority has changed since acquiring locks, the job must've been
# updated
for op in job.ops))
# Calling the processor on a finished job should be a no-op
- self.assertTrue(jqueue._JobProcessor(self.queue, opexec, job)())
+ self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
+ jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, self.queue.GetNextUpdate)
+class TestJobDependencyManager(unittest.TestCase):
+ class _FakeJob:
+ def __init__(self, job_id):
+ self.id = str(job_id)
+
+ def setUp(self):
+ self._status = []
+ self._queue = []
+ self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
+
+ def _GetStatus(self, job_id):
+ (exp_job_id, result) = self._status.pop(0)
+ self.assertEqual(exp_job_id, job_id)
+ return result
+
+ def _Enqueue(self, jobs):
+ self.assertFalse(self.jdm._lock.is_owned(),
+ msg=("Must not own manager lock while re-adding jobs"
+ " (potential deadlock)"))
+ self._queue.append(jobs)
+
+ def testNotFinalizedThenCancel(self):
+ job = self._FakeJob(17697)
+ job_id = str(28625)
+
+ self._status.append((job_id, constants.JOB_STATUS_RUNNING))
+ (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
+ self.assertEqual(result, self.jdm.WAIT)
+ self.assertFalse(self._status)
+ self.assertFalse(self._queue)
+ self.assertTrue(self.jdm.JobWaiting(job))
+ self.assertEqual(self.jdm._waiters, {
+ job_id: set([job]),
+ })
+ self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
+ ("job/28625", None, None, [("job", [job.id])])
+ ])
+
+ self._status.append((job_id, constants.JOB_STATUS_CANCELED))
+ (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
+ self.assertEqual(result, self.jdm.CANCEL)
+ self.assertFalse(self._status)
+ self.assertFalse(self._queue)
+ self.assertFalse(self.jdm.JobWaiting(job))
+ self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
+
+ def testRequireCancel(self):
+ job = self._FakeJob(5278)
+ job_id = str(9610)
+ dep_status = [constants.JOB_STATUS_CANCELED]
+
+ self._status.append((job_id, constants.JOB_STATUS_WAITING))
+ (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
+ self.assertEqual(result, self.jdm.WAIT)
+ self.assertFalse(self._status)
+ self.assertFalse(self._queue)
+ self.assertTrue(self.jdm.JobWaiting(job))
+ self.assertEqual(self.jdm._waiters, {
+ job_id: set([job]),
+ })
+ self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
+ ("job/9610", None, None, [("job", [job.id])])
+ ])
+
+ self._status.append((job_id, constants.JOB_STATUS_CANCELED))
+ (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
+ self.assertEqual(result, self.jdm.CONTINUE)
+ self.assertFalse(self._status)
+ self.assertFalse(self._queue)
+ self.assertFalse(self.jdm.JobWaiting(job))
+ self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
+
+ def testRequireError(self):
+ job = self._FakeJob(21459)
+ job_id = str(25519)
+ dep_status = [constants.JOB_STATUS_ERROR]
+
+ self._status.append((job_id, constants.JOB_STATUS_WAITING))
+ (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
+ self.assertEqual(result, self.jdm.WAIT)
+ self.assertFalse(self._status)
+ self.assertFalse(self._queue)
+ self.assertTrue(self.jdm.JobWaiting(job))
+ self.assertEqual(self.jdm._waiters, {
+ job_id: set([job]),
+ })
+
+ self._status.append((job_id, constants.JOB_STATUS_ERROR))
+ (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
+ self.assertEqual(result, self.jdm.CONTINUE)
+ self.assertFalse(self._status)
+ self.assertFalse(self._queue)
+ self.assertFalse(self.jdm.JobWaiting(job))
+ self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
+
+ def testRequireMultiple(self):
+ dep_status = list(constants.JOBS_FINALIZED)
+
+ for end_status in dep_status:
+ job = self._FakeJob(21343)
+ job_id = str(14609)
+
+ self._status.append((job_id, constants.JOB_STATUS_WAITING))
+ (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
+ self.assertEqual(result, self.jdm.WAIT)
+ self.assertFalse(self._status)
+ self.assertFalse(self._queue)
+ self.assertTrue(self.jdm.JobWaiting(job))
+ self.assertEqual(self.jdm._waiters, {
+ job_id: set([job]),
+ })
+ self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
+ ("job/14609", None, None, [("job", [job.id])])
+ ])
+
+ self._status.append((job_id, end_status))
+ (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
+ self.assertEqual(result, self.jdm.CONTINUE)
+ self.assertFalse(self._status)
+ self.assertFalse(self._queue)
+ self.assertFalse(self.jdm.JobWaiting(job))
+ self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
+
+ def testNotify(self):
+ job = self._FakeJob(8227)
+ job_id = str(4113)
+
+ self._status.append((job_id, constants.JOB_STATUS_RUNNING))
+ (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
+ self.assertEqual(result, self.jdm.WAIT)
+ self.assertFalse(self._status)
+ self.assertFalse(self._queue)
+ self.assertTrue(self.jdm.JobWaiting(job))
+ self.assertEqual(self.jdm._waiters, {
+ job_id: set([job]),
+ })
+
+ self.jdm.NotifyWaiters(job_id)
+ self.assertFalse(self._status)
+ self.assertFalse(self.jdm._waiters)
+ self.assertFalse(self.jdm.JobWaiting(job))
+ self.assertEqual(self._queue, [set([job])])
+
+ def testWrongStatus(self):
+ job = self._FakeJob(10102)
+ job_id = str(1271)
+
+ self._status.append((job_id, constants.JOB_STATUS_QUEUED))
+ (result, _) = self.jdm.CheckAndRegister(job, job_id,
+ [constants.JOB_STATUS_SUCCESS])
+ self.assertEqual(result, self.jdm.WAIT)
+ self.assertFalse(self._status)
+ self.assertFalse(self._queue)
+ self.assertTrue(self.jdm.JobWaiting(job))
+ self.assertEqual(self.jdm._waiters, {
+ job_id: set([job]),
+ })
+
+ self._status.append((job_id, constants.JOB_STATUS_ERROR))
+ (result, _) = self.jdm.CheckAndRegister(job, job_id,
+ [constants.JOB_STATUS_SUCCESS])
+ self.assertEqual(result, self.jdm.WRONGSTATUS)
+ self.assertFalse(self._status)
+ self.assertFalse(self._queue)
+ self.assertFalse(self.jdm.JobWaiting(job))
+
+ def testCorrectStatus(self):
+ job = self._FakeJob(24273)
+ job_id = str(23885)
+
+ self._status.append((job_id, constants.JOB_STATUS_QUEUED))
+ (result, _) = self.jdm.CheckAndRegister(job, job_id,
+ [constants.JOB_STATUS_SUCCESS])
+ self.assertEqual(result, self.jdm.WAIT)
+ self.assertFalse(self._status)
+ self.assertFalse(self._queue)
+ self.assertTrue(self.jdm.JobWaiting(job))
+ self.assertEqual(self.jdm._waiters, {
+ job_id: set([job]),
+ })
+
+ self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
+ (result, _) = self.jdm.CheckAndRegister(job, job_id,
+ [constants.JOB_STATUS_SUCCESS])
+ self.assertEqual(result, self.jdm.CONTINUE)
+ self.assertFalse(self._status)
+ self.assertFalse(self._queue)
+ self.assertFalse(self.jdm.JobWaiting(job))
+
+ def testFinalizedRightAway(self):
+ job = self._FakeJob(224)
+ job_id = str(3081)
+
+ self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
+ (result, _) = self.jdm.CheckAndRegister(job, job_id,
+ [constants.JOB_STATUS_SUCCESS])
+ self.assertEqual(result, self.jdm.CONTINUE)
+ self.assertFalse(self._status)
+ self.assertFalse(self._queue)
+ self.assertFalse(self.jdm.JobWaiting(job))
+ self.assertEqual(self.jdm._waiters, {
+ job_id: set(),
+ })
+
+ # Force cleanup
+ self.jdm.NotifyWaiters("0")
+ self.assertFalse(self.jdm._waiters)
+ self.assertFalse(self._status)
+ self.assertFalse(self._queue)
+
+ def testMultipleWaiting(self):
+ # Use a deterministic random generator
+ rnd = random.Random(21402)
+
+ job_ids = map(str, rnd.sample(range(1, 10000), 150))
+
+ waiters = dict((job_ids.pop(),
+ set(map(self._FakeJob,
+ [job_ids.pop()
+ for _ in range(rnd.randint(1, 20))])))
+ for _ in range(10))
+
+ # Ensure there are no duplicate job IDs
+ assert not utils.FindDuplicates(waiters.keys() +
+ [job.id
+ for jobs in waiters.values()
+ for job in jobs])
+
+ # Register all jobs as waiters
+ for job_id, job in [(job_id, job)
+ for (job_id, jobs) in waiters.items()
+ for job in jobs]:
+ self._status.append((job_id, constants.JOB_STATUS_QUEUED))
+ (result, _) = self.jdm.CheckAndRegister(job, job_id,
+ [constants.JOB_STATUS_SUCCESS])
+ self.assertEqual(result, self.jdm.WAIT)
+ self.assertFalse(self._status)
+ self.assertFalse(self._queue)
+ self.assertTrue(self.jdm.JobWaiting(job))
+
+ self.assertEqual(self.jdm._waiters, waiters)
+
+ def _MakeSet((name, mode, owner_names, pending)):
+ return (name, mode, owner_names,
+ [(pendmode, set(pend)) for (pendmode, pend) in pending])
+
+ def _CheckLockInfo():
+ info = self.jdm.GetLockInfo([query.LQ_PENDING])
+ self.assertEqual(sorted(map(_MakeSet, info)), sorted([
+ ("job/%s" % job_id, None, None,
+ [("job", set([job.id for job in jobs]))])
+ for job_id, jobs in waiters.items()
+ if jobs
+ ]))
+
+ _CheckLockInfo()
+
+ # Notify in random order
+ for job_id in rnd.sample(waiters, len(waiters)):
+ # Remove from pending waiter list
+ jobs = waiters.pop(job_id)
+ for job in jobs:
+ self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
+ (result, _) = self.jdm.CheckAndRegister(job, job_id,
+ [constants.JOB_STATUS_SUCCESS])
+ self.assertEqual(result, self.jdm.CONTINUE)
+ self.assertFalse(self._status)
+ self.assertFalse(self._queue)
+ self.assertFalse(self.jdm.JobWaiting(job))
+
+ _CheckLockInfo()
+
+ self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
+
+ assert not waiters
+
+ def testSelfDependency(self):
+ job = self._FakeJob(18937)
+
+ self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
+ (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
+ self.assertEqual(result, self.jdm.ERROR)
+
+ def testJobDisappears(self):
+ job = self._FakeJob(30540)
+ job_id = str(23769)
+
+ def _FakeStatus(_):
+ raise errors.JobLost("#msg#")
+
+ jdm = jqueue._JobDependencyManager(_FakeStatus, None)
+ (result, _) = jdm.CheckAndRegister(job, job_id, [])
+ self.assertEqual(result, self.jdm.ERROR)
+ self.assertFalse(jdm.JobWaiting(job))
+ self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
+
+
if __name__ == "__main__":
testutils.GanetiTestProgram()