jqueue/mcpu: Determine priority using callback
authorMichael Hanselmann <hansmi@google.com>
Wed, 24 Oct 2012 01:05:05 +0000 (03:05 +0200)
committerMichael Hanselmann <hansmi@google.com>
Thu, 8 Nov 2012 14:40:13 +0000 (15:40 +0100)
Instead of being given the priority for acquiring locks by means of a
parameter, mcpu will now call back. This is in preparation for
implementing a command to change a job's priority on the fly and allows
to change it while locks are being acquired (taking effect on the next
lock acquire).

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Bernardo Dal Seno <bdalseno@google.com>

lib/jqueue.py
lib/mcpu.py
test/ganeti.jqueue_unittest.py

index 6929d6a..a562e6e 100644 (file)
@@ -564,8 +564,8 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     timestamp = utils.SplitTime(time.time())
     self._AppendFeedback(timestamp, log_type, log_msg)
 
-  def CheckCancel(self):
-    """Check whether job has been cancelled.
+  def CurrentPriority(self):
+    """Returns current priority for opcode.
 
     """
     assert self._op.status in (constants.OP_STATUS_WAITING,
@@ -574,6 +574,8 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     # Cancel here if we were asked to
     self._CheckCancel()
 
+    return self._op.priority
+
   def SubmitManyJobs(self, jobs):
     """Submits jobs for processing.
 
@@ -1043,7 +1045,7 @@ class _JobProcessor(object):
       # Make sure not to hold queue lock while calling ExecOpCode
       result = self.opexec_fn(op.input,
                               _OpExecCallbacks(self.queue, self.job, op),
-                              timeout=timeout, priority=op.priority)
+                              timeout=timeout)
     except mcpu.LockAcquireTimeout:
       assert timeout is not None, "Received timeout for blocking acquire"
       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
index 27f29fc..b227334 100644 (file)
@@ -142,10 +142,11 @@ class OpExecCbBase: # pylint: disable=W0232
 
     """
 
-  def CheckCancel(self):
-    """Check whether job has been cancelled.
+  def CurrentPriority(self): # pylint: disable=R0201
+    """Returns current priority or C{None}.
 
     """
+    return None
 
   def SubmitManyJobs(self, jobs):
     """Submits jobs for processing.
@@ -274,7 +275,7 @@ class Processor(object):
     if not self._enable_locks:
       raise errors.ProgrammerError("Attempted to use disabled locks")
 
-  def _AcquireLocks(self, level, names, shared, timeout, priority):
+  def _AcquireLocks(self, level, names, shared, timeout):
     """Acquires locks via the Ganeti lock manager.
 
     @type level: int
@@ -292,7 +293,9 @@ class Processor(object):
     self._CheckLocksEnabled()
 
     if self._cbs:
-      self._cbs.CheckCancel()
+      priority = self._cbs.CurrentPriority()
+    else:
+      priority = None
 
     acquired = self.context.glm.acquire(level, names, shared=shared,
                                         timeout=timeout, priority=priority)
@@ -342,7 +345,7 @@ class Processor(object):
   def BuildHooksManager(self, lu):
     return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
 
-  def _LockAndExecLU(self, lu, level, calc_timeout, priority):
+  def _LockAndExecLU(self, lu, level, calc_timeout):
     """Execute a Logical Unit, with the needed locks.
 
     This is a recursive function that starts locking the given level, and
@@ -391,7 +394,7 @@ class Processor(object):
           needed_locks = lu.needed_locks[level]
 
           self._AcquireLocks(level, needed_locks, share,
-                             calc_timeout(), priority)
+                             calc_timeout())
         else:
           # Adding locks
           add_locks = lu.add_locks[level]
@@ -408,7 +411,7 @@ class Processor(object):
               errors.ECODE_NOTUNIQUE)
 
         try:
-          result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
+          result = self._LockAndExecLU(lu, level + 1, calc_timeout)
         finally:
           if level in lu.remove_locks:
             self.context.glm.remove(level, lu.remove_locks[level])
@@ -417,11 +420,11 @@ class Processor(object):
           self.context.glm.release(level)
 
     else:
-      result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
+      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
 
     return result
 
-  def ExecOpCode(self, op, cbs, timeout=None, priority=None):
+  def ExecOpCode(self, op, cbs, timeout=None):
     """Execute an opcode.
 
     @type op: an OpCode instance
@@ -430,8 +433,6 @@ class Processor(object):
     @param cbs: Runtime callbacks
     @type timeout: float or None
     @param timeout: Maximum time to acquire all locks, None for no timeout
-    @type priority: number or None
-    @param priority: Priority for acquiring lock(s)
     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
         amount of time
 
@@ -456,8 +457,7 @@ class Processor(object):
         # and in a shared fashion otherwise (to prevent concurrent run with
         # an exclusive LU.
         self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
-                            not lu_class.REQ_BGL, calc_timeout(),
-                            priority)
+                            not lu_class.REQ_BGL, calc_timeout())
       elif lu_class.REQ_BGL:
         raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
                                      " disabled" % op.OP_ID)
@@ -468,8 +468,7 @@ class Processor(object):
         assert lu.needed_locks is not None, "needed_locks not set by LU"
 
         try:
-          result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
-                                       priority)
+          result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout)
         finally:
           if self._ec_id:
             self.context.cfg.DropECReservations(self._ec_id)
index 8b1cb25..a5b19ad 100755 (executable)
@@ -554,13 +554,13 @@ class _FakeExecOpCodeForProc:
     self._before_start = before_start
     self._after_start = after_start
 
-  def __call__(self, op, cbs, timeout=None, priority=None):
+  def __call__(self, op, cbs, timeout=None):
     assert isinstance(op, opcodes.OpTestDummy)
     assert not self._queue.IsAcquired(), \
            "Queue lock not released when executing opcode"
 
     if self._before_start:
-      self._before_start(timeout, priority)
+      self._before_start(timeout, cbs.CurrentPriority())
 
     cbs.NotifyStart()