#!/usr/bin/python
#
-# Copyright (C) 2010, 2011 Google Inc.
+# Copyright (C) 2010, 2011, 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
from ganeti import compat
from ganeti import mcpu
from ganeti import query
+from ganeti import workerpool
import testutils
self.assertEqual(len(job.ops), len(ops))
self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
for (inp, op) in zip(ops, job.ops)))
- self.assertRaises(errors.OpExecError, job.GetInfo,
+ self.assertRaises(errors.OpPrereqError, job.GetInfo,
["unknown-field"])
self.assertEqual(job.GetInfo(["summary"]),
[[op.input.Summary() for op in job.ops]])
+ self.assertFalse(job.archived)
job1 = jqueue._QueuedJob(None, job_id, ops, True)
_Check(job1)
- job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True)
+ job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True, False)
_Check(job2)
self.assertEqual(job1.Serialize(), job2.Serialize())
job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
self.assertTrue(job.writable)
+ def testArchived(self):
+ job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
+ self.assertFalse(job.archived)
+
+ newjob = jqueue._QueuedJob.Restore(None, job.Serialize(), True, True)
+ self.assertTrue(newjob.archived)
+
+ newjob2 = jqueue._QueuedJob.Restore(None, newjob.Serialize(), True, False)
+ self.assertFalse(newjob2.archived)
+
def testPriority(self):
job_id = 4283
ops = [
for i in range(opcount)]
# Create job
- job = self._CreateJob(queue, job_id, ops)
+ job = self._CreateJob(queue, str(job_id), ops)
opexec = _FakeExecOpCodeForProc(queue, None, None)
self.assert_(job.ops_iter)
# Serialize and restore (simulates program restart)
- newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
+ newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
self.assertFalse(newjob.ops_iter)
self._TestPartial(newjob, successcount)
self.assertRaises(IndexError, queue.GetNextUpdate)
# ... also after being restored
- job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
+ job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
# Calling the processor on a finished job should be a no-op
self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
jqueue._JobProcessor.FINISHED)
self._CheckLogMessages(job, logmsgcount)
# Serialize and restore (simulates program restart)
- newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
+ newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
self._CheckLogMessages(newjob, logmsgcount)
# Check each message
self.assertRaises(IndexError, queue.GetNextUpdate)
+class TestEvaluateJobProcessorResult(unittest.TestCase):
+ def testFinished(self):
+ depmgr = _FakeDependencyManager()
+ job = _IdOnlyFakeJob(30953)
+ jqueue._EvaluateJobProcessorResult(depmgr, job,
+ jqueue._JobProcessor.FINISHED)
+ self.assertEqual(depmgr.GetNextNotification(), job.id)
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+ def testDefer(self):
+ depmgr = _FakeDependencyManager()
+ job = _IdOnlyFakeJob(11326, priority=5463)
+ try:
+ jqueue._EvaluateJobProcessorResult(depmgr, job,
+ jqueue._JobProcessor.DEFER)
+ except workerpool.DeferTask, err:
+ self.assertEqual(err.priority, 5463)
+ else:
+ self.fail("Didn't raise exception")
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+ def testWaitdep(self):
+ depmgr = _FakeDependencyManager()
+ job = _IdOnlyFakeJob(21317)
+ jqueue._EvaluateJobProcessorResult(depmgr, job,
+ jqueue._JobProcessor.WAITDEP)
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+ def testOther(self):
+ depmgr = _FakeDependencyManager()
+ job = _IdOnlyFakeJob(5813)
+ self.assertRaises(errors.ProgrammerError,
+ jqueue._EvaluateJobProcessorResult,
+ depmgr, job, "Other result")
+ self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+
class _FakeTimeoutStrategy:
def __init__(self, timeouts):
self.timeouts = timeouts
return result
def _Enqueue(self, jobs):
+ self.assertFalse(self.jdm._lock.is_owned(),
+ msg=("Must not own manager lock while re-adding jobs"
+ " (potential deadlock)"))
self._queue.append(jobs)
def testNotFinalizedThenCancel(self):