Merge branch 'devel-2.1' into stable-2.1
[ganeti-local] / lib / jqueue.py
index dcdce1e..2c2345b 100644 (file)
@@ -69,7 +69,7 @@ def TimeStampNow():
 
 
 class _QueuedOpCode(object):
-  """Encasulates an opcode object.
+  """Encapsulates an opcode object.
 
   @ivar log: holds the execution log and consists of tuples
   of the form C{(log_serial, timestamp, level, message)}
@@ -80,6 +80,10 @@ class _QueuedOpCode(object):
   @ivar stop_timestamp: timestamp for the end of the execution
 
   """
+  __slots__ = ["input", "status", "result", "log",
+               "start_timestamp", "end_timestamp",
+               "__weakref__"]
+
   def __init__(self, op):
     """Constructor for the _QuededOpCode.
 
@@ -141,17 +145,21 @@ class _QueuedJob(object):
   @ivar id: the job ID
   @type ops: list
   @ivar ops: the list of _QueuedOpCode that constitute the job
-  @type run_op_index: int
-  @ivar run_op_index: the currently executing opcode, or -1 if
-      we didn't yet start executing
   @type log_serial: int
   @ivar log_serial: holds the index for the next log entry
   @ivar received_timestamp: the timestamp for when the job was received
   @ivar start_timestmap: the timestamp for start of execution
   @ivar end_timestamp: the timestamp for end of execution
+  @ivar lock_status: In-memory locking information for debugging
   @ivar change: a Condition variable we use for waiting for job changes
 
   """
+  # pylint: disable-msg=W0212
+  __slots__ = ["queue", "id", "ops", "log_serial",
+               "received_timestamp", "start_timestamp", "end_timestamp",
+               "lock_status", "change",
+               "__weakref__"]
+
   def __init__(self, queue, job_id, ops):
     """Constructor for the _QueuedJob.
 
@@ -171,12 +179,14 @@ class _QueuedJob(object):
     self.queue = queue
     self.id = job_id
     self.ops = [_QueuedOpCode(op) for op in ops]
-    self.run_op_index = -1
     self.log_serial = 0
     self.received_timestamp = TimeStampNow()
     self.start_timestamp = None
     self.end_timestamp = None
 
+    # In-memory attributes
+    self.lock_status = None
+
     # Condition to wait for changes
     self.change = threading.Condition(self.queue._lock)
 
@@ -195,11 +205,13 @@ class _QueuedJob(object):
     obj = _QueuedJob.__new__(cls)
     obj.queue = queue
     obj.id = state["id"]
-    obj.run_op_index = state["run_op_index"]
     obj.received_timestamp = state.get("received_timestamp", None)
     obj.start_timestamp = state.get("start_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
 
+    # In-memory attributes
+    obj.lock_status = None
+
     obj.ops = []
     obj.log_serial = 0
     for op_state in state["ops"]:
@@ -223,7 +235,6 @@ class _QueuedJob(object):
     return {
       "id": self.id,
       "ops": [op.Serialize() for op in self.ops],
-      "run_op_index": self.run_op_index,
       "start_timestamp": self.start_timestamp,
       "end_timestamp": self.end_timestamp,
       "received_timestamp": self.received_timestamp,
@@ -286,7 +297,7 @@ class _QueuedJob(object):
     """Selectively returns the log entries.
 
     @type newer_than: None or int
-    @param newer_than: if this is None, return all log enties,
+    @param newer_than: if this is None, return all log entries,
         otherwise return only the log entries with serial higher
         than this value
     @rtype: list
@@ -304,37 +315,112 @@ class _QueuedJob(object):
 
     return entries
 
+  def MarkUnfinishedOps(self, status, result):
+    """Mark unfinished opcodes with a given status and result.
 
-class _JobQueueWorker(workerpool.BaseWorker):
-  """The actual job workers.
+    This is an utility function for marking all running or waiting to
+    be run opcodes with a given status. Opcodes which are already
+    finalised are not changed.
 
-  """
-  def _NotifyStart(self):
-    """Mark the opcode as running, not lock-waiting.
+    @param status: a given opcode status
+    @param result: the opcode result
+
+    """
+    not_marked = True
+    for op in self.ops:
+      if op.status in constants.OPS_FINALIZED:
+        assert not_marked, "Finalized opcodes found after non-finalized ones"
+        continue
+      op.status = status
+      op.result = result
+      not_marked = False
+
+
+class _OpExecCallbacks(mcpu.OpExecCbBase):
+  def __init__(self, queue, job, op):
+    """Initializes this class.
 
-    This is called from the mcpu code as a notifier function, when the
-    LU is finally about to start the Exec() method. Of course, to have
-    end-user visible results, the opcode must be initially (before
-    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
+    @type queue: L{JobQueue}
+    @param queue: Job queue
+    @type job: L{_QueuedJob}
+    @param job: Job object
+    @type op: L{_QueuedOpCode}
+    @param op: OpCode
 
     """
-    assert self.queue, "Queue attribute is missing"
-    assert self.opcode, "Opcode attribute is missing"
+    assert queue, "Queue is missing"
+    assert job, "Job is missing"
+    assert op, "Opcode is missing"
 
-    self.queue.acquire()
+    self._queue = queue
+    self._job = job
+    self._op = op
+
+  def NotifyStart(self):
+    """Mark the opcode as running, not lock-waiting.
+
+    This is called from the mcpu code as a notifier function, when the LU is
+    finally about to start the Exec() method. Of course, to have end-user
+    visible results, the opcode must be initially (before calling into
+    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
+
+    """
+    self._queue.acquire()
     try:
-      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
-                                    constants.OP_STATUS_CANCELING)
+      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
+                                 constants.OP_STATUS_CANCELING)
+
+      # All locks are acquired by now
+      self._job.lock_status = None
 
       # Cancel here if we were asked to
-      if self.opcode.status == constants.OP_STATUS_CANCELING:
+      if self._op.status == constants.OP_STATUS_CANCELING:
         raise CancelJob()
 
-      self.opcode.status = constants.OP_STATUS_RUNNING
+      self._op.status = constants.OP_STATUS_RUNNING
     finally:
-      self.queue.release()
+      self._queue.release()
+
+  def Feedback(self, *args):
+    """Append a log entry.
+
+    """
+    assert len(args) < 3
+
+    if len(args) == 1:
+      log_type = constants.ELOG_MESSAGE
+      log_msg = args[0]
+    else:
+      (log_type, log_msg) = args
+
+    # The time is split to make serialization easier and not lose
+    # precision.
+    timestamp = utils.SplitTime(time.time())
 
-  def RunTask(self, job):
+    self._queue.acquire()
+    try:
+      self._job.log_serial += 1
+      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
+
+      self._job.change.notifyAll()
+    finally:
+      self._queue.release()
+
+  def ReportLocks(self, msg):
+    """Write locking information to the job.
+
+    Called whenever the LU processor is waiting for a lock or has acquired one.
+
+    """
+    # Not getting the queue lock because this is a single assignment
+    self._job.lock_status = msg
+
+
+class _JobQueueWorker(workerpool.BaseWorker):
+  """The actual job workers.
+
+  """
+  def RunTask(self, job): # pylint: disable-msg=W0221
     """Job executor.
 
     This functions processes a job. It is closely tied to the _QueuedJob and
@@ -346,21 +432,31 @@ class _JobQueueWorker(workerpool.BaseWorker):
     """
     logging.info("Worker %s processing job %s",
                   self.worker_id, job.id)
-    proc = mcpu.Processor(self.pool.queue.context)
-    self.queue = queue = job.queue
+    proc = mcpu.Processor(self.pool.queue.context, job.id)
+    queue = job.queue
     try:
       try:
         count = len(job.ops)
         for idx, op in enumerate(job.ops):
           op_summary = op.input.Summary()
+          if op.status == constants.OP_STATUS_SUCCESS:
+            # this is a job that was partially completed before master
+            # daemon shutdown, so it can be expected that some opcodes
+            # are already completed successfully (if any did error
+            # out, then the whole job should have been aborted and not
+            # resubmitted for processing)
+            logging.info("Op %s/%s: opcode %s already processed, skipping",
+                         idx + 1, count, op_summary)
+            continue
           try:
             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
                          op_summary)
 
             queue.acquire()
             try:
+              if op.status == constants.OP_STATUS_CANCELED:
+                raise CancelJob()
               assert op.status == constants.OP_STATUS_QUEUED
-              job.run_op_index = idx
               op.status = constants.OP_STATUS_WAITLOCK
               op.result = None
               op.start_timestamp = TimeStampNow()
@@ -372,34 +468,9 @@ class _JobQueueWorker(workerpool.BaseWorker):
             finally:
               queue.release()
 
-            def _Log(*args):
-              """Append a log entry.
-
-              """
-              assert len(args) < 3
-
-              if len(args) == 1:
-                log_type = constants.ELOG_MESSAGE
-                log_msg = args[0]
-              else:
-                log_type, log_msg = args
-
-              # The time is split to make serialization easier and not lose
-              # precision.
-              timestamp = utils.SplitTime(time.time())
-
-              queue.acquire()
-              try:
-                job.log_serial += 1
-                op.log.append((job.log_serial, timestamp, log_type, log_msg))
-
-                job.change.notifyAll()
-              finally:
-                queue.release()
-
-            # Make sure not to hold lock while _Log is called
-            self.opcode = op
-            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
+            # Make sure not to hold queue lock while calling ExecOpCode
+            result = proc.ExecOpCode(input_opcode,
+                                     _OpExecCallbacks(queue, job, op))
 
             queue.acquire()
             try:
@@ -420,10 +491,13 @@ class _JobQueueWorker(workerpool.BaseWorker):
             try:
               try:
                 op.status = constants.OP_STATUS_ERROR
-                op.result = str(err)
+                if isinstance(err, errors.GenericError):
+                  op.result = errors.EncodeException(err)
+                else:
+                  op.result = str(err)
                 op.end_timestamp = TimeStampNow()
-                logging.info("Op %s/%s: Error in opcode %s", idx + 1, count,
-                             op_summary)
+                logging.info("Op %s/%s: Error in opcode %s: %s",
+                             idx + 1, count, op_summary, err)
               finally:
                 queue.UpdateJobUnlocked(job)
             finally:
@@ -444,7 +518,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
       queue.acquire()
       try:
         try:
-          job.run_op_idx = -1
+          job.lock_status = None
           job.end_timestamp = TimeStampNow()
           queue.UpdateJobUnlocked(job)
         finally:
@@ -452,6 +526,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
           status = job.CalcStatus()
       finally:
         queue.release()
+
       logging.info("Worker %s finished job %s, status = %s",
                    self.worker_id, job_id, status)
 
@@ -466,33 +541,38 @@ class _JobQueueWorkerPool(workerpool.WorkerPool):
     self.queue = queue
 
 
-class JobQueue(object):
-  """Quue used to manaage the jobs.
+def _RequireOpenQueue(fn):
+  """Decorator for "public" functions.
 
-  @cvar _RE_JOB_FILE: regex matching the valid job file names
+  This function should be used for all 'public' functions. That is,
+  functions usually called from other classes. Note that this should
+  be applied only to methods (not plain functions), since it expects
+  that the decorated function is called with a first argument that has
+  a '_queue_lock' argument.
 
-  """
-  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
+  @warning: Use this decorator only after utils.LockedMethod!
 
-  def _RequireOpenQueue(fn):
-    """Decorator for "public" functions.
+  Example::
+    @utils.LockedMethod
+    @_RequireOpenQueue
+    def Example(self):
+      pass
 
-    This function should be used for all 'public' functions. That is,
-    functions usually called from other classes.
+  """
+  def wrapper(self, *args, **kwargs):
+    # pylint: disable-msg=W0212
+    assert self._queue_lock is not None, "Queue should be open"
+    return fn(self, *args, **kwargs)
+  return wrapper
 
-    @warning: Use this decorator only after utils.LockedMethod!
 
-    Example::
-      @utils.LockedMethod
-      @_RequireOpenQueue
-      def Example(self):
-        pass
+class JobQueue(object):
+  """Queue used to manage the jobs.
 
-    """
-    def wrapper(self, *args, **kwargs):
-      assert self._queue_lock is not None, "Queue should be open"
-      return fn(self, *args, **kwargs)
-    return wrapper
+  @cvar _RE_JOB_FILE: regex matching the valid job file names
+
+  """
+  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
 
   def __init__(self, context):
     """Constructor for JobQueue.
@@ -573,9 +653,8 @@ class JobQueue(object):
                           constants.JOB_STATUS_CANCELING):
             logging.warning("Unfinished job %s found: %s", job.id, job)
             try:
-              for op in job.ops:
-                op.status = constants.OP_STATUS_ERROR
-                op.result = "Unclean master daemon shutdown"
+              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                                    "Unclean master daemon shutdown")
             finally:
               self.UpdateJobUnlocked(job)
 
@@ -599,7 +678,11 @@ class JobQueue(object):
     assert node_name != self._my_hostname
 
     # Clean queue directory on added node
-    rpc.RpcRunner.call_jobqueue_purge(node_name)
+    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
+    msg = result.fail_msg
+    if msg:
+      logging.warning("Cannot cleanup queue directory on node %s: %s",
+                      node_name, msg)
 
     if not node.master_candidate:
       # remove if existing, ignoring errors
@@ -615,17 +698,15 @@ class JobQueue(object):
 
     for file_name in files:
       # Read file content
-      fd = open(file_name, "r")
-      try:
-        content = fd.read()
-      finally:
-        fd.close()
+      content = utils.ReadFile(file_name)
 
       result = rpc.RpcRunner.call_jobqueue_update([node_name],
                                                   [node.primary_ip],
                                                   file_name, content)
-      if not result[node_name]:
-        logging.error("Failed to upload %s to %s", file_name, node_name)
+      msg = result[node_name].fail_msg
+      if msg:
+        logging.error("Failed to upload file %s to node %s: %s",
+                      file_name, node_name, msg)
 
     self._nodes[node_name] = node.primary_ip
 
@@ -644,12 +725,13 @@ class JobQueue(object):
     except KeyError:
       pass
 
-  def _CheckRpcResult(self, result, nodes, failmsg):
+  @staticmethod
+  def _CheckRpcResult(result, nodes, failmsg):
     """Verifies the status of an RPC call.
 
     Since we aim to keep consistency should this node (the current
     master) fail, we will log errors if our rpc fail, and especially
-    log the case when more than half of the nodes failes.
+    log the case when more than half of the nodes fails.
 
     @param result: the data as returned from the rpc call
     @type nodes: list
@@ -662,13 +744,13 @@ class JobQueue(object):
     success = []
 
     for node in nodes:
-      if result[node]:
-        success.append(node)
-      else:
+      msg = result[node].fail_msg
+      if msg:
         failed.append(node)
-
-    if failed:
-      logging.error("%s failed on %s", failmsg, ", ".join(failed))
+        logging.error("RPC call %s (%s) failed on node %s: %s",
+                      result[node].call, failmsg, node, msg)
+      else:
+        success.append(node)
 
     # +1 for the master node
     if (len(success) + 1) < len(failed):
@@ -725,7 +807,8 @@ class JobQueue(object):
     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
 
-  def _FormatJobID(self, job_id):
+  @staticmethod
+  def _FormatJobID(job_id):
     """Convert a job ID to string format.
 
     Currently this just does C{str(job_id)} after performing some
@@ -757,26 +840,31 @@ class JobQueue(object):
     """
     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
 
-  def _NewSerialUnlocked(self):
+  def _NewSerialsUnlocked(self, count):
     """Generates a new job identifier.
 
     Job identifiers are unique during the lifetime of a cluster.
 
+    @type count: integer
+    @param count: how many serials to return
     @rtype: str
     @return: a string representing the job identifier.
 
     """
+    assert count > 0
     # New number
-    serial = self._last_serial + 1
+    serial = self._last_serial + count
 
     # Write to file
     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                         "%s\n" % serial)
 
+    result = [self._FormatJobID(v)
+              for v in range(self._last_serial, serial + 1)]
     # Keep it only if we were able to write the file
     self._last_serial = serial
 
-    return self._FormatJobID(serial)
+    return result
 
   @staticmethod
   def _GetJobPath(job_id):
@@ -835,6 +923,7 @@ class JobQueue(object):
     @return: the list of job IDs
 
     """
+    # pylint: disable-msg=W0613
     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
     jlist = utils.NiceSort(jlist)
     return jlist
@@ -869,19 +958,17 @@ class JobQueue(object):
     filepath = self._GetJobPath(job_id)
     logging.debug("Loading job from %s", filepath)
     try:
-      fd = open(filepath, "r")
+      raw_data = utils.ReadFile(filepath)
     except IOError, err:
       if err.errno in (errno.ENOENT, ):
         return None
       raise
-    try:
-      data = serializer.LoadJson(fd.read())
-    finally:
-      fd.close()
+
+    data = serializer.LoadJson(raw_data)
 
     try:
       job = _QueuedJob.Restore(self, data)
-    except Exception, err:
+    except Exception, err: # pylint: disable-msg=W0703
       new_path = self._GetArchivedJobPath(job_id)
       if filepath == new_path:
         # job already archived (future case)
@@ -932,7 +1019,7 @@ class JobQueue(object):
     and in the future we might merge them.
 
     @type drain_flag: boolean
-    @param drain_flag: wheter to set or unset the drain flag
+    @param drain_flag: Whether to set or unset the drain flag
 
     """
     if drain_flag:
@@ -941,14 +1028,15 @@ class JobQueue(object):
       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
     return True
 
-  @utils.LockedMethod
   @_RequireOpenQueue
-  def SubmitJob(self, ops):
+  def _SubmitJobUnlocked(self, job_id, ops):
     """Create and store a new job.
 
     This enters the job into our job queue and also puts it on the new
     queue, in order for it to be picked up by the queue processors.
 
+    @type job_id: job ID
+    @param job_id: the job ID for the new job
     @type ops: list
     @param ops: The list of OpCodes that will become the new job.
     @rtype: job ID
@@ -957,7 +1045,7 @@ class JobQueue(object):
 
     """
     if self._IsQueueMarkedDrain():
-      raise errors.JobQueueDrainError()
+      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
 
     # Check job queue size
     size = len(self._ListJobFiles())
@@ -970,8 +1058,6 @@ class JobQueue(object):
     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
       raise errors.JobQueueFull()
 
-    # Get job identifier
-    job_id = self._NewSerialUnlocked()
     job = _QueuedJob(self, job_id, ops)
 
     # Write to disk
@@ -985,6 +1071,38 @@ class JobQueue(object):
 
     return job.id
 
+  @utils.LockedMethod
+  @_RequireOpenQueue
+  def SubmitJob(self, ops):
+    """Create and store a new job.
+
+    @see: L{_SubmitJobUnlocked}
+
+    """
+    job_id = self._NewSerialsUnlocked(1)[0]
+    return self._SubmitJobUnlocked(job_id, ops)
+
+  @utils.LockedMethod
+  @_RequireOpenQueue
+  def SubmitManyJobs(self, jobs):
+    """Create and store multiple jobs.
+
+    @see: L{_SubmitJobUnlocked}
+
+    """
+    results = []
+    all_job_ids = self._NewSerialsUnlocked(len(jobs))
+    for job_id, ops in zip(all_job_ids, jobs):
+      try:
+        data = self._SubmitJobUnlocked(job_id, ops)
+        status = True
+      except errors.GenericError, err:
+        data = str(err)
+        status = False
+      results.append((status, data))
+
+    return results
+
   @_RequireOpenQueue
   def UpdateJobUnlocked(self, job):
     """Update a job's on disk storage.
@@ -1031,17 +1149,13 @@ class JobQueue(object):
         as such by the clients
 
     """
-    logging.debug("Waiting for changes in job %s", job_id)
-    end_time = time.time() + timeout
-    while True:
-      delta_time = end_time - time.time()
-      if delta_time < 0:
-        return constants.JOB_NOTCHANGED
+    job = self._LoadJobUnlocked(job_id)
+    if not job:
+      logging.debug("Job %s not found", job_id)
+      return None
 
-      job = self._LoadJobUnlocked(job_id)
-      if not job:
-        logging.debug("Job %s not found", job_id)
-        break
+    def _CheckForChanges():
+      logging.debug("Waiting for changes in job %s", job_id)
 
       status = job.CalcStatus()
       job_info = self._GetJobInfoUnlocked(job, fields)
@@ -1055,25 +1169,24 @@ class JobQueue(object):
       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
 
-      if status not in (constants.JOB_STATUS_QUEUED,
-                        constants.JOB_STATUS_RUNNING,
-                        constants.JOB_STATUS_WAITLOCK):
-        # Don't even try to wait if the job is no longer running, there will be
-        # no changes.
-        break
-
-      if (prev_job_info != job_info or
+      # Don't even try to wait if the job is no longer running, there will be
+      # no changes.
+      if (status not in (constants.JOB_STATUS_QUEUED,
+                         constants.JOB_STATUS_RUNNING,
+                         constants.JOB_STATUS_WAITLOCK) or
+          prev_job_info != job_info or
           (log_entries and prev_log_serial != log_entries[0][0])):
-        break
-
-      logging.debug("Waiting again")
+        logging.debug("Job %s changed", job_id)
+        return (job_info, log_entries)
 
-      # Release the queue lock while waiting
-      job.change.wait(delta_time)
+      raise utils.RetryAgain()
 
-    logging.debug("Job %s changed", job_id)
-
-    return (job_info, log_entries)
+    try:
+      # Setting wait function to release the queue lock while waiting
+      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
+                         wait_fn=job.change.wait)
+    except utils.RetryTimeout:
+      return constants.JOB_NOTCHANGED
 
   @utils.LockedMethod
   @_RequireOpenQueue
@@ -1097,8 +1210,8 @@ class JobQueue(object):
 
     if job_status not in (constants.JOB_STATUS_QUEUED,
                           constants.JOB_STATUS_WAITLOCK):
-      logging.debug("Job %s is no longer in the queue", job.id)
-      return (False, "Job %s is no longer in the queue" % job.id)
+      logging.debug("Job %s is no longer waiting in the queue", job.id)
+      return (False, "Job %s is no longer waiting in the queue" % job.id)
 
     if job_status == constants.JOB_STATUS_QUEUED:
       self.CancelJobUnlocked(job)
@@ -1107,8 +1220,7 @@ class JobQueue(object):
     elif job_status == constants.JOB_STATUS_WAITLOCK:
       # The worker will notice the new status and cancel the job
       try:
-        for op in job.ops:
-          op.status = constants.OP_STATUS_CANCELING
+        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
       finally:
         self.UpdateJobUnlocked(job)
       return (True, "Job %s will be canceled" % job.id)
@@ -1119,9 +1231,8 @@ class JobQueue(object):
 
     """
     try:
-      for op in job.ops:
-        op.status = constants.OP_STATUS_ERROR
-        op.result = "Job canceled by request"
+      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+                            "Job canceled by request")
     finally:
       self.UpdateJobUnlocked(job)
 
@@ -1154,7 +1265,7 @@ class JobQueue(object):
     self._RenameFilesUnlocked(rename_files)
 
     logging.debug("Successfully archived job(s) %s",
-                  ", ".join(job.id for job in archive_jobs))
+                  utils.CommaJoin(job.id for job in archive_jobs))
 
     return len(archive_jobs)
 
@@ -1236,7 +1347,8 @@ class JobQueue(object):
 
     return (archived_count, len(all_job_ids) - last_touched - 1)
 
-  def _GetJobInfoUnlocked(self, job, fields):
+  @staticmethod
+  def _GetJobInfoUnlocked(job, fields):
     """Returns information about a job.
 
     @type job: L{_QueuedJob}
@@ -1273,6 +1385,8 @@ class JobQueue(object):
         row.append(job.start_timestamp)
       elif fname == "end_ts":
         row.append(job.end_timestamp)
+      elif fname == "lock_status":
+        row.append(job.lock_status)
       elif fname == "summary":
         row.append([op.input.Summary() for op in job.ops])
       else: