Simplify handling of regular fields in LUQuery*
[ganeti-local] / lib / jqueue.py
index 71a6ad7..5dd5578 100644 (file)
@@ -145,20 +145,18 @@ class _QueuedJob(object):
   @ivar id: the job ID
   @type ops: list
   @ivar ops: the list of _QueuedOpCode that constitute the job
-  @type run_op_index: int
-  @ivar run_op_index: the currently executing opcode, or -1 if
-      we didn't yet start executing
   @type log_serial: int
   @ivar log_serial: holds the index for the next log entry
   @ivar received_timestamp: the timestamp for when the job was received
   @ivar start_timestmap: the timestamp for start of execution
   @ivar end_timestamp: the timestamp for end of execution
+  @ivar lock_status: In-memory locking information for debugging
   @ivar change: a Condition variable we use for waiting for job changes
 
   """
-  __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
+  __slots__ = ["queue", "id", "ops", "log_serial",
                "received_timestamp", "start_timestamp", "end_timestamp",
-               "change",
+               "lock_status", "change",
                "__weakref__"]
 
   def __init__(self, queue, job_id, ops):
@@ -180,12 +178,14 @@ class _QueuedJob(object):
     self.queue = queue
     self.id = job_id
     self.ops = [_QueuedOpCode(op) for op in ops]
-    self.run_op_index = -1
     self.log_serial = 0
     self.received_timestamp = TimeStampNow()
     self.start_timestamp = None
     self.end_timestamp = None
 
+    # In-memory attributes
+    self.lock_status = None
+
     # Condition to wait for changes
     self.change = threading.Condition(self.queue._lock)
 
@@ -204,11 +204,13 @@ class _QueuedJob(object):
     obj = _QueuedJob.__new__(cls)
     obj.queue = queue
     obj.id = state["id"]
-    obj.run_op_index = state["run_op_index"]
     obj.received_timestamp = state.get("received_timestamp", None)
     obj.start_timestamp = state.get("start_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
 
+    # In-memory attributes
+    obj.lock_status = None
+
     obj.ops = []
     obj.log_serial = 0
     for op_state in state["ops"]:
@@ -232,7 +234,6 @@ class _QueuedJob(object):
     return {
       "id": self.id,
       "ops": [op.Serialize() for op in self.ops],
-      "run_op_index": self.run_op_index,
       "start_timestamp": self.start_timestamp,
       "end_timestamp": self.end_timestamp,
       "received_timestamp": self.received_timestamp,
@@ -334,7 +335,7 @@ class _QueuedJob(object):
       not_marked = False
 
 
-class _OpCodeExecCallbacks(mcpu.OpExecCbBase):
+class _OpExecCallbacks(mcpu.OpExecCbBase):
   def __init__(self, queue, job, op):
     """Initializes this class.
 
@@ -368,6 +369,9 @@ class _OpCodeExecCallbacks(mcpu.OpExecCbBase):
       assert self._op.status in (constants.OP_STATUS_WAITLOCK,
                                  constants.OP_STATUS_CANCELING)
 
+      # All locks are acquired by now
+      self._job.lock_status = None
+
       # Cancel here if we were asked to
       if self._op.status == constants.OP_STATUS_CANCELING:
         raise CancelJob()
@@ -401,6 +405,15 @@ class _OpCodeExecCallbacks(mcpu.OpExecCbBase):
     finally:
       self._queue.release()
 
+  def ReportLocks(self, msg):
+    """Write locking information to the job.
+
+    Called whenever the LU processor is waiting for a lock or has acquired one.
+
+    """
+    # Not getting the queue lock because this is a single assignment
+    self._job.lock_status = msg
+
 
 class _JobQueueWorker(workerpool.BaseWorker):
   """The actual job workers.
@@ -443,7 +456,6 @@ class _JobQueueWorker(workerpool.BaseWorker):
               if op.status == constants.OP_STATUS_CANCELED:
                 raise CancelJob()
               assert op.status == constants.OP_STATUS_QUEUED
-              job.run_op_index = idx
               op.status = constants.OP_STATUS_WAITLOCK
               op.result = None
               op.start_timestamp = TimeStampNow()
@@ -457,7 +469,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
 
             # Make sure not to hold queue lock while calling ExecOpCode
             result = proc.ExecOpCode(input_opcode,
-                                     _OpCodeExecCallbacks(queue, job, op))
+                                     _OpExecCallbacks(queue, job, op))
 
             queue.acquire()
             try:
@@ -505,7 +517,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
       queue.acquire()
       try:
         try:
-          job.run_op_index = -1
+          job.lock_status = None
           job.end_timestamp = TimeStampNow()
           queue.UpdateJobUnlocked(job)
         finally:
@@ -513,6 +525,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
           status = job.CalcStatus()
       finally:
         queue.release()
+
       logging.info("Worker %s finished job %s, status = %s",
                    self.worker_id, job_id, status)
 
@@ -660,7 +673,7 @@ class JobQueue(object):
 
     # Clean queue directory on added node
     result = rpc.RpcRunner.call_jobqueue_purge(node_name)
-    msg = result.RemoteFailMsg()
+    msg = result.fail_msg
     if msg:
       logging.warning("Cannot cleanup queue directory on node %s: %s",
                       node_name, msg)
@@ -684,7 +697,7 @@ class JobQueue(object):
       result = rpc.RpcRunner.call_jobqueue_update([node_name],
                                                   [node.primary_ip],
                                                   file_name, content)
-      msg = result[node_name].RemoteFailMsg()
+      msg = result[node_name].fail_msg
       if msg:
         logging.error("Failed to upload file %s to node %s: %s",
                       file_name, node_name, msg)
@@ -724,7 +737,7 @@ class JobQueue(object):
     success = []
 
     for node in nodes:
-      msg = result[node].RemoteFailMsg()
+      msg = result[node].fail_msg
       if msg:
         failed.append(node)
         logging.error("RPC call %s failed on node %s: %s",
@@ -1081,7 +1094,6 @@ class JobQueue(object):
 
     return results
 
-
   @_RequireOpenQueue
   def UpdateJobUnlocked(self, job):
     """Update a job's on disk storage.
@@ -1375,6 +1387,8 @@ class JobQueue(object):
         row.append(job.start_timestamp)
       elif fname == "end_ts":
         row.append(job.end_timestamp)
+      elif fname == "lock_status":
+        row.append(job.lock_status)
       elif fname == "summary":
         row.append([op.input.Summary() for op in job.ops])
       else: