Support arguments in utils.RunInSeparateProcess
[ganeti-local] / lib / jqueue.py
index ec2f52a..62a5f4a 100644 (file)
@@ -190,6 +190,13 @@ class _QueuedJob(object):
     # Condition to wait for changes
     self.change = threading.Condition(self.queue._lock)
 
+  def __repr__(self):
+    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
+              "id=%s" % self.id,
+              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
+
+    return "<%s at %#x>" % (" ".join(status), id(self))
+
   @classmethod
   def Restore(cls, queue, state):
     """Restore a _QueuedJob from serialized state:
@@ -430,8 +437,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
     @param job: the job to be processed
 
     """
-    logging.info("Worker %s processing job %s",
-                  self.worker_id, job.id)
+    logging.info("Processing job %s", job.id)
     proc = mcpu.Processor(self.pool.queue.context, job.id)
     queue = job.queue
     try:
@@ -527,8 +533,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
       finally:
         queue.release()
 
-      logging.info("Worker %s finished job %s, status = %s",
-                   self.worker_id, job_id, status)
+      logging.info("Finished job %s, status = %s", job_id, status)
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
@@ -536,7 +541,8 @@ class _JobQueueWorkerPool(workerpool.WorkerPool):
 
   """
   def __init__(self, queue):
-    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
+    super(_JobQueueWorkerPool, self).__init__("JobQueue",
+                                              JOBQUEUE_THREADS,
                                               _JobQueueWorker)
     self.queue = queue
 
@@ -747,8 +753,8 @@ class JobQueue(object):
       msg = result[node].fail_msg
       if msg:
         failed.append(node)
-        logging.error("RPC call %s failed on node %s: %s",
-                      result[node].call, node, msg)
+        logging.error("RPC call %s (%s) failed on node %s: %s",
+                      result[node].call, failmsg, node, msg)
       else:
         success.append(node)
 
@@ -876,7 +882,7 @@ class JobQueue(object):
     @return: the path to the job file
 
     """
-    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
+    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
 
   @classmethod
   def _GetArchivedJobPath(cls, job_id):
@@ -888,8 +894,8 @@ class JobQueue(object):
     @return: the path to the archived job file
 
     """
-    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
-    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
+    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
+                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
 
   @classmethod
   def _ExtractJobID(cls, name):
@@ -923,6 +929,7 @@ class JobQueue(object):
     @return: the list of job IDs
 
     """
+    # pylint: disable-msg=W0613
     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
     jlist = utils.NiceSort(jlist)
     return jlist
@@ -1314,7 +1321,7 @@ class JobQueue(object):
     all_job_ids = self._GetJobIDsUnlocked(archived=False)
     pending = []
     for idx, job_id in enumerate(all_job_ids):
-      last_touched = idx
+      last_touched = idx + 1
 
       # Not optimal because jobs could be pending
       # TODO: Measure average duration for job archival and take number of
@@ -1344,7 +1351,7 @@ class JobQueue(object):
     if pending:
       archived_count += self._ArchiveJobsUnlocked(pending)
 
-    return (archived_count, len(all_job_ids) - last_touched - 1)
+    return (archived_count, len(all_job_ids) - last_touched)
 
   @staticmethod
   def _GetJobInfoUnlocked(job, fields):