Add reset_env option to RunCmd
[ganeti-local] / lib / jqueue.py
index 5dd5578..7824d3f 100644 (file)
@@ -154,6 +154,7 @@ class _QueuedJob(object):
   @ivar change: a Condition variable we use for waiting for job changes
 
   """
+  # pylint: disable-msg=W0212
   __slots__ = ["queue", "id", "ops", "log_serial",
                "received_timestamp", "start_timestamp", "end_timestamp",
                "lock_status", "change",
@@ -189,6 +190,13 @@ class _QueuedJob(object):
     # Condition to wait for changes
     self.change = threading.Condition(self.queue._lock)
 
+  def __repr__(self):
+    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
+              "id=%s" % self.id,
+              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
+
+    return "<%s at %#x>" % (" ".join(status), id(self))
+
   @classmethod
   def Restore(cls, queue, state):
     """Restore a _QueuedJob from serialized state:
@@ -419,7 +427,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
   """The actual job workers.
 
   """
-  def RunTask(self, job):
+  def RunTask(self, job): # pylint: disable-msg=W0221
     """Job executor.
 
     This functions processes a job. It is closely tied to the _QueuedJob and
@@ -429,9 +437,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
     @param job: the job to be processed
 
     """
-    logging.info("Worker %s processing job %s",
-                  self.worker_id, job.id)
-    proc = mcpu.Processor(self.pool.queue.context)
+    logging.info("Processing job %s", job.id)
+    proc = mcpu.Processor(self.pool.queue.context, job.id)
     queue = job.queue
     try:
       try:
@@ -526,8 +533,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
       finally:
         queue.release()
 
-      logging.info("Worker %s finished job %s, status = %s",
-                   self.worker_id, job_id, status)
+      logging.info("Finished job %s, status = %s", job_id, status)
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
@@ -535,38 +541,44 @@ class _JobQueueWorkerPool(workerpool.WorkerPool):
 
   """
   def __init__(self, queue):
-    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
+    super(_JobQueueWorkerPool, self).__init__("JobQueue",
+                                              JOBQUEUE_THREADS,
                                               _JobQueueWorker)
     self.queue = queue
 
 
-class JobQueue(object):
-  """Queue used to manage the jobs.
+def _RequireOpenQueue(fn):
+  """Decorator for "public" functions.
 
-  @cvar _RE_JOB_FILE: regex matching the valid job file names
+  This function should be used for all 'public' functions. That is,
+  functions usually called from other classes. Note that this should
+  be applied only to methods (not plain functions), since it expects
+  that the decorated function is called with a first argument that has
+  a '_queue_lock' argument.
 
-  """
-  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
+  @warning: Use this decorator only after utils.LockedMethod!
+
+  Example::
+    @utils.LockedMethod
+    @_RequireOpenQueue
+    def Example(self):
+      pass
 
-  def _RequireOpenQueue(fn):
-    """Decorator for "public" functions.
+  """
+  def wrapper(self, *args, **kwargs):
+    # pylint: disable-msg=W0212
+    assert self._queue_lock is not None, "Queue should be open"
+    return fn(self, *args, **kwargs)
+  return wrapper
 
-    This function should be used for all 'public' functions. That is,
-    functions usually called from other classes.
 
-    @warning: Use this decorator only after utils.LockedMethod!
+class JobQueue(object):
+  """Queue used to manage the jobs.
 
-    Example::
-      @utils.LockedMethod
-      @_RequireOpenQueue
-      def Example(self):
-        pass
+  @cvar _RE_JOB_FILE: regex matching the valid job file names
 
-    """
-    def wrapper(self, *args, **kwargs):
-      assert self._queue_lock is not None, "Queue should be open"
-      return fn(self, *args, **kwargs)
-    return wrapper
+  """
+  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
 
   def __init__(self, context):
     """Constructor for JobQueue.
@@ -719,7 +731,8 @@ class JobQueue(object):
     except KeyError:
       pass
 
-  def _CheckRpcResult(self, result, nodes, failmsg):
+  @staticmethod
+  def _CheckRpcResult(result, nodes, failmsg):
     """Verifies the status of an RPC call.
 
     Since we aim to keep consistency should this node (the current
@@ -740,8 +753,8 @@ class JobQueue(object):
       msg = result[node].fail_msg
       if msg:
         failed.append(node)
-        logging.error("RPC call %s failed on node %s: %s",
-                      result[node].call, node, msg)
+        logging.error("RPC call %s (%s) failed on node %s: %s",
+                      result[node].call, failmsg, node, msg)
       else:
         success.append(node)
 
@@ -800,7 +813,8 @@ class JobQueue(object):
     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
 
-  def _FormatJobID(self, job_id):
+  @staticmethod
+  def _FormatJobID(job_id):
     """Convert a job ID to string format.
 
     Currently this just does C{str(job_id)} after performing some
@@ -915,6 +929,7 @@ class JobQueue(object):
     @return: the list of job IDs
 
     """
+    # pylint: disable-msg=W0613
     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
     jlist = utils.NiceSort(jlist)
     return jlist
@@ -959,7 +974,7 @@ class JobQueue(object):
 
     try:
       job = _QueuedJob.Restore(self, data)
-    except Exception, err:
+    except Exception, err: # pylint: disable-msg=W0703
       new_path = self._GetArchivedJobPath(job_id)
       if filepath == new_path:
         # job already archived (future case)
@@ -1027,7 +1042,7 @@ class JobQueue(object):
     queue, in order for it to be picked up by the queue processors.
 
     @type job_id: job ID
-    @param jod_id: the job ID for the new job
+    @param job_id: the job ID for the new job
     @type ops: list
     @param ops: The list of OpCodes that will become the new job.
     @rtype: job ID
@@ -1140,21 +1155,13 @@ class JobQueue(object):
         as such by the clients
 
     """
-    logging.debug("Waiting for changes in job %s", job_id)
-
-    job_info = None
-    log_entries = None
-
-    end_time = time.time() + timeout
-    while True:
-      delta_time = end_time - time.time()
-      if delta_time < 0:
-        return constants.JOB_NOTCHANGED
+    job = self._LoadJobUnlocked(job_id)
+    if not job:
+      logging.debug("Job %s not found", job_id)
+      return None
 
-      job = self._LoadJobUnlocked(job_id)
-      if not job:
-        logging.debug("Job %s not found", job_id)
-        break
+    def _CheckForChanges():
+      logging.debug("Waiting for changes in job %s", job_id)
 
       status = job.CalcStatus()
       job_info = self._GetJobInfoUnlocked(job, fields)
@@ -1168,28 +1175,24 @@ class JobQueue(object):
       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
 
-      if status not in (constants.JOB_STATUS_QUEUED,
-                        constants.JOB_STATUS_RUNNING,
-                        constants.JOB_STATUS_WAITLOCK):
-        # Don't even try to wait if the job is no longer running, there will be
-        # no changes.
-        break
-
-      if (prev_job_info != job_info or
+      # Don't even try to wait if the job is no longer running, there will be
+      # no changes.
+      if (status not in (constants.JOB_STATUS_QUEUED,
+                         constants.JOB_STATUS_RUNNING,
+                         constants.JOB_STATUS_WAITLOCK) or
+          prev_job_info != job_info or
           (log_entries and prev_log_serial != log_entries[0][0])):
-        break
-
-      logging.debug("Waiting again")
-
-      # Release the queue lock while waiting
-      job.change.wait(delta_time)
+        logging.debug("Job %s changed", job_id)
+        return (job_info, log_entries)
 
-    logging.debug("Job %s changed", job_id)
+      raise utils.RetryAgain()
 
-    if job_info is None and log_entries is None:
-      return None
-    else:
-      return (job_info, log_entries)
+    try:
+      # Setting wait function to release the queue lock while waiting
+      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
+                         wait_fn=job.change.wait)
+    except utils.RetryTimeout:
+      return constants.JOB_NOTCHANGED
 
   @utils.LockedMethod
   @_RequireOpenQueue
@@ -1268,7 +1271,7 @@ class JobQueue(object):
     self._RenameFilesUnlocked(rename_files)
 
     logging.debug("Successfully archived job(s) %s",
-                  ", ".join(job.id for job in archive_jobs))
+                  utils.CommaJoin(job.id for job in archive_jobs))
 
     return len(archive_jobs)
 
@@ -1318,7 +1321,7 @@ class JobQueue(object):
     all_job_ids = self._GetJobIDsUnlocked(archived=False)
     pending = []
     for idx, job_id in enumerate(all_job_ids):
-      last_touched = idx
+      last_touched = idx + 1
 
       # Not optimal because jobs could be pending
       # TODO: Measure average duration for job archival and take number of
@@ -1348,9 +1351,10 @@ class JobQueue(object):
     if pending:
       archived_count += self._ArchiveJobsUnlocked(pending)
 
-    return (archived_count, len(all_job_ids) - last_touched - 1)
+    return (archived_count, len(all_job_ids) - last_touched)
 
-  def _GetJobInfoUnlocked(self, job, fields):
+  @staticmethod
+  def _GetJobInfoUnlocked(job, fields):
     """Returns information about a job.
 
     @type job: L{_QueuedJob}