Merge branch 'devel-2.1' into stable-2.1
[ganeti-local] / lib / jqueue.py
index 083031c..2c2345b 100644 (file)
@@ -145,20 +145,19 @@ class _QueuedJob(object):
   @ivar id: the job ID
   @type ops: list
   @ivar ops: the list of _QueuedOpCode that constitute the job
   @ivar id: the job ID
   @type ops: list
   @ivar ops: the list of _QueuedOpCode that constitute the job
-  @type run_op_index: int
-  @ivar run_op_index: the currently executing opcode, or -1 if
-      we didn't yet start executing
   @type log_serial: int
   @ivar log_serial: holds the index for the next log entry
   @ivar received_timestamp: the timestamp for when the job was received
   @ivar start_timestmap: the timestamp for start of execution
   @ivar end_timestamp: the timestamp for end of execution
   @type log_serial: int
   @ivar log_serial: holds the index for the next log entry
   @ivar received_timestamp: the timestamp for when the job was received
   @ivar start_timestmap: the timestamp for start of execution
   @ivar end_timestamp: the timestamp for end of execution
+  @ivar lock_status: In-memory locking information for debugging
   @ivar change: a Condition variable we use for waiting for job changes
 
   """
   @ivar change: a Condition variable we use for waiting for job changes
 
   """
-  __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
+  # pylint: disable-msg=W0212
+  __slots__ = ["queue", "id", "ops", "log_serial",
                "received_timestamp", "start_timestamp", "end_timestamp",
                "received_timestamp", "start_timestamp", "end_timestamp",
-               "change",
+               "lock_status", "change",
                "__weakref__"]
 
   def __init__(self, queue, job_id, ops):
                "__weakref__"]
 
   def __init__(self, queue, job_id, ops):
@@ -180,12 +179,14 @@ class _QueuedJob(object):
     self.queue = queue
     self.id = job_id
     self.ops = [_QueuedOpCode(op) for op in ops]
     self.queue = queue
     self.id = job_id
     self.ops = [_QueuedOpCode(op) for op in ops]
-    self.run_op_index = -1
     self.log_serial = 0
     self.received_timestamp = TimeStampNow()
     self.start_timestamp = None
     self.end_timestamp = None
 
     self.log_serial = 0
     self.received_timestamp = TimeStampNow()
     self.start_timestamp = None
     self.end_timestamp = None
 
+    # In-memory attributes
+    self.lock_status = None
+
     # Condition to wait for changes
     self.change = threading.Condition(self.queue._lock)
 
     # Condition to wait for changes
     self.change = threading.Condition(self.queue._lock)
 
@@ -204,11 +205,13 @@ class _QueuedJob(object):
     obj = _QueuedJob.__new__(cls)
     obj.queue = queue
     obj.id = state["id"]
     obj = _QueuedJob.__new__(cls)
     obj.queue = queue
     obj.id = state["id"]
-    obj.run_op_index = state["run_op_index"]
     obj.received_timestamp = state.get("received_timestamp", None)
     obj.start_timestamp = state.get("start_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
 
     obj.received_timestamp = state.get("received_timestamp", None)
     obj.start_timestamp = state.get("start_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
 
+    # In-memory attributes
+    obj.lock_status = None
+
     obj.ops = []
     obj.log_serial = 0
     for op_state in state["ops"]:
     obj.ops = []
     obj.log_serial = 0
     for op_state in state["ops"]:
@@ -232,7 +235,6 @@ class _QueuedJob(object):
     return {
       "id": self.id,
       "ops": [op.Serialize() for op in self.ops],
     return {
       "id": self.id,
       "ops": [op.Serialize() for op in self.ops],
-      "run_op_index": self.run_op_index,
       "start_timestamp": self.start_timestamp,
       "end_timestamp": self.end_timestamp,
       "received_timestamp": self.received_timestamp,
       "start_timestamp": self.start_timestamp,
       "end_timestamp": self.end_timestamp,
       "received_timestamp": self.received_timestamp,
@@ -313,37 +315,112 @@ class _QueuedJob(object):
 
     return entries
 
 
     return entries
 
+  def MarkUnfinishedOps(self, status, result):
+    """Mark unfinished opcodes with a given status and result.
 
 
-class _JobQueueWorker(workerpool.BaseWorker):
-  """The actual job workers.
+    This is an utility function for marking all running or waiting to
+    be run opcodes with a given status. Opcodes which are already
+    finalised are not changed.
 
 
-  """
-  def _NotifyStart(self):
-    """Mark the opcode as running, not lock-waiting.
+    @param status: a given opcode status
+    @param result: the opcode result
 
 
-    This is called from the mcpu code as a notifier function, when the
-    LU is finally about to start the Exec() method. Of course, to have
-    end-user visible results, the opcode must be initially (before
-    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
+    """
+    not_marked = True
+    for op in self.ops:
+      if op.status in constants.OPS_FINALIZED:
+        assert not_marked, "Finalized opcodes found after non-finalized ones"
+        continue
+      op.status = status
+      op.result = result
+      not_marked = False
+
+
+class _OpExecCallbacks(mcpu.OpExecCbBase):
+  def __init__(self, queue, job, op):
+    """Initializes this class.
+
+    @type queue: L{JobQueue}
+    @param queue: Job queue
+    @type job: L{_QueuedJob}
+    @param job: Job object
+    @type op: L{_QueuedOpCode}
+    @param op: OpCode
 
     """
 
     """
-    assert self.queue, "Queue attribute is missing"
-    assert self.opcode, "Opcode attribute is missing"
+    assert queue, "Queue is missing"
+    assert job, "Job is missing"
+    assert op, "Opcode is missing"
+
+    self._queue = queue
+    self._job = job
+    self._op = op
+
+  def NotifyStart(self):
+    """Mark the opcode as running, not lock-waiting.
+
+    This is called from the mcpu code as a notifier function, when the LU is
+    finally about to start the Exec() method. Of course, to have end-user
+    visible results, the opcode must be initially (before calling into
+    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
 
 
-    self.queue.acquire()
+    """
+    self._queue.acquire()
     try:
     try:
-      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
-                                    constants.OP_STATUS_CANCELING)
+      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
+                                 constants.OP_STATUS_CANCELING)
+
+      # All locks are acquired by now
+      self._job.lock_status = None
 
       # Cancel here if we were asked to
 
       # Cancel here if we were asked to
-      if self.opcode.status == constants.OP_STATUS_CANCELING:
+      if self._op.status == constants.OP_STATUS_CANCELING:
         raise CancelJob()
 
         raise CancelJob()
 
-      self.opcode.status = constants.OP_STATUS_RUNNING
+      self._op.status = constants.OP_STATUS_RUNNING
     finally:
     finally:
-      self.queue.release()
+      self._queue.release()
+
+  def Feedback(self, *args):
+    """Append a log entry.
+
+    """
+    assert len(args) < 3
 
 
-  def RunTask(self, job):
+    if len(args) == 1:
+      log_type = constants.ELOG_MESSAGE
+      log_msg = args[0]
+    else:
+      (log_type, log_msg) = args
+
+    # The time is split to make serialization easier and not lose
+    # precision.
+    timestamp = utils.SplitTime(time.time())
+
+    self._queue.acquire()
+    try:
+      self._job.log_serial += 1
+      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
+
+      self._job.change.notifyAll()
+    finally:
+      self._queue.release()
+
+  def ReportLocks(self, msg):
+    """Write locking information to the job.
+
+    Called whenever the LU processor is waiting for a lock or has acquired one.
+
+    """
+    # Not getting the queue lock because this is a single assignment
+    self._job.lock_status = msg
+
+
+class _JobQueueWorker(workerpool.BaseWorker):
+  """The actual job workers.
+
+  """
+  def RunTask(self, job): # pylint: disable-msg=W0221
     """Job executor.
 
     This functions processes a job. It is closely tied to the _QueuedJob and
     """Job executor.
 
     This functions processes a job. It is closely tied to the _QueuedJob and
@@ -355,8 +432,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
     """
     logging.info("Worker %s processing job %s",
                   self.worker_id, job.id)
     """
     logging.info("Worker %s processing job %s",
                   self.worker_id, job.id)
-    proc = mcpu.Processor(self.pool.queue.context)
-    self.queue = queue = job.queue
+    proc = mcpu.Processor(self.pool.queue.context, job.id)
+    queue = job.queue
     try:
       try:
         count = len(job.ops)
     try:
       try:
         count = len(job.ops)
@@ -380,7 +457,6 @@ class _JobQueueWorker(workerpool.BaseWorker):
               if op.status == constants.OP_STATUS_CANCELED:
                 raise CancelJob()
               assert op.status == constants.OP_STATUS_QUEUED
               if op.status == constants.OP_STATUS_CANCELED:
                 raise CancelJob()
               assert op.status == constants.OP_STATUS_QUEUED
-              job.run_op_index = idx
               op.status = constants.OP_STATUS_WAITLOCK
               op.result = None
               op.start_timestamp = TimeStampNow()
               op.status = constants.OP_STATUS_WAITLOCK
               op.result = None
               op.start_timestamp = TimeStampNow()
@@ -392,34 +468,9 @@ class _JobQueueWorker(workerpool.BaseWorker):
             finally:
               queue.release()
 
             finally:
               queue.release()
 
-            def _Log(*args):
-              """Append a log entry.
-
-              """
-              assert len(args) < 3
-
-              if len(args) == 1:
-                log_type = constants.ELOG_MESSAGE
-                log_msg = args[0]
-              else:
-                log_type, log_msg = args
-
-              # The time is split to make serialization easier and not lose
-              # precision.
-              timestamp = utils.SplitTime(time.time())
-
-              queue.acquire()
-              try:
-                job.log_serial += 1
-                op.log.append((job.log_serial, timestamp, log_type, log_msg))
-
-                job.change.notifyAll()
-              finally:
-                queue.release()
-
-            # Make sure not to hold lock while _Log is called
-            self.opcode = op
-            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
+            # Make sure not to hold queue lock while calling ExecOpCode
+            result = proc.ExecOpCode(input_opcode,
+                                     _OpExecCallbacks(queue, job, op))
 
             queue.acquire()
             try:
 
             queue.acquire()
             try:
@@ -440,7 +491,10 @@ class _JobQueueWorker(workerpool.BaseWorker):
             try:
               try:
                 op.status = constants.OP_STATUS_ERROR
             try:
               try:
                 op.status = constants.OP_STATUS_ERROR
-                op.result = str(err)
+                if isinstance(err, errors.GenericError):
+                  op.result = errors.EncodeException(err)
+                else:
+                  op.result = str(err)
                 op.end_timestamp = TimeStampNow()
                 logging.info("Op %s/%s: Error in opcode %s: %s",
                              idx + 1, count, op_summary, err)
                 op.end_timestamp = TimeStampNow()
                 logging.info("Op %s/%s: Error in opcode %s: %s",
                              idx + 1, count, op_summary, err)
@@ -464,7 +518,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
       queue.acquire()
       try:
         try:
       queue.acquire()
       try:
         try:
-          job.run_op_index = -1
+          job.lock_status = None
           job.end_timestamp = TimeStampNow()
           queue.UpdateJobUnlocked(job)
         finally:
           job.end_timestamp = TimeStampNow()
           queue.UpdateJobUnlocked(job)
         finally:
@@ -472,6 +526,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
           status = job.CalcStatus()
       finally:
         queue.release()
           status = job.CalcStatus()
       finally:
         queue.release()
+
       logging.info("Worker %s finished job %s, status = %s",
                    self.worker_id, job_id, status)
 
       logging.info("Worker %s finished job %s, status = %s",
                    self.worker_id, job_id, status)
 
@@ -486,33 +541,38 @@ class _JobQueueWorkerPool(workerpool.WorkerPool):
     self.queue = queue
 
 
     self.queue = queue
 
 
-class JobQueue(object):
-  """Queue used to manage the jobs.
+def _RequireOpenQueue(fn):
+  """Decorator for "public" functions.
 
 
-  @cvar _RE_JOB_FILE: regex matching the valid job file names
+  This function should be used for all 'public' functions. That is,
+  functions usually called from other classes. Note that this should
+  be applied only to methods (not plain functions), since it expects
+  that the decorated function is called with a first argument that has
+  a '_queue_lock' argument.
 
 
-  """
-  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
+  @warning: Use this decorator only after utils.LockedMethod!
 
 
-  def _RequireOpenQueue(fn):
-    """Decorator for "public" functions.
+  Example::
+    @utils.LockedMethod
+    @_RequireOpenQueue
+    def Example(self):
+      pass
 
 
-    This function should be used for all 'public' functions. That is,
-    functions usually called from other classes.
+  """
+  def wrapper(self, *args, **kwargs):
+    # pylint: disable-msg=W0212
+    assert self._queue_lock is not None, "Queue should be open"
+    return fn(self, *args, **kwargs)
+  return wrapper
 
 
-    @warning: Use this decorator only after utils.LockedMethod!
 
 
-    Example::
-      @utils.LockedMethod
-      @_RequireOpenQueue
-      def Example(self):
-        pass
+class JobQueue(object):
+  """Queue used to manage the jobs.
 
 
-    """
-    def wrapper(self, *args, **kwargs):
-      assert self._queue_lock is not None, "Queue should be open"
-      return fn(self, *args, **kwargs)
-    return wrapper
+  @cvar _RE_JOB_FILE: regex matching the valid job file names
+
+  """
+  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
 
   def __init__(self, context):
     """Constructor for JobQueue.
 
   def __init__(self, context):
     """Constructor for JobQueue.
@@ -593,9 +653,8 @@ class JobQueue(object):
                           constants.JOB_STATUS_CANCELING):
             logging.warning("Unfinished job %s found: %s", job.id, job)
             try:
                           constants.JOB_STATUS_CANCELING):
             logging.warning("Unfinished job %s found: %s", job.id, job)
             try:
-              for op in job.ops:
-                op.status = constants.OP_STATUS_ERROR
-                op.result = "Unclean master daemon shutdown"
+              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                                    "Unclean master daemon shutdown")
             finally:
               self.UpdateJobUnlocked(job)
 
             finally:
               self.UpdateJobUnlocked(job)
 
@@ -619,7 +678,11 @@ class JobQueue(object):
     assert node_name != self._my_hostname
 
     # Clean queue directory on added node
     assert node_name != self._my_hostname
 
     # Clean queue directory on added node
-    rpc.RpcRunner.call_jobqueue_purge(node_name)
+    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
+    msg = result.fail_msg
+    if msg:
+      logging.warning("Cannot cleanup queue directory on node %s: %s",
+                      node_name, msg)
 
     if not node.master_candidate:
       # remove if existing, ignoring errors
 
     if not node.master_candidate:
       # remove if existing, ignoring errors
@@ -635,17 +698,15 @@ class JobQueue(object):
 
     for file_name in files:
       # Read file content
 
     for file_name in files:
       # Read file content
-      fd = open(file_name, "r")
-      try:
-        content = fd.read()
-      finally:
-        fd.close()
+      content = utils.ReadFile(file_name)
 
       result = rpc.RpcRunner.call_jobqueue_update([node_name],
                                                   [node.primary_ip],
                                                   file_name, content)
 
       result = rpc.RpcRunner.call_jobqueue_update([node_name],
                                                   [node.primary_ip],
                                                   file_name, content)
-      if not result[node_name]:
-        logging.error("Failed to upload %s to %s", file_name, node_name)
+      msg = result[node_name].fail_msg
+      if msg:
+        logging.error("Failed to upload file %s to node %s: %s",
+                      file_name, node_name, msg)
 
     self._nodes[node_name] = node.primary_ip
 
 
     self._nodes[node_name] = node.primary_ip
 
@@ -664,7 +725,8 @@ class JobQueue(object):
     except KeyError:
       pass
 
     except KeyError:
       pass
 
-  def _CheckRpcResult(self, result, nodes, failmsg):
+  @staticmethod
+  def _CheckRpcResult(result, nodes, failmsg):
     """Verifies the status of an RPC call.
 
     Since we aim to keep consistency should this node (the current
     """Verifies the status of an RPC call.
 
     Since we aim to keep consistency should this node (the current
@@ -682,13 +744,13 @@ class JobQueue(object):
     success = []
 
     for node in nodes:
     success = []
 
     for node in nodes:
-      if result[node]:
-        success.append(node)
-      else:
+      msg = result[node].fail_msg
+      if msg:
         failed.append(node)
         failed.append(node)
-
-    if failed:
-      logging.error("%s failed on %s", failmsg, ", ".join(failed))
+        logging.error("RPC call %s (%s) failed on node %s: %s",
+                      result[node].call, failmsg, node, msg)
+      else:
+        success.append(node)
 
     # +1 for the master node
     if (len(success) + 1) < len(failed):
 
     # +1 for the master node
     if (len(success) + 1) < len(failed):
@@ -745,7 +807,8 @@ class JobQueue(object):
     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
 
     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
 
-  def _FormatJobID(self, job_id):
+  @staticmethod
+  def _FormatJobID(job_id):
     """Convert a job ID to string format.
 
     Currently this just does C{str(job_id)} after performing some
     """Convert a job ID to string format.
 
     Currently this just does C{str(job_id)} after performing some
@@ -777,26 +840,31 @@ class JobQueue(object):
     """
     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
 
     """
     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
 
-  def _NewSerialUnlocked(self):
+  def _NewSerialsUnlocked(self, count):
     """Generates a new job identifier.
 
     Job identifiers are unique during the lifetime of a cluster.
 
     """Generates a new job identifier.
 
     Job identifiers are unique during the lifetime of a cluster.
 
+    @type count: integer
+    @param count: how many serials to return
     @rtype: str
     @return: a string representing the job identifier.
 
     """
     @rtype: str
     @return: a string representing the job identifier.
 
     """
+    assert count > 0
     # New number
     # New number
-    serial = self._last_serial + 1
+    serial = self._last_serial + count
 
     # Write to file
     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                         "%s\n" % serial)
 
 
     # Write to file
     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                         "%s\n" % serial)
 
+    result = [self._FormatJobID(v)
+              for v in range(self._last_serial, serial + 1)]
     # Keep it only if we were able to write the file
     self._last_serial = serial
 
     # Keep it only if we were able to write the file
     self._last_serial = serial
 
-    return self._FormatJobID(serial)
+    return result
 
   @staticmethod
   def _GetJobPath(job_id):
 
   @staticmethod
   def _GetJobPath(job_id):
@@ -855,6 +923,7 @@ class JobQueue(object):
     @return: the list of job IDs
 
     """
     @return: the list of job IDs
 
     """
+    # pylint: disable-msg=W0613
     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
     jlist = utils.NiceSort(jlist)
     return jlist
     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
     jlist = utils.NiceSort(jlist)
     return jlist
@@ -889,19 +958,17 @@ class JobQueue(object):
     filepath = self._GetJobPath(job_id)
     logging.debug("Loading job from %s", filepath)
     try:
     filepath = self._GetJobPath(job_id)
     logging.debug("Loading job from %s", filepath)
     try:
-      fd = open(filepath, "r")
+      raw_data = utils.ReadFile(filepath)
     except IOError, err:
       if err.errno in (errno.ENOENT, ):
         return None
       raise
     except IOError, err:
       if err.errno in (errno.ENOENT, ):
         return None
       raise
-    try:
-      data = serializer.LoadJson(fd.read())
-    finally:
-      fd.close()
+
+    data = serializer.LoadJson(raw_data)
 
     try:
       job = _QueuedJob.Restore(self, data)
 
     try:
       job = _QueuedJob.Restore(self, data)
-    except Exception, err:
+    except Exception, err: # pylint: disable-msg=W0703
       new_path = self._GetArchivedJobPath(job_id)
       if filepath == new_path:
         # job already archived (future case)
       new_path = self._GetArchivedJobPath(job_id)
       if filepath == new_path:
         # job already archived (future case)
@@ -962,12 +1029,14 @@ class JobQueue(object):
     return True
 
   @_RequireOpenQueue
     return True
 
   @_RequireOpenQueue
-  def _SubmitJobUnlocked(self, ops):
+  def _SubmitJobUnlocked(self, job_id, ops):
     """Create and store a new job.
 
     This enters the job into our job queue and also puts it on the new
     queue, in order for it to be picked up by the queue processors.
 
     """Create and store a new job.
 
     This enters the job into our job queue and also puts it on the new
     queue, in order for it to be picked up by the queue processors.
 
+    @type job_id: job ID
+    @param job_id: the job ID for the new job
     @type ops: list
     @param ops: The list of OpCodes that will become the new job.
     @rtype: job ID
     @type ops: list
     @param ops: The list of OpCodes that will become the new job.
     @rtype: job ID
@@ -989,8 +1058,6 @@ class JobQueue(object):
     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
       raise errors.JobQueueFull()
 
     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
       raise errors.JobQueueFull()
 
-    # Get job identifier
-    job_id = self._NewSerialUnlocked()
     job = _QueuedJob(self, job_id, ops)
 
     # Write to disk
     job = _QueuedJob(self, job_id, ops)
 
     # Write to disk
@@ -1012,7 +1079,8 @@ class JobQueue(object):
     @see: L{_SubmitJobUnlocked}
 
     """
     @see: L{_SubmitJobUnlocked}
 
     """
-    return self._SubmitJobUnlocked(ops)
+    job_id = self._NewSerialsUnlocked(1)[0]
+    return self._SubmitJobUnlocked(job_id, ops)
 
   @utils.LockedMethod
   @_RequireOpenQueue
 
   @utils.LockedMethod
   @_RequireOpenQueue
@@ -1023,9 +1091,10 @@ class JobQueue(object):
 
     """
     results = []
 
     """
     results = []
-    for ops in jobs:
+    all_job_ids = self._NewSerialsUnlocked(len(jobs))
+    for job_id, ops in zip(all_job_ids, jobs):
       try:
       try:
-        data = self._SubmitJobUnlocked(ops)
+        data = self._SubmitJobUnlocked(job_id, ops)
         status = True
       except errors.GenericError, err:
         data = str(err)
         status = True
       except errors.GenericError, err:
         data = str(err)
@@ -1034,7 +1103,6 @@ class JobQueue(object):
 
     return results
 
 
     return results
 
-
   @_RequireOpenQueue
   def UpdateJobUnlocked(self, job):
     """Update a job's on disk storage.
   @_RequireOpenQueue
   def UpdateJobUnlocked(self, job):
     """Update a job's on disk storage.
@@ -1081,17 +1149,13 @@ class JobQueue(object):
         as such by the clients
 
     """
         as such by the clients
 
     """
-    logging.debug("Waiting for changes in job %s", job_id)
-    end_time = time.time() + timeout
-    while True:
-      delta_time = end_time - time.time()
-      if delta_time < 0:
-        return constants.JOB_NOTCHANGED
+    job = self._LoadJobUnlocked(job_id)
+    if not job:
+      logging.debug("Job %s not found", job_id)
+      return None
 
 
-      job = self._LoadJobUnlocked(job_id)
-      if not job:
-        logging.debug("Job %s not found", job_id)
-        break
+    def _CheckForChanges():
+      logging.debug("Waiting for changes in job %s", job_id)
 
       status = job.CalcStatus()
       job_info = self._GetJobInfoUnlocked(job, fields)
 
       status = job.CalcStatus()
       job_info = self._GetJobInfoUnlocked(job, fields)
@@ -1105,25 +1169,24 @@ class JobQueue(object):
       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
 
       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
 
-      if status not in (constants.JOB_STATUS_QUEUED,
-                        constants.JOB_STATUS_RUNNING,
-                        constants.JOB_STATUS_WAITLOCK):
-        # Don't even try to wait if the job is no longer running, there will be
-        # no changes.
-        break
-
-      if (prev_job_info != job_info or
+      # Don't even try to wait if the job is no longer running, there will be
+      # no changes.
+      if (status not in (constants.JOB_STATUS_QUEUED,
+                         constants.JOB_STATUS_RUNNING,
+                         constants.JOB_STATUS_WAITLOCK) or
+          prev_job_info != job_info or
           (log_entries and prev_log_serial != log_entries[0][0])):
           (log_entries and prev_log_serial != log_entries[0][0])):
-        break
-
-      logging.debug("Waiting again")
-
-      # Release the queue lock while waiting
-      job.change.wait(delta_time)
+        logging.debug("Job %s changed", job_id)
+        return (job_info, log_entries)
 
 
-    logging.debug("Job %s changed", job_id)
+      raise utils.RetryAgain()
 
 
-    return (job_info, log_entries)
+    try:
+      # Setting wait function to release the queue lock while waiting
+      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
+                         wait_fn=job.change.wait)
+    except utils.RetryTimeout:
+      return constants.JOB_NOTCHANGED
 
   @utils.LockedMethod
   @_RequireOpenQueue
 
   @utils.LockedMethod
   @_RequireOpenQueue
@@ -1147,8 +1210,8 @@ class JobQueue(object):
 
     if job_status not in (constants.JOB_STATUS_QUEUED,
                           constants.JOB_STATUS_WAITLOCK):
 
     if job_status not in (constants.JOB_STATUS_QUEUED,
                           constants.JOB_STATUS_WAITLOCK):
-      logging.debug("Job %s is no longer in the queue", job.id)
-      return (False, "Job %s is no longer in the queue" % job.id)
+      logging.debug("Job %s is no longer waiting in the queue", job.id)
+      return (False, "Job %s is no longer waiting in the queue" % job.id)
 
     if job_status == constants.JOB_STATUS_QUEUED:
       self.CancelJobUnlocked(job)
 
     if job_status == constants.JOB_STATUS_QUEUED:
       self.CancelJobUnlocked(job)
@@ -1157,8 +1220,7 @@ class JobQueue(object):
     elif job_status == constants.JOB_STATUS_WAITLOCK:
       # The worker will notice the new status and cancel the job
       try:
     elif job_status == constants.JOB_STATUS_WAITLOCK:
       # The worker will notice the new status and cancel the job
       try:
-        for op in job.ops:
-          op.status = constants.OP_STATUS_CANCELING
+        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
       finally:
         self.UpdateJobUnlocked(job)
       return (True, "Job %s will be canceled" % job.id)
       finally:
         self.UpdateJobUnlocked(job)
       return (True, "Job %s will be canceled" % job.id)
@@ -1169,9 +1231,8 @@ class JobQueue(object):
 
     """
     try:
 
     """
     try:
-      for op in job.ops:
-        op.status = constants.OP_STATUS_CANCELED
-        op.result = "Job canceled by request"
+      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+                            "Job canceled by request")
     finally:
       self.UpdateJobUnlocked(job)
 
     finally:
       self.UpdateJobUnlocked(job)
 
@@ -1204,7 +1265,7 @@ class JobQueue(object):
     self._RenameFilesUnlocked(rename_files)
 
     logging.debug("Successfully archived job(s) %s",
     self._RenameFilesUnlocked(rename_files)
 
     logging.debug("Successfully archived job(s) %s",
-                  ", ".join(job.id for job in archive_jobs))
+                  utils.CommaJoin(job.id for job in archive_jobs))
 
     return len(archive_jobs)
 
 
     return len(archive_jobs)
 
@@ -1286,7 +1347,8 @@ class JobQueue(object):
 
     return (archived_count, len(all_job_ids) - last_touched - 1)
 
 
     return (archived_count, len(all_job_ids) - last_touched - 1)
 
-  def _GetJobInfoUnlocked(self, job, fields):
+  @staticmethod
+  def _GetJobInfoUnlocked(job, fields):
     """Returns information about a job.
 
     @type job: L{_QueuedJob}
     """Returns information about a job.
 
     @type job: L{_QueuedJob}
@@ -1323,6 +1385,8 @@ class JobQueue(object):
         row.append(job.start_timestamp)
       elif fname == "end_ts":
         row.append(job.end_timestamp)
         row.append(job.start_timestamp)
       elif fname == "end_ts":
         row.append(job.end_timestamp)
+      elif fname == "lock_status":
+        row.append(job.lock_status)
       elif fname == "summary":
         row.append([op.input.Summary() for op in job.ops])
       else:
       elif fname == "summary":
         row.append([op.input.Summary() for op in job.ops])
       else: