InitCluster force a config file update
[ganeti-local] / lib / jqueue.py
index 4ad1397..b3c7dbb 100644 (file)
@@ -47,11 +47,16 @@ from ganeti import utils
 from ganeti import jstore
 from ganeti import rpc
 
-from ganeti.rpc import RpcRunner
 
 JOBQUEUE_THREADS = 25
 
 
+class CancelJob:
+  """Special exception to cancel a job.
+
+  """
+
+
 def TimeStampNow():
   """Returns the current timestamp.
 
@@ -234,6 +239,7 @@ class _QueuedJob(object):
         status will be the same
       - otherwise, the last opcode with the status one of:
           - waitlock
+          - canceling
           - running
 
         will determine the job status
@@ -259,6 +265,9 @@ class _QueuedJob(object):
         status = constants.JOB_STATUS_WAITLOCK
       elif op.status == constants.OP_STATUS_RUNNING:
         status = constants.JOB_STATUS_RUNNING
+      elif op.status == constants.OP_STATUS_CANCELING:
+        status = constants.JOB_STATUS_CANCELING
+        break
       elif op.status == constants.OP_STATUS_ERROR:
         status = constants.JOB_STATUS_ERROR
         # The whole job fails if one opcode failed
@@ -313,6 +322,13 @@ class _JobQueueWorker(workerpool.BaseWorker):
 
     self.queue.acquire()
     try:
+      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
+                                    constants.OP_STATUS_CANCELING)
+
+      # Cancel here if we were asked to
+      if self.opcode.status == constants.OP_STATUS_CANCELING:
+        raise CancelJob()
+
       self.opcode.status = constants.OP_STATUS_RUNNING
     finally:
       self.queue.release()
@@ -340,6 +356,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
 
             queue.acquire()
             try:
+              assert op.status == constants.OP_STATUS_QUEUED
               job.run_op_index = idx
               op.status = constants.OP_STATUS_WAITLOCK
               op.result = None
@@ -392,6 +409,9 @@ class _JobQueueWorker(workerpool.BaseWorker):
 
             logging.debug("Op %s/%s: Successfully finished %s",
                           idx + 1, count, op)
+          except CancelJob:
+            # Will be handled further up
+            raise
           except Exception, err:
             queue.acquire()
             try:
@@ -406,6 +426,12 @@ class _JobQueueWorker(workerpool.BaseWorker):
               queue.release()
             raise
 
+      except CancelJob:
+        queue.acquire()
+        try:
+          queue.CancelJobUnlocked(job)
+        finally:
+          queue.release()
       except errors.GenericError, err:
         logging.exception("Ganeti exception")
       except:
@@ -496,44 +522,65 @@ class JobQueue(object):
 
     # Get initial list of nodes
     self._nodes = dict((n.name, n.primary_ip)
-                       for n in self.context.cfg.GetAllNodesInfo().values())
+                       for n in self.context.cfg.GetAllNodesInfo().values()
+                       if n.master_candidate)
 
     # Remove master node
     try:
       del self._nodes[self._my_hostname]
-    except ValueError:
+    except KeyError:
       pass
 
     # TODO: Check consistency across nodes
 
     # Setup worker pool
     self._wpool = _JobQueueWorkerPool(self)
-
-    # We need to lock here because WorkerPool.AddTask() may start a job while
-    # we're still doing our work.
-    self.acquire()
     try:
-      for job in self._GetJobsUnlocked(None):
-        # a failure in loading the job can cause 'None' to be returned
-        if job is None:
-          continue
+      # We need to lock here because WorkerPool.AddTask() may start a job while
+      # we're still doing our work.
+      self.acquire()
+      try:
+        logging.info("Inspecting job queue")
 
-        status = job.CalcStatus()
+        all_job_ids = self._GetJobIDsUnlocked()
+        jobs_count = len(all_job_ids)
+        lastinfo = time.time()
+        for idx, job_id in enumerate(all_job_ids):
+          # Give an update every 1000 jobs or 10 seconds
+          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
+              idx == (jobs_count - 1)):
+            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
+                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
+            lastinfo = time.time()
 
-        if status in (constants.JOB_STATUS_QUEUED, ):
-          self._wpool.AddTask(job)
+          job = self._LoadJobUnlocked(job_id)
 
-        elif status in (constants.JOB_STATUS_RUNNING,
-                        constants.JOB_STATUS_WAITLOCK):
-          logging.warning("Unfinished job %s found: %s", job.id, job)
-          try:
-            for op in job.ops:
-              op.status = constants.OP_STATUS_ERROR
-              op.result = "Unclean master daemon shutdown"
-          finally:
-            self.UpdateJobUnlocked(job)
-    finally:
-      self.release()
+          # a failure in loading the job can cause 'None' to be returned
+          if job is None:
+            continue
+
+          status = job.CalcStatus()
+
+          if status in (constants.JOB_STATUS_QUEUED, ):
+            self._wpool.AddTask(job)
+
+          elif status in (constants.JOB_STATUS_RUNNING,
+                          constants.JOB_STATUS_WAITLOCK,
+                          constants.JOB_STATUS_CANCELING):
+            logging.warning("Unfinished job %s found: %s", job.id, job)
+            try:
+              for op in job.ops:
+                op.status = constants.OP_STATUS_ERROR
+                op.result = "Unclean master daemon shutdown"
+            finally:
+              self.UpdateJobUnlocked(job)
+
+        logging.info("Job queue inspection finished")
+      finally:
+        self.release()
+    except:
+      self._wpool.TerminateWorkers()
+      raise
 
   @utils.LockedMethod
   @_RequireOpenQueue
@@ -548,7 +595,13 @@ class JobQueue(object):
     assert node_name != self._my_hostname
 
     # Clean queue directory on added node
-    RpcRunner.call_jobqueue_purge(node_name)
+    rpc.RpcRunner.call_jobqueue_purge(node_name)
+
+    if not node.master_candidate:
+      # remove if existing, ignoring errors
+      self._nodes.pop(node_name, None)
+      # and skip the replication of the job ids
+      return
 
     # Upload the whole queue excluding archived jobs
     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
@@ -564,8 +617,9 @@ class JobQueue(object):
       finally:
         fd.close()
 
-      result = RpcRunner.call_jobqueue_update([node_name], [node.primary_ip],
-                                              file_name, content)
+      result = rpc.RpcRunner.call_jobqueue_update([node_name],
+                                                  [node.primary_ip],
+                                                  file_name, content)
       if not result[node_name]:
         logging.error("Failed to upload %s to %s", file_name, node_name)
 
@@ -644,7 +698,7 @@ class JobQueue(object):
     utils.WriteFile(file_name, data=data)
 
     names, addrs = self._GetNodeIp()
-    result = RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
+    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
     self._CheckRpcResult(result, self._nodes,
                          "Updating %s" % file_name)
 
@@ -663,7 +717,7 @@ class JobQueue(object):
     os.rename(old, new)
 
     names, addrs = self._GetNodeIp()
-    result = RpcRunner.call_jobqueue_rename(names, addrs, old, new)
+    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
     self._CheckRpcResult(result, self._nodes,
                          "Moving %s to %s" % (old, new))
 
@@ -1003,21 +1057,42 @@ class JobQueue(object):
     @param job_id: job ID of job to be cancelled.
 
     """
-    logging.debug("Cancelling job %s", job_id)
+    logging.info("Cancelling job %s", job_id)
 
     job = self._LoadJobUnlocked(job_id)
     if not job:
       logging.debug("Job %s not found", job_id)
-      return
+      return (False, "Job %s not found" % job_id)
 
-    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
+    job_status = job.CalcStatus()
+
+    if job_status not in (constants.JOB_STATUS_QUEUED,
+                          constants.JOB_STATUS_WAITLOCK):
       logging.debug("Job %s is no longer in the queue", job.id)
-      return
+      return (False, "Job %s is no longer in the queue" % job.id)
+
+    if job_status == constants.JOB_STATUS_QUEUED:
+      self.CancelJobUnlocked(job)
+      return (True, "Job %s canceled" % job.id)
 
+    elif job_status == constants.JOB_STATUS_WAITLOCK:
+      # The worker will notice the new status and cancel the job
+      try:
+        for op in job.ops:
+          op.status = constants.OP_STATUS_CANCELING
+      finally:
+        self.UpdateJobUnlocked(job)
+      return (True, "Job %s will be canceled" % job.id)
+
+  @_RequireOpenQueue
+  def CancelJobUnlocked(self, job):
+    """Marks a job as canceled.
+
+    """
     try:
       for op in job.ops:
         op.status = constants.OP_STATUS_ERROR
-        op.result = "Job cancelled by request"
+        op.result = "Job canceled by request"
     finally:
       self.UpdateJobUnlocked(job)