hooks: Provide variables with post-opcode values
[ganeti-local] / lib / jqueue.py
index 2c2345b..56f7a66 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
 #
 #
 
-# Copyright (C) 2006, 2007, 2008 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -29,28 +29,41 @@ used by all other classes in this module.
 
 """
 
 
 """
 
-import os
 import logging
 import logging
-import threading
 import errno
 import re
 import time
 import weakref
 
 import errno
 import re
 import time
 import weakref
 
+try:
+  # pylint: disable-msg=E0611
+  from pyinotify import pyinotify
+except ImportError:
+  import pyinotify
+
+from ganeti import asyncnotifier
 from ganeti import constants
 from ganeti import serializer
 from ganeti import workerpool
 from ganeti import constants
 from ganeti import serializer
 from ganeti import workerpool
+from ganeti import locking
 from ganeti import opcodes
 from ganeti import errors
 from ganeti import mcpu
 from ganeti import utils
 from ganeti import jstore
 from ganeti import rpc
 from ganeti import opcodes
 from ganeti import errors
 from ganeti import mcpu
 from ganeti import utils
 from ganeti import jstore
 from ganeti import rpc
+from ganeti import runtime
+from ganeti import netutils
+from ganeti import compat
 
 
 JOBQUEUE_THREADS = 25
 JOBS_PER_ARCHIVE_DIRECTORY = 10000
 
 
 
 JOBQUEUE_THREADS = 25
 JOBS_PER_ARCHIVE_DIRECTORY = 10000
 
+# member lock names to be passed to @ssynchronized decorator
+_LOCK = "_lock"
+_QUEUE = "_queue"
+
 
 class CancelJob(Exception):
   """Special exception to cancel a job.
 
 class CancelJob(Exception):
   """Special exception to cancel a job.
@@ -77,11 +90,12 @@ class _QueuedOpCode(object):
   @ivar status: the current status
   @ivar result: the result of the LU execution
   @ivar start_timestamp: timestamp for the start of the execution
   @ivar status: the current status
   @ivar result: the result of the LU execution
   @ivar start_timestamp: timestamp for the start of the execution
+  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
   @ivar stop_timestamp: timestamp for the end of the execution
 
   """
   @ivar stop_timestamp: timestamp for the end of the execution
 
   """
-  __slots__ = ["input", "status", "result", "log",
-               "start_timestamp", "end_timestamp",
+  __slots__ = ["input", "status", "result", "log", "priority",
+               "start_timestamp", "exec_timestamp", "end_timestamp",
                "__weakref__"]
 
   def __init__(self, op):
                "__weakref__"]
 
   def __init__(self, op):
@@ -96,8 +110,12 @@ class _QueuedOpCode(object):
     self.result = None
     self.log = []
     self.start_timestamp = None
     self.result = None
     self.log = []
     self.start_timestamp = None
+    self.exec_timestamp = None
     self.end_timestamp = None
 
     self.end_timestamp = None
 
+    # Get initial priority (it might change during the lifetime of this opcode)
+    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
+
   @classmethod
   def Restore(cls, state):
     """Restore the _QueuedOpCode from the serialized form.
   @classmethod
   def Restore(cls, state):
     """Restore the _QueuedOpCode from the serialized form.
@@ -114,7 +132,9 @@ class _QueuedOpCode(object):
     obj.result = state["result"]
     obj.log = state["log"]
     obj.start_timestamp = state.get("start_timestamp", None)
     obj.result = state["result"]
     obj.log = state["log"]
     obj.start_timestamp = state.get("start_timestamp", None)
+    obj.exec_timestamp = state.get("exec_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
+    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
     return obj
 
   def Serialize(self):
     return obj
 
   def Serialize(self):
@@ -130,7 +150,9 @@ class _QueuedOpCode(object):
       "result": self.result,
       "log": self.log,
       "start_timestamp": self.start_timestamp,
       "result": self.result,
       "log": self.log,
       "start_timestamp": self.start_timestamp,
+      "exec_timestamp": self.exec_timestamp,
       "end_timestamp": self.end_timestamp,
       "end_timestamp": self.end_timestamp,
+      "priority": self.priority,
       }
 
 
       }
 
 
@@ -150,14 +172,11 @@ class _QueuedJob(object):
   @ivar received_timestamp: the timestamp for when the job was received
   @ivar start_timestmap: the timestamp for start of execution
   @ivar end_timestamp: the timestamp for end of execution
   @ivar received_timestamp: the timestamp for when the job was received
   @ivar start_timestmap: the timestamp for start of execution
   @ivar end_timestamp: the timestamp for end of execution
-  @ivar lock_status: In-memory locking information for debugging
-  @ivar change: a Condition variable we use for waiting for job changes
 
   """
   # pylint: disable-msg=W0212
 
   """
   # pylint: disable-msg=W0212
-  __slots__ = ["queue", "id", "ops", "log_serial",
+  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
                "received_timestamp", "start_timestamp", "end_timestamp",
                "received_timestamp", "start_timestamp", "end_timestamp",
-               "lock_status", "change",
                "__weakref__"]
 
   def __init__(self, queue, job_id, ops):
                "__weakref__"]
 
   def __init__(self, queue, job_id, ops):
@@ -173,8 +192,7 @@ class _QueuedJob(object):
 
     """
     if not ops:
 
     """
     if not ops:
-      # TODO: use a better exception
-      raise Exception("No opcodes")
+      raise errors.GenericError("A job needs at least one opcode")
 
     self.queue = queue
     self.id = job_id
 
     self.queue = queue
     self.id = job_id
@@ -184,11 +202,22 @@ class _QueuedJob(object):
     self.start_timestamp = None
     self.end_timestamp = None
 
     self.start_timestamp = None
     self.end_timestamp = None
 
-    # In-memory attributes
-    self.lock_status = None
+    self._InitInMemory(self)
+
+  @staticmethod
+  def _InitInMemory(obj):
+    """Initializes in-memory variables.
+
+    """
+    obj.ops_iter = None
+    obj.cur_opctx = None
+
+  def __repr__(self):
+    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
+              "id=%s" % self.id,
+              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
 
 
-    # Condition to wait for changes
-    self.change = threading.Condition(self.queue._lock)
+    return "<%s at %#x>" % (" ".join(status), id(self))
 
   @classmethod
   def Restore(cls, queue, state):
 
   @classmethod
   def Restore(cls, queue, state):
@@ -209,9 +238,6 @@ class _QueuedJob(object):
     obj.start_timestamp = state.get("start_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
 
     obj.start_timestamp = state.get("start_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
 
-    # In-memory attributes
-    obj.lock_status = None
-
     obj.ops = []
     obj.log_serial = 0
     for op_state in state["ops"]:
     obj.ops = []
     obj.log_serial = 0
     for op_state in state["ops"]:
@@ -220,8 +246,7 @@ class _QueuedJob(object):
         obj.log_serial = max(obj.log_serial, log_entry[0])
       obj.ops.append(op)
 
         obj.log_serial = max(obj.log_serial, log_entry[0])
       obj.ops.append(op)
 
-    # Condition to wait for changes
-    obj.change = threading.Condition(obj.queue._lock)
+    cls._InitInMemory(obj)
 
     return obj
 
 
     return obj
 
@@ -293,6 +318,24 @@ class _QueuedJob(object):
 
     return status
 
 
     return status
 
+  def CalcPriority(self):
+    """Gets the current priority for this job.
+
+    Only unfinished opcodes are considered. When all are done, the default
+    priority is used.
+
+    @rtype: int
+
+    """
+    priorities = [op.priority for op in self.ops
+                  if op.status not in constants.OPS_FINALIZED]
+
+    if not priorities:
+      # All opcodes are done, assume default priority
+      return constants.OP_PRIO_DEFAULT
+
+    return min(priorities)
+
   def GetLogEntries(self, newer_than):
     """Selectively returns the log entries.
 
   def GetLogEntries(self, newer_than):
     """Selectively returns the log entries.
 
@@ -315,6 +358,53 @@ class _QueuedJob(object):
 
     return entries
 
 
     return entries
 
+  def GetInfo(self, fields):
+    """Returns information about a job.
+
+    @type fields: list
+    @param fields: names of fields to return
+    @rtype: list
+    @return: list with one element for each field
+    @raise errors.OpExecError: when an invalid field
+        has been passed
+
+    """
+    row = []
+    for fname in fields:
+      if fname == "id":
+        row.append(self.id)
+      elif fname == "status":
+        row.append(self.CalcStatus())
+      elif fname == "priority":
+        row.append(self.CalcPriority())
+      elif fname == "ops":
+        row.append([op.input.__getstate__() for op in self.ops])
+      elif fname == "opresult":
+        row.append([op.result for op in self.ops])
+      elif fname == "opstatus":
+        row.append([op.status for op in self.ops])
+      elif fname == "oplog":
+        row.append([op.log for op in self.ops])
+      elif fname == "opstart":
+        row.append([op.start_timestamp for op in self.ops])
+      elif fname == "opexec":
+        row.append([op.exec_timestamp for op in self.ops])
+      elif fname == "opend":
+        row.append([op.end_timestamp for op in self.ops])
+      elif fname == "oppriority":
+        row.append([op.priority for op in self.ops])
+      elif fname == "received_ts":
+        row.append(self.received_timestamp)
+      elif fname == "start_ts":
+        row.append(self.start_timestamp)
+      elif fname == "end_ts":
+        row.append(self.end_timestamp)
+      elif fname == "summary":
+        row.append([op.input.Summary() for op in self.ops])
+      else:
+        raise errors.OpExecError("Invalid self query field '%s'" % fname)
+    return row
+
   def MarkUnfinishedOps(self, status, result):
     """Mark unfinished opcodes with a given status and result.
 
   def MarkUnfinishedOps(self, status, result):
     """Mark unfinished opcodes with a given status and result.
 
@@ -335,6 +425,30 @@ class _QueuedJob(object):
       op.result = result
       not_marked = False
 
       op.result = result
       not_marked = False
 
+  def Cancel(self):
+    """Marks job as canceled/-ing if possible.
+
+    @rtype: tuple; (bool, string)
+    @return: Boolean describing whether job was successfully canceled or marked
+      as canceling and a text message
+
+    """
+    status = self.CalcStatus()
+
+    if status == constants.JOB_STATUS_QUEUED:
+      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+                             "Job canceled by request")
+      return (True, "Job %s canceled" % self.id)
+
+    elif status == constants.JOB_STATUS_WAITLOCK:
+      # The worker will notice the new status and cancel the job
+      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
+      return (True, "Job %s will be canceled" % self.id)
+
+    else:
+      logging.debug("Job %s is no longer waiting in the queue", self.id)
+      return (False, "Job %s is no longer waiting in the queue" % self.id)
+
 
 class _OpExecCallbacks(mcpu.OpExecCbBase):
   def __init__(self, queue, job, op):
 
 class _OpExecCallbacks(mcpu.OpExecCbBase):
   def __init__(self, queue, job, op):
@@ -356,6 +470,16 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     self._job = job
     self._op = op
 
     self._job = job
     self._op = op
 
+  def _CheckCancel(self):
+    """Raises an exception to cancel the job if asked to.
+
+    """
+    # Cancel here if we were asked to
+    if self._op.status == constants.OP_STATUS_CANCELING:
+      logging.debug("Canceling opcode")
+      raise CancelJob()
+
+  @locking.ssynchronized(_QUEUE, shared=1)
   def NotifyStart(self):
     """Mark the opcode as running, not lock-waiting.
 
   def NotifyStart(self):
     """Mark the opcode as running, not lock-waiting.
 
@@ -365,21 +489,29 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
 
     """
     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
 
     """
-    self._queue.acquire()
-    try:
-      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
-                                 constants.OP_STATUS_CANCELING)
+    assert self._op in self._job.ops
+    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
+                               constants.OP_STATUS_CANCELING)
 
 
-      # All locks are acquired by now
-      self._job.lock_status = None
+    # Cancel here if we were asked to
+    self._CheckCancel()
 
 
-      # Cancel here if we were asked to
-      if self._op.status == constants.OP_STATUS_CANCELING:
-        raise CancelJob()
+    logging.debug("Opcode is now running")
 
 
-      self._op.status = constants.OP_STATUS_RUNNING
-    finally:
-      self._queue.release()
+    self._op.status = constants.OP_STATUS_RUNNING
+    self._op.exec_timestamp = TimeStampNow()
+
+    # And finally replicate the job status
+    self._queue.UpdateJobUnlocked(self._job)
+
+  @locking.ssynchronized(_QUEUE, shared=1)
+  def _AppendFeedback(self, timestamp, log_type, log_msg):
+    """Internal feedback append function, with locks
+
+    """
+    self._job.log_serial += 1
+    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
+    self._queue.UpdateJobUnlocked(self._job, replicate=False)
 
   def Feedback(self, *args):
     """Append a log entry.
 
   def Feedback(self, *args):
     """Append a log entry.
@@ -396,24 +528,587 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     # The time is split to make serialization easier and not lose
     # precision.
     timestamp = utils.SplitTime(time.time())
     # The time is split to make serialization easier and not lose
     # precision.
     timestamp = utils.SplitTime(time.time())
+    self._AppendFeedback(timestamp, log_type, log_msg)
+
+  def CheckCancel(self):
+    """Check whether job has been cancelled.
+
+    """
+    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
+                               constants.OP_STATUS_CANCELING)
+
+    # Cancel here if we were asked to
+    self._CheckCancel()
+
+
+class _JobChangesChecker(object):
+  def __init__(self, fields, prev_job_info, prev_log_serial):
+    """Initializes this class.
+
+    @type fields: list of strings
+    @param fields: Fields requested by LUXI client
+    @type prev_job_info: string
+    @param prev_job_info: previous job info, as passed by the LUXI client
+    @type prev_log_serial: string
+    @param prev_log_serial: previous job serial, as passed by the LUXI client
+
+    """
+    self._fields = fields
+    self._prev_job_info = prev_job_info
+    self._prev_log_serial = prev_log_serial
+
+  def __call__(self, job):
+    """Checks whether job has changed.
+
+    @type job: L{_QueuedJob}
+    @param job: Job object
 
 
-    self._queue.acquire()
+    """
+    status = job.CalcStatus()
+    job_info = job.GetInfo(self._fields)
+    log_entries = job.GetLogEntries(self._prev_log_serial)
+
+    # Serializing and deserializing data can cause type changes (e.g. from
+    # tuple to list) or precision loss. We're doing it here so that we get
+    # the same modifications as the data received from the client. Without
+    # this, the comparison afterwards might fail without the data being
+    # significantly different.
+    # TODO: we just deserialized from disk, investigate how to make sure that
+    # the job info and log entries are compatible to avoid this further step.
+    # TODO: Doing something like in testutils.py:UnifyValueType might be more
+    # efficient, though floats will be tricky
+    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
+    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
+
+    # Don't even try to wait if the job is no longer running, there will be
+    # no changes.
+    if (status not in (constants.JOB_STATUS_QUEUED,
+                       constants.JOB_STATUS_RUNNING,
+                       constants.JOB_STATUS_WAITLOCK) or
+        job_info != self._prev_job_info or
+        (log_entries and self._prev_log_serial != log_entries[0][0])):
+      logging.debug("Job %s changed", job.id)
+      return (job_info, log_entries)
+
+    return None
+
+
+class _JobFileChangesWaiter(object):
+  def __init__(self, filename):
+    """Initializes this class.
+
+    @type filename: string
+    @param filename: Path to job file
+    @raises errors.InotifyError: if the notifier cannot be setup
+
+    """
+    self._wm = pyinotify.WatchManager()
+    self._inotify_handler = \
+      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
+    self._notifier = \
+      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
     try:
     try:
-      self._job.log_serial += 1
-      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
+      self._inotify_handler.enable()
+    except Exception:
+      # pyinotify doesn't close file descriptors automatically
+      self._notifier.stop()
+      raise
 
 
-      self._job.change.notifyAll()
-    finally:
-      self._queue.release()
+  def _OnInotify(self, notifier_enabled):
+    """Callback for inotify.
 
 
-  def ReportLocks(self, msg):
-    """Write locking information to the job.
+    """
+    if not notifier_enabled:
+      self._inotify_handler.enable()
+
+  def Wait(self, timeout):
+    """Waits for the job file to change.
+
+    @type timeout: float
+    @param timeout: Timeout in seconds
+    @return: Whether there have been events
+
+    """
+    assert timeout >= 0
+    have_events = self._notifier.check_events(timeout * 1000)
+    if have_events:
+      self._notifier.read_events()
+    self._notifier.process_events()
+    return have_events
+
+  def Close(self):
+    """Closes underlying notifier and its file descriptor.
 
 
-    Called whenever the LU processor is waiting for a lock or has acquired one.
+    """
+    self._notifier.stop()
+
+
+class _JobChangesWaiter(object):
+  def __init__(self, filename):
+    """Initializes this class.
+
+    @type filename: string
+    @param filename: Path to job file
 
     """
 
     """
-    # Not getting the queue lock because this is a single assignment
-    self._job.lock_status = msg
+    self._filewaiter = None
+    self._filename = filename
+
+  def Wait(self, timeout):
+    """Waits for a job to change.
+
+    @type timeout: float
+    @param timeout: Timeout in seconds
+    @return: Whether there have been events
+
+    """
+    if self._filewaiter:
+      return self._filewaiter.Wait(timeout)
+
+    # Lazy setup: Avoid inotify setup cost when job file has already changed.
+    # If this point is reached, return immediately and let caller check the job
+    # file again in case there were changes since the last check. This avoids a
+    # race condition.
+    self._filewaiter = _JobFileChangesWaiter(self._filename)
+
+    return True
+
+  def Close(self):
+    """Closes underlying waiter.
+
+    """
+    if self._filewaiter:
+      self._filewaiter.Close()
+
+
+class _WaitForJobChangesHelper(object):
+  """Helper class using inotify to wait for changes in a job file.
+
+  This class takes a previous job status and serial, and alerts the client when
+  the current job status has changed.
+
+  """
+  @staticmethod
+  def _CheckForChanges(job_load_fn, check_fn):
+    job = job_load_fn()
+    if not job:
+      raise errors.JobLost()
+
+    result = check_fn(job)
+    if result is None:
+      raise utils.RetryAgain()
+
+    return result
+
+  def __call__(self, filename, job_load_fn,
+               fields, prev_job_info, prev_log_serial, timeout):
+    """Waits for changes on a job.
+
+    @type filename: string
+    @param filename: File on which to wait for changes
+    @type job_load_fn: callable
+    @param job_load_fn: Function to load job
+    @type fields: list of strings
+    @param fields: Which fields to check for changes
+    @type prev_job_info: list or None
+    @param prev_job_info: Last job information returned
+    @type prev_log_serial: int
+    @param prev_log_serial: Last job message serial number
+    @type timeout: float
+    @param timeout: maximum time to wait in seconds
+
+    """
+    try:
+      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
+      waiter = _JobChangesWaiter(filename)
+      try:
+        return utils.Retry(compat.partial(self._CheckForChanges,
+                                          job_load_fn, check_fn),
+                           utils.RETRY_REMAINING_TIME, timeout,
+                           wait_fn=waiter.Wait)
+      finally:
+        waiter.Close()
+    except (errors.InotifyError, errors.JobLost):
+      return None
+    except utils.RetryTimeout:
+      return constants.JOB_NOTCHANGED
+
+
+def _EncodeOpError(err):
+  """Encodes an error which occurred while processing an opcode.
+
+  """
+  if isinstance(err, errors.GenericError):
+    to_encode = err
+  else:
+    to_encode = errors.OpExecError(str(err))
+
+  return errors.EncodeException(to_encode)
+
+
+class _TimeoutStrategyWrapper:
+  def __init__(self, fn):
+    """Initializes this class.
+
+    """
+    self._fn = fn
+    self._next = None
+
+  def _Advance(self):
+    """Gets the next timeout if necessary.
+
+    """
+    if self._next is None:
+      self._next = self._fn()
+
+  def Peek(self):
+    """Returns the next timeout.
+
+    """
+    self._Advance()
+    return self._next
+
+  def Next(self):
+    """Returns the current timeout and advances the internal state.
+
+    """
+    self._Advance()
+    result = self._next
+    self._next = None
+    return result
+
+
+class _OpExecContext:
+  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
+    """Initializes this class.
+
+    """
+    self.op = op
+    self.index = index
+    self.log_prefix = log_prefix
+    self.summary = op.input.Summary()
+
+    self._timeout_strategy_factory = timeout_strategy_factory
+    self._ResetTimeoutStrategy()
+
+  def _ResetTimeoutStrategy(self):
+    """Creates a new timeout strategy.
+
+    """
+    self._timeout_strategy = \
+      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
+
+  def CheckPriorityIncrease(self):
+    """Checks whether priority can and should be increased.
+
+    Called when locks couldn't be acquired.
+
+    """
+    op = self.op
+
+    # Exhausted all retries and next round should not use blocking acquire
+    # for locks?
+    if (self._timeout_strategy.Peek() is None and
+        op.priority > constants.OP_PRIO_HIGHEST):
+      logging.debug("Increasing priority")
+      op.priority -= 1
+      self._ResetTimeoutStrategy()
+      return True
+
+    return False
+
+  def GetNextLockTimeout(self):
+    """Returns the next lock acquire timeout.
+
+    """
+    return self._timeout_strategy.Next()
+
+
+class _JobProcessor(object):
+  def __init__(self, queue, opexec_fn, job,
+               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
+    """Initializes this class.
+
+    """
+    self.queue = queue
+    self.opexec_fn = opexec_fn
+    self.job = job
+    self._timeout_strategy_factory = _timeout_strategy_factory
+
+  @staticmethod
+  def _FindNextOpcode(job, timeout_strategy_factory):
+    """Locates the next opcode to run.
+
+    @type job: L{_QueuedJob}
+    @param job: Job object
+    @param timeout_strategy_factory: Callable to create new timeout strategy
+
+    """
+    # Create some sort of a cache to speed up locating next opcode for future
+    # lookups
+    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
+    # pending and one for processed ops.
+    if job.ops_iter is None:
+      job.ops_iter = enumerate(job.ops)
+
+    # Find next opcode to run
+    while True:
+      try:
+        (idx, op) = job.ops_iter.next()
+      except StopIteration:
+        raise errors.ProgrammerError("Called for a finished job")
+
+      if op.status == constants.OP_STATUS_RUNNING:
+        # Found an opcode already marked as running
+        raise errors.ProgrammerError("Called for job marked as running")
+
+      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
+                             timeout_strategy_factory)
+
+      if op.status == constants.OP_STATUS_CANCELED:
+        # Cancelled jobs are handled by the caller
+        assert not compat.any(i.status != constants.OP_STATUS_CANCELED
+                              for i in job.ops[idx:])
+
+      elif op.status in constants.OPS_FINALIZED:
+        # This is a job that was partially completed before master daemon
+        # shutdown, so it can be expected that some opcodes are already
+        # completed successfully (if any did error out, then the whole job
+        # should have been aborted and not resubmitted for processing).
+        logging.info("%s: opcode %s already processed, skipping",
+                     opctx.log_prefix, opctx.summary)
+        continue
+
+      return opctx
+
+  @staticmethod
+  def _MarkWaitlock(job, op):
+    """Marks an opcode as waiting for locks.
+
+    The job's start timestamp is also set if necessary.
+
+    @type job: L{_QueuedJob}
+    @param job: Job object
+    @type op: L{_QueuedOpCode}
+    @param op: Opcode object
+
+    """
+    assert op in job.ops
+    assert op.status in (constants.OP_STATUS_QUEUED,
+                         constants.OP_STATUS_WAITLOCK)
+
+    update = False
+
+    op.result = None
+
+    if op.status == constants.OP_STATUS_QUEUED:
+      op.status = constants.OP_STATUS_WAITLOCK
+      update = True
+
+    if op.start_timestamp is None:
+      op.start_timestamp = TimeStampNow()
+      update = True
+
+    if job.start_timestamp is None:
+      job.start_timestamp = op.start_timestamp
+      update = True
+
+    assert op.status == constants.OP_STATUS_WAITLOCK
+
+    return update
+
+  def _ExecOpCodeUnlocked(self, opctx):
+    """Processes one opcode and returns the result.
+
+    """
+    op = opctx.op
+
+    assert op.status == constants.OP_STATUS_WAITLOCK
+
+    timeout = opctx.GetNextLockTimeout()
+
+    try:
+      # Make sure not to hold queue lock while calling ExecOpCode
+      result = self.opexec_fn(op.input,
+                              _OpExecCallbacks(self.queue, self.job, op),
+                              timeout=timeout, priority=op.priority)
+    except mcpu.LockAcquireTimeout:
+      assert timeout is not None, "Received timeout for blocking acquire"
+      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
+
+      assert op.status in (constants.OP_STATUS_WAITLOCK,
+                           constants.OP_STATUS_CANCELING)
+
+      # Was job cancelled while we were waiting for the lock?
+      if op.status == constants.OP_STATUS_CANCELING:
+        return (constants.OP_STATUS_CANCELING, None)
+
+      # Stay in waitlock while trying to re-acquire lock
+      return (constants.OP_STATUS_WAITLOCK, None)
+    except CancelJob:
+      logging.exception("%s: Canceling job", opctx.log_prefix)
+      assert op.status == constants.OP_STATUS_CANCELING
+      return (constants.OP_STATUS_CANCELING, None)
+    except Exception, err: # pylint: disable-msg=W0703
+      logging.exception("%s: Caught exception in %s",
+                        opctx.log_prefix, opctx.summary)
+      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
+    else:
+      logging.debug("%s: %s successful",
+                    opctx.log_prefix, opctx.summary)
+      return (constants.OP_STATUS_SUCCESS, result)
+
+  def __call__(self, _nextop_fn=None):
+    """Continues execution of a job.
+
+    @param _nextop_fn: Callback function for tests
+    @rtype: bool
+    @return: True if job is finished, False if processor needs to be called
+             again
+
+    """
+    queue = self.queue
+    job = self.job
+
+    logging.debug("Processing job %s", job.id)
+
+    queue.acquire(shared=1)
+    try:
+      opcount = len(job.ops)
+
+      # Is a previous opcode still pending?
+      if job.cur_opctx:
+        opctx = job.cur_opctx
+        job.cur_opctx = None
+      else:
+        if __debug__ and _nextop_fn:
+          _nextop_fn()
+        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
+
+      op = opctx.op
+
+      # Consistency check
+      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
+                                     constants.OP_STATUS_CANCELING,
+                                     constants.OP_STATUS_CANCELED)
+                        for i in job.ops[opctx.index + 1:])
+
+      assert op.status in (constants.OP_STATUS_QUEUED,
+                           constants.OP_STATUS_WAITLOCK,
+                           constants.OP_STATUS_CANCELING,
+                           constants.OP_STATUS_CANCELED)
+
+      assert (op.priority <= constants.OP_PRIO_LOWEST and
+              op.priority >= constants.OP_PRIO_HIGHEST)
+
+      if op.status not in (constants.OP_STATUS_CANCELING,
+                           constants.OP_STATUS_CANCELED):
+        assert op.status in (constants.OP_STATUS_QUEUED,
+                             constants.OP_STATUS_WAITLOCK)
+
+        # Prepare to start opcode
+        if self._MarkWaitlock(job, op):
+          # Write to disk
+          queue.UpdateJobUnlocked(job)
+
+        assert op.status == constants.OP_STATUS_WAITLOCK
+        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
+        assert job.start_timestamp and op.start_timestamp
+
+        logging.info("%s: opcode %s waiting for locks",
+                     opctx.log_prefix, opctx.summary)
+
+        queue.release()
+        try:
+          (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
+        finally:
+          queue.acquire(shared=1)
+
+        op.status = op_status
+        op.result = op_result
+
+        if op.status == constants.OP_STATUS_WAITLOCK:
+          # Couldn't get locks in time
+          assert not op.end_timestamp
+        else:
+          # Finalize opcode
+          op.end_timestamp = TimeStampNow()
+
+          if op.status == constants.OP_STATUS_CANCELING:
+            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
+                                  for i in job.ops[opctx.index:])
+          else:
+            assert op.status in constants.OPS_FINALIZED
+
+      if op.status == constants.OP_STATUS_WAITLOCK:
+        finalize = False
+
+        if opctx.CheckPriorityIncrease():
+          # Priority was changed, need to update on-disk file
+          queue.UpdateJobUnlocked(job)
+
+        # Keep around for another round
+        job.cur_opctx = opctx
+
+        assert (op.priority <= constants.OP_PRIO_LOWEST and
+                op.priority >= constants.OP_PRIO_HIGHEST)
+
+        # In no case must the status be finalized here
+        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
+
+      else:
+        # Ensure all opcodes so far have been successful
+        assert (opctx.index == 0 or
+                compat.all(i.status == constants.OP_STATUS_SUCCESS
+                           for i in job.ops[:opctx.index]))
+
+        # Reset context
+        job.cur_opctx = None
+
+        if op.status == constants.OP_STATUS_SUCCESS:
+          finalize = False
+
+        elif op.status == constants.OP_STATUS_ERROR:
+          # Ensure failed opcode has an exception as its result
+          assert errors.GetEncodedError(job.ops[opctx.index].result)
+
+          to_encode = errors.OpExecError("Preceding opcode failed")
+          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                                _EncodeOpError(to_encode))
+          finalize = True
+
+          # Consistency check
+          assert compat.all(i.status == constants.OP_STATUS_ERROR and
+                            errors.GetEncodedError(i.result)
+                            for i in job.ops[opctx.index:])
+
+        elif op.status == constants.OP_STATUS_CANCELING:
+          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+                                "Job canceled by request")
+          finalize = True
+
+        elif op.status == constants.OP_STATUS_CANCELED:
+          finalize = True
+
+        else:
+          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
+
+        # Finalizing or last opcode?
+        if finalize or opctx.index == (opcount - 1):
+          # All opcodes have been run, finalize job
+          job.end_timestamp = TimeStampNow()
+
+        # Write to disk. If the job status is final, this is the final write
+        # allowed. Once the file has been written, it can be archived anytime.
+        queue.UpdateJobUnlocked(job)
+
+        if finalize or opctx.index == (opcount - 1):
+          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
+          return True
+
+      return False
+    finally:
+      queue.release()
 
 
 class _JobQueueWorker(workerpool.BaseWorker):
 
 
 class _JobQueueWorker(workerpool.BaseWorker):
@@ -423,112 +1118,23 @@ class _JobQueueWorker(workerpool.BaseWorker):
   def RunTask(self, job): # pylint: disable-msg=W0221
     """Job executor.
 
   def RunTask(self, job): # pylint: disable-msg=W0221
     """Job executor.
 
-    This functions processes a job. It is closely tied to the _QueuedJob and
-    _QueuedOpCode classes.
+    This functions processes a job. It is closely tied to the L{_QueuedJob} and
+    L{_QueuedOpCode} classes.
 
     @type job: L{_QueuedJob}
     @param job: the job to be processed
 
     """
 
     @type job: L{_QueuedJob}
     @param job: the job to be processed
 
     """
-    logging.info("Worker %s processing job %s",
-                  self.worker_id, job.id)
-    proc = mcpu.Processor(self.pool.queue.context, job.id)
     queue = job.queue
     queue = job.queue
-    try:
-      try:
-        count = len(job.ops)
-        for idx, op in enumerate(job.ops):
-          op_summary = op.input.Summary()
-          if op.status == constants.OP_STATUS_SUCCESS:
-            # this is a job that was partially completed before master
-            # daemon shutdown, so it can be expected that some opcodes
-            # are already completed successfully (if any did error
-            # out, then the whole job should have been aborted and not
-            # resubmitted for processing)
-            logging.info("Op %s/%s: opcode %s already processed, skipping",
-                         idx + 1, count, op_summary)
-            continue
-          try:
-            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
-                         op_summary)
-
-            queue.acquire()
-            try:
-              if op.status == constants.OP_STATUS_CANCELED:
-                raise CancelJob()
-              assert op.status == constants.OP_STATUS_QUEUED
-              op.status = constants.OP_STATUS_WAITLOCK
-              op.result = None
-              op.start_timestamp = TimeStampNow()
-              if idx == 0: # first opcode
-                job.start_timestamp = op.start_timestamp
-              queue.UpdateJobUnlocked(job)
-
-              input_opcode = op.input
-            finally:
-              queue.release()
-
-            # Make sure not to hold queue lock while calling ExecOpCode
-            result = proc.ExecOpCode(input_opcode,
-                                     _OpExecCallbacks(queue, job, op))
-
-            queue.acquire()
-            try:
-              op.status = constants.OP_STATUS_SUCCESS
-              op.result = result
-              op.end_timestamp = TimeStampNow()
-              queue.UpdateJobUnlocked(job)
-            finally:
-              queue.release()
-
-            logging.info("Op %s/%s: Successfully finished opcode %s",
-                         idx + 1, count, op_summary)
-          except CancelJob:
-            # Will be handled further up
-            raise
-          except Exception, err:
-            queue.acquire()
-            try:
-              try:
-                op.status = constants.OP_STATUS_ERROR
-                if isinstance(err, errors.GenericError):
-                  op.result = errors.EncodeException(err)
-                else:
-                  op.result = str(err)
-                op.end_timestamp = TimeStampNow()
-                logging.info("Op %s/%s: Error in opcode %s: %s",
-                             idx + 1, count, op_summary, err)
-              finally:
-                queue.UpdateJobUnlocked(job)
-            finally:
-              queue.release()
-            raise
-
-      except CancelJob:
-        queue.acquire()
-        try:
-          queue.CancelJobUnlocked(job)
-        finally:
-          queue.release()
-      except errors.GenericError, err:
-        logging.exception("Ganeti exception")
-      except:
-        logging.exception("Unhandled exception")
-    finally:
-      queue.acquire()
-      try:
-        try:
-          job.lock_status = None
-          job.end_timestamp = TimeStampNow()
-          queue.UpdateJobUnlocked(job)
-        finally:
-          job_id = job.id
-          status = job.CalcStatus()
-      finally:
-        queue.release()
+    assert queue == self.pool.queue
+
+    self.SetTaskName("Job%s" % job.id)
 
 
-      logging.info("Worker %s finished job %s, status = %s",
-                   self.worker_id, job_id, status)
+    proc = mcpu.Processor(queue.context, job.id)
+
+    if not _JobProcessor(queue, proc.ExecOpCode, job)():
+      # Schedule again
+      raise workerpool.DeferTask(priority=job.CalcPriority())
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
@@ -536,7 +1142,8 @@ class _JobQueueWorkerPool(workerpool.WorkerPool):
 
   """
   def __init__(self, queue):
 
   """
   def __init__(self, queue):
-    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
+    super(_JobQueueWorkerPool, self).__init__("JobQueue",
+                                              JOBQUEUE_THREADS,
                                               _JobQueueWorker)
     self.queue = queue
 
                                               _JobQueueWorker)
     self.queue = queue
 
@@ -548,12 +1155,12 @@ def _RequireOpenQueue(fn):
   functions usually called from other classes. Note that this should
   be applied only to methods (not plain functions), since it expects
   that the decorated function is called with a first argument that has
   functions usually called from other classes. Note that this should
   be applied only to methods (not plain functions), since it expects
   that the decorated function is called with a first argument that has
-  a '_queue_lock' argument.
+  a '_queue_filelock' argument.
 
 
-  @warning: Use this decorator only after utils.LockedMethod!
+  @warning: Use this decorator only after locking.ssynchronized
 
   Example::
 
   Example::
-    @utils.LockedMethod
+    @locking.ssynchronized(_LOCK)
     @_RequireOpenQueue
     def Example(self):
       pass
     @_RequireOpenQueue
     def Example(self):
       pass
@@ -561,7 +1168,7 @@ def _RequireOpenQueue(fn):
   """
   def wrapper(self, *args, **kwargs):
     # pylint: disable-msg=W0212
   """
   def wrapper(self, *args, **kwargs):
     # pylint: disable-msg=W0212
-    assert self._queue_lock is not None, "Queue should be open"
+    assert self._queue_filelock is not None, "Queue should be open"
     return fn(self, *args, **kwargs)
   return wrapper
 
     return fn(self, *args, **kwargs)
   return wrapper
 
@@ -589,15 +1196,21 @@ class JobQueue(object):
     """
     self.context = context
     self._memcache = weakref.WeakValueDictionary()
     """
     self.context = context
     self._memcache = weakref.WeakValueDictionary()
-    self._my_hostname = utils.HostInfo().name
+    self._my_hostname = netutils.Hostname.GetSysName()
+
+    # The Big JobQueue lock. If a code block or method acquires it in shared
+    # mode safe it must guarantee concurrency with all the code acquiring it in
+    # shared mode, including itself. In order not to acquire it at all
+    # concurrency must be guaranteed with all code acquiring it in shared mode
+    # and all code acquiring it exclusively.
+    self._lock = locking.SharedLock("JobQueue")
 
 
-    # Locking
-    self._lock = threading.Lock()
     self.acquire = self._lock.acquire
     self.release = self._lock.release
 
     self.acquire = self._lock.acquire
     self.release = self._lock.release
 
-    # Initialize
-    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
+    # Initialize the queue, and acquire the filelock.
+    # This ensures no other process is working on the job queue.
+    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
 
     # Read serial file
     self._last_serial = jstore.ReadSerial()
 
     # Read serial file
     self._last_serial = jstore.ReadSerial()
@@ -610,62 +1223,79 @@ class JobQueue(object):
                        if n.master_candidate)
 
     # Remove master node
                        if n.master_candidate)
 
     # Remove master node
-    try:
-      del self._nodes[self._my_hostname]
-    except KeyError:
-      pass
+    self._nodes.pop(self._my_hostname, None)
 
     # TODO: Check consistency across nodes
 
 
     # TODO: Check consistency across nodes
 
+    self._queue_size = 0
+    self._UpdateQueueSizeUnlocked()
+    self._drained = jstore.CheckDrainFlag()
+
     # Setup worker pool
     self._wpool = _JobQueueWorkerPool(self)
     try:
     # Setup worker pool
     self._wpool = _JobQueueWorkerPool(self)
     try:
-      # We need to lock here because WorkerPool.AddTask() may start a job while
-      # we're still doing our work.
-      self.acquire()
-      try:
-        logging.info("Inspecting job queue")
-
-        all_job_ids = self._GetJobIDsUnlocked()
-        jobs_count = len(all_job_ids)
-        lastinfo = time.time()
-        for idx, job_id in enumerate(all_job_ids):
-          # Give an update every 1000 jobs or 10 seconds
-          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
-              idx == (jobs_count - 1)):
-            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
-                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
-            lastinfo = time.time()
-
-          job = self._LoadJobUnlocked(job_id)
-
-          # a failure in loading the job can cause 'None' to be returned
-          if job is None:
-            continue
-
-          status = job.CalcStatus()
-
-          if status in (constants.JOB_STATUS_QUEUED, ):
-            self._wpool.AddTask(job)
-
-          elif status in (constants.JOB_STATUS_RUNNING,
-                          constants.JOB_STATUS_WAITLOCK,
-                          constants.JOB_STATUS_CANCELING):
-            logging.warning("Unfinished job %s found: %s", job.id, job)
-            try:
-              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
-                                    "Unclean master daemon shutdown")
-            finally:
-              self.UpdateJobUnlocked(job)
-
-        logging.info("Job queue inspection finished")
-      finally:
-        self.release()
+      self._InspectQueue()
     except:
       self._wpool.TerminateWorkers()
       raise
 
     except:
       self._wpool.TerminateWorkers()
       raise
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
+  @_RequireOpenQueue
+  def _InspectQueue(self):
+    """Loads the whole job queue and resumes unfinished jobs.
+
+    This function needs the lock here because WorkerPool.AddTask() may start a
+    job while we're still doing our work.
+
+    """
+    logging.info("Inspecting job queue")
+
+    restartjobs = []
+
+    all_job_ids = self._GetJobIDsUnlocked()
+    jobs_count = len(all_job_ids)
+    lastinfo = time.time()
+    for idx, job_id in enumerate(all_job_ids):
+      # Give an update every 1000 jobs or 10 seconds
+      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
+          idx == (jobs_count - 1)):
+        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
+                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
+        lastinfo = time.time()
+
+      job = self._LoadJobUnlocked(job_id)
+
+      # a failure in loading the job can cause 'None' to be returned
+      if job is None:
+        continue
+
+      status = job.CalcStatus()
+
+      if status == constants.JOB_STATUS_QUEUED:
+        restartjobs.append(job)
+
+      elif status in (constants.JOB_STATUS_RUNNING,
+                      constants.JOB_STATUS_WAITLOCK,
+                      constants.JOB_STATUS_CANCELING):
+        logging.warning("Unfinished job %s found: %s", job.id, job)
+
+        if status == constants.JOB_STATUS_WAITLOCK:
+          # Restart job
+          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
+          restartjobs.append(job)
+        else:
+          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                                "Unclean master daemon shutdown")
+
+        self.UpdateJobUnlocked(job)
+
+    if restartjobs:
+      logging.info("Restarting %s jobs", len(restartjobs))
+      self._EnqueueJobs(restartjobs)
+
+    logging.info("Job queue inspection finished")
+
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def AddNode(self, node):
     """Register a new node with the queue.
   @_RequireOpenQueue
   def AddNode(self, node):
     """Register a new node with the queue.
@@ -710,7 +1340,7 @@ class JobQueue(object):
 
     self._nodes[node_name] = node.primary_ip
 
 
     self._nodes[node_name] = node.primary_ip
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def RemoveNode(self, node_name):
     """Callback called when removing nodes from the cluster.
   @_RequireOpenQueue
   def RemoveNode(self, node_name):
     """Callback called when removing nodes from the cluster.
@@ -719,11 +1349,7 @@ class JobQueue(object):
     @param node_name: the name of the node to remove
 
     """
     @param node_name: the name of the node to remove
 
     """
-    try:
-      # The queue is removed by the "leave node" RPC call.
-      del self._nodes[node_name]
-    except KeyError:
-      pass
+    self._nodes.pop(node_name, None)
 
   @staticmethod
   def _CheckRpcResult(result, nodes, failmsg):
 
   @staticmethod
   def _CheckRpcResult(result, nodes, failmsg):
@@ -765,11 +1391,12 @@ class JobQueue(object):
         names and the second one with the node addresses
 
     """
         names and the second one with the node addresses
 
     """
+    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
     name_list = self._nodes.keys()
     addr_list = [self._nodes[name] for name in name_list]
     return name_list, addr_list
 
     name_list = self._nodes.keys()
     addr_list = [self._nodes[name] for name in name_list]
     return name_list, addr_list
 
-  def _WriteAndReplicateFileUnlocked(self, file_name, data):
+  def _UpdateJobQueueFile(self, file_name, data, replicate):
     """Writes a file locally and then replicates it to all nodes.
 
     This function will replace the contents of a file on the local
     """Writes a file locally and then replicates it to all nodes.
 
     This function will replace the contents of a file on the local
@@ -779,14 +1406,18 @@ class JobQueue(object):
     @param file_name: the path of the file to be replicated
     @type data: str
     @param data: the new contents of the file
     @param file_name: the path of the file to be replicated
     @type data: str
     @param data: the new contents of the file
+    @type replicate: boolean
+    @param replicate: whether to spread the changes to the remote nodes
 
     """
 
     """
-    utils.WriteFile(file_name, data=data)
+    getents = runtime.GetEnts()
+    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
+                    gid=getents.masterd_gid)
 
 
-    names, addrs = self._GetNodeIp()
-    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
-    self._CheckRpcResult(result, self._nodes,
-                         "Updating %s" % file_name)
+    if replicate:
+      names, addrs = self._GetNodeIp()
+      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
+      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
 
   def _RenameFilesUnlocked(self, rename):
     """Renames a file locally and then replicate the change.
 
   def _RenameFilesUnlocked(self, rename):
     """Renames a file locally and then replicate the change.
@@ -856,8 +1487,8 @@ class JobQueue(object):
     serial = self._last_serial + count
 
     # Write to file
     serial = self._last_serial + count
 
     # Write to file
-    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
-                                        "%s\n" % serial)
+    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
+                             "%s\n" % serial, True)
 
     result = [self._FormatJobID(v)
               for v in range(self._last_serial, serial + 1)]
 
     result = [self._FormatJobID(v)
               for v in range(self._last_serial, serial + 1)]
@@ -876,7 +1507,7 @@ class JobQueue(object):
     @return: the path to the job file
 
     """
     @return: the path to the job file
 
     """
-    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
+    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
 
   @classmethod
   def _GetArchivedJobPath(cls, job_id):
 
   @classmethod
   def _GetArchivedJobPath(cls, job_id):
@@ -888,56 +1519,31 @@ class JobQueue(object):
     @return: the path to the archived job file
 
     """
     @return: the path to the archived job file
 
     """
-    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
-    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
-
-  @classmethod
-  def _ExtractJobID(cls, name):
-    """Extract the job id from a filename.
-
-    @type name: str
-    @param name: the job filename
-    @rtype: job id or None
-    @return: the job id corresponding to the given filename,
-        or None if the filename does not represent a valid
-        job file
-
-    """
-    m = cls._RE_JOB_FILE.match(name)
-    if m:
-      return m.group(1)
-    else:
-      return None
+    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
+                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
 
 
-  def _GetJobIDsUnlocked(self, archived=False):
+  def _GetJobIDsUnlocked(self, sort=True):
     """Return all known job IDs.
 
     """Return all known job IDs.
 
-    If the parameter archived is True, archived jobs IDs will be
-    included. Currently this argument is unused.
-
     The method only looks at disk because it's a requirement that all
     jobs are present on disk (so in the _memcache we don't have any
     extra IDs).
 
     The method only looks at disk because it's a requirement that all
     jobs are present on disk (so in the _memcache we don't have any
     extra IDs).
 
+    @type sort: boolean
+    @param sort: perform sorting on the returned job ids
     @rtype: list
     @return: the list of job IDs
 
     """
     @rtype: list
     @return: the list of job IDs
 
     """
-    # pylint: disable-msg=W0613
-    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
-    jlist = utils.NiceSort(jlist)
+    jlist = []
+    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
+      m = self._RE_JOB_FILE.match(filename)
+      if m:
+        jlist.append(m.group(1))
+    if sort:
+      jlist = utils.NiceSort(jlist)
     return jlist
 
     return jlist
 
-  def _ListJobFiles(self):
-    """Returns the list of current job files.
-
-    @rtype: list
-    @return: the list of job file names
-
-    """
-    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
-            if self._RE_JOB_FILE.match(name)]
-
   def _LoadJobUnlocked(self, job_id):
     """Loads a job from the disk or memory.
 
   def _LoadJobUnlocked(self, job_id):
     """Loads a job from the disk or memory.
 
@@ -955,77 +1561,92 @@ class JobQueue(object):
       logging.debug("Found job %s in memcache", job_id)
       return job
 
       logging.debug("Found job %s in memcache", job_id)
       return job
 
-    filepath = self._GetJobPath(job_id)
-    logging.debug("Loading job from %s", filepath)
-    try:
-      raw_data = utils.ReadFile(filepath)
-    except IOError, err:
-      if err.errno in (errno.ENOENT, ):
-        return None
-      raise
-
-    data = serializer.LoadJson(raw_data)
-
     try:
     try:
-      job = _QueuedJob.Restore(self, data)
-    except Exception, err: # pylint: disable-msg=W0703
+      job = self._LoadJobFromDisk(job_id)
+      if job is None:
+        return job
+    except errors.JobFileCorrupted:
+      old_path = self._GetJobPath(job_id)
       new_path = self._GetArchivedJobPath(job_id)
       new_path = self._GetArchivedJobPath(job_id)
-      if filepath == new_path:
+      if old_path == new_path:
         # job already archived (future case)
         logging.exception("Can't parse job %s", job_id)
       else:
         # non-archived case
         logging.exception("Can't parse job %s, will archive.", job_id)
         # job already archived (future case)
         logging.exception("Can't parse job %s", job_id)
       else:
         # non-archived case
         logging.exception("Can't parse job %s, will archive.", job_id)
-        self._RenameFilesUnlocked([(filepath, new_path)])
+        self._RenameFilesUnlocked([(old_path, new_path)])
       return None
 
     self._memcache[job_id] = job
     logging.debug("Added job %s to the cache", job_id)
     return job
 
       return None
 
     self._memcache[job_id] = job
     logging.debug("Added job %s to the cache", job_id)
     return job
 
-  def _GetJobsUnlocked(self, job_ids):
-    """Return a list of jobs based on their IDs.
+  def _LoadJobFromDisk(self, job_id):
+    """Load the given job file from disk.
 
 
-    @type job_ids: list
-    @param job_ids: either an empty list (meaning all jobs),
-        or a list of job IDs
-    @rtype: list
-    @return: the list of job objects
+    Given a job file, read, load and restore it in a _QueuedJob format.
+
+    @type job_id: string
+    @param job_id: job identifier
+    @rtype: L{_QueuedJob} or None
+    @return: either None or the job object
 
     """
 
     """
-    if not job_ids:
-      job_ids = self._GetJobIDsUnlocked()
+    filepath = self._GetJobPath(job_id)
+    logging.debug("Loading job from %s", filepath)
+    try:
+      raw_data = utils.ReadFile(filepath)
+    except EnvironmentError, err:
+      if err.errno in (errno.ENOENT, ):
+        return None
+      raise
+
+    try:
+      data = serializer.LoadJson(raw_data)
+      job = _QueuedJob.Restore(self, data)
+    except Exception, err: # pylint: disable-msg=W0703
+      raise errors.JobFileCorrupted(err)
 
 
-    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
+    return job
 
 
-  @staticmethod
-  def _IsQueueMarkedDrain():
-    """Check if the queue is marked from drain.
+  def SafeLoadJobFromDisk(self, job_id):
+    """Load the given job file from disk.
 
 
-    This currently uses the queue drain file, which makes it a
-    per-node flag. In the future this can be moved to the config file.
+    Given a job file, read, load and restore it in a _QueuedJob format.
+    In case of error reading the job, it gets returned as None, and the
+    exception is logged.
 
 
-    @rtype: boolean
-    @return: True of the job queue is marked for draining
+    @type job_id: string
+    @param job_id: job identifier
+    @rtype: L{_QueuedJob} or None
+    @return: either None or the job object
 
     """
 
     """
-    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
+    try:
+      return self._LoadJobFromDisk(job_id)
+    except (errors.JobFileCorrupted, EnvironmentError):
+      logging.exception("Can't load/parse job %s", job_id)
+      return None
 
 
-  @staticmethod
-  def SetDrainFlag(drain_flag):
-    """Sets the drain flag for the queue.
+  def _UpdateQueueSizeUnlocked(self):
+    """Update the queue size.
 
 
-    This is similar to the function L{backend.JobQueueSetDrainFlag},
-    and in the future we might merge them.
+    """
+    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
+
+  @locking.ssynchronized(_LOCK)
+  @_RequireOpenQueue
+  def SetDrainFlag(self, drain_flag):
+    """Sets the drain flag for the queue.
 
     @type drain_flag: boolean
     @param drain_flag: Whether to set or unset the drain flag
 
     """
 
     @type drain_flag: boolean
     @param drain_flag: Whether to set or unset the drain flag
 
     """
-    if drain_flag:
-      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
-    else:
-      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
+    jstore.SetDrainFlag(drain_flag)
+
+    self._drained = drain_flag
+
     return True
 
   @_RequireOpenQueue
     return True
 
   @_RequireOpenQueue
@@ -1039,39 +1660,41 @@ class JobQueue(object):
     @param job_id: the job ID for the new job
     @type ops: list
     @param ops: The list of OpCodes that will become the new job.
     @param job_id: the job ID for the new job
     @type ops: list
     @param ops: The list of OpCodes that will become the new job.
-    @rtype: job ID
-    @return: the job ID of the newly created job
-    @raise errors.JobQueueDrainError: if the job is marked for draining
+    @rtype: L{_QueuedJob}
+    @return: the job object to be queued
+    @raise errors.JobQueueDrainError: if the job queue is marked for draining
+    @raise errors.JobQueueFull: if the job queue has too many jobs in it
+    @raise errors.GenericError: If an opcode is not valid
 
     """
 
     """
-    if self._IsQueueMarkedDrain():
+    # Ok when sharing the big job queue lock, as the drain file is created when
+    # the lock is exclusive.
+    if self._drained:
       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
 
       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
 
-    # Check job queue size
-    size = len(self._ListJobFiles())
-    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
-      # TODO: Autoarchive jobs. Make sure it's not done on every job
-      # submission, though.
-      #size = ...
-      pass
-
-    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
+    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
       raise errors.JobQueueFull()
 
     job = _QueuedJob(self, job_id, ops)
 
       raise errors.JobQueueFull()
 
     job = _QueuedJob(self, job_id, ops)
 
+    # Check priority
+    for idx, op in enumerate(job.ops):
+      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
+        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
+        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
+                                  " are %s" % (idx, op.priority, allowed))
+
     # Write to disk
     self.UpdateJobUnlocked(job)
 
     # Write to disk
     self.UpdateJobUnlocked(job)
 
+    self._queue_size += 1
+
     logging.debug("Adding new job %s to the cache", job_id)
     self._memcache[job_id] = job
 
     logging.debug("Adding new job %s to the cache", job_id)
     self._memcache[job_id] = job
 
-    # Add to worker pool
-    self._wpool.AddTask(job)
-
-    return job.id
+    return job
 
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def SubmitJob(self, ops):
     """Create and store a new job.
   @_RequireOpenQueue
   def SubmitJob(self, ops):
     """Create and store a new job.
@@ -1080,9 +1703,10 @@ class JobQueue(object):
 
     """
     job_id = self._NewSerialsUnlocked(1)[0]
 
     """
     job_id = self._NewSerialsUnlocked(1)[0]
-    return self._SubmitJobUnlocked(job_id, ops)
+    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
+    return job_id
 
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def SubmitManyJobs(self, jobs):
     """Create and store multiple jobs.
   @_RequireOpenQueue
   def SubmitManyJobs(self, jobs):
     """Create and store multiple jobs.
@@ -1091,20 +1715,34 @@ class JobQueue(object):
 
     """
     results = []
 
     """
     results = []
+    added_jobs = []
     all_job_ids = self._NewSerialsUnlocked(len(jobs))
     for job_id, ops in zip(all_job_ids, jobs):
       try:
     all_job_ids = self._NewSerialsUnlocked(len(jobs))
     for job_id, ops in zip(all_job_ids, jobs):
       try:
-        data = self._SubmitJobUnlocked(job_id, ops)
+        added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
         status = True
         status = True
+        data = job_id
       except errors.GenericError, err:
         data = str(err)
         status = False
       results.append((status, data))
 
       except errors.GenericError, err:
         data = str(err)
         status = False
       results.append((status, data))
 
+    self._EnqueueJobs(added_jobs)
+
     return results
 
     return results
 
+  def _EnqueueJobs(self, jobs):
+    """Helper function to add jobs to worker pool's queue.
+
+    @type jobs: list
+    @param jobs: List of all jobs
+
+    """
+    self._wpool.AddManyTasks([(job, ) for job in jobs],
+                             priority=[job.CalcPriority() for job in jobs])
+
   @_RequireOpenQueue
   @_RequireOpenQueue
-  def UpdateJobUnlocked(self, job):
+  def UpdateJobUnlocked(self, job, replicate=True):
     """Update a job's on disk storage.
 
     After a job has been modified, this function needs to be called in
     """Update a job's on disk storage.
 
     After a job has been modified, this function needs to be called in
@@ -1113,18 +1751,15 @@ class JobQueue(object):
 
     @type job: L{_QueuedJob}
     @param job: the changed job
 
     @type job: L{_QueuedJob}
     @param job: the changed job
+    @type replicate: boolean
+    @param replicate: whether to replicate the change to remote nodes
 
     """
     filename = self._GetJobPath(job.id)
     data = serializer.DumpJson(job.Serialize(), indent=False)
     logging.debug("Writing job %s to %s", job.id, filename)
 
     """
     filename = self._GetJobPath(job.id)
     data = serializer.DumpJson(job.Serialize(), indent=False)
     logging.debug("Writing job %s to %s", job.id, filename)
-    self._WriteAndReplicateFileUnlocked(filename, data)
+    self._UpdateJobQueueFile(filename, data, replicate)
 
 
-    # Notify waiters about potential changes
-    job.change.notifyAll()
-
-  @utils.LockedMethod
-  @_RequireOpenQueue
   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
                         timeout):
     """Waits for changes in a job.
   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
                         timeout):
     """Waits for changes in a job.
@@ -1138,7 +1773,7 @@ class JobQueue(object):
     @type prev_log_serial: int
     @param prev_log_serial: Last job message serial number
     @type timeout: float
     @type prev_log_serial: int
     @param prev_log_serial: Last job message serial number
     @type timeout: float
-    @param timeout: maximum time to wait
+    @param timeout: maximum time to wait in seconds
     @rtype: tuple (job info, log entries)
     @return: a tuple of the job information as required via
         the fields parameter, and the log entries as a list
     @rtype: tuple (job info, log entries)
     @return: a tuple of the job information as required via
         the fields parameter, and the log entries as a list
@@ -1149,46 +1784,14 @@ class JobQueue(object):
         as such by the clients
 
     """
         as such by the clients
 
     """
-    job = self._LoadJobUnlocked(job_id)
-    if not job:
-      logging.debug("Job %s not found", job_id)
-      return None
+    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
 
 
-    def _CheckForChanges():
-      logging.debug("Waiting for changes in job %s", job_id)
+    helper = _WaitForJobChangesHelper()
 
 
-      status = job.CalcStatus()
-      job_info = self._GetJobInfoUnlocked(job, fields)
-      log_entries = job.GetLogEntries(prev_log_serial)
-
-      # Serializing and deserializing data can cause type changes (e.g. from
-      # tuple to list) or precision loss. We're doing it here so that we get
-      # the same modifications as the data received from the client. Without
-      # this, the comparison afterwards might fail without the data being
-      # significantly different.
-      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
-      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
-
-      # Don't even try to wait if the job is no longer running, there will be
-      # no changes.
-      if (status not in (constants.JOB_STATUS_QUEUED,
-                         constants.JOB_STATUS_RUNNING,
-                         constants.JOB_STATUS_WAITLOCK) or
-          prev_job_info != job_info or
-          (log_entries and prev_log_serial != log_entries[0][0])):
-        logging.debug("Job %s changed", job_id)
-        return (job_info, log_entries)
-
-      raise utils.RetryAgain()
-
-    try:
-      # Setting wait function to release the queue lock while waiting
-      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
-                         wait_fn=job.change.wait)
-    except utils.RetryTimeout:
-      return constants.JOB_NOTCHANGED
+    return helper(self._GetJobPath(job_id), load_fn,
+                  fields, prev_job_info, prev_log_serial, timeout)
 
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def CancelJob(self, job_id):
     """Cancels a job.
   @_RequireOpenQueue
   def CancelJob(self, job_id):
     """Cancels a job.
@@ -1206,36 +1809,13 @@ class JobQueue(object):
       logging.debug("Job %s not found", job_id)
       return (False, "Job %s not found" % job_id)
 
       logging.debug("Job %s not found", job_id)
       return (False, "Job %s not found" % job_id)
 
-    job_status = job.CalcStatus()
-
-    if job_status not in (constants.JOB_STATUS_QUEUED,
-                          constants.JOB_STATUS_WAITLOCK):
-      logging.debug("Job %s is no longer waiting in the queue", job.id)
-      return (False, "Job %s is no longer waiting in the queue" % job.id)
-
-    if job_status == constants.JOB_STATUS_QUEUED:
-      self.CancelJobUnlocked(job)
-      return (True, "Job %s canceled" % job.id)
-
-    elif job_status == constants.JOB_STATUS_WAITLOCK:
-      # The worker will notice the new status and cancel the job
-      try:
-        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
-      finally:
-        self.UpdateJobUnlocked(job)
-      return (True, "Job %s will be canceled" % job.id)
-
-  @_RequireOpenQueue
-  def CancelJobUnlocked(self, job):
-    """Marks a job as canceled.
+    (success, msg) = job.Cancel()
 
 
-    """
-    try:
-      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
-                            "Job canceled by request")
-    finally:
+    if success:
       self.UpdateJobUnlocked(job)
 
       self.UpdateJobUnlocked(job)
 
+    return (success, msg)
+
   @_RequireOpenQueue
   def _ArchiveJobsUnlocked(self, jobs):
     """Archives jobs.
   @_RequireOpenQueue
   def _ArchiveJobsUnlocked(self, jobs):
     """Archives jobs.
@@ -1249,9 +1829,7 @@ class JobQueue(object):
     archive_jobs = []
     rename_files = []
     for job in jobs:
     archive_jobs = []
     rename_files = []
     for job in jobs:
-      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
-                                  constants.JOB_STATUS_SUCCESS,
-                                  constants.JOB_STATUS_ERROR):
+      if job.CalcStatus() not in constants.JOBS_FINALIZED:
         logging.debug("Job %s is not yet done", job.id)
         continue
 
         logging.debug("Job %s is not yet done", job.id)
         continue
 
@@ -1267,9 +1845,14 @@ class JobQueue(object):
     logging.debug("Successfully archived job(s) %s",
                   utils.CommaJoin(job.id for job in archive_jobs))
 
     logging.debug("Successfully archived job(s) %s",
                   utils.CommaJoin(job.id for job in archive_jobs))
 
+    # Since we haven't quite checked, above, if we succeeded or failed renaming
+    # the files, we update the cached queue size from the filesystem. When we
+    # get around to fix the TODO: above, we can use the number of actually
+    # archived jobs to fix this.
+    self._UpdateQueueSizeUnlocked()
     return len(archive_jobs)
 
     return len(archive_jobs)
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def ArchiveJob(self, job_id):
     """Archives a job.
   @_RequireOpenQueue
   def ArchiveJob(self, job_id):
     """Archives a job.
@@ -1291,7 +1874,7 @@ class JobQueue(object):
 
     return self._ArchiveJobsUnlocked([job]) == 1
 
 
     return self._ArchiveJobsUnlocked([job]) == 1
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def AutoArchiveJobs(self, age, timeout):
     """Archives all jobs based on age.
   @_RequireOpenQueue
   def AutoArchiveJobs(self, age, timeout):
     """Archives all jobs based on age.
@@ -1312,10 +1895,10 @@ class JobQueue(object):
     archived_count = 0
     last_touched = 0
 
     archived_count = 0
     last_touched = 0
 
-    all_job_ids = self._GetJobIDsUnlocked(archived=False)
+    all_job_ids = self._GetJobIDsUnlocked()
     pending = []
     for idx, job_id in enumerate(all_job_ids):
     pending = []
     for idx, job_id in enumerate(all_job_ids):
-      last_touched = idx
+      last_touched = idx + 1
 
       # Not optimal because jobs could be pending
       # TODO: Measure average duration for job archival and take number of
 
       # Not optimal because jobs could be pending
       # TODO: Measure average duration for job archival and take number of
@@ -1345,62 +1928,11 @@ class JobQueue(object):
     if pending:
       archived_count += self._ArchiveJobsUnlocked(pending)
 
     if pending:
       archived_count += self._ArchiveJobsUnlocked(pending)
 
-    return (archived_count, len(all_job_ids) - last_touched - 1)
-
-  @staticmethod
-  def _GetJobInfoUnlocked(job, fields):
-    """Returns information about a job.
+    return (archived_count, len(all_job_ids) - last_touched)
 
 
-    @type job: L{_QueuedJob}
-    @param job: the job which we query
-    @type fields: list
-    @param fields: names of fields to return
-    @rtype: list
-    @return: list with one element for each field
-    @raise errors.OpExecError: when an invalid field
-        has been passed
-
-    """
-    row = []
-    for fname in fields:
-      if fname == "id":
-        row.append(job.id)
-      elif fname == "status":
-        row.append(job.CalcStatus())
-      elif fname == "ops":
-        row.append([op.input.__getstate__() for op in job.ops])
-      elif fname == "opresult":
-        row.append([op.result for op in job.ops])
-      elif fname == "opstatus":
-        row.append([op.status for op in job.ops])
-      elif fname == "oplog":
-        row.append([op.log for op in job.ops])
-      elif fname == "opstart":
-        row.append([op.start_timestamp for op in job.ops])
-      elif fname == "opend":
-        row.append([op.end_timestamp for op in job.ops])
-      elif fname == "received_ts":
-        row.append(job.received_timestamp)
-      elif fname == "start_ts":
-        row.append(job.start_timestamp)
-      elif fname == "end_ts":
-        row.append(job.end_timestamp)
-      elif fname == "lock_status":
-        row.append(job.lock_status)
-      elif fname == "summary":
-        row.append([op.input.Summary() for op in job.ops])
-      else:
-        raise errors.OpExecError("Invalid job query field '%s'" % fname)
-    return row
-
-  @utils.LockedMethod
-  @_RequireOpenQueue
   def QueryJobs(self, job_ids, fields):
     """Returns a list of jobs in queue.
 
   def QueryJobs(self, job_ids, fields):
     """Returns a list of jobs in queue.
 
-    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
-    processing for each job.
-
     @type job_ids: list
     @param job_ids: sequence of job identifiers or None for all
     @type fields: list
     @type job_ids: list
     @param job_ids: sequence of job identifiers or None for all
     @type fields: list
@@ -1411,16 +1943,23 @@ class JobQueue(object):
 
     """
     jobs = []
 
     """
     jobs = []
+    list_all = False
+    if not job_ids:
+      # Since files are added to/removed from the queue atomically, there's no
+      # risk of getting the job ids in an inconsistent state.
+      job_ids = self._GetJobIDsUnlocked()
+      list_all = True
 
 
-    for job in self._GetJobsUnlocked(job_ids):
-      if job is None:
+    for job_id in job_ids:
+      job = self.SafeLoadJobFromDisk(job_id)
+      if job is not None:
+        jobs.append(job.GetInfo(fields))
+      elif not list_all:
         jobs.append(None)
         jobs.append(None)
-      else:
-        jobs.append(self._GetJobInfoUnlocked(job, fields))
 
     return jobs
 
 
     return jobs
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def Shutdown(self):
     """Stops the job queue.
   @_RequireOpenQueue
   def Shutdown(self):
     """Stops the job queue.
@@ -1430,5 +1969,5 @@ class JobQueue(object):
     """
     self._wpool.TerminateWorkers()
 
     """
     self._wpool.TerminateWorkers()
 
-    self._queue_lock.Close()
-    self._queue_lock = None
+    self._queue_filelock.Close()
+    self._queue_filelock = None