Rename OpGetTags and LUGetTags
[ganeti-local] / test / ganeti.jqueue_unittest.py
index dddf7c9..9137927 100755 (executable)
@@ -268,7 +268,7 @@ class TestQueuedOpCode(unittest.TestCase):
       self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
 
-    inpop = opcodes.OpGetTags(priority=constants.OP_PRIO_HIGH)
+    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
     op1 = jqueue._QueuedOpCode(inpop)
     _Check(op1)
     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
@@ -284,7 +284,7 @@ class TestQueuedJob(unittest.TestCase):
   def testDefaults(self):
     job_id = 4260
     ops = [
-      opcodes.OpGetTags(),
+      opcodes.OpTagsGet(),
       opcodes.OpTestDelay(),
       ]
 
@@ -314,7 +314,7 @@ class TestQueuedJob(unittest.TestCase):
   def testPriority(self):
     job_id = 4283
     ops = [
-      opcodes.OpGetTags(priority=constants.OP_PRIO_DEFAULT),
+      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
       opcodes.OpTestDelay(),
       ]
 
@@ -427,10 +427,14 @@ class TestQueuedJob(unittest.TestCase):
 class _FakeQueueForProc:
   def __init__(self):
     self._acquired = False
+    self._updates = []
 
   def IsAcquired(self):
     return self._acquired
 
+  def GetNextUpdate(self):
+    return self._updates.pop(0)
+
   def acquire(self, shared=0):
     assert shared == 1
     self._acquired = True
@@ -439,18 +443,21 @@ class _FakeQueueForProc:
     assert self._acquired
     self._acquired = False
 
-  def UpdateJobUnlocked(self, job, replicate=None):
-    # TODO: Ensure job is updated at the correct places
-    pass
+  def UpdateJobUnlocked(self, job, replicate=True):
+    assert self._acquired, "Lock not acquired while updating job"
+    self._updates.append((job, bool(replicate)))
 
 
 class _FakeExecOpCodeForProc:
-  def __init__(self, before_start, after_start):
+  def __init__(self, queue, before_start, after_start):
+    self._queue = queue
     self._before_start = before_start
     self._after_start = after_start
 
   def __call__(self, op, cbs, timeout=None, priority=None):
     assert isinstance(op, opcodes.OpTestDummy)
+    assert not self._queue.IsAcquired(), \
+           "Queue lock not released when executing opcode"
 
     if self._before_start:
       self._before_start(timeout, priority)
@@ -460,6 +467,9 @@ class _FakeExecOpCodeForProc:
     if self._after_start:
       self._after_start(op, cbs)
 
+    # Check again after the callbacks
+    assert not self._queue.IsAcquired()
+
     if op.fail:
       raise errors.OpExecError("Error requested (%s)" % op.result)
 
@@ -507,21 +517,31 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
       job = self._CreateJob(queue, job_id, ops)
 
       def _BeforeStart(timeout, priority):
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        self.assertRaises(IndexError, queue.GetNextUpdate)
         self.assertFalse(queue.IsAcquired())
         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+        self.assertFalse(job.cur_opctx)
 
       def _AfterStart(op, cbs):
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        self.assertRaises(IndexError, queue.GetNextUpdate)
+
         self.assertFalse(queue.IsAcquired())
         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+        self.assertFalse(job.cur_opctx)
 
         # Job is running, cancelling shouldn't be possible
         (success, _) = job.Cancel()
         self.assertFalse(success)
 
-      opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
+      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
 
       for idx in range(len(ops)):
+        self.assertRaises(IndexError, queue.GetNextUpdate)
         result = jqueue._JobProcessor(queue, opexec, job)()
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        self.assertRaises(IndexError, queue.GetNextUpdate)
         if idx == len(ops) - 1:
           # Last opcode
           self.assert_(result)
@@ -532,6 +552,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
           self.assert_(job.start_timestamp)
           self.assertFalse(job.end_timestamp)
 
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
       self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
       self.assertEqual(job.GetInfo(["opresult"]),
@@ -568,10 +590,18 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
       # Create job
       job = self._CreateJob(queue, job_id, ops)
 
-      opexec = _FakeExecOpCodeForProc(None, None)
+      opexec = _FakeExecOpCodeForProc(queue, None, None)
 
       for idx in range(len(ops)):
+        self.assertRaises(IndexError, queue.GetNextUpdate)
         result = jqueue._JobProcessor(queue, opexec, job)()
+        # queued to waitlock
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        # waitlock to running
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        # Opcode result
+        self.assertEqual(queue.GetNextUpdate(), (job, True))
+        self.assertRaises(IndexError, queue.GetNextUpdate)
 
         if idx in (failfrom, len(ops) - 1):
           # Last opcode
@@ -582,6 +612,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
 
         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
 
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+
       # Check job status
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
       self.assertEqual(job.GetInfo(["id"]), [job_id])
@@ -631,11 +663,54 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     (success, _) = job.Cancel()
     self.assert_(success)
 
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
     self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
                             for op in job.ops))
 
-    opexec = _FakeExecOpCodeForProc(None, None)
-    jqueue._JobProcessor(queue, opexec, job)()
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
+    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+
+    # Check result
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+    self.assertFalse(job.start_timestamp)
+    self.assert_(job.end_timestamp)
+    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
+                                for op in job.ops))
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
+                      ["Job canceled by request" for _ in job.ops]])
+
+  def testCancelWhileWaitlockInQueue(self):
+    queue = _FakeQueueForProc()
+
+    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+           for i in range(5)]
+
+    # Create job
+    job_id = 8645
+    job = self._CreateJob(queue, job_id, ops)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    job.ops[0].status = constants.OP_STATUS_WAITLOCK
+
+    assert len(job.ops) == 5
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+    # Mark as cancelling
+    (success, _) = job.Cancel()
+    self.assert_(success)
+
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
+    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
+                            for op in job.ops))
+
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
+    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
 
     # Check result
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
@@ -661,6 +736,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
 
     def _BeforeStart(timeout, priority):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
       self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
 
@@ -670,14 +747,64 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
 
       self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
                               for op in job.ops))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
 
     def _AfterStart(op, cbs):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
       self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
 
-    opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
+    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertEqual(queue.GetNextUpdate(), (job, True))
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
+    # Check result
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+    self.assert_(job.start_timestamp)
+    self.assert_(job.end_timestamp)
+    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
+                                for op in job.ops))
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
+                      ["Job canceled by request" for _ in job.ops]])
+
+  def testCancelWhileWaitlockWithTimeout(self):
+    queue = _FakeQueueForProc()
+
+    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+           for i in range(5)]
 
-    jqueue._JobProcessor(queue, opexec, job)()
+    # Create job
+    job_id = 24314
+    job = self._CreateJob(queue, job_id, ops)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    def _BeforeStart(timeout, priority):
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+      # Mark as cancelled
+      (success, _) = job.Cancel()
+      self.assert_(success)
+
+      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
+                              for op in job.ops))
+
+      # Fake an acquire attempt timing out
+      raise mcpu.LockAcquireTimeout()
+
+    def _AfterStart(op, cbs):
+      self.fail("Should not reach this")
+
+    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
 
     # Check result
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
@@ -703,7 +830,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
 
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
 
-    opexec = _FakeExecOpCodeForProc(None, None)
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
 
     # Run one opcode
     self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
@@ -739,7 +866,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     # program was restarted
     queue = _FakeQueueForProc()
 
-    opexec = _FakeExecOpCodeForProc(None, None)
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
 
     for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
       ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
@@ -772,7 +899,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
 
     queue = _FakeQueueForProc()
-    opexec = _FakeExecOpCodeForProc(None, None)
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
 
     for remaining in reversed(range(len(job.ops) - successcount)):
       result = jqueue._JobProcessor(queue, opexec, job)()
@@ -810,7 +937,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     ops = [opcodes.OpTestDummy(result="result", fail=False)]
 
     queue = _FakeQueueForProc()
-    opexec = _FakeExecOpCodeForProc(None, None)
+    opexec = _FakeExecOpCodeForProc(queue, None, None)
 
     # Create job
     job = self._CreateJob(queue, 9571, ops)
@@ -850,10 +977,14 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     job = self._CreateJob(queue, 29386, ops)
 
     def _BeforeStart(timeout, priority):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
       self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
 
     def _AfterStart(op, cbs):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
       self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
 
@@ -861,15 +992,22 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
                         "too", "many", "arguments")
 
       for (log_type, msg) in op.messages:
+        self.assertRaises(IndexError, queue.GetNextUpdate)
         if log_type:
           cbs.Feedback(log_type, msg)
         else:
           cbs.Feedback(msg)
+        # Check for job update without replication
+        self.assertEqual(queue.GetNextUpdate(), (job, False))
+        self.assertRaises(IndexError, queue.GetNextUpdate)
 
-    opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
+    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
 
     for remaining in reversed(range(len(job.ops))):
+      self.assertRaises(IndexError, queue.GetNextUpdate)
       result = jqueue._JobProcessor(queue, opexec, job)()
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
 
       if remaining == 0:
         # Last opcode
@@ -880,6 +1018,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
 
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
 
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
     self.assertEqual(job.GetInfo(["opresult"]),
                      [[op.input.result for op in job.ops]])
@@ -952,12 +1092,19 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
     self.retries = 0
     self.prev_tsop = None
     self.prev_prio = None
+    self.prev_status = None
+    self.lock_acq_prio = None
     self.gave_lock = None
     self.done_lock_before_blocking = False
 
   def _BeforeStart(self, timeout, priority):
     job = self.job
 
+    # If status has changed, job must've been written
+    if self.prev_status != self.job.ops[self.curop].status:
+      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+    self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
     self.assertFalse(self.queue.IsAcquired())
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
 
@@ -968,6 +1115,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
     self.assertEqual(priority, job.ops[self.curop].priority)
 
     self.gave_lock = True
+    self.lock_acq_prio = priority
 
     if (self.curop == 3 and
         job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
@@ -991,6 +1139,10 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
   def _AfterStart(self, op, cbs):
     job = self.job
 
+    # Setting to "running" requires an update
+    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+    self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
     self.assertFalse(self.queue.IsAcquired())
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
 
@@ -1001,6 +1153,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
   def _NextOpcode(self):
     self.curop = self.opcounter.next()
     self.prev_prio = self.job.ops[self.curop].priority
+    self.prev_status = self.job.ops[self.curop].status
 
   def _NewTimeoutStrategy(self):
     job = self.job
@@ -1066,30 +1219,62 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
 
     self.opcounter = itertools.count(0)
 
-    opexec = _FakeExecOpCodeForProc(self._BeforeStart, self._AfterStart)
+    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
+                                    self._AfterStart)
     tsf = self._NewTimeoutStrategy
 
     self.assertFalse(self.done_lock_before_blocking)
 
-    for i in itertools.count(0):
+    while True:
       proc = jqueue._JobProcessor(self.queue, opexec, job,
                                   _timeout_strategy_factory=tsf)
 
+      self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
+      if self.curop is not None:
+        self.prev_status = self.job.ops[self.curop].status
+
+      self.lock_acq_prio = None
+
       result = proc(_nextop_fn=self._NextOpcode)
+      assert self.curop is not None
+
+      if result or self.gave_lock:
+        # Got lock and/or job is done, result must've been written
+        self.assertFalse(job.cur_opctx)
+        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+        self.assertRaises(IndexError, self.queue.GetNextUpdate)
+        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
+        self.assert_(job.ops[self.curop].exec_timestamp)
+
       if result:
         self.assertFalse(job.cur_opctx)
         break
 
       self.assertFalse(result)
 
+      if self.curop == 0:
+        self.assertEqual(job.ops[self.curop].start_timestamp,
+                         job.start_timestamp)
+
       if self.gave_lock:
-        self.assertFalse(job.cur_opctx)
+        # Opcode finished, but job not yet done
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
       else:
+        # Did not get locks
         self.assert_(job.cur_opctx)
         self.assertEqual(job.cur_opctx._timeout_strategy._fn,
                          self.timeout_strategy.NextAttempt)
+        self.assertFalse(job.ops[self.curop].exec_timestamp)
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+        # If priority has changed since acquiring locks, the job must've been
+        # updated
+        if self.lock_acq_prio != job.ops[self.curop].priority:
+          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+
+      self.assertRaises(IndexError, self.queue.GetNextUpdate)
 
-      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
       self.assert_(job.start_timestamp)
       self.assertFalse(job.end_timestamp)
 
@@ -1098,6 +1283,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
     self.assertEqual(self.opcounter.next(), len(job.ops))
     self.assert_(self.done_lock_before_blocking)
 
+    self.assertRaises(IndexError, self.queue.GetNextUpdate)
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
     self.assertEqual(job.GetInfo(["opresult"]),