Implement BuildHooksEnv for NoHooksLU
[ganeti-local] / lib / jqueue.py
index c13e303..8657fed 100644 (file)
@@ -313,6 +313,26 @@ class _QueuedJob(object):
 
     return entries
 
+  def MarkUnfinishedOps(self, status, result):
+    """Mark unfinished opcodes with a given status and result.
+
+    This is an utility function for marking all running or waiting to
+    be run opcodes with a given status. Opcodes which are already
+    finalised are not changed.
+
+    @param status: a given opcode status
+    @param result: the opcode result
+
+    """
+    not_marked = True
+    for op in self.ops:
+      if op.status in constants.OPS_FINALIZED:
+        assert not_marked, "Finalized opcodes found after non-finalized ones"
+        continue
+      op.status = status
+      op.result = result
+      not_marked = False
+
 
 class _JobQueueWorker(workerpool.BaseWorker):
   """The actual job workers.
@@ -440,7 +460,10 @@ class _JobQueueWorker(workerpool.BaseWorker):
             try:
               try:
                 op.status = constants.OP_STATUS_ERROR
-                op.result = str(err)
+                if isinstance(err, errors.GenericError):
+                  op.result = errors.EncodeException(err)
+                else:
+                  op.result = str(err)
                 op.end_timestamp = TimeStampNow()
                 logging.info("Op %s/%s: Error in opcode %s: %s",
                              idx + 1, count, op_summary, err)
@@ -486,33 +509,37 @@ class _JobQueueWorkerPool(workerpool.WorkerPool):
     self.queue = queue
 
 
-class JobQueue(object):
-  """Queue used to manage the jobs.
+def _RequireOpenQueue(fn):
+  """Decorator for "public" functions.
 
-  @cvar _RE_JOB_FILE: regex matching the valid job file names
+  This function should be used for all 'public' functions. That is,
+  functions usually called from other classes. Note that this should
+  be applied only to methods (not plain functions), since it expects
+  that the decorated function is called with a first argument that has
+  a '_queue_lock' argument.
 
-  """
-  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
+  @warning: Use this decorator only after utils.LockedMethod!
 
-  def _RequireOpenQueue(fn):
-    """Decorator for "public" functions.
+  Example::
+    @utils.LockedMethod
+    @_RequireOpenQueue
+    def Example(self):
+      pass
 
-    This function should be used for all 'public' functions. That is,
-    functions usually called from other classes.
+  """
+  def wrapper(self, *args, **kwargs):
+    assert self._queue_lock is not None, "Queue should be open"
+    return fn(self, *args, **kwargs)
+  return wrapper
 
-    @warning: Use this decorator only after utils.LockedMethod!
 
-    Example::
-      @utils.LockedMethod
-      @_RequireOpenQueue
-      def Example(self):
-        pass
+class JobQueue(object):
+  """Queue used to manage the jobs.
 
-    """
-    def wrapper(self, *args, **kwargs):
-      assert self._queue_lock is not None, "Queue should be open"
-      return fn(self, *args, **kwargs)
-    return wrapper
+  @cvar _RE_JOB_FILE: regex matching the valid job file names
+
+  """
+  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
 
   def __init__(self, context):
     """Constructor for JobQueue.
@@ -593,9 +620,8 @@ class JobQueue(object):
                           constants.JOB_STATUS_CANCELING):
             logging.warning("Unfinished job %s found: %s", job.id, job)
             try:
-              for op in job.ops:
-                op.status = constants.OP_STATUS_ERROR
-                op.result = "Unclean master daemon shutdown"
+              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                                    "Unclean master daemon shutdown")
             finally:
               self.UpdateJobUnlocked(job)
 
@@ -777,26 +803,31 @@ class JobQueue(object):
     """
     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
 
-  def _NewSerialUnlocked(self):
+  def _NewSerialsUnlocked(self, count):
     """Generates a new job identifier.
 
     Job identifiers are unique during the lifetime of a cluster.
 
+    @type count: integer
+    @param count: how many serials to return
     @rtype: str
     @return: a string representing the job identifier.
 
     """
+    assert count > 0
     # New number
-    serial = self._last_serial + 1
+    serial = self._last_serial + count
 
     # Write to file
     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                         "%s\n" % serial)
 
+    result = [self._FormatJobID(v)
+              for v in range(self._last_serial, serial + 1)]
     # Keep it only if we were able to write the file
     self._last_serial = serial
 
-    return self._FormatJobID(serial)
+    return result
 
   @staticmethod
   def _GetJobPath(job_id):
@@ -961,14 +992,15 @@ class JobQueue(object):
       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
     return True
 
-  @utils.LockedMethod
   @_RequireOpenQueue
-  def SubmitJob(self, ops):
+  def _SubmitJobUnlocked(self, job_id, ops):
     """Create and store a new job.
 
     This enters the job into our job queue and also puts it on the new
     queue, in order for it to be picked up by the queue processors.
 
+    @type job_id: job ID
+    @param jod_id: the job ID for the new job
     @type ops: list
     @param ops: The list of OpCodes that will become the new job.
     @rtype: job ID
@@ -977,7 +1009,7 @@ class JobQueue(object):
 
     """
     if self._IsQueueMarkedDrain():
-      raise errors.JobQueueDrainError()
+      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
 
     # Check job queue size
     size = len(self._ListJobFiles())
@@ -990,8 +1022,6 @@ class JobQueue(object):
     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
       raise errors.JobQueueFull()
 
-    # Get job identifier
-    job_id = self._NewSerialUnlocked()
     job = _QueuedJob(self, job_id, ops)
 
     # Write to disk
@@ -1005,6 +1035,39 @@ class JobQueue(object):
 
     return job.id
 
+  @utils.LockedMethod
+  @_RequireOpenQueue
+  def SubmitJob(self, ops):
+    """Create and store a new job.
+
+    @see: L{_SubmitJobUnlocked}
+
+    """
+    job_id = self._NewSerialsUnlocked(1)[0]
+    return self._SubmitJobUnlocked(job_id, ops)
+
+  @utils.LockedMethod
+  @_RequireOpenQueue
+  def SubmitManyJobs(self, jobs):
+    """Create and store multiple jobs.
+
+    @see: L{_SubmitJobUnlocked}
+
+    """
+    results = []
+    all_job_ids = self._NewSerialsUnlocked(len(jobs))
+    for job_id, ops in zip(all_job_ids, jobs):
+      try:
+        data = self._SubmitJobUnlocked(job_id, ops)
+        status = True
+      except errors.GenericError, err:
+        data = str(err)
+        status = False
+      results.append((status, data))
+
+    return results
+
+
   @_RequireOpenQueue
   def UpdateJobUnlocked(self, job):
     """Update a job's on disk storage.
@@ -1052,6 +1115,10 @@ class JobQueue(object):
 
     """
     logging.debug("Waiting for changes in job %s", job_id)
+
+    job_info = None
+    log_entries = None
+
     end_time = time.time() + timeout
     while True:
       delta_time = end_time - time.time()
@@ -1093,7 +1160,10 @@ class JobQueue(object):
 
     logging.debug("Job %s changed", job_id)
 
-    return (job_info, log_entries)
+    if job_info is None and log_entries is None:
+      return None
+    else:
+      return (job_info, log_entries)
 
   @utils.LockedMethod
   @_RequireOpenQueue
@@ -1117,8 +1187,8 @@ class JobQueue(object):
 
     if job_status not in (constants.JOB_STATUS_QUEUED,
                           constants.JOB_STATUS_WAITLOCK):
-      logging.debug("Job %s is no longer in the queue", job.id)
-      return (False, "Job %s is no longer in the queue" % job.id)
+      logging.debug("Job %s is no longer waiting in the queue", job.id)
+      return (False, "Job %s is no longer waiting in the queue" % job.id)
 
     if job_status == constants.JOB_STATUS_QUEUED:
       self.CancelJobUnlocked(job)
@@ -1127,8 +1197,7 @@ class JobQueue(object):
     elif job_status == constants.JOB_STATUS_WAITLOCK:
       # The worker will notice the new status and cancel the job
       try:
-        for op in job.ops:
-          op.status = constants.OP_STATUS_CANCELING
+        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
       finally:
         self.UpdateJobUnlocked(job)
       return (True, "Job %s will be canceled" % job.id)
@@ -1139,9 +1208,8 @@ class JobQueue(object):
 
     """
     try:
-      for op in job.ops:
-        op.status = constants.OP_STATUS_CANCELED
-        op.result = "Job canceled by request"
+      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+                            "Job canceled by request")
     finally:
       self.UpdateJobUnlocked(job)