#!/usr/bin/python
#
-# Copyright (C) 2010 Google Inc.
+# Copyright (C) 2010, 2011 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
import tempfile
import shutil
import errno
+import itertools
from ganeti import constants
from ganeti import utils
from ganeti import errors
from ganeti import jqueue
+from ganeti import opcodes
+from ganeti import compat
+from ganeti import mcpu
import testutils
self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
+class TestQueuedOpCode(unittest.TestCase):
+ def testDefaults(self):
+ def _Check(op):
+ self.assertFalse(hasattr(op.input, "dry_run"))
+ self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
+ self.assertFalse(op.log)
+ self.assert_(op.start_timestamp is None)
+ self.assert_(op.exec_timestamp is None)
+ self.assert_(op.end_timestamp is None)
+ self.assert_(op.result is None)
+ self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
+
+ op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
+ _Check(op1)
+ op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
+ _Check(op2)
+ self.assertEqual(op1.Serialize(), op2.Serialize())
+
+ def testPriority(self):
+ def _Check(op):
+ assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
+ "Default priority equals high priority; test can't work"
+ self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
+ self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
+
+ inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
+ op1 = jqueue._QueuedOpCode(inpop)
+ _Check(op1)
+ op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
+ _Check(op2)
+ self.assertEqual(op1.Serialize(), op2.Serialize())
+
+
+class TestQueuedJob(unittest.TestCase):
+ def test(self):
+ self.assertRaises(errors.GenericError, jqueue._QueuedJob,
+ None, 1, [])
+
+ def testDefaults(self):
+ job_id = 4260
+ ops = [
+ opcodes.OpTagsGet(),
+ opcodes.OpTestDelay(),
+ ]
+
+ def _Check(job):
+ self.assertEqual(job.id, job_id)
+ self.assertEqual(job.log_serial, 0)
+ self.assert_(job.received_timestamp)
+ self.assert_(job.start_timestamp is None)
+ self.assert_(job.end_timestamp is None)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+ self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+ self.assert_(repr(job).startswith("<"))
+ self.assertEqual(len(job.ops), len(ops))
+ self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
+ for (inp, op) in zip(ops, job.ops)))
+ self.assertRaises(errors.OpExecError, job.GetInfo,
+ ["unknown-field"])
+ self.assertEqual(job.GetInfo(["summary"]),
+ [[op.input.Summary() for op in job.ops]])
+
+ job1 = jqueue._QueuedJob(None, job_id, ops)
+ _Check(job1)
+ job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
+ _Check(job2)
+ self.assertEqual(job1.Serialize(), job2.Serialize())
+
+ def testPriority(self):
+ job_id = 4283
+ ops = [
+ opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
+ opcodes.OpTestDelay(),
+ ]
+
+ def _Check(job):
+ self.assertEqual(job.id, job_id)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+ self.assert_(repr(job).startswith("<"))
+
+ job = jqueue._QueuedJob(None, job_id, ops)
+ _Check(job)
+ self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
+ for op in job.ops))
+ self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+
+ # Increase first
+ job.ops[0].priority -= 1
+ _Check(job)
+ self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
+
+ # Mark opcode as finished
+ job.ops[0].status = constants.OP_STATUS_SUCCESS
+ _Check(job)
+ self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+
+ # Increase second
+ job.ops[1].priority -= 10
+ self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
+
+ # Test increasing first
+ job.ops[0].status = constants.OP_STATUS_RUNNING
+ job.ops[0].priority -= 19
+ self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
+
+ def testCalcStatus(self):
+ def _Queued(ops):
+ # The default status is "queued"
+ self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
+ for op in ops))
+
+ def _Waitlock1(ops):
+ ops[0].status = constants.OP_STATUS_WAITLOCK
+
+ def _Waitlock2(ops):
+ ops[0].status = constants.OP_STATUS_SUCCESS
+ ops[1].status = constants.OP_STATUS_SUCCESS
+ ops[2].status = constants.OP_STATUS_WAITLOCK
+
+ def _Running(ops):
+ ops[0].status = constants.OP_STATUS_SUCCESS
+ ops[1].status = constants.OP_STATUS_RUNNING
+ for op in ops[2:]:
+ op.status = constants.OP_STATUS_QUEUED
+
+ def _Canceling1(ops):
+ ops[0].status = constants.OP_STATUS_SUCCESS
+ ops[1].status = constants.OP_STATUS_SUCCESS
+ for op in ops[2:]:
+ op.status = constants.OP_STATUS_CANCELING
+
+ def _Canceling2(ops):
+ for op in ops:
+ op.status = constants.OP_STATUS_CANCELING
+
+ def _Canceled(ops):
+ for op in ops:
+ op.status = constants.OP_STATUS_CANCELED
+
+ def _Error1(ops):
+ for idx, op in enumerate(ops):
+ if idx > 3:
+ op.status = constants.OP_STATUS_ERROR
+ else:
+ op.status = constants.OP_STATUS_SUCCESS
+
+ def _Error2(ops):
+ for op in ops:
+ op.status = constants.OP_STATUS_ERROR
+
+ def _Success(ops):
+ for op in ops:
+ op.status = constants.OP_STATUS_SUCCESS
+
+ tests = {
+ constants.JOB_STATUS_QUEUED: [_Queued],
+ constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
+ constants.JOB_STATUS_RUNNING: [_Running],
+ constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
+ constants.JOB_STATUS_CANCELED: [_Canceled],
+ constants.JOB_STATUS_ERROR: [_Error1, _Error2],
+ constants.JOB_STATUS_SUCCESS: [_Success],
+ }
+
+ def _NewJob():
+ job = jqueue._QueuedJob(None, 1,
+ [opcodes.OpTestDelay() for _ in range(10)])
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+ self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
+ for op in job.ops))
+ return job
+
+ for status in constants.JOB_STATUS_ALL:
+ sttests = tests[status]
+ assert sttests
+ for fn in sttests:
+ job = _NewJob()
+ fn(job.ops)
+ self.assertEqual(job.CalcStatus(), status)
+
+
+class _FakeQueueForProc:
+ def __init__(self):
+ self._acquired = False
+ self._updates = []
+ self._submitted = []
+
+ self._submit_count = itertools.count(1000)
+
+ def IsAcquired(self):
+ return self._acquired
+
+ def GetNextUpdate(self):
+ return self._updates.pop(0)
+
+ def GetNextSubmittedJob(self):
+ return self._submitted.pop(0)
+
+ def acquire(self, shared=0):
+ assert shared == 1
+ self._acquired = True
+
+ def release(self):
+ assert self._acquired
+ self._acquired = False
+
+ def UpdateJobUnlocked(self, job, replicate=True):
+ assert self._acquired, "Lock not acquired while updating job"
+ self._updates.append((job, bool(replicate)))
+
+ def SubmitManyJobs(self, jobs):
+ assert not self._acquired, "Lock acquired while submitting jobs"
+ job_ids = [self._submit_count.next() for _ in jobs]
+ self._submitted.extend(zip(job_ids, jobs))
+ return job_ids
+
+
+class _FakeExecOpCodeForProc:
+ def __init__(self, queue, before_start, after_start):
+ self._queue = queue
+ self._before_start = before_start
+ self._after_start = after_start
+
+ def __call__(self, op, cbs, timeout=None, priority=None):
+ assert isinstance(op, opcodes.OpTestDummy)
+ assert not self._queue.IsAcquired(), \
+ "Queue lock not released when executing opcode"
+
+ if self._before_start:
+ self._before_start(timeout, priority)
+
+ cbs.NotifyStart()
+
+ if self._after_start:
+ self._after_start(op, cbs)
+
+ # Check again after the callbacks
+ assert not self._queue.IsAcquired()
+
+ if op.fail:
+ raise errors.OpExecError("Error requested (%s)" % op.result)
+
+ if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
+ return cbs.SubmitManyJobs(op.submit_jobs)
+
+ return op.result
+
+
+class _JobProcessorTestUtils:
+ def _CreateJob(self, queue, job_id, ops):
+ job = jqueue._QueuedJob(queue, job_id, ops)
+ self.assertFalse(job.start_timestamp)
+ self.assertFalse(job.end_timestamp)
+ self.assertEqual(len(ops), len(job.ops))
+ self.assert_(compat.all(op.input == inp
+ for (op, inp) in zip(job.ops, ops)))
+ self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
+ return job
+
+
+class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
+ def _GenericCheckJob(self, job):
+ assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
+ for op in job.ops)
+
+ self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
+ [[op.start_timestamp for op in job.ops],
+ [op.exec_timestamp for op in job.ops],
+ [op.end_timestamp for op in job.ops]])
+ self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
+ [job.received_timestamp,
+ job.start_timestamp,
+ job.end_timestamp])
+ self.assert_(job.start_timestamp)
+ self.assert_(job.end_timestamp)
+ self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
+
+ def testSuccess(self):
+ queue = _FakeQueueForProc()
+
+ for (job_id, opcount) in [(25351, 1), (6637, 3),
+ (24644, 10), (32207, 100)]:
+ ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+ for i in range(opcount)]
+
+ # Create job
+ job = self._CreateJob(queue, job_id, ops)
+
+ def _BeforeStart(timeout, priority):
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertFalse(job.cur_opctx)
+
+ def _AfterStart(op, cbs):
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+ self.assertFalse(job.cur_opctx)
+
+ # Job is running, cancelling shouldn't be possible
+ (success, _) = job.Cancel()
+ self.assertFalse(success)
+
+ opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+ for idx in range(len(ops)):
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ result = jqueue._JobProcessor(queue, opexec, job)()
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ if idx == len(ops) - 1:
+ # Last opcode
+ self.assert_(result)
+ else:
+ self.assertFalse(result)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+ self.assert_(job.start_timestamp)
+ self.assertFalse(job.end_timestamp)
+
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+ self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+ self.assertEqual(job.GetInfo(["opresult"]),
+ [[op.input.result for op in job.ops]])
+ self.assertEqual(job.GetInfo(["opstatus"]),
+ [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
+ self.assert_(compat.all(op.start_timestamp and op.end_timestamp
+ for op in job.ops))
+
+ self._GenericCheckJob(job)
+
+ # Calling the processor on a finished job should be a no-op
+ self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ def testOpcodeError(self):
+ queue = _FakeQueueForProc()
+
+ testdata = [
+ (17077, 1, 0, 0),
+ (1782, 5, 2, 2),
+ (18179, 10, 9, 9),
+ (4744, 10, 3, 8),
+ (23816, 100, 39, 45),
+ ]
+
+ for (job_id, opcount, failfrom, failto) in testdata:
+ # Prepare opcodes
+ ops = [opcodes.OpTestDummy(result="Res%s" % i,
+ fail=(failfrom <= i and
+ i <= failto))
+ for i in range(opcount)]
+
+ # Create job
+ job = self._CreateJob(queue, job_id, ops)
+
+ opexec = _FakeExecOpCodeForProc(queue, None, None)
+
+ for idx in range(len(ops)):
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ result = jqueue._JobProcessor(queue, opexec, job)()
+ # queued to waitlock
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ # waitlock to running
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ # Opcode result
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ if idx in (failfrom, len(ops) - 1):
+ # Last opcode
+ self.assert_(result)
+ break
+
+ self.assertFalse(result)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ # Check job status
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
+ self.assertEqual(job.GetInfo(["id"]), [job_id])
+ self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
+
+ # Check opcode status
+ data = zip(job.ops,
+ job.GetInfo(["opstatus"])[0],
+ job.GetInfo(["opresult"])[0])
+
+ for idx, (op, opstatus, opresult) in enumerate(data):
+ if idx < failfrom:
+ assert not op.input.fail
+ self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
+ self.assertEqual(opresult, op.input.result)
+ elif idx <= failto:
+ assert op.input.fail
+ self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
+ self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
+ else:
+ assert not op.input.fail
+ self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
+ self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
+
+ self.assert_(compat.all(op.start_timestamp and op.end_timestamp
+ for op in job.ops[:failfrom]))
+
+ self._GenericCheckJob(job)
+
+ # Calling the processor on a finished job should be a no-op
+ self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ def testCancelWhileInQueue(self):
+ queue = _FakeQueueForProc()
+
+ ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+ for i in range(5)]
+
+ # Create job
+ job_id = 17045
+ job = self._CreateJob(queue, job_id, ops)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+ # Mark as cancelled
+ (success, _) = job.Cancel()
+ self.assert_(success)
+
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ self.assertFalse(job.start_timestamp)
+ self.assertTrue(job.end_timestamp)
+ self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
+ for op in job.ops))
+
+ # Serialize to check for differences
+ before_proc = job.Serialize()
+
+ # Simulate processor called in workerpool
+ opexec = _FakeExecOpCodeForProc(queue, None, None)
+ self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+
+ # Check result
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+ self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+ self.assertFalse(job.start_timestamp)
+ self.assertTrue(job.end_timestamp)
+ self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
+ for op in job.ops))
+ self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+ [[constants.OP_STATUS_CANCELED for _ in job.ops],
+ ["Job canceled by request" for _ in job.ops]])
+
+ # Must not have changed or written
+ self.assertEqual(before_proc, job.Serialize())
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ def testCancelWhileWaitlockInQueue(self):
+ queue = _FakeQueueForProc()
+
+ ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+ for i in range(5)]
+
+ # Create job
+ job_id = 8645
+ job = self._CreateJob(queue, job_id, ops)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+ job.ops[0].status = constants.OP_STATUS_WAITLOCK
+
+ assert len(job.ops) == 5
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+ # Mark as cancelling
+ (success, _) = job.Cancel()
+ self.assert_(success)
+
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
+ for op in job.ops))
+
+ opexec = _FakeExecOpCodeForProc(queue, None, None)
+ self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+
+ # Check result
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+ self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+ self.assertFalse(job.start_timestamp)
+ self.assert_(job.end_timestamp)
+ self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
+ for op in job.ops))
+ self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+ [[constants.OP_STATUS_CANCELED for _ in job.ops],
+ ["Job canceled by request" for _ in job.ops]])
+
+ def testCancelWhileWaitlock(self):
+ queue = _FakeQueueForProc()
+
+ ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+ for i in range(5)]
+
+ # Create job
+ job_id = 11009
+ job = self._CreateJob(queue, job_id, ops)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+ def _BeforeStart(timeout, priority):
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+ # Mark as cancelled
+ (success, _) = job.Cancel()
+ self.assert_(success)
+
+ self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
+ for op in job.ops))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ def _AfterStart(op, cbs):
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+
+ opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ # Check result
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+ self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+ self.assert_(job.start_timestamp)
+ self.assert_(job.end_timestamp)
+ self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
+ for op in job.ops))
+ self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+ [[constants.OP_STATUS_CANCELED for _ in job.ops],
+ ["Job canceled by request" for _ in job.ops]])
+
+ def testCancelWhileWaitlockWithTimeout(self):
+ queue = _FakeQueueForProc()
+
+ ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+ for i in range(5)]
+
+ # Create job
+ job_id = 24314
+ job = self._CreateJob(queue, job_id, ops)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+ def _BeforeStart(timeout, priority):
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+ # Mark as cancelled
+ (success, _) = job.Cancel()
+ self.assert_(success)
+
+ self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
+ for op in job.ops))
+
+ # Fake an acquire attempt timing out
+ raise mcpu.LockAcquireTimeout()
+
+ def _AfterStart(op, cbs):
+ self.fail("Should not reach this")
+
+ opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+ self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+
+ # Check result
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+ self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+ self.assert_(job.start_timestamp)
+ self.assert_(job.end_timestamp)
+ self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
+ for op in job.ops))
+ self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+ [[constants.OP_STATUS_CANCELED for _ in job.ops],
+ ["Job canceled by request" for _ in job.ops]])
+
+ def testCancelWhileRunning(self):
+ # Tests canceling a job with finished opcodes and more, unprocessed ones
+ queue = _FakeQueueForProc()
+
+ ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+ for i in range(3)]
+
+ # Create job
+ job_id = 28492
+ job = self._CreateJob(queue, job_id, ops)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+ opexec = _FakeExecOpCodeForProc(queue, None, None)
+
+ # Run one opcode
+ self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
+
+ # Job goes back to queued
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+ self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+ [[constants.OP_STATUS_SUCCESS,
+ constants.OP_STATUS_QUEUED,
+ constants.OP_STATUS_QUEUED],
+ ["Res0", None, None]])
+
+ # Mark as cancelled
+ (success, _) = job.Cancel()
+ self.assert_(success)
+
+ # Try processing another opcode (this will actually cancel the job)
+ self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+
+ # Check result
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+ self.assertEqual(job.GetInfo(["id"]), [job_id])
+ self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+ self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+ [[constants.OP_STATUS_SUCCESS,
+ constants.OP_STATUS_CANCELED,
+ constants.OP_STATUS_CANCELED],
+ ["Res0", "Job canceled by request",
+ "Job canceled by request"]])
+
+ def testPartiallyRun(self):
+ # Tests calling the processor on a job that's been partially run before the
+ # program was restarted
+ queue = _FakeQueueForProc()
+
+ opexec = _FakeExecOpCodeForProc(queue, None, None)
+
+ for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
+ ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+ for i in range(10)]
+
+ # Create job
+ job = self._CreateJob(queue, job_id, ops)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+ for _ in range(successcount):
+ self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+ self.assertEqual(job.GetInfo(["opstatus"]),
+ [[constants.OP_STATUS_SUCCESS
+ for _ in range(successcount)] +
+ [constants.OP_STATUS_QUEUED
+ for _ in range(len(ops) - successcount)]])
+
+ self.assert_(job.ops_iter)
+
+ # Serialize and restore (simulates program restart)
+ newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
+ self.assertFalse(newjob.ops_iter)
+ self._TestPartial(newjob, successcount)
+
+ def _TestPartial(self, job, successcount):
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+ self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
+
+ queue = _FakeQueueForProc()
+ opexec = _FakeExecOpCodeForProc(queue, None, None)
+
+ for remaining in reversed(range(len(job.ops) - successcount)):
+ result = jqueue._JobProcessor(queue, opexec, job)()
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ if remaining == 0:
+ # Last opcode
+ self.assert_(result)
+ break
+
+ self.assertFalse(result)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+ self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+ self.assertEqual(job.GetInfo(["opresult"]),
+ [[op.input.result for op in job.ops]])
+ self.assertEqual(job.GetInfo(["opstatus"]),
+ [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
+ self.assert_(compat.all(op.start_timestamp and op.end_timestamp
+ for op in job.ops))
+
+ self._GenericCheckJob(job)
+
+ # Calling the processor on a finished job should be a no-op
+ self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ # ... also after being restored
+ job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
+ self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ def testProcessorOnRunningJob(self):
+ ops = [opcodes.OpTestDummy(result="result", fail=False)]
+
+ queue = _FakeQueueForProc()
+ opexec = _FakeExecOpCodeForProc(queue, None, None)
+
+ # Create job
+ job = self._CreateJob(queue, 9571, ops)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+ job.ops[0].status = constants.OP_STATUS_RUNNING
+
+ assert len(job.ops) == 1
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+
+ # Calling on running job must fail
+ self.assertRaises(errors.ProgrammerError,
+ jqueue._JobProcessor(queue, opexec, job))
+
+ def testLogMessages(self):
+ # Tests the "Feedback" callback function
+ queue = _FakeQueueForProc()
+
+ messages = {
+ 1: [
+ (None, "Hello"),
+ (None, "World"),
+ (constants.ELOG_MESSAGE, "there"),
+ ],
+ 4: [
+ (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
+ (constants.ELOG_JQUEUE_TEST, ("other", "type")),
+ ],
+ }
+ ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
+ messages=messages.get(i, []))
+ for i in range(5)]
+
+ # Create job
+ job = self._CreateJob(queue, 29386, ops)
+
+ def _BeforeStart(timeout, priority):
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+ def _AfterStart(op, cbs):
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+
+ self.assertRaises(AssertionError, cbs.Feedback,
+ "too", "many", "arguments")
+
+ for (log_type, msg) in op.messages:
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ if log_type:
+ cbs.Feedback(log_type, msg)
+ else:
+ cbs.Feedback(msg)
+ # Check for job update without replication
+ self.assertEqual(queue.GetNextUpdate(), (job, False))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+ for remaining in reversed(range(len(job.ops))):
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ result = jqueue._JobProcessor(queue, opexec, job)()
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ if remaining == 0:
+ # Last opcode
+ self.assert_(result)
+ break
+
+ self.assertFalse(result)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+ self.assertEqual(job.GetInfo(["opresult"]),
+ [[op.input.result for op in job.ops]])
+
+ logmsgcount = sum(len(m) for m in messages.values())
+
+ self._CheckLogMessages(job, logmsgcount)
+
+ # Serialize and restore (simulates program restart)
+ newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
+ self._CheckLogMessages(newjob, logmsgcount)
+
+ # Check each message
+ prevserial = -1
+ for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
+ for (serial, timestamp, log_type, msg) in oplog:
+ (exptype, expmsg) = messages.get(idx).pop(0)
+ if exptype:
+ self.assertEqual(log_type, exptype)
+ else:
+ self.assertEqual(log_type, constants.ELOG_MESSAGE)
+ self.assertEqual(expmsg, msg)
+ self.assert_(serial > prevserial)
+ prevserial = serial
+
+ def _CheckLogMessages(self, job, count):
+ # Check serial
+ self.assertEqual(job.log_serial, count)
+
+ # No filter
+ self.assertEqual(job.GetLogEntries(None),
+ [entry for entries in job.GetInfo(["oplog"])[0] if entries
+ for entry in entries])
+
+ # Filter with serial
+ assert count > 3
+ self.assert_(job.GetLogEntries(3))
+ self.assertEqual(job.GetLogEntries(3),
+ [entry for entries in job.GetInfo(["oplog"])[0] if entries
+ for entry in entries][3:])
+
+ # No log message after highest serial
+ self.assertFalse(job.GetLogEntries(count))
+ self.assertFalse(job.GetLogEntries(count + 3))
+
+ def testSubmitManyJobs(self):
+ queue = _FakeQueueForProc()
+
+ job_id = 15656
+ ops = [
+ opcodes.OpTestDummy(result="Res0", fail=False,
+ submit_jobs=[]),
+ opcodes.OpTestDummy(result="Res1", fail=False,
+ submit_jobs=[
+ [opcodes.OpTestDummy(result="r1j0", fail=False)],
+ ]),
+ opcodes.OpTestDummy(result="Res2", fail=False,
+ submit_jobs=[
+ [opcodes.OpTestDummy(result="r2j0o0", fail=False),
+ opcodes.OpTestDummy(result="r2j0o1", fail=False),
+ opcodes.OpTestDummy(result="r2j0o2", fail=False),
+ opcodes.OpTestDummy(result="r2j0o3", fail=False)],
+ [opcodes.OpTestDummy(result="r2j1", fail=False)],
+ [opcodes.OpTestDummy(result="r2j3o0", fail=False),
+ opcodes.OpTestDummy(result="r2j3o1", fail=False)],
+ ]),
+ ]
+
+ # Create job
+ job = self._CreateJob(queue, job_id, ops)
+
+ def _BeforeStart(timeout, priority):
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+ self.assertFalse(job.cur_opctx)
+
+ def _AfterStart(op, cbs):
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ self.assertFalse(queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+ self.assertFalse(job.cur_opctx)
+
+ # Job is running, cancelling shouldn't be possible
+ (success, _) = job.Cancel()
+ self.assertFalse(success)
+
+ opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+ for idx in range(len(ops)):
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ result = jqueue._JobProcessor(queue, opexec, job)()
+ self.assertEqual(queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+ if idx == len(ops) - 1:
+ # Last opcode
+ self.assert_(result)
+ else:
+ self.assertFalse(result)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+ self.assert_(job.start_timestamp)
+ self.assertFalse(job.end_timestamp)
+
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+ for idx, submitted_ops in enumerate(job_ops
+ for op in ops
+ for job_ops in op.submit_jobs):
+ self.assertEqual(queue.GetNextSubmittedJob(),
+ (1000 + idx, submitted_ops))
+ self.assertRaises(IndexError, queue.GetNextSubmittedJob)
+
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+ self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+ self.assertEqual(job.GetInfo(["opresult"]),
+ [[[], [1000], [1001, 1002, 1003]]])
+ self.assertEqual(job.GetInfo(["opstatus"]),
+ [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
+
+ self._GenericCheckJob(job)
+
+ # Calling the processor on a finished job should be a no-op
+ self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+ self.assertRaises(IndexError, queue.GetNextUpdate)
+
+
+class _FakeTimeoutStrategy:
+ def __init__(self, timeouts):
+ self.timeouts = timeouts
+ self.attempts = 0
+ self.last_timeout = None
+
+ def NextAttempt(self):
+ self.attempts += 1
+ if self.timeouts:
+ timeout = self.timeouts.pop(0)
+ else:
+ timeout = None
+ self.last_timeout = timeout
+ return timeout
+
+
+class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
+ def setUp(self):
+ self.queue = _FakeQueueForProc()
+ self.job = None
+ self.curop = None
+ self.opcounter = None
+ self.timeout_strategy = None
+ self.retries = 0
+ self.prev_tsop = None
+ self.prev_prio = None
+ self.prev_status = None
+ self.lock_acq_prio = None
+ self.gave_lock = None
+ self.done_lock_before_blocking = False
+
+ def _BeforeStart(self, timeout, priority):
+ job = self.job
+
+ # If status has changed, job must've been written
+ if self.prev_status != self.job.ops[self.curop].status:
+ self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
+ self.assertFalse(self.queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+ ts = self.timeout_strategy
+
+ self.assert_(timeout is None or isinstance(timeout, (int, float)))
+ self.assertEqual(timeout, ts.last_timeout)
+ self.assertEqual(priority, job.ops[self.curop].priority)
+
+ self.gave_lock = True
+ self.lock_acq_prio = priority
+
+ if (self.curop == 3 and
+ job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
+ # Give locks before running into blocking acquire
+ assert self.retries == 7
+ self.retries = 0
+ self.done_lock_before_blocking = True
+ return
+
+ if self.retries > 0:
+ self.assert_(timeout is not None)
+ self.retries -= 1
+ self.gave_lock = False
+ raise mcpu.LockAcquireTimeout()
+
+ if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
+ assert self.retries == 0, "Didn't exhaust all retries at highest priority"
+ assert not ts.timeouts
+ self.assert_(timeout is None)
+
+ def _AfterStart(self, op, cbs):
+ job = self.job
+
+ # Setting to "running" requires an update
+ self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
+ self.assertFalse(self.queue.IsAcquired())
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+
+ # Job is running, cancelling shouldn't be possible
+ (success, _) = job.Cancel()
+ self.assertFalse(success)
+
+ def _NextOpcode(self):
+ self.curop = self.opcounter.next()
+ self.prev_prio = self.job.ops[self.curop].priority
+ self.prev_status = self.job.ops[self.curop].status
+
+ def _NewTimeoutStrategy(self):
+ job = self.job
+
+ self.assertEqual(self.retries, 0)
+
+ if self.prev_tsop == self.curop:
+ # Still on the same opcode, priority must've been increased
+ self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
+
+ if self.curop == 1:
+ # Normal retry
+ timeouts = range(10, 31, 10)
+ self.retries = len(timeouts) - 1
+
+ elif self.curop == 2:
+ # Let this run into a blocking acquire
+ timeouts = range(11, 61, 12)
+ self.retries = len(timeouts)
+
+ elif self.curop == 3:
+ # Wait for priority to increase, but give lock before blocking acquire
+ timeouts = range(12, 100, 14)
+ self.retries = len(timeouts)
+
+ self.assertFalse(self.done_lock_before_blocking)
+
+ elif self.curop == 4:
+ self.assert_(self.done_lock_before_blocking)
+
+ # Timeouts, but no need to retry
+ timeouts = range(10, 31, 10)
+ self.retries = 0
+
+ elif self.curop == 5:
+ # Normal retry
+ timeouts = range(19, 100, 11)
+ self.retries = len(timeouts)
+
+ else:
+ timeouts = []
+ self.retries = 0
+
+ assert len(job.ops) == 10
+ assert self.retries <= len(timeouts)
+
+ ts = _FakeTimeoutStrategy(timeouts)
+
+ self.timeout_strategy = ts
+ self.prev_tsop = self.curop
+ self.prev_prio = job.ops[self.curop].priority
+
+ return ts
+
+ def testTimeout(self):
+ ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+ for i in range(10)]
+
+ # Create job
+ job_id = 15801
+ job = self._CreateJob(self.queue, job_id, ops)
+ self.job = job
+
+ self.opcounter = itertools.count(0)
+
+ opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
+ self._AfterStart)
+ tsf = self._NewTimeoutStrategy
+
+ self.assertFalse(self.done_lock_before_blocking)
+
+ while True:
+ proc = jqueue._JobProcessor(self.queue, opexec, job,
+ _timeout_strategy_factory=tsf)
+
+ self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
+ if self.curop is not None:
+ self.prev_status = self.job.ops[self.curop].status
+
+ self.lock_acq_prio = None
+
+ result = proc(_nextop_fn=self._NextOpcode)
+ assert self.curop is not None
+
+ if result or self.gave_lock:
+ # Got lock and/or job is done, result must've been written
+ self.assertFalse(job.cur_opctx)
+ self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+ self.assertRaises(IndexError, self.queue.GetNextUpdate)
+ self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
+ self.assert_(job.ops[self.curop].exec_timestamp)
+
+ if result:
+ self.assertFalse(job.cur_opctx)
+ break
+
+ self.assertFalse(result)
+
+ if self.curop == 0:
+ self.assertEqual(job.ops[self.curop].start_timestamp,
+ job.start_timestamp)
+
+ if self.gave_lock:
+ # Opcode finished, but job not yet done
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+ else:
+ # Did not get locks
+ self.assert_(job.cur_opctx)
+ self.assertEqual(job.cur_opctx._timeout_strategy._fn,
+ self.timeout_strategy.NextAttempt)
+ self.assertFalse(job.ops[self.curop].exec_timestamp)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+ # If priority has changed since acquiring locks, the job must've been
+ # updated
+ if self.lock_acq_prio != job.ops[self.curop].priority:
+ self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+
+ self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
+ self.assert_(job.start_timestamp)
+ self.assertFalse(job.end_timestamp)
+
+ self.assertEqual(self.curop, len(job.ops) - 1)
+ self.assertEqual(self.job, job)
+ self.assertEqual(self.opcounter.next(), len(job.ops))
+ self.assert_(self.done_lock_before_blocking)
+
+ self.assertRaises(IndexError, self.queue.GetNextUpdate)
+ self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+ self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+ self.assertEqual(job.GetInfo(["opresult"]),
+ [[op.input.result for op in job.ops]])
+ self.assertEqual(job.GetInfo(["opstatus"]),
+ [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
+ self.assert_(compat.all(op.start_timestamp and op.end_timestamp
+ for op in job.ops))
+
+ # Calling the processor on a finished job should be a no-op
+ self.assertTrue(jqueue._JobProcessor(self.queue, opexec, job)())
+ self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
+
if __name__ == "__main__":
testutils.GanetiTestProgram()