hooks: Provide variables with post-opcode values
[ganeti-local] / lib / jqueue.py
index c7fd4c7..56f7a66 100644 (file)
@@ -29,7 +29,6 @@ used by all other classes in this module.
 
 """
 
-import os
 import logging
 import errno
 import re
@@ -176,7 +175,7 @@ class _QueuedJob(object):
 
   """
   # pylint: disable-msg=W0212
-  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter",
+  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
                "received_timestamp", "start_timestamp", "end_timestamp",
                "__weakref__"]
 
@@ -211,6 +210,7 @@ class _QueuedJob(object):
 
     """
     obj.ops_iter = None
+    obj.cur_opctx = None
 
   def __repr__(self):
     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
@@ -375,6 +375,8 @@ class _QueuedJob(object):
         row.append(self.id)
       elif fname == "status":
         row.append(self.CalcStatus())
+      elif fname == "priority":
+        row.append(self.CalcPriority())
       elif fname == "ops":
         row.append([op.input.__getstate__() for op in self.ops])
       elif fname == "opresult":
@@ -389,6 +391,8 @@ class _QueuedJob(object):
         row.append([op.exec_timestamp for op in self.ops])
       elif fname == "opend":
         row.append([op.end_timestamp for op in self.ops])
+      elif fname == "oppriority":
+        row.append([op.priority for op in self.ops])
       elif fname == "received_ts":
         row.append(self.received_timestamp)
       elif fname == "start_ts":
@@ -431,22 +435,19 @@ class _QueuedJob(object):
     """
     status = self.CalcStatus()
 
-    if status not in (constants.JOB_STATUS_QUEUED,
-                      constants.JOB_STATUS_WAITLOCK):
-      logging.debug("Job %s is no longer waiting in the queue", self.id)
-      return (False, "Job %s is no longer waiting in the queue" % self.id)
-
     if status == constants.JOB_STATUS_QUEUED:
       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
                              "Job canceled by request")
-      msg = "Job %s canceled" % self.id
+      return (True, "Job %s canceled" % self.id)
 
     elif status == constants.JOB_STATUS_WAITLOCK:
       # The worker will notice the new status and cancel the job
       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
-      msg = "Job %s will be canceled" % self.id
+      return (True, "Job %s will be canceled" % self.id)
 
-    return (True, msg)
+    else:
+      logging.debug("Job %s is no longer waiting in the queue", self.id)
+      return (False, "Job %s is no longer waiting in the queue" % self.id)
 
 
 class _OpExecCallbacks(mcpu.OpExecCbBase):
@@ -745,8 +746,40 @@ def _EncodeOpError(err):
   return errors.EncodeException(to_encode)
 
 
+class _TimeoutStrategyWrapper:
+  def __init__(self, fn):
+    """Initializes this class.
+
+    """
+    self._fn = fn
+    self._next = None
+
+  def _Advance(self):
+    """Gets the next timeout if necessary.
+
+    """
+    if self._next is None:
+      self._next = self._fn()
+
+  def Peek(self):
+    """Returns the next timeout.
+
+    """
+    self._Advance()
+    return self._next
+
+  def Next(self):
+    """Returns the current timeout and advances the internal state.
+
+    """
+    self._Advance()
+    result = self._next
+    self._next = None
+    return result
+
+
 class _OpExecContext:
-  def __init__(self, op, index, log_prefix):
+  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
     """Initializes this class.
 
     """
@@ -755,22 +788,60 @@ class _OpExecContext:
     self.log_prefix = log_prefix
     self.summary = op.input.Summary()
 
+    self._timeout_strategy_factory = timeout_strategy_factory
+    self._ResetTimeoutStrategy()
+
+  def _ResetTimeoutStrategy(self):
+    """Creates a new timeout strategy.
+
+    """
+    self._timeout_strategy = \
+      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
+
+  def CheckPriorityIncrease(self):
+    """Checks whether priority can and should be increased.
+
+    Called when locks couldn't be acquired.
+
+    """
+    op = self.op
+
+    # Exhausted all retries and next round should not use blocking acquire
+    # for locks?
+    if (self._timeout_strategy.Peek() is None and
+        op.priority > constants.OP_PRIO_HIGHEST):
+      logging.debug("Increasing priority")
+      op.priority -= 1
+      self._ResetTimeoutStrategy()
+      return True
+
+    return False
+
+  def GetNextLockTimeout(self):
+    """Returns the next lock acquire timeout.
+
+    """
+    return self._timeout_strategy.Next()
+
 
 class _JobProcessor(object):
-  def __init__(self, queue, opexec_fn, job):
+  def __init__(self, queue, opexec_fn, job,
+               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
     """Initializes this class.
 
     """
     self.queue = queue
     self.opexec_fn = opexec_fn
     self.job = job
+    self._timeout_strategy_factory = _timeout_strategy_factory
 
   @staticmethod
-  def _FindNextOpcode(job):
+  def _FindNextOpcode(job, timeout_strategy_factory):
     """Locates the next opcode to run.
 
     @type job: L{_QueuedJob}
     @param job: Job object
+    @param timeout_strategy_factory: Callable to create new timeout strategy
 
     """
     # Create some sort of a cache to speed up locating next opcode for future
@@ -791,7 +862,8 @@ class _JobProcessor(object):
         # Found an opcode already marked as running
         raise errors.ProgrammerError("Called for job marked as running")
 
-      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)))
+      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
+                             timeout_strategy_factory)
 
       if op.status == constants.OP_STATUS_CANCELED:
         # Cancelled jobs are handled by the caller
@@ -817,18 +889,33 @@ class _JobProcessor(object):
 
     @type job: L{_QueuedJob}
     @param job: Job object
-    @type job: L{_QueuedOpCode}
-    @param job: Opcode object
+    @type op: L{_QueuedOpCode}
+    @param op: Opcode object
 
     """
     assert op in job.ops
+    assert op.status in (constants.OP_STATUS_QUEUED,
+                         constants.OP_STATUS_WAITLOCK)
+
+    update = False
 
-    op.status = constants.OP_STATUS_WAITLOCK
     op.result = None
-    op.start_timestamp = TimeStampNow()
+
+    if op.status == constants.OP_STATUS_QUEUED:
+      op.status = constants.OP_STATUS_WAITLOCK
+      update = True
+
+    if op.start_timestamp is None:
+      op.start_timestamp = TimeStampNow()
+      update = True
 
     if job.start_timestamp is None:
       job.start_timestamp = op.start_timestamp
+      update = True
+
+    assert op.status == constants.OP_STATUS_WAITLOCK
+
+    return update
 
   def _ExecOpCodeUnlocked(self, opctx):
     """Processes one opcode and returns the result.
@@ -838,10 +925,26 @@ class _JobProcessor(object):
 
     assert op.status == constants.OP_STATUS_WAITLOCK
 
+    timeout = opctx.GetNextLockTimeout()
+
     try:
       # Make sure not to hold queue lock while calling ExecOpCode
       result = self.opexec_fn(op.input,
-                              _OpExecCallbacks(self.queue, self.job, op))
+                              _OpExecCallbacks(self.queue, self.job, op),
+                              timeout=timeout, priority=op.priority)
+    except mcpu.LockAcquireTimeout:
+      assert timeout is not None, "Received timeout for blocking acquire"
+      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
+
+      assert op.status in (constants.OP_STATUS_WAITLOCK,
+                           constants.OP_STATUS_CANCELING)
+
+      # Was job cancelled while we were waiting for the lock?
+      if op.status == constants.OP_STATUS_CANCELING:
+        return (constants.OP_STATUS_CANCELING, None)
+
+      # Stay in waitlock while trying to re-acquire lock
+      return (constants.OP_STATUS_WAITLOCK, None)
     except CancelJob:
       logging.exception("%s: Canceling job", opctx.log_prefix)
       assert op.status == constants.OP_STATUS_CANCELING
@@ -855,9 +958,10 @@ class _JobProcessor(object):
                     opctx.log_prefix, opctx.summary)
       return (constants.OP_STATUS_SUCCESS, result)
 
-  def __call__(self):
+  def __call__(self, _nextop_fn=None):
     """Continues execution of a job.
 
+    @param _nextop_fn: Callback function for tests
     @rtype: bool
     @return: True if job is finished, False if processor needs to be called
              again
@@ -872,27 +976,44 @@ class _JobProcessor(object):
     try:
       opcount = len(job.ops)
 
-      opctx = self._FindNextOpcode(job)
+      # Is a previous opcode still pending?
+      if job.cur_opctx:
+        opctx = job.cur_opctx
+        job.cur_opctx = None
+      else:
+        if __debug__ and _nextop_fn:
+          _nextop_fn()
+        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
+
       op = opctx.op
 
       # Consistency check
       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
+                                     constants.OP_STATUS_CANCELING,
                                      constants.OP_STATUS_CANCELED)
-                        for i in job.ops[opctx.index:])
+                        for i in job.ops[opctx.index + 1:])
 
       assert op.status in (constants.OP_STATUS_QUEUED,
                            constants.OP_STATUS_WAITLOCK,
+                           constants.OP_STATUS_CANCELING,
                            constants.OP_STATUS_CANCELED)
 
-      if op.status != constants.OP_STATUS_CANCELED:
+      assert (op.priority <= constants.OP_PRIO_LOWEST and
+              op.priority >= constants.OP_PRIO_HIGHEST)
+
+      if op.status not in (constants.OP_STATUS_CANCELING,
+                           constants.OP_STATUS_CANCELED):
+        assert op.status in (constants.OP_STATUS_QUEUED,
+                             constants.OP_STATUS_WAITLOCK)
+
         # Prepare to start opcode
-        self._MarkWaitlock(job, op)
+        if self._MarkWaitlock(job, op):
+          # Write to disk
+          queue.UpdateJobUnlocked(job)
 
         assert op.status == constants.OP_STATUS_WAITLOCK
         assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
-
-        # Write to disk
-        queue.UpdateJobUnlocked(job)
+        assert job.start_timestamp and op.start_timestamp
 
         logging.info("%s: opcode %s waiting for locks",
                      opctx.log_prefix, opctx.summary)
@@ -903,62 +1024,87 @@ class _JobProcessor(object):
         finally:
           queue.acquire(shared=1)
 
-        # Finalize opcode
-        op.end_timestamp = TimeStampNow()
         op.status = op_status
         op.result = op_result
 
-        if op.status == constants.OP_STATUS_CANCELING:
-          assert not compat.any(i.status != constants.OP_STATUS_CANCELING
-                                for i in job.ops[opctx.index:])
+        if op.status == constants.OP_STATUS_WAITLOCK:
+          # Couldn't get locks in time
+          assert not op.end_timestamp
         else:
-          assert op.status in constants.OPS_FINALIZED
+          # Finalize opcode
+          op.end_timestamp = TimeStampNow()
 
-      # Ensure all opcodes so far have been successful
-      assert (opctx.index == 0 or
-              compat.all(i.status == constants.OP_STATUS_SUCCESS
-                         for i in job.ops[:opctx.index]))
+          if op.status == constants.OP_STATUS_CANCELING:
+            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
+                                  for i in job.ops[opctx.index:])
+          else:
+            assert op.status in constants.OPS_FINALIZED
 
-      if op.status == constants.OP_STATUS_SUCCESS:
+      if op.status == constants.OP_STATUS_WAITLOCK:
         finalize = False
 
-      elif op.status == constants.OP_STATUS_ERROR:
-        # Ensure failed opcode has an exception as its result
-        assert errors.GetEncodedError(job.ops[opctx.index].result)
-
-        to_encode = errors.OpExecError("Preceding opcode failed")
-        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
-                              _EncodeOpError(to_encode))
-        finalize = True
+        if opctx.CheckPriorityIncrease():
+          # Priority was changed, need to update on-disk file
+          queue.UpdateJobUnlocked(job)
 
-        # Consistency check
-        assert compat.all(i.status == constants.OP_STATUS_ERROR and
-                          errors.GetEncodedError(i.result)
-                          for i in job.ops[opctx.index:])
+        # Keep around for another round
+        job.cur_opctx = opctx
 
-      elif op.status == constants.OP_STATUS_CANCELING:
-        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
-                              "Job canceled by request")
-        finalize = True
+        assert (op.priority <= constants.OP_PRIO_LOWEST and
+                op.priority >= constants.OP_PRIO_HIGHEST)
 
-      elif op.status == constants.OP_STATUS_CANCELED:
-        finalize = True
+        # In no case must the status be finalized here
+        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
 
       else:
-        raise errors.ProgrammerError("Unknown status '%s'" % op.status)
+        # Ensure all opcodes so far have been successful
+        assert (opctx.index == 0 or
+                compat.all(i.status == constants.OP_STATUS_SUCCESS
+                           for i in job.ops[:opctx.index]))
+
+        # Reset context
+        job.cur_opctx = None
+
+        if op.status == constants.OP_STATUS_SUCCESS:
+          finalize = False
 
-      # Finalizing or last opcode?
-      if finalize or opctx.index == (opcount - 1):
-        # All opcodes have been run, finalize job
-        job.end_timestamp = TimeStampNow()
+        elif op.status == constants.OP_STATUS_ERROR:
+          # Ensure failed opcode has an exception as its result
+          assert errors.GetEncodedError(job.ops[opctx.index].result)
 
-      # Write to disk. If the job status is final, this is the final write
-      # allowed. Once the file has been written, it can be archived anytime.
-      queue.UpdateJobUnlocked(job)
+          to_encode = errors.OpExecError("Preceding opcode failed")
+          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                                _EncodeOpError(to_encode))
+          finalize = True
 
-      if finalize or opctx.index == (opcount - 1):
-        logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
-        return True
+          # Consistency check
+          assert compat.all(i.status == constants.OP_STATUS_ERROR and
+                            errors.GetEncodedError(i.result)
+                            for i in job.ops[opctx.index:])
+
+        elif op.status == constants.OP_STATUS_CANCELING:
+          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+                                "Job canceled by request")
+          finalize = True
+
+        elif op.status == constants.OP_STATUS_CANCELED:
+          finalize = True
+
+        else:
+          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
+
+        # Finalizing or last opcode?
+        if finalize or opctx.index == (opcount - 1):
+          # All opcodes have been run, finalize job
+          job.end_timestamp = TimeStampNow()
+
+        # Write to disk. If the job status is final, this is the final write
+        # allowed. Once the file has been written, it can be archived anytime.
+        queue.UpdateJobUnlocked(job)
+
+        if finalize or opctx.index == (opcount - 1):
+          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
+          return True
 
       return False
     finally:
@@ -988,7 +1134,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
 
     if not _JobProcessor(queue, proc.ExecOpCode, job)():
       # Schedule again
-      raise workerpool.DeferTask()
+      raise workerpool.DeferTask(priority=job.CalcPriority())
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
@@ -1083,7 +1229,7 @@ class JobQueue(object):
 
     self._queue_size = 0
     self._UpdateQueueSizeUnlocked()
-    self._drained = self._IsQueueMarkedDrain()
+    self._drained = jstore.CheckDrainFlag()
 
     # Setup worker pool
     self._wpool = _JobQueueWorkerPool(self)
@@ -1125,15 +1271,22 @@ class JobQueue(object):
 
       status = job.CalcStatus()
 
-      if status in (constants.JOB_STATUS_QUEUED, ):
+      if status == constants.JOB_STATUS_QUEUED:
         restartjobs.append(job)
 
       elif status in (constants.JOB_STATUS_RUNNING,
                       constants.JOB_STATUS_WAITLOCK,
                       constants.JOB_STATUS_CANCELING):
         logging.warning("Unfinished job %s found: %s", job.id, job)
-        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
-                              "Unclean master daemon shutdown")
+
+        if status == constants.JOB_STATUS_WAITLOCK:
+          # Restart job
+          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
+          restartjobs.append(job)
+        else:
+          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                                "Unclean master daemon shutdown")
+
         self.UpdateJobUnlocked(job)
 
     if restartjobs:
@@ -1475,19 +1628,6 @@ class JobQueue(object):
       logging.exception("Can't load/parse job %s", job_id)
       return None
 
-  @staticmethod
-  def _IsQueueMarkedDrain():
-    """Check if the queue is marked from drain.
-
-    This currently uses the queue drain file, which makes it a
-    per-node flag. In the future this can be moved to the config file.
-
-    @rtype: boolean
-    @return: True of the job queue is marked for draining
-
-    """
-    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
-
   def _UpdateQueueSizeUnlocked(self):
     """Update the queue size.
 
@@ -1503,13 +1643,7 @@ class JobQueue(object):
     @param drain_flag: Whether to set or unset the drain flag
 
     """
-    getents = runtime.GetEnts()
-
-    if drain_flag:
-      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
-                      uid=getents.masterd_uid, gid=getents.masterd_gid)
-    else:
-      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
+    jstore.SetDrainFlag(drain_flag)
 
     self._drained = drain_flag