Convert instance_os_import rpc to new style result
[ganeti-local] / lib / jqueue.py
index 25eb3d9..90757a5 100644 (file)
@@ -344,7 +344,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
     @param job: the job to be processed
 
     """
     @param job: the job to be processed
 
     """
-    logging.debug("Worker %s processing job %s",
+    logging.info("Worker %s processing job %s",
                   self.worker_id, job.id)
     proc = mcpu.Processor(self.pool.queue.context)
     self.queue = queue = job.queue
                   self.worker_id, job.id)
     proc = mcpu.Processor(self.pool.queue.context)
     self.queue = queue = job.queue
@@ -352,11 +352,15 @@ class _JobQueueWorker(workerpool.BaseWorker):
       try:
         count = len(job.ops)
         for idx, op in enumerate(job.ops):
       try:
         count = len(job.ops)
         for idx, op in enumerate(job.ops):
+          op_summary = op.input.Summary()
           try:
           try:
-            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
+            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
+                         op_summary)
 
             queue.acquire()
             try:
 
             queue.acquire()
             try:
+              if op.status == constants.OP_STATUS_CANCELED:
+                raise CancelJob()
               assert op.status == constants.OP_STATUS_QUEUED
               job.run_op_index = idx
               op.status = constants.OP_STATUS_WAITLOCK
               assert op.status == constants.OP_STATUS_QUEUED
               job.run_op_index = idx
               op.status = constants.OP_STATUS_WAITLOCK
@@ -408,8 +412,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
             finally:
               queue.release()
 
             finally:
               queue.release()
 
-            logging.debug("Op %s/%s: Successfully finished %s",
-                          idx + 1, count, op)
+            logging.info("Op %s/%s: Successfully finished opcode %s",
+                         idx + 1, count, op_summary)
           except CancelJob:
             # Will be handled further up
             raise
           except CancelJob:
             # Will be handled further up
             raise
@@ -420,7 +424,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
                 op.status = constants.OP_STATUS_ERROR
                 op.result = str(err)
                 op.end_timestamp = TimeStampNow()
                 op.status = constants.OP_STATUS_ERROR
                 op.result = str(err)
                 op.end_timestamp = TimeStampNow()
-                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
+                logging.info("Op %s/%s: Error in opcode %s: %s",
+                             idx + 1, count, op_summary, err)
               finally:
                 queue.UpdateJobUnlocked(job)
             finally:
               finally:
                 queue.UpdateJobUnlocked(job)
             finally:
@@ -449,8 +454,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
           status = job.CalcStatus()
       finally:
         queue.release()
           status = job.CalcStatus()
       finally:
         queue.release()
-      logging.debug("Worker %s finished job %s, status = %s",
-                    self.worker_id, job_id, status)
+      logging.info("Worker %s finished job %s, status = %s",
+                   self.worker_id, job_id, status)
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
@@ -713,13 +718,14 @@ class JobQueue(object):
     @param rename: List containing tuples mapping old to new names
 
     """
     @param rename: List containing tuples mapping old to new names
 
     """
+    # Rename them locally
     for old, new in rename:
       utils.RenameFile(old, new, mkdir=True)
 
     for old, new in rename:
       utils.RenameFile(old, new, mkdir=True)
 
-      names, addrs = self._GetNodeIp()
-      result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
-      self._CheckRpcResult(result, self._nodes,
-                           "Moving %s to %s" % (old, new))
+    # ... and on all nodes
+    names, addrs = self._GetNodeIp()
+    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
+    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
 
   def _FormatJobID(self, job_id):
     """Convert a job ID to string format.
 
   def _FormatJobID(self, job_id):
     """Convert a job ID to string format.
@@ -937,9 +943,8 @@ class JobQueue(object):
       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
     return True
 
       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
     return True
 
-  @utils.LockedMethod
   @_RequireOpenQueue
   @_RequireOpenQueue
-  def SubmitJob(self, ops):
+  def _SubmitJobUnlocked(self, ops):
     """Create and store a new job.
 
     This enters the job into our job queue and also puts it on the new
     """Create and store a new job.
 
     This enters the job into our job queue and also puts it on the new
@@ -953,7 +958,7 @@ class JobQueue(object):
 
     """
     if self._IsQueueMarkedDrain():
 
     """
     if self._IsQueueMarkedDrain():
-      raise errors.JobQueueDrainError()
+      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
 
     # Check job queue size
     size = len(self._ListJobFiles())
 
     # Check job queue size
     size = len(self._ListJobFiles())
@@ -981,6 +986,37 @@ class JobQueue(object):
 
     return job.id
 
 
     return job.id
 
+  @utils.LockedMethod
+  @_RequireOpenQueue
+  def SubmitJob(self, ops):
+    """Create and store a new job.
+
+    @see: L{_SubmitJobUnlocked}
+
+    """
+    return self._SubmitJobUnlocked(ops)
+
+  @utils.LockedMethod
+  @_RequireOpenQueue
+  def SubmitManyJobs(self, jobs):
+    """Create and store multiple jobs.
+
+    @see: L{_SubmitJobUnlocked}
+
+    """
+    results = []
+    for ops in jobs:
+      try:
+        data = self._SubmitJobUnlocked(ops)
+        status = True
+      except errors.GenericError, err:
+        data = str(err)
+        status = False
+      results.append((status, data))
+
+    return results
+
+
   @_RequireOpenQueue
   def UpdateJobUnlocked(self, job):
     """Update a job's on disk storage.
   @_RequireOpenQueue
   def UpdateJobUnlocked(self, job):
     """Update a job's on disk storage.
@@ -1116,7 +1152,7 @@ class JobQueue(object):
     """
     try:
       for op in job.ops:
     """
     try:
       for op in job.ops:
-        op.status = constants.OP_STATUS_ERROR
+        op.status = constants.OP_STATUS_CANCELED
         op.result = "Job canceled by request"
     finally:
       self.UpdateJobUnlocked(job)
         op.result = "Job canceled by request"
     finally:
       self.UpdateJobUnlocked(job)
@@ -1126,7 +1162,7 @@ class JobQueue(object):
     """Archives jobs.
 
     @type jobs: list of L{_QueuedJob}
     """Archives jobs.
 
     @type jobs: list of L{_QueuedJob}
-    @param job: Job objects
+    @param jobs: Job objects
     @rtype: int
     @return: Number of archived jobs
 
     @rtype: int
     @return: Number of archived jobs
 
@@ -1159,7 +1195,7 @@ class JobQueue(object):
   def ArchiveJob(self, job_id):
     """Archives a job.
 
   def ArchiveJob(self, job_id):
     """Archives a job.
 
-    This is just a wrapper over L{_ArchiveJobUnlocked}.
+    This is just a wrapper over L{_ArchiveJobsUnlocked}.
 
     @type job_id: string
     @param job_id: Job ID of job to be archived.
 
     @type job_id: string
     @param job_id: Job ID of job to be archived.
@@ -1174,7 +1210,7 @@ class JobQueue(object):
       logging.debug("Job %s not found", job_id)
       return False
 
       logging.debug("Job %s not found", job_id)
       return False
 
-    return self._ArchiveJobUnlocked([job]) == 1
+    return self._ArchiveJobsUnlocked([job]) == 1
 
   @utils.LockedMethod
   @_RequireOpenQueue
 
   @utils.LockedMethod
   @_RequireOpenQueue