hv_chroot: remove hard-coded path constructs
[ganeti-local] / lib / jqueue.py
index 71a6ad7..1b10b71 100644 (file)
@@ -145,20 +145,19 @@ class _QueuedJob(object):
   @ivar id: the job ID
   @type ops: list
   @ivar ops: the list of _QueuedOpCode that constitute the job
-  @type run_op_index: int
-  @ivar run_op_index: the currently executing opcode, or -1 if
-      we didn't yet start executing
   @type log_serial: int
   @ivar log_serial: holds the index for the next log entry
   @ivar received_timestamp: the timestamp for when the job was received
   @ivar start_timestmap: the timestamp for start of execution
   @ivar end_timestamp: the timestamp for end of execution
+  @ivar lock_status: In-memory locking information for debugging
   @ivar change: a Condition variable we use for waiting for job changes
 
   """
-  __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
+  # pylint: disable-msg=W0212
+  __slots__ = ["queue", "id", "ops", "log_serial",
                "received_timestamp", "start_timestamp", "end_timestamp",
-               "change",
+               "lock_status", "change",
                "__weakref__"]
 
   def __init__(self, queue, job_id, ops):
@@ -180,15 +179,24 @@ class _QueuedJob(object):
     self.queue = queue
     self.id = job_id
     self.ops = [_QueuedOpCode(op) for op in ops]
-    self.run_op_index = -1
     self.log_serial = 0
     self.received_timestamp = TimeStampNow()
     self.start_timestamp = None
     self.end_timestamp = None
 
+    # In-memory attributes
+    self.lock_status = None
+
     # Condition to wait for changes
     self.change = threading.Condition(self.queue._lock)
 
+  def __repr__(self):
+    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
+              "id=%s" % self.id,
+              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
+
+    return "<%s at %#x>" % (" ".join(status), id(self))
+
   @classmethod
   def Restore(cls, queue, state):
     """Restore a _QueuedJob from serialized state:
@@ -204,11 +212,13 @@ class _QueuedJob(object):
     obj = _QueuedJob.__new__(cls)
     obj.queue = queue
     obj.id = state["id"]
-    obj.run_op_index = state["run_op_index"]
     obj.received_timestamp = state.get("received_timestamp", None)
     obj.start_timestamp = state.get("start_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
 
+    # In-memory attributes
+    obj.lock_status = None
+
     obj.ops = []
     obj.log_serial = 0
     for op_state in state["ops"]:
@@ -232,7 +242,6 @@ class _QueuedJob(object):
     return {
       "id": self.id,
       "ops": [op.Serialize() for op in self.ops],
-      "run_op_index": self.run_op_index,
       "start_timestamp": self.start_timestamp,
       "end_timestamp": self.end_timestamp,
       "received_timestamp": self.received_timestamp,
@@ -334,7 +343,7 @@ class _QueuedJob(object):
       not_marked = False
 
 
-class _OpCodeExecCallbacks(mcpu.OpExecCbBase):
+class _OpExecCallbacks(mcpu.OpExecCbBase):
   def __init__(self, queue, job, op):
     """Initializes this class.
 
@@ -368,6 +377,9 @@ class _OpCodeExecCallbacks(mcpu.OpExecCbBase):
       assert self._op.status in (constants.OP_STATUS_WAITLOCK,
                                  constants.OP_STATUS_CANCELING)
 
+      # All locks are acquired by now
+      self._job.lock_status = None
+
       # Cancel here if we were asked to
       if self._op.status == constants.OP_STATUS_CANCELING:
         raise CancelJob()
@@ -401,12 +413,21 @@ class _OpCodeExecCallbacks(mcpu.OpExecCbBase):
     finally:
       self._queue.release()
 
+  def ReportLocks(self, msg):
+    """Write locking information to the job.
+
+    Called whenever the LU processor is waiting for a lock or has acquired one.
+
+    """
+    # Not getting the queue lock because this is a single assignment
+    self._job.lock_status = msg
+
 
 class _JobQueueWorker(workerpool.BaseWorker):
   """The actual job workers.
 
   """
-  def RunTask(self, job):
+  def RunTask(self, job): # pylint: disable-msg=W0221
     """Job executor.
 
     This functions processes a job. It is closely tied to the _QueuedJob and
@@ -416,9 +437,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
     @param job: the job to be processed
 
     """
-    logging.info("Worker %s processing job %s",
-                  self.worker_id, job.id)
-    proc = mcpu.Processor(self.pool.queue.context)
+    logging.info("Processing job %s", job.id)
+    proc = mcpu.Processor(self.pool.queue.context, job.id)
     queue = job.queue
     try:
       try:
@@ -443,7 +463,6 @@ class _JobQueueWorker(workerpool.BaseWorker):
               if op.status == constants.OP_STATUS_CANCELED:
                 raise CancelJob()
               assert op.status == constants.OP_STATUS_QUEUED
-              job.run_op_index = idx
               op.status = constants.OP_STATUS_WAITLOCK
               op.result = None
               op.start_timestamp = TimeStampNow()
@@ -457,7 +476,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
 
             # Make sure not to hold queue lock while calling ExecOpCode
             result = proc.ExecOpCode(input_opcode,
-                                     _OpCodeExecCallbacks(queue, job, op))
+                                     _OpExecCallbacks(queue, job, op))
 
             queue.acquire()
             try:
@@ -505,7 +524,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
       queue.acquire()
       try:
         try:
-          job.run_op_index = -1
+          job.lock_status = None
           job.end_timestamp = TimeStampNow()
           queue.UpdateJobUnlocked(job)
         finally:
@@ -513,8 +532,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
           status = job.CalcStatus()
       finally:
         queue.release()
-      logging.info("Worker %s finished job %s, status = %s",
-                   self.worker_id, job_id, status)
+
+      logging.info("Finished job %s, status = %s", job_id, status)
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
@@ -522,38 +541,44 @@ class _JobQueueWorkerPool(workerpool.WorkerPool):
 
   """
   def __init__(self, queue):
-    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
+    super(_JobQueueWorkerPool, self).__init__("JobQueue",
+                                              JOBQUEUE_THREADS,
                                               _JobQueueWorker)
     self.queue = queue
 
 
-class JobQueue(object):
-  """Queue used to manage the jobs.
+def _RequireOpenQueue(fn):
+  """Decorator for "public" functions.
 
-  @cvar _RE_JOB_FILE: regex matching the valid job file names
+  This function should be used for all 'public' functions. That is,
+  functions usually called from other classes. Note that this should
+  be applied only to methods (not plain functions), since it expects
+  that the decorated function is called with a first argument that has
+  a '_queue_lock' argument.
 
-  """
-  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
+  @warning: Use this decorator only after utils.LockedMethod!
 
-  def _RequireOpenQueue(fn):
-    """Decorator for "public" functions.
+  Example::
+    @utils.LockedMethod
+    @_RequireOpenQueue
+    def Example(self):
+      pass
 
-    This function should be used for all 'public' functions. That is,
-    functions usually called from other classes.
+  """
+  def wrapper(self, *args, **kwargs):
+    # pylint: disable-msg=W0212
+    assert self._queue_lock is not None, "Queue should be open"
+    return fn(self, *args, **kwargs)
+  return wrapper
 
-    @warning: Use this decorator only after utils.LockedMethod!
 
-    Example::
-      @utils.LockedMethod
-      @_RequireOpenQueue
-      def Example(self):
-        pass
+class JobQueue(object):
+  """Queue used to manage the jobs.
 
-    """
-    def wrapper(self, *args, **kwargs):
-      assert self._queue_lock is not None, "Queue should be open"
-      return fn(self, *args, **kwargs)
-    return wrapper
+  @cvar _RE_JOB_FILE: regex matching the valid job file names
+
+  """
+  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
 
   def __init__(self, context):
     """Constructor for JobQueue.
@@ -660,7 +685,7 @@ class JobQueue(object):
 
     # Clean queue directory on added node
     result = rpc.RpcRunner.call_jobqueue_purge(node_name)
-    msg = result.RemoteFailMsg()
+    msg = result.fail_msg
     if msg:
       logging.warning("Cannot cleanup queue directory on node %s: %s",
                       node_name, msg)
@@ -684,7 +709,7 @@ class JobQueue(object):
       result = rpc.RpcRunner.call_jobqueue_update([node_name],
                                                   [node.primary_ip],
                                                   file_name, content)
-      msg = result[node_name].RemoteFailMsg()
+      msg = result[node_name].fail_msg
       if msg:
         logging.error("Failed to upload file %s to node %s: %s",
                       file_name, node_name, msg)
@@ -706,7 +731,8 @@ class JobQueue(object):
     except KeyError:
       pass
 
-  def _CheckRpcResult(self, result, nodes, failmsg):
+  @staticmethod
+  def _CheckRpcResult(result, nodes, failmsg):
     """Verifies the status of an RPC call.
 
     Since we aim to keep consistency should this node (the current
@@ -724,11 +750,11 @@ class JobQueue(object):
     success = []
 
     for node in nodes:
-      msg = result[node].RemoteFailMsg()
+      msg = result[node].fail_msg
       if msg:
         failed.append(node)
-        logging.error("RPC call %s failed on node %s: %s",
-                      result[node].call, node, msg)
+        logging.error("RPC call %s (%s) failed on node %s: %s",
+                      result[node].call, failmsg, node, msg)
       else:
         success.append(node)
 
@@ -787,7 +813,8 @@ class JobQueue(object):
     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
 
-  def _FormatJobID(self, job_id):
+  @staticmethod
+  def _FormatJobID(job_id):
     """Convert a job ID to string format.
 
     Currently this just does C{str(job_id)} after performing some
@@ -855,7 +882,7 @@ class JobQueue(object):
     @return: the path to the job file
 
     """
-    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
+    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
 
   @classmethod
   def _GetArchivedJobPath(cls, job_id):
@@ -868,7 +895,7 @@ class JobQueue(object):
 
     """
     path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
-    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
+    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR, path)
 
   @classmethod
   def _ExtractJobID(cls, name):
@@ -902,6 +929,7 @@ class JobQueue(object):
     @return: the list of job IDs
 
     """
+    # pylint: disable-msg=W0613
     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
     jlist = utils.NiceSort(jlist)
     return jlist
@@ -946,7 +974,7 @@ class JobQueue(object):
 
     try:
       job = _QueuedJob.Restore(self, data)
-    except Exception, err:
+    except Exception, err: # pylint: disable-msg=W0703
       new_path = self._GetArchivedJobPath(job_id)
       if filepath == new_path:
         # job already archived (future case)
@@ -1014,7 +1042,7 @@ class JobQueue(object):
     queue, in order for it to be picked up by the queue processors.
 
     @type job_id: job ID
-    @param jod_id: the job ID for the new job
+    @param job_id: the job ID for the new job
     @type ops: list
     @param ops: The list of OpCodes that will become the new job.
     @rtype: job ID
@@ -1081,7 +1109,6 @@ class JobQueue(object):
 
     return results
 
-
   @_RequireOpenQueue
   def UpdateJobUnlocked(self, job):
     """Update a job's on disk storage.
@@ -1128,21 +1155,13 @@ class JobQueue(object):
         as such by the clients
 
     """
-    logging.debug("Waiting for changes in job %s", job_id)
-
-    job_info = None
-    log_entries = None
-
-    end_time = time.time() + timeout
-    while True:
-      delta_time = end_time - time.time()
-      if delta_time < 0:
-        return constants.JOB_NOTCHANGED
+    job = self._LoadJobUnlocked(job_id)
+    if not job:
+      logging.debug("Job %s not found", job_id)
+      return None
 
-      job = self._LoadJobUnlocked(job_id)
-      if not job:
-        logging.debug("Job %s not found", job_id)
-        break
+    def _CheckForChanges():
+      logging.debug("Waiting for changes in job %s", job_id)
 
       status = job.CalcStatus()
       job_info = self._GetJobInfoUnlocked(job, fields)
@@ -1156,28 +1175,24 @@ class JobQueue(object):
       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
 
-      if status not in (constants.JOB_STATUS_QUEUED,
-                        constants.JOB_STATUS_RUNNING,
-                        constants.JOB_STATUS_WAITLOCK):
-        # Don't even try to wait if the job is no longer running, there will be
-        # no changes.
-        break
-
-      if (prev_job_info != job_info or
+      # Don't even try to wait if the job is no longer running, there will be
+      # no changes.
+      if (status not in (constants.JOB_STATUS_QUEUED,
+                         constants.JOB_STATUS_RUNNING,
+                         constants.JOB_STATUS_WAITLOCK) or
+          prev_job_info != job_info or
           (log_entries and prev_log_serial != log_entries[0][0])):
-        break
-
-      logging.debug("Waiting again")
+        logging.debug("Job %s changed", job_id)
+        return (job_info, log_entries)
 
-      # Release the queue lock while waiting
-      job.change.wait(delta_time)
+      raise utils.RetryAgain()
 
-    logging.debug("Job %s changed", job_id)
-
-    if job_info is None and log_entries is None:
-      return None
-    else:
-      return (job_info, log_entries)
+    try:
+      # Setting wait function to release the queue lock while waiting
+      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
+                         wait_fn=job.change.wait)
+    except utils.RetryTimeout:
+      return constants.JOB_NOTCHANGED
 
   @utils.LockedMethod
   @_RequireOpenQueue
@@ -1256,7 +1271,7 @@ class JobQueue(object):
     self._RenameFilesUnlocked(rename_files)
 
     logging.debug("Successfully archived job(s) %s",
-                  ", ".join(job.id for job in archive_jobs))
+                  utils.CommaJoin(job.id for job in archive_jobs))
 
     return len(archive_jobs)
 
@@ -1306,7 +1321,7 @@ class JobQueue(object):
     all_job_ids = self._GetJobIDsUnlocked(archived=False)
     pending = []
     for idx, job_id in enumerate(all_job_ids):
-      last_touched = idx
+      last_touched = idx + 1
 
       # Not optimal because jobs could be pending
       # TODO: Measure average duration for job archival and take number of
@@ -1336,9 +1351,10 @@ class JobQueue(object):
     if pending:
       archived_count += self._ArchiveJobsUnlocked(pending)
 
-    return (archived_count, len(all_job_ids) - last_touched - 1)
+    return (archived_count, len(all_job_ids) - last_touched)
 
-  def _GetJobInfoUnlocked(self, job, fields):
+  @staticmethod
+  def _GetJobInfoUnlocked(job, fields):
     """Returns information about a job.
 
     @type job: L{_QueuedJob}
@@ -1375,6 +1391,8 @@ class JobQueue(object):
         row.append(job.start_timestamp)
       elif fname == "end_ts":
         row.append(job.end_timestamp)
+      elif fname == "lock_status":
+        row.append(job.lock_status)
       elif fname == "summary":
         row.append([op.input.Summary() for op in job.ops])
       else: