Merge branch 'devel-2.1' into stable-2.1
[ganeti-local] / lib / jqueue.py
index 91a721a..2c2345b 100644 (file)
@@ -154,6 +154,7 @@ class _QueuedJob(object):
   @ivar change: a Condition variable we use for waiting for job changes
 
   """
+  # pylint: disable-msg=W0212
   __slots__ = ["queue", "id", "ops", "log_serial",
                "received_timestamp", "start_timestamp", "end_timestamp",
                "lock_status", "change",
@@ -419,7 +420,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
   """The actual job workers.
 
   """
-  def RunTask(self, job):
+  def RunTask(self, job): # pylint: disable-msg=W0221
     """Job executor.
 
     This functions processes a job. It is closely tied to the _QueuedJob and
@@ -431,7 +432,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
     """
     logging.info("Worker %s processing job %s",
                   self.worker_id, job.id)
-    proc = mcpu.Processor(self.pool.queue.context)
+    proc = mcpu.Processor(self.pool.queue.context, job.id)
     queue = job.queue
     try:
       try:
@@ -540,33 +541,38 @@ class _JobQueueWorkerPool(workerpool.WorkerPool):
     self.queue = queue
 
 
-class JobQueue(object):
-  """Queue used to manage the jobs.
+def _RequireOpenQueue(fn):
+  """Decorator for "public" functions.
 
-  @cvar _RE_JOB_FILE: regex matching the valid job file names
+  This function should be used for all 'public' functions. That is,
+  functions usually called from other classes. Note that this should
+  be applied only to methods (not plain functions), since it expects
+  that the decorated function is called with a first argument that has
+  a '_queue_lock' argument.
 
-  """
-  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
+  @warning: Use this decorator only after utils.LockedMethod!
 
-  def _RequireOpenQueue(fn):
-    """Decorator for "public" functions.
+  Example::
+    @utils.LockedMethod
+    @_RequireOpenQueue
+    def Example(self):
+      pass
 
-    This function should be used for all 'public' functions. That is,
-    functions usually called from other classes.
+  """
+  def wrapper(self, *args, **kwargs):
+    # pylint: disable-msg=W0212
+    assert self._queue_lock is not None, "Queue should be open"
+    return fn(self, *args, **kwargs)
+  return wrapper
 
-    @warning: Use this decorator only after utils.LockedMethod!
 
-    Example::
-      @utils.LockedMethod
-      @_RequireOpenQueue
-      def Example(self):
-        pass
+class JobQueue(object):
+  """Queue used to manage the jobs.
 
-    """
-    def wrapper(self, *args, **kwargs):
-      assert self._queue_lock is not None, "Queue should be open"
-      return fn(self, *args, **kwargs)
-    return wrapper
+  @cvar _RE_JOB_FILE: regex matching the valid job file names
+
+  """
+  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
 
   def __init__(self, context):
     """Constructor for JobQueue.
@@ -719,7 +725,8 @@ class JobQueue(object):
     except KeyError:
       pass
 
-  def _CheckRpcResult(self, result, nodes, failmsg):
+  @staticmethod
+  def _CheckRpcResult(result, nodes, failmsg):
     """Verifies the status of an RPC call.
 
     Since we aim to keep consistency should this node (the current
@@ -740,8 +747,8 @@ class JobQueue(object):
       msg = result[node].fail_msg
       if msg:
         failed.append(node)
-        logging.error("RPC call %s failed on node %s: %s",
-                      result[node].call, node, msg)
+        logging.error("RPC call %s (%s) failed on node %s: %s",
+                      result[node].call, failmsg, node, msg)
       else:
         success.append(node)
 
@@ -800,7 +807,8 @@ class JobQueue(object):
     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
 
-  def _FormatJobID(self, job_id):
+  @staticmethod
+  def _FormatJobID(job_id):
     """Convert a job ID to string format.
 
     Currently this just does C{str(job_id)} after performing some
@@ -915,6 +923,7 @@ class JobQueue(object):
     @return: the list of job IDs
 
     """
+    # pylint: disable-msg=W0613
     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
     jlist = utils.NiceSort(jlist)
     return jlist
@@ -959,7 +968,7 @@ class JobQueue(object):
 
     try:
       job = _QueuedJob.Restore(self, data)
-    except Exception, err:
+    except Exception, err: # pylint: disable-msg=W0703
       new_path = self._GetArchivedJobPath(job_id)
       if filepath == new_path:
         # job already archived (future case)
@@ -1140,21 +1149,13 @@ class JobQueue(object):
         as such by the clients
 
     """
-    logging.debug("Waiting for changes in job %s", job_id)
-
-    job_info = None
-    log_entries = None
-
-    end_time = time.time() + timeout
-    while True:
-      delta_time = end_time - time.time()
-      if delta_time < 0:
-        return constants.JOB_NOTCHANGED
+    job = self._LoadJobUnlocked(job_id)
+    if not job:
+      logging.debug("Job %s not found", job_id)
+      return None
 
-      job = self._LoadJobUnlocked(job_id)
-      if not job:
-        logging.debug("Job %s not found", job_id)
-        break
+    def _CheckForChanges():
+      logging.debug("Waiting for changes in job %s", job_id)
 
       status = job.CalcStatus()
       job_info = self._GetJobInfoUnlocked(job, fields)
@@ -1168,28 +1169,24 @@ class JobQueue(object):
       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
 
-      if status not in (constants.JOB_STATUS_QUEUED,
-                        constants.JOB_STATUS_RUNNING,
-                        constants.JOB_STATUS_WAITLOCK):
-        # Don't even try to wait if the job is no longer running, there will be
-        # no changes.
-        break
-
-      if (prev_job_info != job_info or
+      # Don't even try to wait if the job is no longer running, there will be
+      # no changes.
+      if (status not in (constants.JOB_STATUS_QUEUED,
+                         constants.JOB_STATUS_RUNNING,
+                         constants.JOB_STATUS_WAITLOCK) or
+          prev_job_info != job_info or
           (log_entries and prev_log_serial != log_entries[0][0])):
-        break
-
-      logging.debug("Waiting again")
-
-      # Release the queue lock while waiting
-      job.change.wait(delta_time)
+        logging.debug("Job %s changed", job_id)
+        return (job_info, log_entries)
 
-    logging.debug("Job %s changed", job_id)
+      raise utils.RetryAgain()
 
-    if job_info is None and log_entries is None:
-      return None
-    else:
-      return (job_info, log_entries)
+    try:
+      # Setting wait function to release the queue lock while waiting
+      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
+                         wait_fn=job.change.wait)
+    except utils.RetryTimeout:
+      return constants.JOB_NOTCHANGED
 
   @utils.LockedMethod
   @_RequireOpenQueue
@@ -1268,7 +1265,7 @@ class JobQueue(object):
     self._RenameFilesUnlocked(rename_files)
 
     logging.debug("Successfully archived job(s) %s",
-                  ", ".join(job.id for job in archive_jobs))
+                  utils.CommaJoin(job.id for job in archive_jobs))
 
     return len(archive_jobs)
 
@@ -1350,7 +1347,8 @@ class JobQueue(object):
 
     return (archived_count, len(all_job_ids) - last_touched - 1)
 
-  def _GetJobInfoUnlocked(self, job, fields):
+  @staticmethod
+  def _GetJobInfoUnlocked(job, fields):
     """Returns information about a job.
 
     @type job: L{_QueuedJob}