Fix heading to the one of epydoc
[ganeti-local] / lib / jqueue.py
index ed862dd..62a5f4a 100644 (file)
@@ -154,6 +154,7 @@ class _QueuedJob(object):
   @ivar change: a Condition variable we use for waiting for job changes
 
   """
+  # pylint: disable-msg=W0212
   __slots__ = ["queue", "id", "ops", "log_serial",
                "received_timestamp", "start_timestamp", "end_timestamp",
                "lock_status", "change",
@@ -189,6 +190,13 @@ class _QueuedJob(object):
     # Condition to wait for changes
     self.change = threading.Condition(self.queue._lock)
 
+  def __repr__(self):
+    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
+              "id=%s" % self.id,
+              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
+
+    return "<%s at %#x>" % (" ".join(status), id(self))
+
   @classmethod
   def Restore(cls, queue, state):
     """Restore a _QueuedJob from serialized state:
@@ -419,7 +427,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
   """The actual job workers.
 
   """
-  def RunTask(self, job):
+  def RunTask(self, job): # pylint: disable-msg=W0221
     """Job executor.
 
     This functions processes a job. It is closely tied to the _QueuedJob and
@@ -429,8 +437,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
     @param job: the job to be processed
 
     """
-    logging.info("Worker %s processing job %s",
-                  self.worker_id, job.id)
+    logging.info("Processing job %s", job.id)
     proc = mcpu.Processor(self.pool.queue.context, job.id)
     queue = job.queue
     try:
@@ -526,8 +533,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
       finally:
         queue.release()
 
-      logging.info("Worker %s finished job %s, status = %s",
-                   self.worker_id, job_id, status)
+      logging.info("Finished job %s, status = %s", job_id, status)
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
@@ -535,7 +541,8 @@ class _JobQueueWorkerPool(workerpool.WorkerPool):
 
   """
   def __init__(self, queue):
-    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
+    super(_JobQueueWorkerPool, self).__init__("JobQueue",
+                                              JOBQUEUE_THREADS,
                                               _JobQueueWorker)
     self.queue = queue
 
@@ -559,6 +566,7 @@ def _RequireOpenQueue(fn):
 
   """
   def wrapper(self, *args, **kwargs):
+    # pylint: disable-msg=W0212
     assert self._queue_lock is not None, "Queue should be open"
     return fn(self, *args, **kwargs)
   return wrapper
@@ -723,7 +731,8 @@ class JobQueue(object):
     except KeyError:
       pass
 
-  def _CheckRpcResult(self, result, nodes, failmsg):
+  @staticmethod
+  def _CheckRpcResult(result, nodes, failmsg):
     """Verifies the status of an RPC call.
 
     Since we aim to keep consistency should this node (the current
@@ -744,8 +753,8 @@ class JobQueue(object):
       msg = result[node].fail_msg
       if msg:
         failed.append(node)
-        logging.error("RPC call %s failed on node %s: %s",
-                      result[node].call, node, msg)
+        logging.error("RPC call %s (%s) failed on node %s: %s",
+                      result[node].call, failmsg, node, msg)
       else:
         success.append(node)
 
@@ -804,7 +813,8 @@ class JobQueue(object):
     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
 
-  def _FormatJobID(self, job_id):
+  @staticmethod
+  def _FormatJobID(job_id):
     """Convert a job ID to string format.
 
     Currently this just does C{str(job_id)} after performing some
@@ -872,7 +882,7 @@ class JobQueue(object):
     @return: the path to the job file
 
     """
-    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
+    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
 
   @classmethod
   def _GetArchivedJobPath(cls, job_id):
@@ -884,8 +894,8 @@ class JobQueue(object):
     @return: the path to the archived job file
 
     """
-    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
-    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
+    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
+                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
 
   @classmethod
   def _ExtractJobID(cls, name):
@@ -919,6 +929,7 @@ class JobQueue(object):
     @return: the list of job IDs
 
     """
+    # pylint: disable-msg=W0613
     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
     jlist = utils.NiceSort(jlist)
     return jlist
@@ -963,7 +974,7 @@ class JobQueue(object):
 
     try:
       job = _QueuedJob.Restore(self, data)
-    except Exception, err:
+    except Exception, err: # pylint: disable-msg=W0703
       new_path = self._GetArchivedJobPath(job_id)
       if filepath == new_path:
         # job already archived (future case)
@@ -1260,7 +1271,7 @@ class JobQueue(object):
     self._RenameFilesUnlocked(rename_files)
 
     logging.debug("Successfully archived job(s) %s",
-                  ", ".join(job.id for job in archive_jobs))
+                  utils.CommaJoin(job.id for job in archive_jobs))
 
     return len(archive_jobs)
 
@@ -1310,7 +1321,7 @@ class JobQueue(object):
     all_job_ids = self._GetJobIDsUnlocked(archived=False)
     pending = []
     for idx, job_id in enumerate(all_job_ids):
-      last_touched = idx
+      last_touched = idx + 1
 
       # Not optimal because jobs could be pending
       # TODO: Measure average duration for job archival and take number of
@@ -1340,9 +1351,10 @@ class JobQueue(object):
     if pending:
       archived_count += self._ArchiveJobsUnlocked(pending)
 
-    return (archived_count, len(all_job_ids) - last_touched - 1)
+    return (archived_count, len(all_job_ids) - last_touched)
 
-  def _GetJobInfoUnlocked(self, job, fields):
+  @staticmethod
+  def _GetJobInfoUnlocked(job, fields):
     """Returns information about a job.
 
     @type job: L{_QueuedJob}