jqueue: Use priority for acquiring locks
authorMichael Hanselmann <hansmi@google.com>
Thu, 23 Sep 2010 16:36:22 +0000 (18:36 +0200)
committerMichael Hanselmann <hansmi@google.com>
Fri, 24 Sep 2010 15:18:48 +0000 (17:18 +0200)
Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: RenĂ© Nussbaumer <rn@google.com>

lib/jqueue.py
test/ganeti.jqueue_unittest.py

index 7411f46..e2b4cf4 100644 (file)
@@ -916,7 +916,7 @@ class _JobProcessor(object):
       # Make sure not to hold queue lock while calling ExecOpCode
       result = self.opexec_fn(op.input,
                               _OpExecCallbacks(self.queue, self.job, op),
-                              timeout=timeout)
+                              timeout=timeout, priority=op.priority)
     except mcpu.LockAcquireTimeout:
       assert timeout is not None, "Received timeout for blocking acquire"
       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
index ba56ae5..dddf7c9 100755 (executable)
@@ -449,11 +449,11 @@ class _FakeExecOpCodeForProc:
     self._before_start = before_start
     self._after_start = after_start
 
-  def __call__(self, op, cbs, timeout=None):
+  def __call__(self, op, cbs, timeout=None, priority=None):
     assert isinstance(op, opcodes.OpTestDummy)
 
     if self._before_start:
-      self._before_start(timeout)
+      self._before_start(timeout, priority)
 
     cbs.NotifyStart()
 
@@ -506,7 +506,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
       # Create job
       job = self._CreateJob(queue, job_id, ops)
 
-      def _BeforeStart(_):
+      def _BeforeStart(timeout, priority):
         self.assertFalse(queue.IsAcquired())
         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
 
@@ -660,7 +660,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
 
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
 
-    def _BeforeStart(_):
+    def _BeforeStart(timeout, priority):
       self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
 
@@ -849,7 +849,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     # Create job
     job = self._CreateJob(queue, 29386, ops)
 
-    def _BeforeStart(_):
+    def _BeforeStart(timeout, priority):
       self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
 
@@ -955,7 +955,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
     self.gave_lock = None
     self.done_lock_before_blocking = False
 
-  def _BeforeStart(self, timeout):
+  def _BeforeStart(self, timeout, priority):
     job = self.job
 
     self.assertFalse(self.queue.IsAcquired())
@@ -965,6 +965,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
 
     self.assert_(timeout is None or isinstance(timeout, (int, float)))
     self.assertEqual(timeout, ts.last_timeout)
+    self.assertEqual(priority, job.ops[self.curop].priority)
 
     self.gave_lock = True