jqueue: Keep jobs in “waitlock” while returning to queue
authorMichael Hanselmann <hansmi@google.com>
Tue, 14 Dec 2010 16:56:39 +0000 (17:56 +0100)
committerMichael Hanselmann <hansmi@google.com>
Wed, 15 Dec 2010 13:42:15 +0000 (14:42 +0100)
Iustin Pop reported that a job's file is updated many times while it
waits for locks held by other thread(s). After an investigation it was
concluded that the reason was a design decision for job priorities to
return jobs to the “queued” status if they couldn't acquire all locks.
Changing a jobs' status or priority requires an update to permanent
storage.

In a high-level view this is what happens:
1. Mark as waitlock
2. Write to disk as permanent storage (jobs left in this state by a
   crashing master daemon are resumed on restart)
3. Wait for lock (assume lock is held by another thread)
4. Mark as queued
5. Write to disk again
6. Return to workerpool

Another option originally discussed was to leave the job in the
“waitlock” status. Ignoring priority changes, this is what would happen:
1. If not in waitlock
1.1. Assert state == queued
1.2. Mark as waitlock
1.3. Set start_timestamp
1.4. Write to disk as permanent storage
3. Wait for locks (assume lock is held by another thread)
4. Leave in waitlock
5. Return to workerpool

Now let's assume the lock is released by the other thread:
[…]
3. Wait for locks and get them
4. Assert state == waitlock
5. Set state to running
6. Set exec_timestamp
7. Write to disk

As this change reduces the number of writes from two per lock acquire
attempt to two per opcode and one per priority increase (as happens
after 24 acquire attempts (see mcpu._CalculateLockAttemptTimeouts) until
the highest priority is reached), here's the patch to implement it.
Unittests are updated.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Iustin Pop <iustin@google.com>

lib/jqueue.py
test/ganeti.jqueue_unittest.py

index 1a0c20d..93bc4ee 100644 (file)
@@ -895,13 +895,28 @@ class _JobProcessor(object):
 
     """
     assert op in job.ops
+    assert op.status in (constants.OP_STATUS_QUEUED,
+                         constants.OP_STATUS_WAITLOCK)
+
+    update = False
 
-    op.status = constants.OP_STATUS_WAITLOCK
     op.result = None
-    op.start_timestamp = TimeStampNow()
+
+    if op.status == constants.OP_STATUS_QUEUED:
+      op.status = constants.OP_STATUS_WAITLOCK
+      update = True
+
+    if op.start_timestamp is None:
+      op.start_timestamp = TimeStampNow()
+      update = True
 
     if job.start_timestamp is None:
       job.start_timestamp = op.start_timestamp
+      update = True
+
+    assert op.status == constants.OP_STATUS_WAITLOCK
+
+    return update
 
   def _ExecOpCodeUnlocked(self, opctx):
     """Processes one opcode and returns the result.
@@ -929,7 +944,8 @@ class _JobProcessor(object):
       if op.status == constants.OP_STATUS_CANCELING:
         return (constants.OP_STATUS_CANCELING, None)
 
-      return (constants.OP_STATUS_QUEUED, None)
+      # Stay in waitlock while trying to re-acquire lock
+      return (constants.OP_STATUS_WAITLOCK, None)
     except CancelJob:
       logging.exception("%s: Canceling job", opctx.log_prefix)
       assert op.status == constants.OP_STATUS_CANCELING
@@ -964,6 +980,7 @@ class _JobProcessor(object):
       # Is a previous opcode still pending?
       if job.cur_opctx:
         opctx = job.cur_opctx
+        job.cur_opctx = None
       else:
         if __debug__ and _nextop_fn:
           _nextop_fn()
@@ -974,7 +991,7 @@ class _JobProcessor(object):
       # Consistency check
       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
                                      constants.OP_STATUS_CANCELED)
-                        for i in job.ops[opctx.index:])
+                        for i in job.ops[opctx.index + 1:])
 
       assert op.status in (constants.OP_STATUS_QUEUED,
                            constants.OP_STATUS_WAITLOCK,
@@ -985,13 +1002,13 @@ class _JobProcessor(object):
 
       if op.status != constants.OP_STATUS_CANCELED:
         # Prepare to start opcode
-        self._MarkWaitlock(job, op)
+        if self._MarkWaitlock(job, op):
+          # Write to disk
+          queue.UpdateJobUnlocked(job)
 
         assert op.status == constants.OP_STATUS_WAITLOCK
         assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
-
-        # Write to disk
-        queue.UpdateJobUnlocked(job)
+        assert job.start_timestamp and op.start_timestamp
 
         logging.info("%s: opcode %s waiting for locks",
                      opctx.log_prefix, opctx.summary)
@@ -1005,7 +1022,7 @@ class _JobProcessor(object):
         op.status = op_status
         op.result = op_result
 
-        if op.status == constants.OP_STATUS_QUEUED:
+        if op.status == constants.OP_STATUS_WAITLOCK:
           # Couldn't get locks in time
           assert not op.end_timestamp
         else:
@@ -1018,10 +1035,12 @@ class _JobProcessor(object):
           else:
             assert op.status in constants.OPS_FINALIZED
 
-      if op.status == constants.OP_STATUS_QUEUED:
+      if op.status == constants.OP_STATUS_WAITLOCK:
         finalize = False
 
-        opctx.CheckPriorityIncrease()
+        if opctx.CheckPriorityIncrease():
+          # Priority was changed, need to update on-disk file
+          queue.UpdateJobUnlocked(job)
 
         # Keep around for another round
         job.cur_opctx = opctx
@@ -1030,9 +1049,7 @@ class _JobProcessor(object):
                 op.priority >= constants.OP_PRIO_HIGHEST)
 
         # In no case must the status be finalized here
-        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
-
-        queue.UpdateJobUnlocked(job)
+        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
 
       else:
         # Ensure all opcodes so far have been successful
index 3a2f0ed..02ea67f 100755 (executable)
@@ -521,6 +521,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
         self.assertRaises(IndexError, queue.GetNextUpdate)
         self.assertFalse(queue.IsAcquired())
         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+        self.assertFalse(job.cur_opctx)
 
       def _AfterStart(op, cbs):
         self.assertEqual(queue.GetNextUpdate(), (job, True))
@@ -528,6 +529,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
 
         self.assertFalse(queue.IsAcquired())
         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+        self.assertFalse(job.cur_opctx)
 
         # Job is running, cancelling shouldn't be possible
         (success, _) = job.Cancel()
@@ -1049,14 +1051,19 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
     self.retries = 0
     self.prev_tsop = None
     self.prev_prio = None
+    self.prev_status = None
+    self.lock_acq_prio = None
     self.gave_lock = None
     self.done_lock_before_blocking = False
 
   def _BeforeStart(self, timeout, priority):
     job = self.job
 
-    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+    # If status has changed, job must've been written
+    if self.prev_status != self.job.ops[self.curop].status:
+      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
     self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
     self.assertFalse(self.queue.IsAcquired())
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
 
@@ -1067,6 +1074,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
     self.assertEqual(priority, job.ops[self.curop].priority)
 
     self.gave_lock = True
+    self.lock_acq_prio = priority
 
     if (self.curop == 3 and
         job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
@@ -1090,8 +1098,10 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
   def _AfterStart(self, op, cbs):
     job = self.job
 
+    # Setting to "running" requires an update
     self.assertEqual(self.queue.GetNextUpdate(), (job, True))
     self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
     self.assertFalse(self.queue.IsAcquired())
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
 
@@ -1102,6 +1112,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
   def _NextOpcode(self):
     self.curop = self.opcounter.next()
     self.prev_prio = self.job.ops[self.curop].priority
+    self.prev_status = self.job.ops[self.curop].status
 
   def _NewTimeoutStrategy(self):
     job = self.job
@@ -1173,28 +1184,56 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
 
     self.assertFalse(self.done_lock_before_blocking)
 
-    for i in itertools.count(0):
+    while True:
       proc = jqueue._JobProcessor(self.queue, opexec, job,
                                   _timeout_strategy_factory=tsf)
 
       self.assertRaises(IndexError, self.queue.GetNextUpdate)
+
+      if self.curop is not None:
+        self.prev_status = self.job.ops[self.curop].status
+
+      self.lock_acq_prio = None
+
       result = proc(_nextop_fn=self._NextOpcode)
-      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
-      self.assertRaises(IndexError, self.queue.GetNextUpdate)
+      assert self.curop is not None
+
+      if result or self.gave_lock:
+        # Got lock and/or job is done, result must've been written
+        self.assertFalse(job.cur_opctx)
+        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+        self.assertRaises(IndexError, self.queue.GetNextUpdate)
+        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
+        self.assert_(job.ops[self.curop].exec_timestamp)
+
       if result:
         self.assertFalse(job.cur_opctx)
         break
 
       self.assertFalse(result)
 
+      if self.curop == 0:
+        self.assertEqual(job.ops[self.curop].start_timestamp,
+                         job.start_timestamp)
+
       if self.gave_lock:
-        self.assertFalse(job.cur_opctx)
+        # Opcode finished, but job not yet done
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
       else:
+        # Did not get locks
         self.assert_(job.cur_opctx)
         self.assertEqual(job.cur_opctx._timeout_strategy._fn,
                          self.timeout_strategy.NextAttempt)
+        self.assertFalse(job.ops[self.curop].exec_timestamp)
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+        # If priority has changed since acquiring locks, the job must've been
+        # updated
+        if self.lock_acq_prio != job.ops[self.curop].priority:
+          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
+
+      self.assertRaises(IndexError, self.queue.GetNextUpdate)
 
-      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
       self.assert_(job.start_timestamp)
       self.assertFalse(job.end_timestamp)