Add reset_env option to RunCmd
[ganeti-local] / lib / jqueue.py
index 25f464e..7824d3f 100644 (file)
@@ -190,6 +190,13 @@ class _QueuedJob(object):
     # Condition to wait for changes
     self.change = threading.Condition(self.queue._lock)
 
     # Condition to wait for changes
     self.change = threading.Condition(self.queue._lock)
 
+  def __repr__(self):
+    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
+              "id=%s" % self.id,
+              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
+
+    return "<%s at %#x>" % (" ".join(status), id(self))
+
   @classmethod
   def Restore(cls, queue, state):
     """Restore a _QueuedJob from serialized state:
   @classmethod
   def Restore(cls, queue, state):
     """Restore a _QueuedJob from serialized state:
@@ -430,8 +437,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
     @param job: the job to be processed
 
     """
     @param job: the job to be processed
 
     """
-    logging.info("Worker %s processing job %s",
-                  self.worker_id, job.id)
+    logging.info("Processing job %s", job.id)
     proc = mcpu.Processor(self.pool.queue.context, job.id)
     queue = job.queue
     try:
     proc = mcpu.Processor(self.pool.queue.context, job.id)
     queue = job.queue
     try:
@@ -527,8 +533,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
       finally:
         queue.release()
 
       finally:
         queue.release()
 
-      logging.info("Worker %s finished job %s, status = %s",
-                   self.worker_id, job_id, status)
+      logging.info("Finished job %s, status = %s", job_id, status)
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
@@ -536,7 +541,8 @@ class _JobQueueWorkerPool(workerpool.WorkerPool):
 
   """
   def __init__(self, queue):
 
   """
   def __init__(self, queue):
-    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
+    super(_JobQueueWorkerPool, self).__init__("JobQueue",
+                                              JOBQUEUE_THREADS,
                                               _JobQueueWorker)
     self.queue = queue
 
                                               _JobQueueWorker)
     self.queue = queue
 
@@ -725,7 +731,8 @@ class JobQueue(object):
     except KeyError:
       pass
 
     except KeyError:
       pass
 
-  def _CheckRpcResult(self, result, nodes, failmsg):
+  @staticmethod
+  def _CheckRpcResult(result, nodes, failmsg):
     """Verifies the status of an RPC call.
 
     Since we aim to keep consistency should this node (the current
     """Verifies the status of an RPC call.
 
     Since we aim to keep consistency should this node (the current
@@ -746,8 +753,8 @@ class JobQueue(object):
       msg = result[node].fail_msg
       if msg:
         failed.append(node)
       msg = result[node].fail_msg
       if msg:
         failed.append(node)
-        logging.error("RPC call %s failed on node %s: %s",
-                      result[node].call, node, msg)
+        logging.error("RPC call %s (%s) failed on node %s: %s",
+                      result[node].call, failmsg, node, msg)
       else:
         success.append(node)
 
       else:
         success.append(node)
 
@@ -806,7 +813,8 @@ class JobQueue(object):
     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
 
     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
 
-  def _FormatJobID(self, job_id):
+  @staticmethod
+  def _FormatJobID(job_id):
     """Convert a job ID to string format.
 
     Currently this just does C{str(job_id)} after performing some
     """Convert a job ID to string format.
 
     Currently this just does C{str(job_id)} after performing some
@@ -921,6 +929,7 @@ class JobQueue(object):
     @return: the list of job IDs
 
     """
     @return: the list of job IDs
 
     """
+    # pylint: disable-msg=W0613
     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
     jlist = utils.NiceSort(jlist)
     return jlist
     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
     jlist = utils.NiceSort(jlist)
     return jlist
@@ -1312,7 +1321,7 @@ class JobQueue(object):
     all_job_ids = self._GetJobIDsUnlocked(archived=False)
     pending = []
     for idx, job_id in enumerate(all_job_ids):
     all_job_ids = self._GetJobIDsUnlocked(archived=False)
     pending = []
     for idx, job_id in enumerate(all_job_ids):
-      last_touched = idx
+      last_touched = idx + 1
 
       # Not optimal because jobs could be pending
       # TODO: Measure average duration for job archival and take number of
 
       # Not optimal because jobs could be pending
       # TODO: Measure average duration for job archival and take number of
@@ -1342,9 +1351,10 @@ class JobQueue(object):
     if pending:
       archived_count += self._ArchiveJobsUnlocked(pending)
 
     if pending:
       archived_count += self._ArchiveJobsUnlocked(pending)
 
-    return (archived_count, len(all_job_ids) - last_touched - 1)
+    return (archived_count, len(all_job_ids) - last_touched)
 
 
-  def _GetJobInfoUnlocked(self, job, fields):
+  @staticmethod
+  def _GetJobInfoUnlocked(job, fields):
     """Returns information about a job.
 
     @type job: L{_QueuedJob}
     """Returns information about a job.
 
     @type job: L{_QueuedJob}