Merge branch 'devel-2.6'
[ganeti-local] / test / ganeti.jqueue_unittest.py
index 4669320..41b4c4b 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/python
 #
 
-# Copyright (C) 2010, 2011 Google Inc.
+# Copyright (C) 2010, 2011, 2012 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -38,6 +38,7 @@ from ganeti import opcodes
 from ganeti import compat
 from ganeti import mcpu
 from ganeti import query
+from ganeti import workerpool
 
 import testutils
 
@@ -304,14 +305,15 @@ class TestQueuedJob(unittest.TestCase):
       self.assertEqual(len(job.ops), len(ops))
       self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
                               for (inp, op) in zip(ops, job.ops)))
-      self.assertRaises(errors.OpExecError, job.GetInfo,
+      self.assertRaises(errors.OpPrereqError, job.GetInfo,
                         ["unknown-field"])
       self.assertEqual(job.GetInfo(["summary"]),
                        [[op.input.Summary() for op in job.ops]])
+      self.assertFalse(job.archived)
 
     job1 = jqueue._QueuedJob(None, job_id, ops, True)
     _Check(job1)
-    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True)
+    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True, False)
     _Check(job2)
     self.assertEqual(job1.Serialize(), job2.Serialize())
 
@@ -322,6 +324,16 @@ class TestQueuedJob(unittest.TestCase):
     job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
     self.assertTrue(job.writable)
 
+  def testArchived(self):
+    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
+    self.assertFalse(job.archived)
+
+    newjob = jqueue._QueuedJob.Restore(None, job.Serialize(), True, True)
+    self.assertTrue(newjob.archived)
+
+    newjob2 = jqueue._QueuedJob.Restore(None, newjob.Serialize(), True, False)
+    self.assertFalse(newjob2.archived)
+
   def testPriority(self):
     job_id = 4283
     ops = [
@@ -673,7 +685,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
              for i in range(opcount)]
 
       # Create job
-      job = self._CreateJob(queue, job_id, ops)
+      job = self._CreateJob(queue, str(job_id), ops)
 
       opexec = _FakeExecOpCodeForProc(queue, None, None)
 
@@ -993,7 +1005,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
       self.assert_(job.ops_iter)
 
       # Serialize and restore (simulates program restart)
-      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
+      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
       self.assertFalse(newjob.ops_iter)
       self._TestPartial(newjob, successcount)
 
@@ -1038,7 +1050,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     self.assertRaises(IndexError, queue.GetNextUpdate)
 
     # ... also after being restored
-    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
+    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
     # Calling the processor on a finished job should be a no-op
     self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
                      jqueue._JobProcessor.FINISHED)
@@ -1140,7 +1152,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     self._CheckLogMessages(job, logmsgcount)
 
     # Serialize and restore (simulates program restart)
-    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
+    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
     self._CheckLogMessages(newjob, logmsgcount)
 
     # Check each message
@@ -1625,6 +1637,43 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     self.assertRaises(IndexError, queue.GetNextUpdate)
 
 
+class TestEvaluateJobProcessorResult(unittest.TestCase):
+  def testFinished(self):
+    depmgr = _FakeDependencyManager()
+    job = _IdOnlyFakeJob(30953)
+    jqueue._EvaluateJobProcessorResult(depmgr, job,
+                                       jqueue._JobProcessor.FINISHED)
+    self.assertEqual(depmgr.GetNextNotification(), job.id)
+    self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+  def testDefer(self):
+    depmgr = _FakeDependencyManager()
+    job = _IdOnlyFakeJob(11326, priority=5463)
+    try:
+      jqueue._EvaluateJobProcessorResult(depmgr, job,
+                                         jqueue._JobProcessor.DEFER)
+    except workerpool.DeferTask, err:
+      self.assertEqual(err.priority, 5463)
+    else:
+      self.fail("Didn't raise exception")
+    self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+  def testWaitdep(self):
+    depmgr = _FakeDependencyManager()
+    job = _IdOnlyFakeJob(21317)
+    jqueue._EvaluateJobProcessorResult(depmgr, job,
+                                       jqueue._JobProcessor.WAITDEP)
+    self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+  def testOther(self):
+    depmgr = _FakeDependencyManager()
+    job = _IdOnlyFakeJob(5813)
+    self.assertRaises(errors.ProgrammerError,
+                      jqueue._EvaluateJobProcessorResult,
+                      depmgr, job, "Other result")
+    self.assertRaises(IndexError, depmgr.GetNextNotification)
+
+
 class _FakeTimeoutStrategy:
   def __init__(self, timeouts):
     self.timeouts = timeouts
@@ -1879,6 +1928,9 @@ class TestJobDependencyManager(unittest.TestCase):
     return result
 
   def _Enqueue(self, jobs):
+    self.assertFalse(self.jdm._lock.is_owned(),
+                     msg=("Must not own manager lock while re-adding jobs"
+                          " (potential deadlock)"))
     self._queue.append(jobs)
 
   def testNotFinalizedThenCancel(self):