Fix disk count check in LUSetInstanceParams
[ganeti-local] / lib / jqueue.py
index 7411f46..ddf941e 100644 (file)
@@ -376,6 +376,8 @@ class _QueuedJob(object):
         row.append(self.id)
       elif fname == "status":
         row.append(self.CalcStatus())
+      elif fname == "priority":
+        row.append(self.CalcPriority())
       elif fname == "ops":
         row.append([op.input.__getstate__() for op in self.ops])
       elif fname == "opresult":
@@ -390,6 +392,8 @@ class _QueuedJob(object):
         row.append([op.exec_timestamp for op in self.ops])
       elif fname == "opend":
         row.append([op.end_timestamp for op in self.ops])
+      elif fname == "oppriority":
+        row.append([op.priority for op in self.ops])
       elif fname == "received_ts":
         row.append(self.received_timestamp)
       elif fname == "start_ts":
@@ -432,22 +436,19 @@ class _QueuedJob(object):
     """
     status = self.CalcStatus()
 
-    if status not in (constants.JOB_STATUS_QUEUED,
-                      constants.JOB_STATUS_WAITLOCK):
-      logging.debug("Job %s is no longer waiting in the queue", self.id)
-      return (False, "Job %s is no longer waiting in the queue" % self.id)
-
     if status == constants.JOB_STATUS_QUEUED:
       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
                              "Job canceled by request")
-      msg = "Job %s canceled" % self.id
+      return (True, "Job %s canceled" % self.id)
 
     elif status == constants.JOB_STATUS_WAITLOCK:
       # The worker will notice the new status and cancel the job
       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
-      msg = "Job %s will be canceled" % self.id
+      return (True, "Job %s will be canceled" % self.id)
 
-    return (True, msg)
+    else:
+      logging.debug("Job %s is no longer waiting in the queue", self.id)
+      return (False, "Job %s is no longer waiting in the queue" % self.id)
 
 
 class _OpExecCallbacks(mcpu.OpExecCbBase):
@@ -889,18 +890,33 @@ class _JobProcessor(object):
 
     @type job: L{_QueuedJob}
     @param job: Job object
-    @type job: L{_QueuedOpCode}
-    @param job: Opcode object
+    @type op: L{_QueuedOpCode}
+    @param op: Opcode object
 
     """
     assert op in job.ops
+    assert op.status in (constants.OP_STATUS_QUEUED,
+                         constants.OP_STATUS_WAITLOCK)
+
+    update = False
 
-    op.status = constants.OP_STATUS_WAITLOCK
     op.result = None
-    op.start_timestamp = TimeStampNow()
+
+    if op.status == constants.OP_STATUS_QUEUED:
+      op.status = constants.OP_STATUS_WAITLOCK
+      update = True
+
+    if op.start_timestamp is None:
+      op.start_timestamp = TimeStampNow()
+      update = True
 
     if job.start_timestamp is None:
       job.start_timestamp = op.start_timestamp
+      update = True
+
+    assert op.status == constants.OP_STATUS_WAITLOCK
+
+    return update
 
   def _ExecOpCodeUnlocked(self, opctx):
     """Processes one opcode and returns the result.
@@ -916,12 +932,20 @@ class _JobProcessor(object):
       # Make sure not to hold queue lock while calling ExecOpCode
       result = self.opexec_fn(op.input,
                               _OpExecCallbacks(self.queue, self.job, op),
-                              timeout=timeout)
+                              timeout=timeout, priority=op.priority)
     except mcpu.LockAcquireTimeout:
       assert timeout is not None, "Received timeout for blocking acquire"
       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
-      assert op.status == constants.OP_STATUS_WAITLOCK
-      return (constants.OP_STATUS_QUEUED, None)
+
+      assert op.status in (constants.OP_STATUS_WAITLOCK,
+                           constants.OP_STATUS_CANCELING)
+
+      # Was job cancelled while we were waiting for the lock?
+      if op.status == constants.OP_STATUS_CANCELING:
+        return (constants.OP_STATUS_CANCELING, None)
+
+      # Stay in waitlock while trying to re-acquire lock
+      return (constants.OP_STATUS_WAITLOCK, None)
     except CancelJob:
       logging.exception("%s: Canceling job", opctx.log_prefix)
       assert op.status == constants.OP_STATUS_CANCELING
@@ -956,6 +980,7 @@ class _JobProcessor(object):
       # Is a previous opcode still pending?
       if job.cur_opctx:
         opctx = job.cur_opctx
+        job.cur_opctx = None
       else:
         if __debug__ and _nextop_fn:
           _nextop_fn()
@@ -965,25 +990,31 @@ class _JobProcessor(object):
 
       # Consistency check
       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
+                                     constants.OP_STATUS_CANCELING,
                                      constants.OP_STATUS_CANCELED)
-                        for i in job.ops[opctx.index:])
+                        for i in job.ops[opctx.index + 1:])
 
       assert op.status in (constants.OP_STATUS_QUEUED,
                            constants.OP_STATUS_WAITLOCK,
+                           constants.OP_STATUS_CANCELING,
                            constants.OP_STATUS_CANCELED)
 
       assert (op.priority <= constants.OP_PRIO_LOWEST and
               op.priority >= constants.OP_PRIO_HIGHEST)
 
-      if op.status != constants.OP_STATUS_CANCELED:
+      if op.status not in (constants.OP_STATUS_CANCELING,
+                           constants.OP_STATUS_CANCELED):
+        assert op.status in (constants.OP_STATUS_QUEUED,
+                             constants.OP_STATUS_WAITLOCK)
+
         # Prepare to start opcode
-        self._MarkWaitlock(job, op)
+        if self._MarkWaitlock(job, op):
+          # Write to disk
+          queue.UpdateJobUnlocked(job)
 
         assert op.status == constants.OP_STATUS_WAITLOCK
         assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
-
-        # Write to disk
-        queue.UpdateJobUnlocked(job)
+        assert job.start_timestamp and op.start_timestamp
 
         logging.info("%s: opcode %s waiting for locks",
                      opctx.log_prefix, opctx.summary)
@@ -997,7 +1028,7 @@ class _JobProcessor(object):
         op.status = op_status
         op.result = op_result
 
-        if op.status == constants.OP_STATUS_QUEUED:
+        if op.status == constants.OP_STATUS_WAITLOCK:
           # Couldn't get locks in time
           assert not op.end_timestamp
         else:
@@ -1010,10 +1041,12 @@ class _JobProcessor(object):
           else:
             assert op.status in constants.OPS_FINALIZED
 
-      if op.status == constants.OP_STATUS_QUEUED:
+      if op.status == constants.OP_STATUS_WAITLOCK:
         finalize = False
 
-        opctx.CheckPriorityIncrease()
+        if opctx.CheckPriorityIncrease():
+          # Priority was changed, need to update on-disk file
+          queue.UpdateJobUnlocked(job)
 
         # Keep around for another round
         job.cur_opctx = opctx
@@ -1022,9 +1055,7 @@ class _JobProcessor(object):
                 op.priority >= constants.OP_PRIO_HIGHEST)
 
         # In no case must the status be finalized here
-        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
-
-        queue.UpdateJobUnlocked(job)
+        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
 
       else:
         # Ensure all opcodes so far have been successful
@@ -1241,15 +1272,22 @@ class JobQueue(object):
 
       status = job.CalcStatus()
 
-      if status in (constants.JOB_STATUS_QUEUED, ):
+      if status == constants.JOB_STATUS_QUEUED:
         restartjobs.append(job)
 
       elif status in (constants.JOB_STATUS_RUNNING,
                       constants.JOB_STATUS_WAITLOCK,
                       constants.JOB_STATUS_CANCELING):
         logging.warning("Unfinished job %s found: %s", job.id, job)
-        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
-                              "Unclean master daemon shutdown")
+
+        if status == constants.JOB_STATUS_WAITLOCK:
+          # Restart job
+          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
+          restartjobs.append(job)
+        else:
+          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                                "Unclean master daemon shutdown")
+
         self.UpdateJobUnlocked(job)
 
     if restartjobs: