ganeti.http: Add constant for DELETE
[ganeti-local] / lib / jqueue.py
index c6442f5..b3c7dbb 100644 (file)
@@ -47,9 +47,16 @@ from ganeti import utils
 from ganeti import jstore
 from ganeti import rpc
 
+
 JOBQUEUE_THREADS = 25
 
 
+class CancelJob:
+  """Special exception to cancel a job.
+
+  """
+
+
 def TimeStampNow():
   """Returns the current timestamp.
 
@@ -232,6 +239,7 @@ class _QueuedJob(object):
         status will be the same
       - otherwise, the last opcode with the status one of:
           - waitlock
+          - canceling
           - running
 
         will determine the job status
@@ -257,6 +265,9 @@ class _QueuedJob(object):
         status = constants.JOB_STATUS_WAITLOCK
       elif op.status == constants.OP_STATUS_RUNNING:
         status = constants.JOB_STATUS_RUNNING
+      elif op.status == constants.OP_STATUS_CANCELING:
+        status = constants.JOB_STATUS_CANCELING
+        break
       elif op.status == constants.OP_STATUS_ERROR:
         status = constants.JOB_STATUS_ERROR
         # The whole job fails if one opcode failed
@@ -311,6 +322,13 @@ class _JobQueueWorker(workerpool.BaseWorker):
 
     self.queue.acquire()
     try:
+      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
+                                    constants.OP_STATUS_CANCELING)
+
+      # Cancel here if we were asked to
+      if self.opcode.status == constants.OP_STATUS_CANCELING:
+        raise CancelJob()
+
       self.opcode.status = constants.OP_STATUS_RUNNING
     finally:
       self.queue.release()
@@ -338,6 +356,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
 
             queue.acquire()
             try:
+              assert op.status == constants.OP_STATUS_QUEUED
               job.run_op_index = idx
               op.status = constants.OP_STATUS_WAITLOCK
               op.result = None
@@ -390,6 +409,9 @@ class _JobQueueWorker(workerpool.BaseWorker):
 
             logging.debug("Op %s/%s: Successfully finished %s",
                           idx + 1, count, op)
+          except CancelJob:
+            # Will be handled further up
+            raise
           except Exception, err:
             queue.acquire()
             try:
@@ -404,6 +426,12 @@ class _JobQueueWorker(workerpool.BaseWorker):
               queue.release()
             raise
 
+      except CancelJob:
+        queue.acquire()
+        try:
+          queue.CancelJobUnlocked(job)
+        finally:
+          queue.release()
       except errors.GenericError, err:
         logging.exception("Ganeti exception")
       except:
@@ -494,12 +522,13 @@ class JobQueue(object):
 
     # Get initial list of nodes
     self._nodes = dict((n.name, n.primary_ip)
-                       for n in self.context.cfg.GetAllNodesInfo().values())
+                       for n in self.context.cfg.GetAllNodesInfo().values()
+                       if n.master_candidate)
 
     # Remove master node
     try:
       del self._nodes[self._my_hostname]
-    except ValueError:
+    except KeyError:
       pass
 
     # TODO: Check consistency across nodes
@@ -514,13 +543,14 @@ class JobQueue(object):
         logging.info("Inspecting job queue")
 
         all_job_ids = self._GetJobIDsUnlocked()
+        jobs_count = len(all_job_ids)
         lastinfo = time.time()
         for idx, job_id in enumerate(all_job_ids):
           # Give an update every 1000 jobs or 10 seconds
-          if idx % 1000 == 0 or time.time() >= (lastinfo + 10.0):
-            jobs_count = len(all_job_ids)
+          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
+              idx == (jobs_count - 1)):
             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
-                         idx, jobs_count, 100.0 * (idx + 1) / jobs_count)
+                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
             lastinfo = time.time()
 
           job = self._LoadJobUnlocked(job_id)
@@ -535,7 +565,8 @@ class JobQueue(object):
             self._wpool.AddTask(job)
 
           elif status in (constants.JOB_STATUS_RUNNING,
-                          constants.JOB_STATUS_WAITLOCK):
+                          constants.JOB_STATUS_WAITLOCK,
+                          constants.JOB_STATUS_CANCELING):
             logging.warning("Unfinished job %s found: %s", job.id, job)
             try:
               for op in job.ops:
@@ -566,6 +597,12 @@ class JobQueue(object):
     # Clean queue directory on added node
     rpc.RpcRunner.call_jobqueue_purge(node_name)
 
+    if not node.master_candidate:
+      # remove if existing, ignoring errors
+      self._nodes.pop(node_name, None)
+      # and skip the replication of the job ids
+      return
+
     # Upload the whole queue excluding archived jobs
     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
 
@@ -1020,21 +1057,42 @@ class JobQueue(object):
     @param job_id: job ID of job to be cancelled.
 
     """
-    logging.debug("Cancelling job %s", job_id)
+    logging.info("Cancelling job %s", job_id)
 
     job = self._LoadJobUnlocked(job_id)
     if not job:
       logging.debug("Job %s not found", job_id)
-      return
+      return (False, "Job %s not found" % job_id)
+
+    job_status = job.CalcStatus()
 
-    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
+    if job_status not in (constants.JOB_STATUS_QUEUED,
+                          constants.JOB_STATUS_WAITLOCK):
       logging.debug("Job %s is no longer in the queue", job.id)
-      return
+      return (False, "Job %s is no longer in the queue" % job.id)
+
+    if job_status == constants.JOB_STATUS_QUEUED:
+      self.CancelJobUnlocked(job)
+      return (True, "Job %s canceled" % job.id)
 
+    elif job_status == constants.JOB_STATUS_WAITLOCK:
+      # The worker will notice the new status and cancel the job
+      try:
+        for op in job.ops:
+          op.status = constants.OP_STATUS_CANCELING
+      finally:
+        self.UpdateJobUnlocked(job)
+      return (True, "Job %s will be canceled" % job.id)
+
+  @_RequireOpenQueue
+  def CancelJobUnlocked(self, job):
+    """Marks a job as canceled.
+
+    """
     try:
       for op in job.ops:
         op.status = constants.OP_STATUS_ERROR
-        op.result = "Job cancelled by request"
+        op.result = "Job canceled by request"
     finally:
       self.UpdateJobUnlocked(job)