Code and docstring style fixes
[ganeti-local] / lib / jqueue.py
index 743870a..91a721a 100644 (file)
@@ -80,6 +80,10 @@ class _QueuedOpCode(object):
   @ivar stop_timestamp: timestamp for the end of the execution
 
   """
   @ivar stop_timestamp: timestamp for the end of the execution
 
   """
+  __slots__ = ["input", "status", "result", "log",
+               "start_timestamp", "end_timestamp",
+               "__weakref__"]
+
   def __init__(self, op):
     """Constructor for the _QuededOpCode.
 
   def __init__(self, op):
     """Constructor for the _QuededOpCode.
 
@@ -141,17 +145,20 @@ class _QueuedJob(object):
   @ivar id: the job ID
   @type ops: list
   @ivar ops: the list of _QueuedOpCode that constitute the job
   @ivar id: the job ID
   @type ops: list
   @ivar ops: the list of _QueuedOpCode that constitute the job
-  @type run_op_index: int
-  @ivar run_op_index: the currently executing opcode, or -1 if
-      we didn't yet start executing
   @type log_serial: int
   @ivar log_serial: holds the index for the next log entry
   @ivar received_timestamp: the timestamp for when the job was received
   @ivar start_timestmap: the timestamp for start of execution
   @ivar end_timestamp: the timestamp for end of execution
   @type log_serial: int
   @ivar log_serial: holds the index for the next log entry
   @ivar received_timestamp: the timestamp for when the job was received
   @ivar start_timestmap: the timestamp for start of execution
   @ivar end_timestamp: the timestamp for end of execution
+  @ivar lock_status: In-memory locking information for debugging
   @ivar change: a Condition variable we use for waiting for job changes
 
   """
   @ivar change: a Condition variable we use for waiting for job changes
 
   """
+  __slots__ = ["queue", "id", "ops", "log_serial",
+               "received_timestamp", "start_timestamp", "end_timestamp",
+               "lock_status", "change",
+               "__weakref__"]
+
   def __init__(self, queue, job_id, ops):
     """Constructor for the _QueuedJob.
 
   def __init__(self, queue, job_id, ops):
     """Constructor for the _QueuedJob.
 
@@ -171,12 +178,14 @@ class _QueuedJob(object):
     self.queue = queue
     self.id = job_id
     self.ops = [_QueuedOpCode(op) for op in ops]
     self.queue = queue
     self.id = job_id
     self.ops = [_QueuedOpCode(op) for op in ops]
-    self.run_op_index = -1
     self.log_serial = 0
     self.received_timestamp = TimeStampNow()
     self.start_timestamp = None
     self.end_timestamp = None
 
     self.log_serial = 0
     self.received_timestamp = TimeStampNow()
     self.start_timestamp = None
     self.end_timestamp = None
 
+    # In-memory attributes
+    self.lock_status = None
+
     # Condition to wait for changes
     self.change = threading.Condition(self.queue._lock)
 
     # Condition to wait for changes
     self.change = threading.Condition(self.queue._lock)
 
@@ -195,11 +204,13 @@ class _QueuedJob(object):
     obj = _QueuedJob.__new__(cls)
     obj.queue = queue
     obj.id = state["id"]
     obj = _QueuedJob.__new__(cls)
     obj.queue = queue
     obj.id = state["id"]
-    obj.run_op_index = state["run_op_index"]
     obj.received_timestamp = state.get("received_timestamp", None)
     obj.start_timestamp = state.get("start_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
 
     obj.received_timestamp = state.get("received_timestamp", None)
     obj.start_timestamp = state.get("start_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
 
+    # In-memory attributes
+    obj.lock_status = None
+
     obj.ops = []
     obj.log_serial = 0
     for op_state in state["ops"]:
     obj.ops = []
     obj.log_serial = 0
     for op_state in state["ops"]:
@@ -223,7 +234,6 @@ class _QueuedJob(object):
     return {
       "id": self.id,
       "ops": [op.Serialize() for op in self.ops],
     return {
       "id": self.id,
       "ops": [op.Serialize() for op in self.ops],
-      "run_op_index": self.run_op_index,
       "start_timestamp": self.start_timestamp,
       "end_timestamp": self.end_timestamp,
       "received_timestamp": self.received_timestamp,
       "start_timestamp": self.start_timestamp,
       "end_timestamp": self.end_timestamp,
       "received_timestamp": self.received_timestamp,
@@ -304,36 +314,111 @@ class _QueuedJob(object):
 
     return entries
 
 
     return entries
 
+  def MarkUnfinishedOps(self, status, result):
+    """Mark unfinished opcodes with a given status and result.
 
 
-class _JobQueueWorker(workerpool.BaseWorker):
-  """The actual job workers.
+    This is an utility function for marking all running or waiting to
+    be run opcodes with a given status. Opcodes which are already
+    finalised are not changed.
 
 
-  """
-  def _NotifyStart(self):
-    """Mark the opcode as running, not lock-waiting.
+    @param status: a given opcode status
+    @param result: the opcode result
+
+    """
+    not_marked = True
+    for op in self.ops:
+      if op.status in constants.OPS_FINALIZED:
+        assert not_marked, "Finalized opcodes found after non-finalized ones"
+        continue
+      op.status = status
+      op.result = result
+      not_marked = False
+
+
+class _OpExecCallbacks(mcpu.OpExecCbBase):
+  def __init__(self, queue, job, op):
+    """Initializes this class.
 
 
-    This is called from the mcpu code as a notifier function, when the
-    LU is finally about to start the Exec() method. Of course, to have
-    end-user visible results, the opcode must be initially (before
-    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
+    @type queue: L{JobQueue}
+    @param queue: Job queue
+    @type job: L{_QueuedJob}
+    @param job: Job object
+    @type op: L{_QueuedOpCode}
+    @param op: OpCode
 
     """
 
     """
-    assert self.queue, "Queue attribute is missing"
-    assert self.opcode, "Opcode attribute is missing"
+    assert queue, "Queue is missing"
+    assert job, "Job is missing"
+    assert op, "Opcode is missing"
+
+    self._queue = queue
+    self._job = job
+    self._op = op
 
 
-    self.queue.acquire()
+  def NotifyStart(self):
+    """Mark the opcode as running, not lock-waiting.
+
+    This is called from the mcpu code as a notifier function, when the LU is
+    finally about to start the Exec() method. Of course, to have end-user
+    visible results, the opcode must be initially (before calling into
+    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
+
+    """
+    self._queue.acquire()
     try:
     try:
-      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
-                                    constants.OP_STATUS_CANCELING)
+      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
+                                 constants.OP_STATUS_CANCELING)
+
+      # All locks are acquired by now
+      self._job.lock_status = None
 
       # Cancel here if we were asked to
 
       # Cancel here if we were asked to
-      if self.opcode.status == constants.OP_STATUS_CANCELING:
+      if self._op.status == constants.OP_STATUS_CANCELING:
         raise CancelJob()
 
         raise CancelJob()
 
-      self.opcode.status = constants.OP_STATUS_RUNNING
+      self._op.status = constants.OP_STATUS_RUNNING
     finally:
     finally:
-      self.queue.release()
+      self._queue.release()
 
 
+  def Feedback(self, *args):
+    """Append a log entry.
+
+    """
+    assert len(args) < 3
+
+    if len(args) == 1:
+      log_type = constants.ELOG_MESSAGE
+      log_msg = args[0]
+    else:
+      (log_type, log_msg) = args
+
+    # The time is split to make serialization easier and not lose
+    # precision.
+    timestamp = utils.SplitTime(time.time())
+
+    self._queue.acquire()
+    try:
+      self._job.log_serial += 1
+      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
+
+      self._job.change.notifyAll()
+    finally:
+      self._queue.release()
+
+  def ReportLocks(self, msg):
+    """Write locking information to the job.
+
+    Called whenever the LU processor is waiting for a lock or has acquired one.
+
+    """
+    # Not getting the queue lock because this is a single assignment
+    self._job.lock_status = msg
+
+
+class _JobQueueWorker(workerpool.BaseWorker):
+  """The actual job workers.
+
+  """
   def RunTask(self, job):
     """Job executor.
 
   def RunTask(self, job):
     """Job executor.
 
@@ -347,12 +432,21 @@ class _JobQueueWorker(workerpool.BaseWorker):
     logging.info("Worker %s processing job %s",
                   self.worker_id, job.id)
     proc = mcpu.Processor(self.pool.queue.context)
     logging.info("Worker %s processing job %s",
                   self.worker_id, job.id)
     proc = mcpu.Processor(self.pool.queue.context)
-    self.queue = queue = job.queue
+    queue = job.queue
     try:
       try:
         count = len(job.ops)
         for idx, op in enumerate(job.ops):
           op_summary = op.input.Summary()
     try:
       try:
         count = len(job.ops)
         for idx, op in enumerate(job.ops):
           op_summary = op.input.Summary()
+          if op.status == constants.OP_STATUS_SUCCESS:
+            # this is a job that was partially completed before master
+            # daemon shutdown, so it can be expected that some opcodes
+            # are already completed successfully (if any did error
+            # out, then the whole job should have been aborted and not
+            # resubmitted for processing)
+            logging.info("Op %s/%s: opcode %s already processed, skipping",
+                         idx + 1, count, op_summary)
+            continue
           try:
             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
                          op_summary)
           try:
             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
                          op_summary)
@@ -362,7 +456,6 @@ class _JobQueueWorker(workerpool.BaseWorker):
               if op.status == constants.OP_STATUS_CANCELED:
                 raise CancelJob()
               assert op.status == constants.OP_STATUS_QUEUED
               if op.status == constants.OP_STATUS_CANCELED:
                 raise CancelJob()
               assert op.status == constants.OP_STATUS_QUEUED
-              job.run_op_index = idx
               op.status = constants.OP_STATUS_WAITLOCK
               op.result = None
               op.start_timestamp = TimeStampNow()
               op.status = constants.OP_STATUS_WAITLOCK
               op.result = None
               op.start_timestamp = TimeStampNow()
@@ -374,34 +467,9 @@ class _JobQueueWorker(workerpool.BaseWorker):
             finally:
               queue.release()
 
             finally:
               queue.release()
 
-            def _Log(*args):
-              """Append a log entry.
-
-              """
-              assert len(args) < 3
-
-              if len(args) == 1:
-                log_type = constants.ELOG_MESSAGE
-                log_msg = args[0]
-              else:
-                log_type, log_msg = args
-
-              # The time is split to make serialization easier and not lose
-              # precision.
-              timestamp = utils.SplitTime(time.time())
-
-              queue.acquire()
-              try:
-                job.log_serial += 1
-                op.log.append((job.log_serial, timestamp, log_type, log_msg))
-
-                job.change.notifyAll()
-              finally:
-                queue.release()
-
-            # Make sure not to hold lock while _Log is called
-            self.opcode = op
-            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
+            # Make sure not to hold queue lock while calling ExecOpCode
+            result = proc.ExecOpCode(input_opcode,
+                                     _OpExecCallbacks(queue, job, op))
 
             queue.acquire()
             try:
 
             queue.acquire()
             try:
@@ -422,7 +490,10 @@ class _JobQueueWorker(workerpool.BaseWorker):
             try:
               try:
                 op.status = constants.OP_STATUS_ERROR
             try:
               try:
                 op.status = constants.OP_STATUS_ERROR
-                op.result = str(err)
+                if isinstance(err, errors.GenericError):
+                  op.result = errors.EncodeException(err)
+                else:
+                  op.result = str(err)
                 op.end_timestamp = TimeStampNow()
                 logging.info("Op %s/%s: Error in opcode %s: %s",
                              idx + 1, count, op_summary, err)
                 op.end_timestamp = TimeStampNow()
                 logging.info("Op %s/%s: Error in opcode %s: %s",
                              idx + 1, count, op_summary, err)
@@ -446,7 +517,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
       queue.acquire()
       try:
         try:
       queue.acquire()
       try:
         try:
-          job.run_op_idx = -1
+          job.lock_status = None
           job.end_timestamp = TimeStampNow()
           queue.UpdateJobUnlocked(job)
         finally:
           job.end_timestamp = TimeStampNow()
           queue.UpdateJobUnlocked(job)
         finally:
@@ -454,6 +525,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
           status = job.CalcStatus()
       finally:
         queue.release()
           status = job.CalcStatus()
       finally:
         queue.release()
+
       logging.info("Worker %s finished job %s, status = %s",
                    self.worker_id, job_id, status)
 
       logging.info("Worker %s finished job %s, status = %s",
                    self.worker_id, job_id, status)
 
@@ -575,9 +647,8 @@ class JobQueue(object):
                           constants.JOB_STATUS_CANCELING):
             logging.warning("Unfinished job %s found: %s", job.id, job)
             try:
                           constants.JOB_STATUS_CANCELING):
             logging.warning("Unfinished job %s found: %s", job.id, job)
             try:
-              for op in job.ops:
-                op.status = constants.OP_STATUS_ERROR
-                op.result = "Unclean master daemon shutdown"
+              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                                    "Unclean master daemon shutdown")
             finally:
               self.UpdateJobUnlocked(job)
 
             finally:
               self.UpdateJobUnlocked(job)
 
@@ -601,7 +672,11 @@ class JobQueue(object):
     assert node_name != self._my_hostname
 
     # Clean queue directory on added node
     assert node_name != self._my_hostname
 
     # Clean queue directory on added node
-    rpc.RpcRunner.call_jobqueue_purge(node_name)
+    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
+    msg = result.fail_msg
+    if msg:
+      logging.warning("Cannot cleanup queue directory on node %s: %s",
+                      node_name, msg)
 
     if not node.master_candidate:
       # remove if existing, ignoring errors
 
     if not node.master_candidate:
       # remove if existing, ignoring errors
@@ -617,17 +692,15 @@ class JobQueue(object):
 
     for file_name in files:
       # Read file content
 
     for file_name in files:
       # Read file content
-      fd = open(file_name, "r")
-      try:
-        content = fd.read()
-      finally:
-        fd.close()
+      content = utils.ReadFile(file_name)
 
       result = rpc.RpcRunner.call_jobqueue_update([node_name],
                                                   [node.primary_ip],
                                                   file_name, content)
 
       result = rpc.RpcRunner.call_jobqueue_update([node_name],
                                                   [node.primary_ip],
                                                   file_name, content)
-      if not result[node_name]:
-        logging.error("Failed to upload %s to %s", file_name, node_name)
+      msg = result[node_name].fail_msg
+      if msg:
+        logging.error("Failed to upload file %s to node %s: %s",
+                      file_name, node_name, msg)
 
     self._nodes[node_name] = node.primary_ip
 
 
     self._nodes[node_name] = node.primary_ip
 
@@ -664,13 +737,13 @@ class JobQueue(object):
     success = []
 
     for node in nodes:
     success = []
 
     for node in nodes:
-      if result[node]:
-        success.append(node)
-      else:
+      msg = result[node].fail_msg
+      if msg:
         failed.append(node)
         failed.append(node)
-
-    if failed:
-      logging.error("%s failed on %s", failmsg, ", ".join(failed))
+        logging.error("RPC call %s failed on node %s: %s",
+                      result[node].call, node, msg)
+      else:
+        success.append(node)
 
     # +1 for the master node
     if (len(success) + 1) < len(failed):
 
     # +1 for the master node
     if (len(success) + 1) < len(failed):
@@ -759,26 +832,31 @@ class JobQueue(object):
     """
     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
 
     """
     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
 
-  def _NewSerialUnlocked(self):
+  def _NewSerialsUnlocked(self, count):
     """Generates a new job identifier.
 
     Job identifiers are unique during the lifetime of a cluster.
 
     """Generates a new job identifier.
 
     Job identifiers are unique during the lifetime of a cluster.
 
+    @type count: integer
+    @param count: how many serials to return
     @rtype: str
     @return: a string representing the job identifier.
 
     """
     @rtype: str
     @return: a string representing the job identifier.
 
     """
+    assert count > 0
     # New number
     # New number
-    serial = self._last_serial + 1
+    serial = self._last_serial + count
 
     # Write to file
     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                         "%s\n" % serial)
 
 
     # Write to file
     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                         "%s\n" % serial)
 
+    result = [self._FormatJobID(v)
+              for v in range(self._last_serial, serial + 1)]
     # Keep it only if we were able to write the file
     self._last_serial = serial
 
     # Keep it only if we were able to write the file
     self._last_serial = serial
 
-    return self._FormatJobID(serial)
+    return result
 
   @staticmethod
   def _GetJobPath(job_id):
 
   @staticmethod
   def _GetJobPath(job_id):
@@ -871,15 +949,13 @@ class JobQueue(object):
     filepath = self._GetJobPath(job_id)
     logging.debug("Loading job from %s", filepath)
     try:
     filepath = self._GetJobPath(job_id)
     logging.debug("Loading job from %s", filepath)
     try:
-      fd = open(filepath, "r")
+      raw_data = utils.ReadFile(filepath)
     except IOError, err:
       if err.errno in (errno.ENOENT, ):
         return None
       raise
     except IOError, err:
       if err.errno in (errno.ENOENT, ):
         return None
       raise
-    try:
-      data = serializer.LoadJson(fd.read())
-    finally:
-      fd.close()
+
+    data = serializer.LoadJson(raw_data)
 
     try:
       job = _QueuedJob.Restore(self, data)
 
     try:
       job = _QueuedJob.Restore(self, data)
@@ -943,14 +1019,15 @@ class JobQueue(object):
       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
     return True
 
       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
     return True
 
-  @utils.LockedMethod
   @_RequireOpenQueue
   @_RequireOpenQueue
-  def SubmitJob(self, ops):
+  def _SubmitJobUnlocked(self, job_id, ops):
     """Create and store a new job.
 
     This enters the job into our job queue and also puts it on the new
     queue, in order for it to be picked up by the queue processors.
 
     """Create and store a new job.
 
     This enters the job into our job queue and also puts it on the new
     queue, in order for it to be picked up by the queue processors.
 
+    @type job_id: job ID
+    @param job_id: the job ID for the new job
     @type ops: list
     @param ops: The list of OpCodes that will become the new job.
     @rtype: job ID
     @type ops: list
     @param ops: The list of OpCodes that will become the new job.
     @rtype: job ID
@@ -959,7 +1036,7 @@ class JobQueue(object):
 
     """
     if self._IsQueueMarkedDrain():
 
     """
     if self._IsQueueMarkedDrain():
-      raise errors.JobQueueDrainError()
+      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
 
     # Check job queue size
     size = len(self._ListJobFiles())
 
     # Check job queue size
     size = len(self._ListJobFiles())
@@ -972,8 +1049,6 @@ class JobQueue(object):
     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
       raise errors.JobQueueFull()
 
     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
       raise errors.JobQueueFull()
 
-    # Get job identifier
-    job_id = self._NewSerialUnlocked()
     job = _QueuedJob(self, job_id, ops)
 
     # Write to disk
     job = _QueuedJob(self, job_id, ops)
 
     # Write to disk
@@ -987,6 +1062,38 @@ class JobQueue(object):
 
     return job.id
 
 
     return job.id
 
+  @utils.LockedMethod
+  @_RequireOpenQueue
+  def SubmitJob(self, ops):
+    """Create and store a new job.
+
+    @see: L{_SubmitJobUnlocked}
+
+    """
+    job_id = self._NewSerialsUnlocked(1)[0]
+    return self._SubmitJobUnlocked(job_id, ops)
+
+  @utils.LockedMethod
+  @_RequireOpenQueue
+  def SubmitManyJobs(self, jobs):
+    """Create and store multiple jobs.
+
+    @see: L{_SubmitJobUnlocked}
+
+    """
+    results = []
+    all_job_ids = self._NewSerialsUnlocked(len(jobs))
+    for job_id, ops in zip(all_job_ids, jobs):
+      try:
+        data = self._SubmitJobUnlocked(job_id, ops)
+        status = True
+      except errors.GenericError, err:
+        data = str(err)
+        status = False
+      results.append((status, data))
+
+    return results
+
   @_RequireOpenQueue
   def UpdateJobUnlocked(self, job):
     """Update a job's on disk storage.
   @_RequireOpenQueue
   def UpdateJobUnlocked(self, job):
     """Update a job's on disk storage.
@@ -1034,6 +1141,10 @@ class JobQueue(object):
 
     """
     logging.debug("Waiting for changes in job %s", job_id)
 
     """
     logging.debug("Waiting for changes in job %s", job_id)
+
+    job_info = None
+    log_entries = None
+
     end_time = time.time() + timeout
     while True:
       delta_time = end_time - time.time()
     end_time = time.time() + timeout
     while True:
       delta_time = end_time - time.time()
@@ -1075,7 +1186,10 @@ class JobQueue(object):
 
     logging.debug("Job %s changed", job_id)
 
 
     logging.debug("Job %s changed", job_id)
 
-    return (job_info, log_entries)
+    if job_info is None and log_entries is None:
+      return None
+    else:
+      return (job_info, log_entries)
 
   @utils.LockedMethod
   @_RequireOpenQueue
 
   @utils.LockedMethod
   @_RequireOpenQueue
@@ -1099,8 +1213,8 @@ class JobQueue(object):
 
     if job_status not in (constants.JOB_STATUS_QUEUED,
                           constants.JOB_STATUS_WAITLOCK):
 
     if job_status not in (constants.JOB_STATUS_QUEUED,
                           constants.JOB_STATUS_WAITLOCK):
-      logging.debug("Job %s is no longer in the queue", job.id)
-      return (False, "Job %s is no longer in the queue" % job.id)
+      logging.debug("Job %s is no longer waiting in the queue", job.id)
+      return (False, "Job %s is no longer waiting in the queue" % job.id)
 
     if job_status == constants.JOB_STATUS_QUEUED:
       self.CancelJobUnlocked(job)
 
     if job_status == constants.JOB_STATUS_QUEUED:
       self.CancelJobUnlocked(job)
@@ -1109,8 +1223,7 @@ class JobQueue(object):
     elif job_status == constants.JOB_STATUS_WAITLOCK:
       # The worker will notice the new status and cancel the job
       try:
     elif job_status == constants.JOB_STATUS_WAITLOCK:
       # The worker will notice the new status and cancel the job
       try:
-        for op in job.ops:
-          op.status = constants.OP_STATUS_CANCELING
+        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
       finally:
         self.UpdateJobUnlocked(job)
       return (True, "Job %s will be canceled" % job.id)
       finally:
         self.UpdateJobUnlocked(job)
       return (True, "Job %s will be canceled" % job.id)
@@ -1121,9 +1234,8 @@ class JobQueue(object):
 
     """
     try:
 
     """
     try:
-      for op in job.ops:
-        op.status = constants.OP_STATUS_CANCELED
-        op.result = "Job canceled by request"
+      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+                            "Job canceled by request")
     finally:
       self.UpdateJobUnlocked(job)
 
     finally:
       self.UpdateJobUnlocked(job)
 
@@ -1275,6 +1387,8 @@ class JobQueue(object):
         row.append(job.start_timestamp)
       elif fname == "end_ts":
         row.append(job.end_timestamp)
         row.append(job.start_timestamp)
       elif fname == "end_ts":
         row.append(job.end_timestamp)
+      elif fname == "lock_status":
+        row.append(job.lock_status)
       elif fname == "summary":
         row.append([op.input.Summary() for op in job.ops])
       else:
       elif fname == "summary":
         row.append([op.input.Summary() for op in job.ops])
       else: