drive_del after device_del in disk hot-remove
[ganeti-local] / lib / jqueue.py
index dbfc49b..9e9d988 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
 #
 #
 
-# Copyright (C) 2006, 2007, 2008 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -29,27 +29,50 @@ used by all other classes in this module.
 
 """
 
 
 """
 
-import os
 import logging
 import logging
-import threading
 import errno
 import errno
-import re
 import time
 import weakref
 import time
 import weakref
+import threading
+import itertools
+import operator
 
 
+try:
+  # pylint: disable=E0611
+  from pyinotify import pyinotify
+except ImportError:
+  import pyinotify
+
+from ganeti import asyncnotifier
 from ganeti import constants
 from ganeti import serializer
 from ganeti import workerpool
 from ganeti import constants
 from ganeti import serializer
 from ganeti import workerpool
+from ganeti import locking
 from ganeti import opcodes
 from ganeti import opcodes
+from ganeti import opcodes_base
 from ganeti import errors
 from ganeti import mcpu
 from ganeti import utils
 from ganeti import jstore
 from ganeti import rpc
 from ganeti import errors
 from ganeti import mcpu
 from ganeti import utils
 from ganeti import jstore
 from ganeti import rpc
+from ganeti import runtime
+from ganeti import netutils
+from ganeti import compat
+from ganeti import ht
+from ganeti import query
+from ganeti import qlang
+from ganeti import pathutils
+from ganeti import vcluster
 
 
 JOBQUEUE_THREADS = 25
 
 
 JOBQUEUE_THREADS = 25
-JOBS_PER_ARCHIVE_DIRECTORY = 10000
+
+# member lock names to be passed to @ssynchronized decorator
+_LOCK = "_lock"
+_QUEUE = "_queue"
+
+#: Retrieves "id" attribute
+_GetIdAttr = operator.attrgetter("id")
 
 
 class CancelJob(Exception):
 
 
 class CancelJob(Exception):
@@ -58,6 +81,12 @@ class CancelJob(Exception):
   """
 
 
   """
 
 
+class QueueShutdown(Exception):
+  """Special exception to abort a job when the job queue is shutting down.
+
+  """
+
+
 def TimeStampNow():
   """Returns the current timestamp.
 
 def TimeStampNow():
   """Returns the current timestamp.
 
@@ -68,8 +97,35 @@ def TimeStampNow():
   return utils.SplitTime(time.time())
 
 
   return utils.SplitTime(time.time())
 
 
+def _CallJqUpdate(runner, names, file_name, content):
+  """Updates job queue file after virtualizing filename.
+
+  """
+  virt_file_name = vcluster.MakeVirtualPath(file_name)
+  return runner.call_jobqueue_update(names, virt_file_name, content)
+
+
+class _SimpleJobQuery:
+  """Wrapper for job queries.
+
+  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
+
+  """
+  def __init__(self, fields):
+    """Initializes this class.
+
+    """
+    self._query = query.Query(query.JOB_FIELDS, fields)
+
+  def __call__(self, job):
+    """Executes a job query using cached field list.
+
+    """
+    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
+
+
 class _QueuedOpCode(object):
 class _QueuedOpCode(object):
-  """Encasulates an opcode object.
+  """Encapsulates an opcode object.
 
   @ivar log: holds the execution log and consists of tuples
   of the form C{(log_serial, timestamp, level, message)}
 
   @ivar log: holds the execution log and consists of tuples
   of the form C{(log_serial, timestamp, level, message)}
@@ -77,11 +133,16 @@ class _QueuedOpCode(object):
   @ivar status: the current status
   @ivar result: the result of the LU execution
   @ivar start_timestamp: timestamp for the start of the execution
   @ivar status: the current status
   @ivar result: the result of the LU execution
   @ivar start_timestamp: timestamp for the start of the execution
+  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
   @ivar stop_timestamp: timestamp for the end of the execution
 
   """
   @ivar stop_timestamp: timestamp for the end of the execution
 
   """
+  __slots__ = ["input", "status", "result", "log", "priority",
+               "start_timestamp", "exec_timestamp", "end_timestamp",
+               "__weakref__"]
+
   def __init__(self, op):
   def __init__(self, op):
-    """Constructor for the _QuededOpCode.
+    """Initializes instances of this class.
 
     @type op: L{opcodes.OpCode}
     @param op: the opcode we encapsulate
 
     @type op: L{opcodes.OpCode}
     @param op: the opcode we encapsulate
@@ -92,8 +153,12 @@ class _QueuedOpCode(object):
     self.result = None
     self.log = []
     self.start_timestamp = None
     self.result = None
     self.log = []
     self.start_timestamp = None
+    self.exec_timestamp = None
     self.end_timestamp = None
 
     self.end_timestamp = None
 
+    # Get initial priority (it might change during the lifetime of this opcode)
+    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
+
   @classmethod
   def Restore(cls, state):
     """Restore the _QueuedOpCode from the serialized form.
   @classmethod
   def Restore(cls, state):
     """Restore the _QueuedOpCode from the serialized form.
@@ -110,7 +175,9 @@ class _QueuedOpCode(object):
     obj.result = state["result"]
     obj.log = state["log"]
     obj.start_timestamp = state.get("start_timestamp", None)
     obj.result = state["result"]
     obj.log = state["log"]
     obj.start_timestamp = state.get("start_timestamp", None)
+    obj.exec_timestamp = state.get("exec_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
+    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
     return obj
 
   def Serialize(self):
     return obj
 
   def Serialize(self):
@@ -126,7 +193,9 @@ class _QueuedOpCode(object):
       "result": self.result,
       "log": self.log,
       "start_timestamp": self.start_timestamp,
       "result": self.result,
       "log": self.log,
       "start_timestamp": self.start_timestamp,
+      "exec_timestamp": self.exec_timestamp,
       "end_timestamp": self.end_timestamp,
       "end_timestamp": self.end_timestamp,
+      "priority": self.priority,
       }
 
 
       }
 
 
@@ -141,18 +210,36 @@ class _QueuedJob(object):
   @ivar id: the job ID
   @type ops: list
   @ivar ops: the list of _QueuedOpCode that constitute the job
   @ivar id: the job ID
   @type ops: list
   @ivar ops: the list of _QueuedOpCode that constitute the job
-  @type run_op_index: int
-  @ivar run_op_index: the currently executing opcode, or -1 if
-      we didn't yet start executing
   @type log_serial: int
   @ivar log_serial: holds the index for the next log entry
   @ivar received_timestamp: the timestamp for when the job was received
   @ivar start_timestmap: the timestamp for start of execution
   @ivar end_timestamp: the timestamp for end of execution
   @type log_serial: int
   @ivar log_serial: holds the index for the next log entry
   @ivar received_timestamp: the timestamp for when the job was received
   @ivar start_timestmap: the timestamp for start of execution
   @ivar end_timestamp: the timestamp for end of execution
-  @ivar change: a Condition variable we use for waiting for job changes
+  @ivar writable: Whether the job is allowed to be modified
 
   """
 
   """
-  def __init__(self, queue, job_id, ops):
+  # pylint: disable=W0212
+  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
+               "received_timestamp", "start_timestamp", "end_timestamp",
+               "__weakref__", "processor_lock", "writable", "archived"]
+
+  def _AddReasons(self):
+    """Extend the reason trail
+
+    Add the reason for all the opcodes of this job to be executed.
+
+    """
+    count = 0
+    for queued_op in self.ops:
+      op = queued_op.input
+      reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__)
+      reason_text = "job=%d;index=%d" % (self.id, count)
+      reason = getattr(op, "reason", [])
+      reason.append((reason_src, reason_text, utils.EpochNano()))
+      op.reason = reason
+      count = count + 1
+
+  def __init__(self, queue, job_id, ops, writable):
     """Constructor for the _QueuedJob.
 
     @type queue: L{JobQueue}
     """Constructor for the _QueuedJob.
 
     @type queue: L{JobQueue}
@@ -162,43 +249,72 @@ class _QueuedJob(object):
     @type ops: list
     @param ops: the list of opcodes we hold, which will be encapsulated
         in _QueuedOpCodes
     @type ops: list
     @param ops: the list of opcodes we hold, which will be encapsulated
         in _QueuedOpCodes
+    @type writable: bool
+    @param writable: Whether job can be modified
 
     """
     if not ops:
 
     """
     if not ops:
-      # TODO: use a better exception
-      raise Exception("No opcodes")
+      raise errors.GenericError("A job needs at least one opcode")
 
     self.queue = queue
 
     self.queue = queue
-    self.id = job_id
+    self.id = int(job_id)
     self.ops = [_QueuedOpCode(op) for op in ops]
     self.ops = [_QueuedOpCode(op) for op in ops]
-    self.run_op_index = -1
+    self._AddReasons()
     self.log_serial = 0
     self.received_timestamp = TimeStampNow()
     self.start_timestamp = None
     self.end_timestamp = None
     self.log_serial = 0
     self.received_timestamp = TimeStampNow()
     self.start_timestamp = None
     self.end_timestamp = None
+    self.archived = False
+
+    self._InitInMemory(self, writable)
+
+    assert not self.archived, "New jobs can not be marked as archived"
+
+  @staticmethod
+  def _InitInMemory(obj, writable):
+    """Initializes in-memory variables.
+
+    """
+    obj.writable = writable
+    obj.ops_iter = None
+    obj.cur_opctx = None
+
+    # Read-only jobs are not processed and therefore don't need a lock
+    if writable:
+      obj.processor_lock = threading.Lock()
+    else:
+      obj.processor_lock = None
+
+  def __repr__(self):
+    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
+              "id=%s" % self.id,
+              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
 
 
-    # Condition to wait for changes
-    self.change = threading.Condition(self.queue._lock)
+    return "<%s at %#x>" % (" ".join(status), id(self))
 
   @classmethod
 
   @classmethod
-  def Restore(cls, queue, state):
+  def Restore(cls, queue, state, writable, archived):
     """Restore a _QueuedJob from serialized state:
 
     @type queue: L{JobQueue}
     @param queue: to which queue the restored job belongs
     @type state: dict
     @param state: the serialized state
     """Restore a _QueuedJob from serialized state:
 
     @type queue: L{JobQueue}
     @param queue: to which queue the restored job belongs
     @type state: dict
     @param state: the serialized state
+    @type writable: bool
+    @param writable: Whether job can be modified
+    @type archived: bool
+    @param archived: Whether job was already archived
     @rtype: _JobQueue
     @return: the restored _JobQueue instance
 
     """
     obj = _QueuedJob.__new__(cls)
     obj.queue = queue
     @rtype: _JobQueue
     @return: the restored _JobQueue instance
 
     """
     obj = _QueuedJob.__new__(cls)
     obj.queue = queue
-    obj.id = state["id"]
-    obj.run_op_index = state["run_op_index"]
+    obj.id = int(state["id"])
     obj.received_timestamp = state.get("received_timestamp", None)
     obj.start_timestamp = state.get("start_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
     obj.received_timestamp = state.get("received_timestamp", None)
     obj.start_timestamp = state.get("start_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
+    obj.archived = archived
 
     obj.ops = []
     obj.log_serial = 0
 
     obj.ops = []
     obj.log_serial = 0
@@ -208,8 +324,7 @@ class _QueuedJob(object):
         obj.log_serial = max(obj.log_serial, log_entry[0])
       obj.ops.append(op)
 
         obj.log_serial = max(obj.log_serial, log_entry[0])
       obj.ops.append(op)
 
-    # Condition to wait for changes
-    obj.change = threading.Condition(obj.queue._lock)
+    cls._InitInMemory(obj, writable)
 
     return obj
 
 
     return obj
 
@@ -223,7 +338,6 @@ class _QueuedJob(object):
     return {
       "id": self.id,
       "ops": [op.Serialize() for op in self.ops],
     return {
       "id": self.id,
       "ops": [op.Serialize() for op in self.ops],
-      "run_op_index": self.run_op_index,
       "start_timestamp": self.start_timestamp,
       "end_timestamp": self.end_timestamp,
       "received_timestamp": self.received_timestamp,
       "start_timestamp": self.start_timestamp,
       "end_timestamp": self.end_timestamp,
       "received_timestamp": self.received_timestamp,
@@ -262,8 +376,8 @@ class _QueuedJob(object):
 
       if op.status == constants.OP_STATUS_QUEUED:
         pass
 
       if op.status == constants.OP_STATUS_QUEUED:
         pass
-      elif op.status == constants.OP_STATUS_WAITLOCK:
-        status = constants.JOB_STATUS_WAITLOCK
+      elif op.status == constants.OP_STATUS_WAITING:
+        status = constants.JOB_STATUS_WAITING
       elif op.status == constants.OP_STATUS_RUNNING:
         status = constants.JOB_STATUS_RUNNING
       elif op.status == constants.OP_STATUS_CANCELING:
       elif op.status == constants.OP_STATUS_RUNNING:
         status = constants.JOB_STATUS_RUNNING
       elif op.status == constants.OP_STATUS_CANCELING:
@@ -282,11 +396,29 @@ class _QueuedJob(object):
 
     return status
 
 
     return status
 
+  def CalcPriority(self):
+    """Gets the current priority for this job.
+
+    Only unfinished opcodes are considered. When all are done, the default
+    priority is used.
+
+    @rtype: int
+
+    """
+    priorities = [op.priority for op in self.ops
+                  if op.status not in constants.OPS_FINALIZED]
+
+    if not priorities:
+      # All opcodes are done, assume default priority
+      return constants.OP_PRIO_DEFAULT
+
+    return min(priorities)
+
   def GetLogEntries(self, newer_than):
     """Selectively returns the log entries.
 
     @type newer_than: None or int
   def GetLogEntries(self, newer_than):
     """Selectively returns the log entries.
 
     @type newer_than: None or int
-    @param newer_than: if this is None, return all log enties,
+    @param newer_than: if this is None, return all log entries,
         otherwise return only the log entries with serial higher
         than this value
     @rtype: list
         otherwise return only the log entries with serial higher
         than this value
     @rtype: list
@@ -304,547 +436,1599 @@ class _QueuedJob(object):
 
     return entries
 
 
     return entries
 
+  def GetInfo(self, fields):
+    """Returns information about a job.
 
 
-class _JobQueueWorker(workerpool.BaseWorker):
-  """The actual job workers.
+    @type fields: list
+    @param fields: names of fields to return
+    @rtype: list
+    @return: list with one element for each field
+    @raise errors.OpExecError: when an invalid field
+        has been passed
 
 
-  """
-  def _NotifyStart(self):
-    """Mark the opcode as running, not lock-waiting.
+    """
+    return _SimpleJobQuery(fields)(self)
 
 
-    This is called from the mcpu code as a notifier function, when the
-    LU is finally about to start the Exec() method. Of course, to have
-    end-user visible results, the opcode must be initially (before
-    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
+  def MarkUnfinishedOps(self, status, result):
+    """Mark unfinished opcodes with a given status and result.
 
 
-    """
-    assert self.queue, "Queue attribute is missing"
-    assert self.opcode, "Opcode attribute is missing"
+    This is an utility function for marking all running or waiting to
+    be run opcodes with a given status. Opcodes which are already
+    finalised are not changed.
 
 
-    self.queue.acquire()
-    try:
-      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
-                                    constants.OP_STATUS_CANCELING)
+    @param status: a given opcode status
+    @param result: the opcode result
 
 
-      # Cancel here if we were asked to
-      if self.opcode.status == constants.OP_STATUS_CANCELING:
-        raise CancelJob()
+    """
+    not_marked = True
+    for op in self.ops:
+      if op.status in constants.OPS_FINALIZED:
+        assert not_marked, "Finalized opcodes found after non-finalized ones"
+        continue
+      op.status = status
+      op.result = result
+      not_marked = False
 
 
-      self.opcode.status = constants.OP_STATUS_RUNNING
-    finally:
-      self.queue.release()
+  def Finalize(self):
+    """Marks the job as finalized.
 
 
-  def RunTask(self, job):
-    """Job executor.
+    """
+    self.end_timestamp = TimeStampNow()
 
 
-    This functions processes a job. It is closely tied to the _QueuedJob and
-    _QueuedOpCode classes.
+  def Cancel(self):
+    """Marks job as canceled/-ing if possible.
 
 
-    @type job: L{_QueuedJob}
-    @param job: the job to be processed
+    @rtype: tuple; (bool, string)
+    @return: Boolean describing whether job was successfully canceled or marked
+      as canceling and a text message
 
     """
 
     """
-    logging.debug("Worker %s processing job %s",
-                  self.worker_id, job.id)
-    proc = mcpu.Processor(self.pool.queue.context)
-    self.queue = queue = job.queue
-    try:
-      try:
-        count = len(job.ops)
-        for idx, op in enumerate(job.ops):
-          try:
-            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
-
-            queue.acquire()
-            try:
-              assert op.status == constants.OP_STATUS_QUEUED
-              job.run_op_index = idx
-              op.status = constants.OP_STATUS_WAITLOCK
-              op.result = None
-              op.start_timestamp = TimeStampNow()
-              if idx == 0: # first opcode
-                job.start_timestamp = op.start_timestamp
-              queue.UpdateJobUnlocked(job)
-
-              input_opcode = op.input
-            finally:
-              queue.release()
-
-            def _Log(*args):
-              """Append a log entry.
-
-              """
-              assert len(args) < 3
-
-              if len(args) == 1:
-                log_type = constants.ELOG_MESSAGE
-                log_msg = args[0]
-              else:
-                log_type, log_msg = args
-
-              # The time is split to make serialization easier and not lose
-              # precision.
-              timestamp = utils.SplitTime(time.time())
-
-              queue.acquire()
-              try:
-                job.log_serial += 1
-                op.log.append((job.log_serial, timestamp, log_type, log_msg))
-
-                job.change.notifyAll()
-              finally:
-                queue.release()
-
-            # Make sure not to hold lock while _Log is called
-            self.opcode = op
-            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
-
-            queue.acquire()
-            try:
-              op.status = constants.OP_STATUS_SUCCESS
-              op.result = result
-              op.end_timestamp = TimeStampNow()
-              queue.UpdateJobUnlocked(job)
-            finally:
-              queue.release()
-
-            logging.debug("Op %s/%s: Successfully finished %s",
-                          idx + 1, count, op)
-          except CancelJob:
-            # Will be handled further up
-            raise
-          except Exception, err:
-            queue.acquire()
-            try:
-              try:
-                op.status = constants.OP_STATUS_ERROR
-                op.result = str(err)
-                op.end_timestamp = TimeStampNow()
-                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
-              finally:
-                queue.UpdateJobUnlocked(job)
-            finally:
-              queue.release()
-            raise
-
-      except CancelJob:
-        queue.acquire()
-        try:
-          queue.CancelJobUnlocked(job)
-        finally:
-          queue.release()
-      except errors.GenericError, err:
-        logging.exception("Ganeti exception")
-      except:
-        logging.exception("Unhandled exception")
-    finally:
-      queue.acquire()
-      try:
-        try:
-          job.run_op_idx = -1
-          job.end_timestamp = TimeStampNow()
-          queue.UpdateJobUnlocked(job)
-        finally:
-          job_id = job.id
-          status = job.CalcStatus()
-      finally:
-        queue.release()
-      logging.debug("Worker %s finished job %s, status = %s",
-                    self.worker_id, job_id, status)
-
+    status = self.CalcStatus()
 
 
-class _JobQueueWorkerPool(workerpool.WorkerPool):
-  """Simple class implementing a job-processing workerpool.
+    if status == constants.JOB_STATUS_QUEUED:
+      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+                             "Job canceled by request")
+      self.Finalize()
+      return (True, "Job %s canceled" % self.id)
 
 
-  """
-  def __init__(self, queue):
-    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
-                                              _JobQueueWorker)
-    self.queue = queue
+    elif status == constants.JOB_STATUS_WAITING:
+      # The worker will notice the new status and cancel the job
+      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
+      return (True, "Job %s will be canceled" % self.id)
 
 
+    else:
+      logging.debug("Job %s is no longer waiting in the queue", self.id)
+      return (False, "Job %s is no longer waiting in the queue" % self.id)
 
 
-class JobQueue(object):
-  """Quue used to manaage the jobs.
+  def ChangePriority(self, priority):
+    """Changes the job priority.
 
 
-  @cvar _RE_JOB_FILE: regex matching the valid job file names
+    @type priority: int
+    @param priority: New priority
+    @rtype: tuple; (bool, string)
+    @return: Boolean describing whether job's priority was successfully changed
+      and a text message
 
 
-  """
-  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
+    """
+    status = self.CalcStatus()
 
 
-  def _RequireOpenQueue(fn):
-    """Decorator for "public" functions.
+    if status in constants.JOBS_FINALIZED:
+      return (False, "Job %s is finished" % self.id)
+    elif status == constants.JOB_STATUS_CANCELING:
+      return (False, "Job %s is cancelling" % self.id)
+    else:
+      assert status in (constants.JOB_STATUS_QUEUED,
+                        constants.JOB_STATUS_WAITING,
+                        constants.JOB_STATUS_RUNNING)
+
+      changed = False
+      for op in self.ops:
+        if (op.status == constants.OP_STATUS_RUNNING or
+            op.status in constants.OPS_FINALIZED):
+          assert not changed, \
+            ("Found opcode for which priority should not be changed after"
+             " priority has been changed for previous opcodes")
+          continue
+
+        assert op.status in (constants.OP_STATUS_QUEUED,
+                             constants.OP_STATUS_WAITING)
+
+        changed = True
+
+        # Set new priority (doesn't modify opcode input)
+        op.priority = priority
+
+      if changed:
+        return (True, ("Priorities of pending opcodes for job %s have been"
+                       " changed to %s" % (self.id, priority)))
+      else:
+        return (False, "Job %s had no pending opcodes" % self.id)
 
 
-    This function should be used for all 'public' functions. That is,
-    functions usually called from other classes.
 
 
-    @warning: Use this decorator only after utils.LockedMethod!
+class _OpExecCallbacks(mcpu.OpExecCbBase):
+  def __init__(self, queue, job, op):
+    """Initializes this class.
 
 
-    Example::
-      @utils.LockedMethod
-      @_RequireOpenQueue
-      def Example(self):
-        pass
+    @type queue: L{JobQueue}
+    @param queue: Job queue
+    @type job: L{_QueuedJob}
+    @param job: Job object
+    @type op: L{_QueuedOpCode}
+    @param op: OpCode
 
     """
 
     """
-    def wrapper(self, *args, **kwargs):
-      assert self._queue_lock is not None, "Queue should be open"
-      return fn(self, *args, **kwargs)
-    return wrapper
-
-  def __init__(self, context):
-    """Constructor for JobQueue.
+    assert queue, "Queue is missing"
+    assert job, "Job is missing"
+    assert op, "Opcode is missing"
 
 
-    The constructor will initialize the job queue object and then
-    start loading the current jobs from disk, either for starting them
-    (if they were queue) or for aborting them (if they were already
-    running).
+    self._queue = queue
+    self._job = job
+    self._op = op
 
 
-    @type context: GanetiContext
-    @param context: the context object for access to the configuration
-        data and other ganeti objects
+  def _CheckCancel(self):
+    """Raises an exception to cancel the job if asked to.
 
     """
 
     """
-    self.context = context
-    self._memcache = weakref.WeakValueDictionary()
-    self._my_hostname = utils.HostInfo().name
-
-    # Locking
-    self._lock = threading.Lock()
-    self.acquire = self._lock.acquire
-    self.release = self._lock.release
+    # Cancel here if we were asked to
+    if self._op.status == constants.OP_STATUS_CANCELING:
+      logging.debug("Canceling opcode")
+      raise CancelJob()
+
+    # See if queue is shutting down
+    if not self._queue.AcceptingJobsUnlocked():
+      logging.debug("Queue is shutting down")
+      raise QueueShutdown()
+
+  @locking.ssynchronized(_QUEUE, shared=1)
+  def NotifyStart(self):
+    """Mark the opcode as running, not lock-waiting.
 
 
-    # Initialize
-    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
+    This is called from the mcpu code as a notifier function, when the LU is
+    finally about to start the Exec() method. Of course, to have end-user
+    visible results, the opcode must be initially (before calling into
+    Processor.ExecOpCode) set to OP_STATUS_WAITING.
 
 
-    # Read serial file
-    self._last_serial = jstore.ReadSerial()
-    assert self._last_serial is not None, ("Serial file was modified between"
-                                           " check in jstore and here")
+    """
+    assert self._op in self._job.ops
+    assert self._op.status in (constants.OP_STATUS_WAITING,
+                               constants.OP_STATUS_CANCELING)
 
 
-    # Get initial list of nodes
-    self._nodes = dict((n.name, n.primary_ip)
-                       for n in self.context.cfg.GetAllNodesInfo().values()
-                       if n.master_candidate)
+    # Cancel here if we were asked to
+    self._CheckCancel()
 
 
-    # Remove master node
-    try:
-      del self._nodes[self._my_hostname]
-    except KeyError:
-      pass
+    logging.debug("Opcode is now running")
 
 
-    # TODO: Check consistency across nodes
+    self._op.status = constants.OP_STATUS_RUNNING
+    self._op.exec_timestamp = TimeStampNow()
 
 
-    # Setup worker pool
-    self._wpool = _JobQueueWorkerPool(self)
-    try:
-      # We need to lock here because WorkerPool.AddTask() may start a job while
-      # we're still doing our work.
-      self.acquire()
-      try:
-        logging.info("Inspecting job queue")
+    # And finally replicate the job status
+    self._queue.UpdateJobUnlocked(self._job)
 
 
-        all_job_ids = self._GetJobIDsUnlocked()
-        jobs_count = len(all_job_ids)
-        lastinfo = time.time()
-        for idx, job_id in enumerate(all_job_ids):
-          # Give an update every 1000 jobs or 10 seconds
-          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
-              idx == (jobs_count - 1)):
-            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
-                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
-            lastinfo = time.time()
-
-          job = self._LoadJobUnlocked(job_id)
-
-          # a failure in loading the job can cause 'None' to be returned
-          if job is None:
-            continue
-
-          status = job.CalcStatus()
-
-          if status in (constants.JOB_STATUS_QUEUED, ):
-            self._wpool.AddTask(job)
-
-          elif status in (constants.JOB_STATUS_RUNNING,
-                          constants.JOB_STATUS_WAITLOCK,
-                          constants.JOB_STATUS_CANCELING):
-            logging.warning("Unfinished job %s found: %s", job.id, job)
-            try:
-              for op in job.ops:
-                op.status = constants.OP_STATUS_ERROR
-                op.result = "Unclean master daemon shutdown"
-            finally:
-              self.UpdateJobUnlocked(job)
-
-        logging.info("Job queue inspection finished")
-      finally:
-        self.release()
-    except:
-      self._wpool.TerminateWorkers()
-      raise
+  @locking.ssynchronized(_QUEUE, shared=1)
+  def _AppendFeedback(self, timestamp, log_type, log_msg):
+    """Internal feedback append function, with locks
 
 
-  @utils.LockedMethod
-  @_RequireOpenQueue
-  def AddNode(self, node):
-    """Register a new node with the queue.
+    """
+    self._job.log_serial += 1
+    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
+    self._queue.UpdateJobUnlocked(self._job, replicate=False)
 
 
-    @type node: L{objects.Node}
-    @param node: the node object to be added
+  def Feedback(self, *args):
+    """Append a log entry.
 
     """
 
     """
-    node_name = node.name
-    assert node_name != self._my_hostname
-
-    # Clean queue directory on added node
-    rpc.RpcRunner.call_jobqueue_purge(node_name)
+    assert len(args) < 3
 
 
-    if not node.master_candidate:
-      # remove if existing, ignoring errors
-      self._nodes.pop(node_name, None)
-      # and skip the replication of the job ids
-      return
+    if len(args) == 1:
+      log_type = constants.ELOG_MESSAGE
+      log_msg = args[0]
+    else:
+      (log_type, log_msg) = args
 
 
-    # Upload the whole queue excluding archived jobs
-    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
+    # The time is split to make serialization easier and not lose
+    # precision.
+    timestamp = utils.SplitTime(time.time())
+    self._AppendFeedback(timestamp, log_type, log_msg)
 
 
-    # Upload current serial file
-    files.append(constants.JOB_QUEUE_SERIAL_FILE)
+  def CurrentPriority(self):
+    """Returns current priority for opcode.
 
 
-    for file_name in files:
-      # Read file content
-      fd = open(file_name, "r")
-      try:
-        content = fd.read()
-      finally:
-        fd.close()
+    """
+    assert self._op.status in (constants.OP_STATUS_WAITING,
+                               constants.OP_STATUS_CANCELING)
 
 
-      result = rpc.RpcRunner.call_jobqueue_update([node_name],
-                                                  [node.primary_ip],
-                                                  file_name, content)
-      if not result[node_name]:
-        logging.error("Failed to upload %s to %s", file_name, node_name)
+    # Cancel here if we were asked to
+    self._CheckCancel()
 
 
-    self._nodes[node_name] = node.primary_ip
+    return self._op.priority
 
 
-  @utils.LockedMethod
-  @_RequireOpenQueue
-  def RemoveNode(self, node_name):
-    """Callback called when removing nodes from the cluster.
+  def SubmitManyJobs(self, jobs):
+    """Submits jobs for processing.
 
 
-    @type node_name: str
-    @param node_name: the name of the node to remove
+    See L{JobQueue.SubmitManyJobs}.
 
     """
 
     """
-    try:
-      # The queue is removed by the "leave node" RPC call.
-      del self._nodes[node_name]
-    except KeyError:
-      pass
+    # Locking is done in job queue
+    return self._queue.SubmitManyJobs(jobs)
 
 
-  def _CheckRpcResult(self, result, nodes, failmsg):
-    """Verifies the status of an RPC call.
 
 
-    Since we aim to keep consistency should this node (the current
-    master) fail, we will log errors if our rpc fail, and especially
-    log the case when more than half of the nodes failes.
+class _JobChangesChecker(object):
+  def __init__(self, fields, prev_job_info, prev_log_serial):
+    """Initializes this class.
 
 
-    @param result: the data as returned from the rpc call
-    @type nodes: list
-    @param nodes: the list of nodes we made the call to
-    @type failmsg: str
-    @param failmsg: the identifier to be used for logging
+    @type fields: list of strings
+    @param fields: Fields requested by LUXI client
+    @type prev_job_info: string
+    @param prev_job_info: previous job info, as passed by the LUXI client
+    @type prev_log_serial: string
+    @param prev_log_serial: previous job serial, as passed by the LUXI client
 
     """
 
     """
-    failed = []
-    success = []
+    self._squery = _SimpleJobQuery(fields)
+    self._prev_job_info = prev_job_info
+    self._prev_log_serial = prev_log_serial
 
 
-    for node in nodes:
-      if result[node]:
-        success.append(node)
-      else:
-        failed.append(node)
-
-    if failed:
-      logging.error("%s failed on %s", failmsg, ", ".join(failed))
-
-    # +1 for the master node
-    if (len(success) + 1) < len(failed):
-      # TODO: Handle failing nodes
-      logging.error("More than half of the nodes failed")
-
-  def _GetNodeIp(self):
-    """Helper for returning the node name/ip list.
+  def __call__(self, job):
+    """Checks whether job has changed.
 
 
-    @rtype: (list, list)
-    @return: a tuple of two lists, the first one with the node
-        names and the second one with the node addresses
+    @type job: L{_QueuedJob}
+    @param job: Job object
 
     """
 
     """
-    name_list = self._nodes.keys()
-    addr_list = [self._nodes[name] for name in name_list]
-    return name_list, addr_list
-
-  def _WriteAndReplicateFileUnlocked(self, file_name, data):
-    """Writes a file locally and then replicates it to all nodes.
+    assert not job.writable, "Expected read-only job"
+
+    status = job.CalcStatus()
+    job_info = self._squery(job)
+    log_entries = job.GetLogEntries(self._prev_log_serial)
+
+    # Serializing and deserializing data can cause type changes (e.g. from
+    # tuple to list) or precision loss. We're doing it here so that we get
+    # the same modifications as the data received from the client. Without
+    # this, the comparison afterwards might fail without the data being
+    # significantly different.
+    # TODO: we just deserialized from disk, investigate how to make sure that
+    # the job info and log entries are compatible to avoid this further step.
+    # TODO: Doing something like in testutils.py:UnifyValueType might be more
+    # efficient, though floats will be tricky
+    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
+    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
+
+    # Don't even try to wait if the job is no longer running, there will be
+    # no changes.
+    if (status not in (constants.JOB_STATUS_QUEUED,
+                       constants.JOB_STATUS_RUNNING,
+                       constants.JOB_STATUS_WAITING) or
+        job_info != self._prev_job_info or
+        (log_entries and self._prev_log_serial != log_entries[0][0])):
+      logging.debug("Job %s changed", job.id)
+      return (job_info, log_entries)
+
+    return None
+
+
+class _JobFileChangesWaiter(object):
+  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
+    """Initializes this class.
+
+    @type filename: string
+    @param filename: Path to job file
+    @raises errors.InotifyError: if the notifier cannot be setup
 
 
-    This function will replace the contents of a file on the local
-    node and then replicate it to all the other nodes we have.
+    """
+    self._wm = _inotify_wm_cls()
+    self._inotify_handler = \
+      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
+    self._notifier = \
+      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
+    try:
+      self._inotify_handler.enable()
+    except Exception:
+      # pyinotify doesn't close file descriptors automatically
+      self._notifier.stop()
+      raise
 
 
-    @type file_name: str
-    @param file_name: the path of the file to be replicated
-    @type data: str
-    @param data: the new contents of the file
+  def _OnInotify(self, notifier_enabled):
+    """Callback for inotify.
 
     """
 
     """
-    utils.WriteFile(file_name, data=data)
+    if not notifier_enabled:
+      self._inotify_handler.enable()
 
 
-    names, addrs = self._GetNodeIp()
-    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
-    self._CheckRpcResult(result, self._nodes,
-                         "Updating %s" % file_name)
+  def Wait(self, timeout):
+    """Waits for the job file to change.
 
 
-  def _RenameFileUnlocked(self, old, new):
-    """Renames a file locally and then replicate the change.
+    @type timeout: float
+    @param timeout: Timeout in seconds
+    @return: Whether there have been events
 
 
-    This function will rename a file in the local queue directory
-    and then replicate this rename to all the other nodes we have.
+    """
+    assert timeout >= 0
+    have_events = self._notifier.check_events(timeout * 1000)
+    if have_events:
+      self._notifier.read_events()
+    self._notifier.process_events()
+    return have_events
 
 
-    @type old: str
-    @param old: the current name of the file
-    @type new: str
-    @param new: the new name of the file
+  def Close(self):
+    """Closes underlying notifier and its file descriptor.
 
     """
 
     """
-    utils.RenameFile(old, new, mkdir=True)
-
-    names, addrs = self._GetNodeIp()
-    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
-    self._CheckRpcResult(result, self._nodes,
-                         "Moving %s to %s" % (old, new))
+    self._notifier.stop()
 
 
-  def _FormatJobID(self, job_id):
-    """Convert a job ID to string format.
 
 
-    Currently this just does C{str(job_id)} after performing some
-    checks, but if we want to change the job id format this will
-    abstract this change.
+class _JobChangesWaiter(object):
+  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
+    """Initializes this class.
 
 
-    @type job_id: int or long
-    @param job_id: the numeric job id
-    @rtype: str
-    @return: the formatted job id
+    @type filename: string
+    @param filename: Path to job file
 
     """
 
     """
-    if not isinstance(job_id, (int, long)):
-      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
-    if job_id < 0:
-      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
-
-    return str(job_id)
+    self._filewaiter = None
+    self._filename = filename
+    self._waiter_cls = _waiter_cls
 
 
-  @classmethod
-  def _GetArchiveDirectory(cls, job_id):
-    """Returns the archive directory for a job.
+  def Wait(self, timeout):
+    """Waits for a job to change.
 
 
-    @type job_id: str
-    @param job_id: Job identifier
-    @rtype: str
-    @return: Directory name
+    @type timeout: float
+    @param timeout: Timeout in seconds
+    @return: Whether there have been events
 
     """
 
     """
-    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
+    if self._filewaiter:
+      return self._filewaiter.Wait(timeout)
 
 
-  def _NewSerialUnlocked(self):
-    """Generates a new job identifier.
+    # Lazy setup: Avoid inotify setup cost when job file has already changed.
+    # If this point is reached, return immediately and let caller check the job
+    # file again in case there were changes since the last check. This avoids a
+    # race condition.
+    self._filewaiter = self._waiter_cls(self._filename)
 
 
-    Job identifiers are unique during the lifetime of a cluster.
+    return True
 
 
-    @rtype: str
-    @return: a string representing the job identifier.
+  def Close(self):
+    """Closes underlying waiter.
 
     """
 
     """
-    # New number
-    serial = self._last_serial + 1
+    if self._filewaiter:
+      self._filewaiter.Close()
 
 
-    # Write to file
-    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
-                                        "%s\n" % serial)
 
 
-    # Keep it only if we were able to write the file
-    self._last_serial = serial
+class _WaitForJobChangesHelper(object):
+  """Helper class using inotify to wait for changes in a job file.
 
 
-    return self._FormatJobID(serial)
+  This class takes a previous job status and serial, and alerts the client when
+  the current job status has changed.
 
 
+  """
   @staticmethod
   @staticmethod
-  def _GetJobPath(job_id):
-    """Returns the job file for a given job id.
+  def _CheckForChanges(counter, job_load_fn, check_fn):
+    if counter.next() > 0:
+      # If this isn't the first check the job is given some more time to change
+      # again. This gives better performance for jobs generating many
+      # changes/messages.
+      time.sleep(0.1)
+
+    job = job_load_fn()
+    if not job:
+      raise errors.JobLost()
 
 
-    @type job_id: str
-    @param job_id: the job identifier
-    @rtype: str
-    @return: the path to the job file
+    result = check_fn(job)
+    if result is None:
+      raise utils.RetryAgain()
 
 
-    """
-    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
+    return result
 
 
-  @classmethod
-  def _GetArchivedJobPath(cls, job_id):
-    """Returns the archived job file for a give job id.
+  def __call__(self, filename, job_load_fn,
+               fields, prev_job_info, prev_log_serial, timeout,
+               _waiter_cls=_JobChangesWaiter):
+    """Waits for changes on a job.
 
 
-    @type job_id: str
-    @param job_id: the job identifier
-    @rtype: str
-    @return: the path to the archived job file
+    @type filename: string
+    @param filename: File on which to wait for changes
+    @type job_load_fn: callable
+    @param job_load_fn: Function to load job
+    @type fields: list of strings
+    @param fields: Which fields to check for changes
+    @type prev_job_info: list or None
+    @param prev_job_info: Last job information returned
+    @type prev_log_serial: int
+    @param prev_log_serial: Last job message serial number
+    @type timeout: float
+    @param timeout: maximum time to wait in seconds
 
     """
 
     """
-    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
-    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
-
-  @classmethod
-  def _ExtractJobID(cls, name):
-    """Extract the job id from a filename.
+    counter = itertools.count()
+    try:
+      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
+      waiter = _waiter_cls(filename)
+      try:
+        return utils.Retry(compat.partial(self._CheckForChanges,
+                                          counter, job_load_fn, check_fn),
+                           utils.RETRY_REMAINING_TIME, timeout,
+                           wait_fn=waiter.Wait)
+      finally:
+        waiter.Close()
+    except errors.JobLost:
+      return None
+    except utils.RetryTimeout:
+      return constants.JOB_NOTCHANGED
+
+
+def _EncodeOpError(err):
+  """Encodes an error which occurred while processing an opcode.
+
+  """
+  if isinstance(err, errors.GenericError):
+    to_encode = err
+  else:
+    to_encode = errors.OpExecError(str(err))
+
+  return errors.EncodeException(to_encode)
+
+
+class _TimeoutStrategyWrapper:
+  def __init__(self, fn):
+    """Initializes this class.
+
+    """
+    self._fn = fn
+    self._next = None
+
+  def _Advance(self):
+    """Gets the next timeout if necessary.
+
+    """
+    if self._next is None:
+      self._next = self._fn()
+
+  def Peek(self):
+    """Returns the next timeout.
+
+    """
+    self._Advance()
+    return self._next
+
+  def Next(self):
+    """Returns the current timeout and advances the internal state.
+
+    """
+    self._Advance()
+    result = self._next
+    self._next = None
+    return result
+
+
+class _OpExecContext:
+  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
+    """Initializes this class.
+
+    """
+    self.op = op
+    self.index = index
+    self.log_prefix = log_prefix
+    self.summary = op.input.Summary()
+
+    # Create local copy to modify
+    if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
+      self.jobdeps = op.input.depends[:]
+    else:
+      self.jobdeps = None
+
+    self._timeout_strategy_factory = timeout_strategy_factory
+    self._ResetTimeoutStrategy()
+
+  def _ResetTimeoutStrategy(self):
+    """Creates a new timeout strategy.
+
+    """
+    self._timeout_strategy = \
+      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
+
+  def CheckPriorityIncrease(self):
+    """Checks whether priority can and should be increased.
+
+    Called when locks couldn't be acquired.
+
+    """
+    op = self.op
+
+    # Exhausted all retries and next round should not use blocking acquire
+    # for locks?
+    if (self._timeout_strategy.Peek() is None and
+        op.priority > constants.OP_PRIO_HIGHEST):
+      logging.debug("Increasing priority")
+      op.priority -= 1
+      self._ResetTimeoutStrategy()
+      return True
+
+    return False
+
+  def GetNextLockTimeout(self):
+    """Returns the next lock acquire timeout.
+
+    """
+    return self._timeout_strategy.Next()
+
+
+class _JobProcessor(object):
+  (DEFER,
+   WAITDEP,
+   FINISHED) = range(1, 4)
+
+  def __init__(self, queue, opexec_fn, job,
+               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
+    """Initializes this class.
+
+    """
+    self.queue = queue
+    self.opexec_fn = opexec_fn
+    self.job = job
+    self._timeout_strategy_factory = _timeout_strategy_factory
+
+  @staticmethod
+  def _FindNextOpcode(job, timeout_strategy_factory):
+    """Locates the next opcode to run.
+
+    @type job: L{_QueuedJob}
+    @param job: Job object
+    @param timeout_strategy_factory: Callable to create new timeout strategy
+
+    """
+    # Create some sort of a cache to speed up locating next opcode for future
+    # lookups
+    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
+    # pending and one for processed ops.
+    if job.ops_iter is None:
+      job.ops_iter = enumerate(job.ops)
+
+    # Find next opcode to run
+    while True:
+      try:
+        (idx, op) = job.ops_iter.next()
+      except StopIteration:
+        raise errors.ProgrammerError("Called for a finished job")
+
+      if op.status == constants.OP_STATUS_RUNNING:
+        # Found an opcode already marked as running
+        raise errors.ProgrammerError("Called for job marked as running")
+
+      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
+                             timeout_strategy_factory)
+
+      if op.status not in constants.OPS_FINALIZED:
+        return opctx
+
+      # This is a job that was partially completed before master daemon
+      # shutdown, so it can be expected that some opcodes are already
+      # completed successfully (if any did error out, then the whole job
+      # should have been aborted and not resubmitted for processing).
+      logging.info("%s: opcode %s already processed, skipping",
+                   opctx.log_prefix, opctx.summary)
+
+  @staticmethod
+  def _MarkWaitlock(job, op):
+    """Marks an opcode as waiting for locks.
+
+    The job's start timestamp is also set if necessary.
+
+    @type job: L{_QueuedJob}
+    @param job: Job object
+    @type op: L{_QueuedOpCode}
+    @param op: Opcode object
+
+    """
+    assert op in job.ops
+    assert op.status in (constants.OP_STATUS_QUEUED,
+                         constants.OP_STATUS_WAITING)
+
+    update = False
+
+    op.result = None
+
+    if op.status == constants.OP_STATUS_QUEUED:
+      op.status = constants.OP_STATUS_WAITING
+      update = True
+
+    if op.start_timestamp is None:
+      op.start_timestamp = TimeStampNow()
+      update = True
+
+    if job.start_timestamp is None:
+      job.start_timestamp = op.start_timestamp
+      update = True
+
+    assert op.status == constants.OP_STATUS_WAITING
+
+    return update
+
+  @staticmethod
+  def _CheckDependencies(queue, job, opctx):
+    """Checks if an opcode has dependencies and if so, processes them.
+
+    @type queue: L{JobQueue}
+    @param queue: Queue object
+    @type job: L{_QueuedJob}
+    @param job: Job object
+    @type opctx: L{_OpExecContext}
+    @param opctx: Opcode execution context
+    @rtype: bool
+    @return: Whether opcode will be re-scheduled by dependency tracker
+
+    """
+    op = opctx.op
+
+    result = False
+
+    while opctx.jobdeps:
+      (dep_job_id, dep_status) = opctx.jobdeps[0]
+
+      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
+                                                          dep_status)
+      assert ht.TNonEmptyString(depmsg), "No dependency message"
+
+      logging.info("%s: %s", opctx.log_prefix, depmsg)
+
+      if depresult == _JobDependencyManager.CONTINUE:
+        # Remove dependency and continue
+        opctx.jobdeps.pop(0)
+
+      elif depresult == _JobDependencyManager.WAIT:
+        # Need to wait for notification, dependency tracker will re-add job
+        # to workerpool
+        result = True
+        break
+
+      elif depresult == _JobDependencyManager.CANCEL:
+        # Job was cancelled, cancel this job as well
+        job.Cancel()
+        assert op.status == constants.OP_STATUS_CANCELING
+        break
+
+      elif depresult in (_JobDependencyManager.WRONGSTATUS,
+                         _JobDependencyManager.ERROR):
+        # Job failed or there was an error, this job must fail
+        op.status = constants.OP_STATUS_ERROR
+        op.result = _EncodeOpError(errors.OpExecError(depmsg))
+        break
+
+      else:
+        raise errors.ProgrammerError("Unknown dependency result '%s'" %
+                                     depresult)
+
+    return result
+
+  def _ExecOpCodeUnlocked(self, opctx):
+    """Processes one opcode and returns the result.
+
+    """
+    op = opctx.op
+
+    assert op.status == constants.OP_STATUS_WAITING
+
+    timeout = opctx.GetNextLockTimeout()
+
+    try:
+      # Make sure not to hold queue lock while calling ExecOpCode
+      result = self.opexec_fn(op.input,
+                              _OpExecCallbacks(self.queue, self.job, op),
+                              timeout=timeout)
+    except mcpu.LockAcquireTimeout:
+      assert timeout is not None, "Received timeout for blocking acquire"
+      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
+
+      assert op.status in (constants.OP_STATUS_WAITING,
+                           constants.OP_STATUS_CANCELING)
+
+      # Was job cancelled while we were waiting for the lock?
+      if op.status == constants.OP_STATUS_CANCELING:
+        return (constants.OP_STATUS_CANCELING, None)
+
+      # Queue is shutting down, return to queued
+      if not self.queue.AcceptingJobsUnlocked():
+        return (constants.OP_STATUS_QUEUED, None)
+
+      # Stay in waitlock while trying to re-acquire lock
+      return (constants.OP_STATUS_WAITING, None)
+    except CancelJob:
+      logging.exception("%s: Canceling job", opctx.log_prefix)
+      assert op.status == constants.OP_STATUS_CANCELING
+      return (constants.OP_STATUS_CANCELING, None)
+
+    except QueueShutdown:
+      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
+
+      assert op.status == constants.OP_STATUS_WAITING
+
+      # Job hadn't been started yet, so it should return to the queue
+      return (constants.OP_STATUS_QUEUED, None)
+
+    except Exception, err: # pylint: disable=W0703
+      logging.exception("%s: Caught exception in %s",
+                        opctx.log_prefix, opctx.summary)
+      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
+    else:
+      logging.debug("%s: %s successful",
+                    opctx.log_prefix, opctx.summary)
+      return (constants.OP_STATUS_SUCCESS, result)
+
+  def __call__(self, _nextop_fn=None):
+    """Continues execution of a job.
+
+    @param _nextop_fn: Callback function for tests
+    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
+      be deferred and C{WAITDEP} if the dependency manager
+      (L{_JobDependencyManager}) will re-schedule the job when appropriate
+
+    """
+    queue = self.queue
+    job = self.job
+
+    logging.debug("Processing job %s", job.id)
+
+    queue.acquire(shared=1)
+    try:
+      opcount = len(job.ops)
+
+      assert job.writable, "Expected writable job"
+
+      # Don't do anything for finalized jobs
+      if job.CalcStatus() in constants.JOBS_FINALIZED:
+        return self.FINISHED
+
+      # Is a previous opcode still pending?
+      if job.cur_opctx:
+        opctx = job.cur_opctx
+        job.cur_opctx = None
+      else:
+        if __debug__ and _nextop_fn:
+          _nextop_fn()
+        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
+
+      op = opctx.op
+
+      # Consistency check
+      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
+                                     constants.OP_STATUS_CANCELING)
+                        for i in job.ops[opctx.index + 1:])
+
+      assert op.status in (constants.OP_STATUS_QUEUED,
+                           constants.OP_STATUS_WAITING,
+                           constants.OP_STATUS_CANCELING)
+
+      assert (op.priority <= constants.OP_PRIO_LOWEST and
+              op.priority >= constants.OP_PRIO_HIGHEST)
+
+      waitjob = None
+
+      if op.status != constants.OP_STATUS_CANCELING:
+        assert op.status in (constants.OP_STATUS_QUEUED,
+                             constants.OP_STATUS_WAITING)
+
+        # Prepare to start opcode
+        if self._MarkWaitlock(job, op):
+          # Write to disk
+          queue.UpdateJobUnlocked(job)
+
+        assert op.status == constants.OP_STATUS_WAITING
+        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
+        assert job.start_timestamp and op.start_timestamp
+        assert waitjob is None
+
+        # Check if waiting for a job is necessary
+        waitjob = self._CheckDependencies(queue, job, opctx)
+
+        assert op.status in (constants.OP_STATUS_WAITING,
+                             constants.OP_STATUS_CANCELING,
+                             constants.OP_STATUS_ERROR)
+
+        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
+                                         constants.OP_STATUS_ERROR)):
+          logging.info("%s: opcode %s waiting for locks",
+                       opctx.log_prefix, opctx.summary)
+
+          assert not opctx.jobdeps, "Not all dependencies were removed"
+
+          queue.release()
+          try:
+            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
+          finally:
+            queue.acquire(shared=1)
+
+          op.status = op_status
+          op.result = op_result
+
+          assert not waitjob
+
+        if op.status in (constants.OP_STATUS_WAITING,
+                         constants.OP_STATUS_QUEUED):
+          # waiting: Couldn't get locks in time
+          # queued: Queue is shutting down
+          assert not op.end_timestamp
+        else:
+          # Finalize opcode
+          op.end_timestamp = TimeStampNow()
+
+          if op.status == constants.OP_STATUS_CANCELING:
+            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
+                                  for i in job.ops[opctx.index:])
+          else:
+            assert op.status in constants.OPS_FINALIZED
+
+      if op.status == constants.OP_STATUS_QUEUED:
+        # Queue is shutting down
+        assert not waitjob
+
+        finalize = False
+
+        # Reset context
+        job.cur_opctx = None
+
+        # In no case must the status be finalized here
+        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
+
+      elif op.status == constants.OP_STATUS_WAITING or waitjob:
+        finalize = False
+
+        if not waitjob and opctx.CheckPriorityIncrease():
+          # Priority was changed, need to update on-disk file
+          queue.UpdateJobUnlocked(job)
+
+        # Keep around for another round
+        job.cur_opctx = opctx
+
+        assert (op.priority <= constants.OP_PRIO_LOWEST and
+                op.priority >= constants.OP_PRIO_HIGHEST)
+
+        # In no case must the status be finalized here
+        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
+
+      else:
+        # Ensure all opcodes so far have been successful
+        assert (opctx.index == 0 or
+                compat.all(i.status == constants.OP_STATUS_SUCCESS
+                           for i in job.ops[:opctx.index]))
+
+        # Reset context
+        job.cur_opctx = None
+
+        if op.status == constants.OP_STATUS_SUCCESS:
+          finalize = False
+
+        elif op.status == constants.OP_STATUS_ERROR:
+          # Ensure failed opcode has an exception as its result
+          assert errors.GetEncodedError(job.ops[opctx.index].result)
+
+          to_encode = errors.OpExecError("Preceding opcode failed")
+          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                                _EncodeOpError(to_encode))
+          finalize = True
+
+          # Consistency check
+          assert compat.all(i.status == constants.OP_STATUS_ERROR and
+                            errors.GetEncodedError(i.result)
+                            for i in job.ops[opctx.index:])
+
+        elif op.status == constants.OP_STATUS_CANCELING:
+          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+                                "Job canceled by request")
+          finalize = True
+
+        else:
+          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
+
+        if opctx.index == (opcount - 1):
+          # Finalize on last opcode
+          finalize = True
+
+        if finalize:
+          # All opcodes have been run, finalize job
+          job.Finalize()
+
+        # Write to disk. If the job status is final, this is the final write
+        # allowed. Once the file has been written, it can be archived anytime.
+        queue.UpdateJobUnlocked(job)
+
+        assert not waitjob
+
+        if finalize:
+          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
+          return self.FINISHED
+
+      assert not waitjob or queue.depmgr.JobWaiting(job)
+
+      if waitjob:
+        return self.WAITDEP
+      else:
+        return self.DEFER
+    finally:
+      assert job.writable, "Job became read-only while being processed"
+      queue.release()
+
+
+def _EvaluateJobProcessorResult(depmgr, job, result):
+  """Looks at a result from L{_JobProcessor} for a job.
+
+  To be used in a L{_JobQueueWorker}.
+
+  """
+  if result == _JobProcessor.FINISHED:
+    # Notify waiting jobs
+    depmgr.NotifyWaiters(job.id)
+
+  elif result == _JobProcessor.DEFER:
+    # Schedule again
+    raise workerpool.DeferTask(priority=job.CalcPriority())
+
+  elif result == _JobProcessor.WAITDEP:
+    # No-op, dependency manager will re-schedule
+    pass
+
+  else:
+    raise errors.ProgrammerError("Job processor returned unknown status %s" %
+                                 (result, ))
+
+
+class _JobQueueWorker(workerpool.BaseWorker):
+  """The actual job workers.
+
+  """
+  def RunTask(self, job): # pylint: disable=W0221
+    """Job executor.
+
+    @type job: L{_QueuedJob}
+    @param job: the job to be processed
+
+    """
+    assert job.writable, "Expected writable job"
+
+    # Ensure only one worker is active on a single job. If a job registers for
+    # a dependency job, and the other job notifies before the first worker is
+    # done, the job can end up in the tasklist more than once.
+    job.processor_lock.acquire()
+    try:
+      return self._RunTaskInner(job)
+    finally:
+      job.processor_lock.release()
+
+  def _RunTaskInner(self, job):
+    """Executes a job.
+
+    Must be called with per-job lock acquired.
+
+    """
+    queue = job.queue
+    assert queue == self.pool.queue
+
+    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
+    setname_fn(None)
+
+    proc = mcpu.Processor(queue.context, job.id)
+
+    # Create wrapper for setting thread name
+    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
+                                    proc.ExecOpCode)
+
+    _EvaluateJobProcessorResult(queue.depmgr, job,
+                                _JobProcessor(queue, wrap_execop_fn, job)())
+
+  @staticmethod
+  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
+    """Updates the worker thread name to include a short summary of the opcode.
+
+    @param setname_fn: Callable setting worker thread name
+    @param execop_fn: Callable for executing opcode (usually
+                      L{mcpu.Processor.ExecOpCode})
+
+    """
+    setname_fn(op)
+    try:
+      return execop_fn(op, *args, **kwargs)
+    finally:
+      setname_fn(None)
+
+  @staticmethod
+  def _GetWorkerName(job, op):
+    """Sets the worker thread name.
+
+    @type job: L{_QueuedJob}
+    @type op: L{opcodes.OpCode}
+
+    """
+    parts = ["Job%s" % job.id]
+
+    if op:
+      parts.append(op.TinySummary())
+
+    return "/".join(parts)
+
+
+class _JobQueueWorkerPool(workerpool.WorkerPool):
+  """Simple class implementing a job-processing workerpool.
+
+  """
+  def __init__(self, queue):
+    super(_JobQueueWorkerPool, self).__init__("Jq",
+                                              JOBQUEUE_THREADS,
+                                              _JobQueueWorker)
+    self.queue = queue
+
+
+class _JobDependencyManager:
+  """Keeps track of job dependencies.
+
+  """
+  (WAIT,
+   ERROR,
+   CANCEL,
+   CONTINUE,
+   WRONGSTATUS) = range(1, 6)
+
+  def __init__(self, getstatus_fn, enqueue_fn):
+    """Initializes this class.
+
+    """
+    self._getstatus_fn = getstatus_fn
+    self._enqueue_fn = enqueue_fn
+
+    self._waiters = {}
+    self._lock = locking.SharedLock("JobDepMgr")
+
+  @locking.ssynchronized(_LOCK, shared=1)
+  def GetLockInfo(self, requested): # pylint: disable=W0613
+    """Retrieves information about waiting jobs.
+
+    @type requested: set
+    @param requested: Requested information, see C{query.LQ_*}
+
+    """
+    # No need to sort here, that's being done by the lock manager and query
+    # library. There are no priorities for notifying jobs, hence all show up as
+    # one item under "pending".
+    return [("job/%s" % job_id, None, None,
+             [("job", [job.id for job in waiters])])
+            for job_id, waiters in self._waiters.items()
+            if waiters]
+
+  @locking.ssynchronized(_LOCK, shared=1)
+  def JobWaiting(self, job):
+    """Checks if a job is waiting.
+
+    """
+    return compat.any(job in jobs
+                      for jobs in self._waiters.values())
+
+  @locking.ssynchronized(_LOCK)
+  def CheckAndRegister(self, job, dep_job_id, dep_status):
+    """Checks if a dependency job has the requested status.
+
+    If the other job is not yet in a finalized status, the calling job will be
+    notified (re-added to the workerpool) at a later point.
+
+    @type job: L{_QueuedJob}
+    @param job: Job object
+    @type dep_job_id: int
+    @param dep_job_id: ID of dependency job
+    @type dep_status: list
+    @param dep_status: Required status
+
+    """
+    assert ht.TJobId(job.id)
+    assert ht.TJobId(dep_job_id)
+    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
+
+    if job.id == dep_job_id:
+      return (self.ERROR, "Job can't depend on itself")
+
+    # Get status of dependency job
+    try:
+      status = self._getstatus_fn(dep_job_id)
+    except errors.JobLost, err:
+      return (self.ERROR, "Dependency error: %s" % err)
+
+    assert status in constants.JOB_STATUS_ALL
+
+    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
+
+    if status not in constants.JOBS_FINALIZED:
+      # Register for notification and wait for job to finish
+      job_id_waiters.add(job)
+      return (self.WAIT,
+              "Need to wait for job %s, wanted status '%s'" %
+              (dep_job_id, dep_status))
+
+    # Remove from waiters list
+    if job in job_id_waiters:
+      job_id_waiters.remove(job)
+
+    if (status == constants.JOB_STATUS_CANCELED and
+        constants.JOB_STATUS_CANCELED not in dep_status):
+      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
+
+    elif not dep_status or status in dep_status:
+      return (self.CONTINUE,
+              "Dependency job %s finished with status '%s'" %
+              (dep_job_id, status))
+
+    else:
+      return (self.WRONGSTATUS,
+              "Dependency job %s finished with status '%s',"
+              " not one of '%s' as required" %
+              (dep_job_id, status, utils.CommaJoin(dep_status)))
+
+  def _RemoveEmptyWaitersUnlocked(self):
+    """Remove all jobs without actual waiters.
+
+    """
+    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
+                   if not waiters]:
+      del self._waiters[job_id]
+
+  def NotifyWaiters(self, job_id):
+    """Notifies all jobs waiting for a certain job ID.
+
+    @attention: Do not call until L{CheckAndRegister} returned a status other
+      than C{WAITDEP} for C{job_id}, or behaviour is undefined
+    @type job_id: int
+    @param job_id: Job ID
+
+    """
+    assert ht.TJobId(job_id)
+
+    self._lock.acquire()
+    try:
+      self._RemoveEmptyWaitersUnlocked()
+
+      jobs = self._waiters.pop(job_id, None)
+    finally:
+      self._lock.release()
+
+    if jobs:
+      # Re-add jobs to workerpool
+      logging.debug("Re-adding %s jobs which were waiting for job %s",
+                    len(jobs), job_id)
+      self._enqueue_fn(jobs)
+
+
+def _RequireOpenQueue(fn):
+  """Decorator for "public" functions.
+
+  This function should be used for all 'public' functions. That is,
+  functions usually called from other classes. Note that this should
+  be applied only to methods (not plain functions), since it expects
+  that the decorated function is called with a first argument that has
+  a '_queue_filelock' argument.
+
+  @warning: Use this decorator only after locking.ssynchronized
+
+  Example::
+    @locking.ssynchronized(_LOCK)
+    @_RequireOpenQueue
+    def Example(self):
+      pass
+
+  """
+  def wrapper(self, *args, **kwargs):
+    # pylint: disable=W0212
+    assert self._queue_filelock is not None, "Queue should be open"
+    return fn(self, *args, **kwargs)
+  return wrapper
+
+
+def _RequireNonDrainedQueue(fn):
+  """Decorator checking for a non-drained queue.
+
+  To be used with functions submitting new jobs.
+
+  """
+  def wrapper(self, *args, **kwargs):
+    """Wrapper function.
+
+    @raise errors.JobQueueDrainError: if the job queue is marked for draining
+
+    """
+    # Ok when sharing the big job queue lock, as the drain file is created when
+    # the lock is exclusive.
+    # Needs access to protected member, pylint: disable=W0212
+    if self._drained:
+      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
+
+    if not self._accepting_jobs:
+      raise errors.JobQueueError("Job queue is shutting down, refusing job")
+
+    return fn(self, *args, **kwargs)
+  return wrapper
+
+
+class JobQueue(object):
+  """Queue used to manage the jobs.
+
+  """
+  def __init__(self, context):
+    """Constructor for JobQueue.
+
+    The constructor will initialize the job queue object and then
+    start loading the current jobs from disk, either for starting them
+    (if they were queue) or for aborting them (if they were already
+    running).
+
+    @type context: GanetiContext
+    @param context: the context object for access to the configuration
+        data and other ganeti objects
+
+    """
+    self.context = context
+    self._memcache = weakref.WeakValueDictionary()
+    self._my_hostname = netutils.Hostname.GetSysName()
+
+    # The Big JobQueue lock. If a code block or method acquires it in shared
+    # mode safe it must guarantee concurrency with all the code acquiring it in
+    # shared mode, including itself. In order not to acquire it at all
+    # concurrency must be guaranteed with all code acquiring it in shared mode
+    # and all code acquiring it exclusively.
+    self._lock = locking.SharedLock("JobQueue")
+
+    self.acquire = self._lock.acquire
+    self.release = self._lock.release
+
+    # Accept jobs by default
+    self._accepting_jobs = True
+
+    # Initialize the queue, and acquire the filelock.
+    # This ensures no other process is working on the job queue.
+    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
+
+    # Read serial file
+    self._last_serial = jstore.ReadSerial()
+    assert self._last_serial is not None, ("Serial file was modified between"
+                                           " check in jstore and here")
+
+    # Get initial list of nodes
+    self._nodes = dict((n.name, n.primary_ip)
+                       for n in self.context.cfg.GetAllNodesInfo().values()
+                       if n.master_candidate)
+
+    # Remove master node
+    self._nodes.pop(self._my_hostname, None)
+
+    # TODO: Check consistency across nodes
+
+    self._queue_size = None
+    self._UpdateQueueSizeUnlocked()
+    assert ht.TInt(self._queue_size)
+    self._drained = jstore.CheckDrainFlag()
+
+    # Job dependencies
+    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
+                                        self._EnqueueJobs)
+    self.context.glm.AddToLockMonitor(self.depmgr)
+
+    # Setup worker pool
+    self._wpool = _JobQueueWorkerPool(self)
+    try:
+      self._InspectQueue()
+    except:
+      self._wpool.TerminateWorkers()
+      raise
+
+  @locking.ssynchronized(_LOCK)
+  @_RequireOpenQueue
+  def _InspectQueue(self):
+    """Loads the whole job queue and resumes unfinished jobs.
+
+    This function needs the lock here because WorkerPool.AddTask() may start a
+    job while we're still doing our work.
+
+    """
+    logging.info("Inspecting job queue")
+
+    restartjobs = []
+
+    all_job_ids = self._GetJobIDsUnlocked()
+    jobs_count = len(all_job_ids)
+    lastinfo = time.time()
+    for idx, job_id in enumerate(all_job_ids):
+      # Give an update every 1000 jobs or 10 seconds
+      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
+          idx == (jobs_count - 1)):
+        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
+                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
+        lastinfo = time.time()
+
+      job = self._LoadJobUnlocked(job_id)
+
+      # a failure in loading the job can cause 'None' to be returned
+      if job is None:
+        continue
+
+      status = job.CalcStatus()
+
+      if status == constants.JOB_STATUS_QUEUED:
+        restartjobs.append(job)
+
+      elif status in (constants.JOB_STATUS_RUNNING,
+                      constants.JOB_STATUS_WAITING,
+                      constants.JOB_STATUS_CANCELING):
+        logging.warning("Unfinished job %s found: %s", job.id, job)
+
+        if status == constants.JOB_STATUS_WAITING:
+          # Restart job
+          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
+          restartjobs.append(job)
+        else:
+          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                                "Unclean master daemon shutdown")
+          job.Finalize()
+
+        self.UpdateJobUnlocked(job)
+
+    if restartjobs:
+      logging.info("Restarting %s jobs", len(restartjobs))
+      self._EnqueueJobsUnlocked(restartjobs)
+
+    logging.info("Job queue inspection finished")
+
+  def _GetRpc(self, address_list):
+    """Gets RPC runner with context.
+
+    """
+    return rpc.JobQueueRunner(self.context, address_list)
+
+  @locking.ssynchronized(_LOCK)
+  @_RequireOpenQueue
+  def AddNode(self, node):
+    """Register a new node with the queue.
+
+    @type node: L{objects.Node}
+    @param node: the node object to be added
+
+    """
+    node_name = node.name
+    assert node_name != self._my_hostname
+
+    # Clean queue directory on added node
+    result = self._GetRpc(None).call_jobqueue_purge(node_name)
+    msg = result.fail_msg
+    if msg:
+      logging.warning("Cannot cleanup queue directory on node %s: %s",
+                      node_name, msg)
+
+    if not node.master_candidate:
+      # remove if existing, ignoring errors
+      self._nodes.pop(node_name, None)
+      # and skip the replication of the job ids
+      return
+
+    # Upload the whole queue excluding archived jobs
+    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
+
+    # Upload current serial file
+    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
+
+    # Static address list
+    addrs = [node.primary_ip]
+
+    for file_name in files:
+      # Read file content
+      content = utils.ReadFile(file_name)
+
+      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
+                             file_name, content)
+      msg = result[node_name].fail_msg
+      if msg:
+        logging.error("Failed to upload file %s to node %s: %s",
+                      file_name, node_name, msg)
+
+    # Set queue drained flag
+    result = \
+      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
+                                                       self._drained)
+    msg = result[node_name].fail_msg
+    if msg:
+      logging.error("Failed to set queue drained flag on node %s: %s",
+                    node_name, msg)
+
+    self._nodes[node_name] = node.primary_ip
+
+  @locking.ssynchronized(_LOCK)
+  @_RequireOpenQueue
+  def RemoveNode(self, node_name):
+    """Callback called when removing nodes from the cluster.
+
+    @type node_name: str
+    @param node_name: the name of the node to remove
+
+    """
+    self._nodes.pop(node_name, None)
+
+  @staticmethod
+  def _CheckRpcResult(result, nodes, failmsg):
+    """Verifies the status of an RPC call.
+
+    Since we aim to keep consistency should this node (the current
+    master) fail, we will log errors if our rpc fail, and especially
+    log the case when more than half of the nodes fails.
+
+    @param result: the data as returned from the rpc call
+    @type nodes: list
+    @param nodes: the list of nodes we made the call to
+    @type failmsg: str
+    @param failmsg: the identifier to be used for logging
+
+    """
+    failed = []
+    success = []
+
+    for node in nodes:
+      msg = result[node].fail_msg
+      if msg:
+        failed.append(node)
+        logging.error("RPC call %s (%s) failed on node %s: %s",
+                      result[node].call, failmsg, node, msg)
+      else:
+        success.append(node)
+
+    # +1 for the master node
+    if (len(success) + 1) < len(failed):
+      # TODO: Handle failing nodes
+      logging.error("More than half of the nodes failed")
+
+  def _GetNodeIp(self):
+    """Helper for returning the node name/ip list.
+
+    @rtype: (list, list)
+    @return: a tuple of two lists, the first one with the node
+        names and the second one with the node addresses
+
+    """
+    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
+    name_list = self._nodes.keys()
+    addr_list = [self._nodes[name] for name in name_list]
+    return name_list, addr_list
+
+  def _UpdateJobQueueFile(self, file_name, data, replicate):
+    """Writes a file locally and then replicates it to all nodes.
+
+    This function will replace the contents of a file on the local
+    node and then replicate it to all the other nodes we have.
+
+    @type file_name: str
+    @param file_name: the path of the file to be replicated
+    @type data: str
+    @param data: the new contents of the file
+    @type replicate: boolean
+    @param replicate: whether to spread the changes to the remote nodes
+
+    """
+    getents = runtime.GetEnts()
+    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
+                    gid=getents.daemons_gid,
+                    mode=constants.JOB_QUEUE_FILES_PERMS)
+
+    if replicate:
+      names, addrs = self._GetNodeIp()
+      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
+      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
+
+  def _RenameFilesUnlocked(self, rename):
+    """Renames a file locally and then replicate the change.
+
+    This function will rename a file in the local queue directory
+    and then replicate this rename to all the other nodes we have.
+
+    @type rename: list of (old, new)
+    @param rename: List containing tuples mapping old to new names
+
+    """
+    # Rename them locally
+    for old, new in rename:
+      utils.RenameFile(old, new, mkdir=True)
+
+    # ... and on all nodes
+    names, addrs = self._GetNodeIp()
+    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
+    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
+
+  def _NewSerialsUnlocked(self, count):
+    """Generates a new job identifier.
+
+    Job identifiers are unique during the lifetime of a cluster.
 
 
-    @type name: str
-    @param name: the job filename
-    @rtype: job id or None
-    @return: the job id corresponding to the given filename,
-        or None if the filename does not represent a valid
-        job file
+    @type count: integer
+    @param count: how many serials to return
+    @rtype: list of int
+    @return: a list of job identifiers.
 
     """
 
     """
-    m = cls._RE_JOB_FILE.match(name)
-    if m:
-      return m.group(1)
-    else:
-      return None
+    assert ht.TNonNegativeInt(count)
 
 
-  def _GetJobIDsUnlocked(self, archived=False):
-    """Return all known job IDs.
+    # New number
+    serial = self._last_serial + count
+
+    # Write to file
+    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
+                             "%s\n" % serial, True)
+
+    result = [jstore.FormatJobID(v)
+              for v in range(self._last_serial + 1, serial + 1)]
+
+    # Keep it only if we were able to write the file
+    self._last_serial = serial
+
+    assert len(result) == count
+
+    return result
+
+  @staticmethod
+  def _GetJobPath(job_id):
+    """Returns the job file for a given job id.
+
+    @type job_id: str
+    @param job_id: the job identifier
+    @rtype: str
+    @return: the path to the job file
+
+    """
+    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
+
+  @staticmethod
+  def _GetArchivedJobPath(job_id):
+    """Returns the archived job file for a give job id.
+
+    @type job_id: str
+    @param job_id: the job identifier
+    @rtype: str
+    @return: the path to the archived job file
+
+    """
+    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
+                          jstore.GetArchiveDirectory(job_id),
+                          "job-%s" % job_id)
+
+  @staticmethod
+  def _DetermineJobDirectories(archived):
+    """Build list of directories containing job files.
+
+    @type archived: bool
+    @param archived: Whether to include directories for archived jobs
+    @rtype: list
+
+    """
+    result = [pathutils.QUEUE_DIR]
+
+    if archived:
+      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
+      result.extend(map(compat.partial(utils.PathJoin, archive_path),
+                        utils.ListVisibleFiles(archive_path)))
+
+    return result
 
 
-    If the parameter archived is True, archived jobs IDs will be
-    included. Currently this argument is unused.
+  @classmethod
+  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
+    """Return all known job IDs.
 
     The method only looks at disk because it's a requirement that all
     jobs are present on disk (so in the _memcache we don't have any
     extra IDs).
 
 
     The method only looks at disk because it's a requirement that all
     jobs are present on disk (so in the _memcache we don't have any
     extra IDs).
 
+    @type sort: boolean
+    @param sort: perform sorting on the returned job ids
     @rtype: list
     @return: the list of job IDs
 
     """
     @rtype: list
     @return: the list of job IDs
 
     """
-    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
-    jlist = utils.NiceSort(jlist)
-    return jlist
-
-  def _ListJobFiles(self):
-    """Returns the list of current job files.
+    jlist = []
 
 
-    @rtype: list
-    @return: the list of job file names
+    for path in cls._DetermineJobDirectories(archived):
+      for filename in utils.ListVisibleFiles(path):
+        m = constants.JOB_FILE_RE.match(filename)
+        if m:
+          jlist.append(int(m.group(1)))
 
 
-    """
-    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
-            if self._RE_JOB_FILE.match(name)]
+    if sort:
+      jlist.sort()
+    return jlist
 
   def _LoadJobUnlocked(self, job_id):
     """Loads a job from the disk or memory.
 
   def _LoadJobUnlocked(self, job_id):
     """Loads a job from the disk or memory.
@@ -853,6 +2037,7 @@ class JobQueue(object):
     existing, or try to load the job from the disk. If loading from
     disk, it will also add the job to the cache.
 
     existing, or try to load the job from the disk. If loading from
     disk, it will also add the job to the cache.
 
+    @type job_id: int
     @param job_id: the job id
     @rtype: L{_QueuedJob} or None
     @return: either None or the job object
     @param job_id: the job id
     @rtype: L{_QueuedJob} or None
     @return: either None or the job object
@@ -861,129 +2046,341 @@ class JobQueue(object):
     job = self._memcache.get(job_id, None)
     if job:
       logging.debug("Found job %s in memcache", job_id)
     job = self._memcache.get(job_id, None)
     if job:
       logging.debug("Found job %s in memcache", job_id)
+      assert job.writable, "Found read-only job in memcache"
       return job
 
       return job
 
-    filepath = self._GetJobPath(job_id)
-    logging.debug("Loading job from %s", filepath)
-    try:
-      fd = open(filepath, "r")
-    except IOError, err:
-      if err.errno in (errno.ENOENT, ):
-        return None
-      raise
-    try:
-      data = serializer.LoadJson(fd.read())
-    finally:
-      fd.close()
-
     try:
     try:
-      job = _QueuedJob.Restore(self, data)
-    except Exception, err:
+      job = self._LoadJobFromDisk(job_id, False)
+      if job is None:
+        return job
+    except errors.JobFileCorrupted:
+      old_path = self._GetJobPath(job_id)
       new_path = self._GetArchivedJobPath(job_id)
       new_path = self._GetArchivedJobPath(job_id)
-      if filepath == new_path:
+      if old_path == new_path:
         # job already archived (future case)
         logging.exception("Can't parse job %s", job_id)
       else:
         # non-archived case
         logging.exception("Can't parse job %s, will archive.", job_id)
         # job already archived (future case)
         logging.exception("Can't parse job %s", job_id)
       else:
         # non-archived case
         logging.exception("Can't parse job %s, will archive.", job_id)
-        self._RenameFileUnlocked(filepath, new_path)
+        self._RenameFilesUnlocked([(old_path, new_path)])
       return None
 
       return None
 
+    assert job.writable, "Job just loaded is not writable"
+
     self._memcache[job_id] = job
     logging.debug("Added job %s to the cache", job_id)
     return job
 
     self._memcache[job_id] = job
     logging.debug("Added job %s to the cache", job_id)
     return job
 
-  def _GetJobsUnlocked(self, job_ids):
-    """Return a list of jobs based on their IDs.
+  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
+    """Load the given job file from disk.
 
 
-    @type job_ids: list
-    @param job_ids: either an empty list (meaning all jobs),
-        or a list of job IDs
-    @rtype: list
-    @return: the list of job objects
+    Given a job file, read, load and restore it in a _QueuedJob format.
+
+    @type job_id: int
+    @param job_id: job identifier
+    @type try_archived: bool
+    @param try_archived: Whether to try loading an archived job
+    @rtype: L{_QueuedJob} or None
+    @return: either None or the job object
 
     """
 
     """
-    if not job_ids:
-      job_ids = self._GetJobIDsUnlocked()
+    path_functions = [(self._GetJobPath, False)]
 
 
-    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
+    if try_archived:
+      path_functions.append((self._GetArchivedJobPath, True))
 
 
-  @staticmethod
-  def _IsQueueMarkedDrain():
-    """Check if the queue is marked from drain.
+    raw_data = None
+    archived = None
+
+    for (fn, archived) in path_functions:
+      filepath = fn(job_id)
+      logging.debug("Loading job from %s", filepath)
+      try:
+        raw_data = utils.ReadFile(filepath)
+      except EnvironmentError, err:
+        if err.errno != errno.ENOENT:
+          raise
+      else:
+        break
+
+    if not raw_data:
+      return None
+
+    if writable is None:
+      writable = not archived
+
+    try:
+      data = serializer.LoadJson(raw_data)
+      job = _QueuedJob.Restore(self, data, writable, archived)
+    except Exception, err: # pylint: disable=W0703
+      raise errors.JobFileCorrupted(err)
+
+    return job
+
+  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
+    """Load the given job file from disk.
 
 
-    This currently uses the queue drain file, which makes it a
-    per-node flag. In the future this can be moved to the config file.
+    Given a job file, read, load and restore it in a _QueuedJob format.
+    In case of error reading the job, it gets returned as None, and the
+    exception is logged.
 
 
-    @rtype: boolean
-    @return: True of the job queue is marked for draining
+    @type job_id: int
+    @param job_id: job identifier
+    @type try_archived: bool
+    @param try_archived: Whether to try loading an archived job
+    @rtype: L{_QueuedJob} or None
+    @return: either None or the job object
 
     """
 
     """
-    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
+    try:
+      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
+    except (errors.JobFileCorrupted, EnvironmentError):
+      logging.exception("Can't load/parse job %s", job_id)
+      return None
 
 
-  @staticmethod
-  def SetDrainFlag(drain_flag):
-    """Sets the drain flag for the queue.
+  def _UpdateQueueSizeUnlocked(self):
+    """Update the queue size.
+
+    """
+    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
 
 
-    This is similar to the function L{backend.JobQueueSetDrainFlag},
-    and in the future we might merge them.
+  @locking.ssynchronized(_LOCK)
+  @_RequireOpenQueue
+  def SetDrainFlag(self, drain_flag):
+    """Sets the drain flag for the queue.
 
     @type drain_flag: boolean
 
     @type drain_flag: boolean
-    @param drain_flag: wheter to set or unset the drain flag
+    @param drain_flag: Whether to set or unset the drain flag
 
     """
 
     """
-    if drain_flag:
-      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
-    else:
-      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
+    # Change flag locally
+    jstore.SetDrainFlag(drain_flag)
+
+    self._drained = drain_flag
+
+    # ... and on all nodes
+    (names, addrs) = self._GetNodeIp()
+    result = \
+      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
+    self._CheckRpcResult(result, self._nodes,
+                         "Setting queue drain flag to %s" % drain_flag)
+
     return True
 
     return True
 
-  @utils.LockedMethod
   @_RequireOpenQueue
   @_RequireOpenQueue
-  def SubmitJob(self, ops):
+  def _SubmitJobUnlocked(self, job_id, ops):
     """Create and store a new job.
 
     This enters the job into our job queue and also puts it on the new
     queue, in order for it to be picked up by the queue processors.
 
     """Create and store a new job.
 
     This enters the job into our job queue and also puts it on the new
     queue, in order for it to be picked up by the queue processors.
 
+    @type job_id: job ID
+    @param job_id: the job ID for the new job
     @type ops: list
     @param ops: The list of OpCodes that will become the new job.
     @type ops: list
     @param ops: The list of OpCodes that will become the new job.
-    @rtype: job ID
-    @return: the job ID of the newly created job
-    @raise errors.JobQueueDrainError: if the job is marked for draining
+    @rtype: L{_QueuedJob}
+    @return: the job object to be queued
+    @raise errors.JobQueueFull: if the job queue has too many jobs in it
+    @raise errors.GenericError: If an opcode is not valid
 
     """
 
     """
-    if self._IsQueueMarkedDrain():
-      raise errors.JobQueueDrainError()
+    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
+      raise errors.JobQueueFull()
 
 
-    # Check job queue size
-    size = len(self._ListJobFiles())
-    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
-      # TODO: Autoarchive jobs. Make sure it's not done on every job
-      # submission, though.
-      #size = ...
-      pass
+    job = _QueuedJob(self, job_id, ops, True)
 
 
-    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
-      raise errors.JobQueueFull()
+    for idx, op in enumerate(job.ops):
+      # Check priority
+      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
+        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
+        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
+                                  " are %s" % (idx, op.priority, allowed))
 
 
-    # Get job identifier
-    job_id = self._NewSerialUnlocked()
-    job = _QueuedJob(self, job_id, ops)
+      # Check job dependencies
+      dependencies = getattr(op.input, opcodes_base.DEPEND_ATTR, None)
+      if not opcodes_base.TNoRelativeJobDependencies(dependencies):
+        raise errors.GenericError("Opcode %s has invalid dependencies, must"
+                                  " match %s: %s" %
+                                  (idx, opcodes_base.TNoRelativeJobDependencies,
+                                   dependencies))
 
     # Write to disk
     self.UpdateJobUnlocked(job)
 
 
     # Write to disk
     self.UpdateJobUnlocked(job)
 
+    self._queue_size += 1
+
     logging.debug("Adding new job %s to the cache", job_id)
     self._memcache[job_id] = job
 
     logging.debug("Adding new job %s to the cache", job_id)
     self._memcache[job_id] = job
 
-    # Add to worker pool
-    self._wpool.AddTask(job)
+    return job
+
+  @locking.ssynchronized(_LOCK)
+  @_RequireOpenQueue
+  @_RequireNonDrainedQueue
+  def SubmitJob(self, ops):
+    """Create and store a new job.
+
+    @see: L{_SubmitJobUnlocked}
+
+    """
+    (job_id, ) = self._NewSerialsUnlocked(1)
+    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
+    return job_id
+
+  @locking.ssynchronized(_LOCK)
+  @_RequireOpenQueue
+  def SubmitJobToDrainedQueue(self, ops):
+    """Forcefully create and store a new job.
+
+    Do so, even if the job queue is drained.
+    @see: L{_SubmitJobUnlocked}
+
+    """
+    (job_id, ) = self._NewSerialsUnlocked(1)
+    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
+    return job_id
+
+  @locking.ssynchronized(_LOCK)
+  @_RequireOpenQueue
+  @_RequireNonDrainedQueue
+  def SubmitManyJobs(self, jobs):
+    """Create and store multiple jobs.
+
+    @see: L{_SubmitJobUnlocked}
+
+    """
+    all_job_ids = self._NewSerialsUnlocked(len(jobs))
+
+    (results, added_jobs) = \
+      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
+
+    self._EnqueueJobsUnlocked(added_jobs)
+
+    return results
+
+  @staticmethod
+  def _FormatSubmitError(msg, ops):
+    """Formats errors which occurred while submitting a job.
+
+    """
+    return ("%s; opcodes %s" %
+            (msg, utils.CommaJoin(op.Summary() for op in ops)))
+
+  @staticmethod
+  def _ResolveJobDependencies(resolve_fn, deps):
+    """Resolves relative job IDs in dependencies.
+
+    @type resolve_fn: callable
+    @param resolve_fn: Function to resolve a relative job ID
+    @type deps: list
+    @param deps: Dependencies
+    @rtype: tuple; (boolean, string or list)
+    @return: If successful (first tuple item), the returned list contains
+      resolved job IDs along with the requested status; if not successful,
+      the second element is an error message
+
+    """
+    result = []
+
+    for (dep_job_id, dep_status) in deps:
+      if ht.TRelativeJobId(dep_job_id):
+        assert ht.TInt(dep_job_id) and dep_job_id < 0
+        try:
+          job_id = resolve_fn(dep_job_id)
+        except IndexError:
+          # Abort
+          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
+      else:
+        job_id = dep_job_id
+
+      result.append((job_id, dep_status))
+
+    return (True, result)
+
+  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
+    """Create and store multiple jobs.
+
+    @see: L{_SubmitJobUnlocked}
+
+    """
+    results = []
+    added_jobs = []
+
+    def resolve_fn(job_idx, reljobid):
+      assert reljobid < 0
+      return (previous_job_ids + job_ids[:job_idx])[reljobid]
+
+    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
+      for op in ops:
+        if getattr(op, opcodes_base.DEPEND_ATTR, None):
+          (status, data) = \
+            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
+                                         op.depends)
+          if not status:
+            # Abort resolving dependencies
+            assert ht.TNonEmptyString(data), "No error message"
+            break
+          # Use resolved dependencies
+          op.depends = data
+      else:
+        try:
+          job = self._SubmitJobUnlocked(job_id, ops)
+        except errors.GenericError, err:
+          status = False
+          data = self._FormatSubmitError(str(err), ops)
+        else:
+          status = True
+          data = job_id
+          added_jobs.append(job)
+
+      results.append((status, data))
+
+    return (results, added_jobs)
+
+  @locking.ssynchronized(_LOCK)
+  def _EnqueueJobs(self, jobs):
+    """Helper function to add jobs to worker pool's queue.
 
 
-    return job.id
+    @type jobs: list
+    @param jobs: List of all jobs
+
+    """
+    return self._EnqueueJobsUnlocked(jobs)
+
+  def _EnqueueJobsUnlocked(self, jobs):
+    """Helper function to add jobs to worker pool's queue.
+
+    @type jobs: list
+    @param jobs: List of all jobs
+
+    """
+    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
+    self._wpool.AddManyTasks([(job, ) for job in jobs],
+                             priority=[job.CalcPriority() for job in jobs],
+                             task_id=map(_GetIdAttr, jobs))
+
+  def _GetJobStatusForDependencies(self, job_id):
+    """Gets the status of a job for dependencies.
+
+    @type job_id: int
+    @param job_id: Job ID
+    @raise errors.JobLost: If job can't be found
+
+    """
+    # Not using in-memory cache as doing so would require an exclusive lock
+
+    # Try to load from disk
+    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
+
+    assert not job.writable, "Got writable job" # pylint: disable=E1101
+
+    if job:
+      return job.CalcStatus()
+
+    raise errors.JobLost("Job %s not found" % job_id)
 
   @_RequireOpenQueue
 
   @_RequireOpenQueue
-  def UpdateJobUnlocked(self, job):
+  def UpdateJobUnlocked(self, job, replicate=True):
     """Update a job's on disk storage.
 
     After a job has been modified, this function needs to be called in
     """Update a job's on disk storage.
 
     After a job has been modified, this function needs to be called in
@@ -992,23 +2389,26 @@ class JobQueue(object):
 
     @type job: L{_QueuedJob}
     @param job: the changed job
 
     @type job: L{_QueuedJob}
     @param job: the changed job
+    @type replicate: boolean
+    @param replicate: whether to replicate the change to remote nodes
 
     """
 
     """
+    if __debug__:
+      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
+      assert (finalized ^ (job.end_timestamp is None))
+      assert job.writable, "Can't update read-only job"
+      assert not job.archived, "Can't update archived job"
+
     filename = self._GetJobPath(job.id)
     filename = self._GetJobPath(job.id)
-    data = serializer.DumpJson(job.Serialize(), indent=False)
+    data = serializer.DumpJson(job.Serialize())
     logging.debug("Writing job %s to %s", job.id, filename)
     logging.debug("Writing job %s to %s", job.id, filename)
-    self._WriteAndReplicateFileUnlocked(filename, data)
+    self._UpdateJobQueueFile(filename, data, replicate)
 
 
-    # Notify waiters about potential changes
-    job.change.notifyAll()
-
-  @utils.LockedMethod
-  @_RequireOpenQueue
   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
                         timeout):
     """Waits for changes in a job.
 
   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
                         timeout):
     """Waits for changes in a job.
 
-    @type job_id: string
+    @type job_id: int
     @param job_id: Job identifier
     @type fields: list of strings
     @param fields: Which fields to check for changes
     @param job_id: Job identifier
     @type fields: list of strings
     @param fields: Which fields to check for changes
@@ -1017,7 +2417,7 @@ class JobQueue(object):
     @type prev_log_serial: int
     @param prev_log_serial: Last job message serial number
     @type timeout: float
     @type prev_log_serial: int
     @param prev_log_serial: Last job message serial number
     @type timeout: float
-    @param timeout: maximum time to wait
+    @param timeout: maximum time to wait in seconds
     @rtype: tuple (job info, log entries)
     @return: a tuple of the job information as required via
         the fields parameter, and the log entries as a list
     @rtype: tuple (job info, log entries)
     @return: a tuple of the job information as required via
         the fields parameter, and the log entries as a list
@@ -1028,133 +2428,134 @@ class JobQueue(object):
         as such by the clients
 
     """
         as such by the clients
 
     """
-    logging.debug("Waiting for changes in job %s", job_id)
-    end_time = time.time() + timeout
-    while True:
-      delta_time = end_time - time.time()
-      if delta_time < 0:
-        return constants.JOB_NOTCHANGED
-
-      job = self._LoadJobUnlocked(job_id)
-      if not job:
-        logging.debug("Job %s not found", job_id)
-        break
-
-      status = job.CalcStatus()
-      job_info = self._GetJobInfoUnlocked(job, fields)
-      log_entries = job.GetLogEntries(prev_log_serial)
-
-      # Serializing and deserializing data can cause type changes (e.g. from
-      # tuple to list) or precision loss. We're doing it here so that we get
-      # the same modifications as the data received from the client. Without
-      # this, the comparison afterwards might fail without the data being
-      # significantly different.
-      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
-      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
-
-      if status not in (constants.JOB_STATUS_QUEUED,
-                        constants.JOB_STATUS_RUNNING,
-                        constants.JOB_STATUS_WAITLOCK):
-        # Don't even try to wait if the job is no longer running, there will be
-        # no changes.
-        break
-
-      if (prev_job_info != job_info or
-          (log_entries and prev_log_serial != log_entries[0][0])):
-        break
-
-      logging.debug("Waiting again")
+    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
+                             writable=False)
 
 
-      # Release the queue lock while waiting
-      job.change.wait(delta_time)
+    helper = _WaitForJobChangesHelper()
 
 
-    logging.debug("Job %s changed", job_id)
+    return helper(self._GetJobPath(job_id), load_fn,
+                  fields, prev_job_info, prev_log_serial, timeout)
 
 
-    return (job_info, log_entries)
-
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def CancelJob(self, job_id):
     """Cancels a job.
 
     This will only succeed if the job has not started yet.
 
   @_RequireOpenQueue
   def CancelJob(self, job_id):
     """Cancels a job.
 
     This will only succeed if the job has not started yet.
 
-    @type job_id: string
+    @type job_id: int
     @param job_id: job ID of job to be cancelled.
 
     """
     logging.info("Cancelling job %s", job_id)
 
     @param job_id: job ID of job to be cancelled.
 
     """
     logging.info("Cancelling job %s", job_id)
 
-    job = self._LoadJobUnlocked(job_id)
-    if not job:
-      logging.debug("Job %s not found", job_id)
-      return (False, "Job %s not found" % job_id)
+    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
 
 
-    job_status = job.CalcStatus()
+  @locking.ssynchronized(_LOCK)
+  @_RequireOpenQueue
+  def ChangeJobPriority(self, job_id, priority):
+    """Changes a job's priority.
 
 
-    if job_status not in (constants.JOB_STATUS_QUEUED,
-                          constants.JOB_STATUS_WAITLOCK):
-      logging.debug("Job %s is no longer in the queue", job.id)
-      return (False, "Job %s is no longer in the queue" % job.id)
+    @type job_id: int
+    @param job_id: ID of the job whose priority should be changed
+    @type priority: int
+    @param priority: New priority
 
 
-    if job_status == constants.JOB_STATUS_QUEUED:
-      self.CancelJobUnlocked(job)
-      return (True, "Job %s canceled" % job.id)
+    """
+    logging.info("Changing priority of job %s to %s", job_id, priority)
 
 
-    elif job_status == constants.JOB_STATUS_WAITLOCK:
-      # The worker will notice the new status and cancel the job
-      try:
-        for op in job.ops:
-          op.status = constants.OP_STATUS_CANCELING
-      finally:
-        self.UpdateJobUnlocked(job)
-      return (True, "Job %s will be canceled" % job.id)
+    if priority not in constants.OP_PRIO_SUBMIT_VALID:
+      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
+      raise errors.GenericError("Invalid priority %s, allowed are %s" %
+                                (priority, allowed))
 
 
-  @_RequireOpenQueue
-  def CancelJobUnlocked(self, job):
-    """Marks a job as canceled.
+    def fn(job):
+      (success, msg) = job.ChangePriority(priority)
+
+      if success:
+        try:
+          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
+        except workerpool.NoSuchTask:
+          logging.debug("Job %s is not in workerpool at this time", job.id)
+
+      return (success, msg)
+
+    return self._ModifyJobUnlocked(job_id, fn)
+
+  def _ModifyJobUnlocked(self, job_id, mod_fn):
+    """Modifies a job.
+
+    @type job_id: int
+    @param job_id: Job ID
+    @type mod_fn: callable
+    @param mod_fn: Modifying function, receiving job object as parameter,
+      returning tuple of (status boolean, message string)
 
     """
 
     """
-    try:
-      for op in job.ops:
-        op.status = constants.OP_STATUS_ERROR
-        op.result = "Job canceled by request"
-    finally:
+    job = self._LoadJobUnlocked(job_id)
+    if not job:
+      logging.debug("Job %s not found", job_id)
+      return (False, "Job %s not found" % job_id)
+
+    assert job.writable, "Can't modify read-only job"
+    assert not job.archived, "Can't modify archived job"
+
+    (success, msg) = mod_fn(job)
+
+    if success:
+      # If the job was finalized (e.g. cancelled), this is the final write
+      # allowed. The job can be archived anytime.
       self.UpdateJobUnlocked(job)
 
       self.UpdateJobUnlocked(job)
 
+    return (success, msg)
+
   @_RequireOpenQueue
   @_RequireOpenQueue
-  def _ArchiveJobUnlocked(self, job):
-    """Archives a job.
+  def _ArchiveJobsUnlocked(self, jobs):
+    """Archives jobs.
 
 
-    @type job: L{_QueuedJob}
-    @param job: Job object
-    @rtype bool
-    @return Whether job was archived
+    @type jobs: list of L{_QueuedJob}
+    @param jobs: Job objects
+    @rtype: int
+    @return: Number of archived jobs
 
     """
 
     """
-    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
-                                constants.JOB_STATUS_SUCCESS,
-                                constants.JOB_STATUS_ERROR):
-      logging.debug("Job %s is not yet done", job.id)
-      return False
+    archive_jobs = []
+    rename_files = []
+    for job in jobs:
+      assert job.writable, "Can't archive read-only job"
+      assert not job.archived, "Can't cancel archived job"
+
+      if job.CalcStatus() not in constants.JOBS_FINALIZED:
+        logging.debug("Job %s is not yet done", job.id)
+        continue
 
 
-    old = self._GetJobPath(job.id)
-    new = self._GetArchivedJobPath(job.id)
+      archive_jobs.append(job)
 
 
-    self._RenameFileUnlocked(old, new)
+      old = self._GetJobPath(job.id)
+      new = self._GetArchivedJobPath(job.id)
+      rename_files.append((old, new))
 
 
-    logging.debug("Successfully archived job %s", job.id)
+    # TODO: What if 1..n files fail to rename?
+    self._RenameFilesUnlocked(rename_files)
 
 
-    return True
+    logging.debug("Successfully archived job(s) %s",
+                  utils.CommaJoin(job.id for job in archive_jobs))
+
+    # Since we haven't quite checked, above, if we succeeded or failed renaming
+    # the files, we update the cached queue size from the filesystem. When we
+    # get around to fix the TODO: above, we can use the number of actually
+    # archived jobs to fix this.
+    self._UpdateQueueSizeUnlocked()
+    return len(archive_jobs)
 
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def ArchiveJob(self, job_id):
     """Archives a job.
 
   @_RequireOpenQueue
   def ArchiveJob(self, job_id):
     """Archives a job.
 
-    This is just a wrapper over L{_ArchiveJobUnlocked}.
+    This is just a wrapper over L{_ArchiveJobsUnlocked}.
 
 
-    @type job_id: string
+    @type job_id: int
     @param job_id: Job ID of job to be archived.
     @rtype: bool
     @return: Whether job was archived
     @param job_id: Job ID of job to be archived.
     @rtype: bool
     @return: Whether job was archived
@@ -1167,11 +2568,11 @@ class JobQueue(object):
       logging.debug("Job %s not found", job_id)
       return False
 
       logging.debug("Job %s not found", job_id)
       return False
 
-    return self._ArchiveJobUnlocked(job)
+    return self._ArchiveJobsUnlocked([job]) == 1
 
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   @_RequireOpenQueue
-  def AutoArchiveJobs(self, age):
+  def AutoArchiveJobs(self, age, timeout):
     """Archives all jobs based on age.
 
     The method will archive all jobs which are older than the age
     """Archives all jobs based on age.
 
     The method will archive all jobs which are older than the age
@@ -1186,73 +2587,87 @@ class JobQueue(object):
     logging.info("Archiving jobs with age more than %s seconds", age)
 
     now = time.time()
     logging.info("Archiving jobs with age more than %s seconds", age)
 
     now = time.time()
-    for job_id in self._GetJobIDsUnlocked(archived=False):
+    end_time = now + timeout
+    archived_count = 0
+    last_touched = 0
+
+    all_job_ids = self._GetJobIDsUnlocked()
+    pending = []
+    for idx, job_id in enumerate(all_job_ids):
+      last_touched = idx + 1
+
+      # Not optimal because jobs could be pending
+      # TODO: Measure average duration for job archival and take number of
+      # pending jobs into account.
+      if time.time() > end_time:
+        break
+
       # Returns None if the job failed to load
       job = self._LoadJobUnlocked(job_id)
       # Returns None if the job failed to load
       job = self._LoadJobUnlocked(job_id)
-      if not job:
-        continue
-
-      if job.end_timestamp is None:
-        if job.start_timestamp is None:
-          job_age = job.received_timestamp
+      if job:
+        if job.end_timestamp is None:
+          if job.start_timestamp is None:
+            job_age = job.received_timestamp
+          else:
+            job_age = job.start_timestamp
         else:
         else:
-          job_age = job.start_timestamp
-      else:
-        job_age = job.end_timestamp
+          job_age = job.end_timestamp
 
 
-      if age == -1 or now - job_age[0] > age:
-        self._ArchiveJobUnlocked(job)
+        if age == -1 or now - job_age[0] > age:
+          pending.append(job)
 
 
-  def _GetJobInfoUnlocked(self, job, fields):
-    """Returns information about a job.
+          # Archive 10 jobs at a time
+          if len(pending) >= 10:
+            archived_count += self._ArchiveJobsUnlocked(pending)
+            pending = []
 
 
-    @type job: L{_QueuedJob}
-    @param job: the job which we query
-    @type fields: list
-    @param fields: names of fields to return
-    @rtype: list
-    @return: list with one element for each field
-    @raise errors.OpExecError: when an invalid field
-        has been passed
+    if pending:
+      archived_count += self._ArchiveJobsUnlocked(pending)
 
 
-    """
-    row = []
-    for fname in fields:
-      if fname == "id":
-        row.append(job.id)
-      elif fname == "status":
-        row.append(job.CalcStatus())
-      elif fname == "ops":
-        row.append([op.input.__getstate__() for op in job.ops])
-      elif fname == "opresult":
-        row.append([op.result for op in job.ops])
-      elif fname == "opstatus":
-        row.append([op.status for op in job.ops])
-      elif fname == "oplog":
-        row.append([op.log for op in job.ops])
-      elif fname == "opstart":
-        row.append([op.start_timestamp for op in job.ops])
-      elif fname == "opend":
-        row.append([op.end_timestamp for op in job.ops])
-      elif fname == "received_ts":
-        row.append(job.received_timestamp)
-      elif fname == "start_ts":
-        row.append(job.start_timestamp)
-      elif fname == "end_ts":
-        row.append(job.end_timestamp)
-      elif fname == "summary":
-        row.append([op.input.Summary() for op in job.ops])
-      else:
-        raise errors.OpExecError("Invalid job query field '%s'" % fname)
-    return row
+    return (archived_count, len(all_job_ids) - last_touched)
 
 
-  @utils.LockedMethod
-  @_RequireOpenQueue
-  def QueryJobs(self, job_ids, fields):
+  def _Query(self, fields, qfilter):
+    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
+                       namefield="id")
+
+    # Archived jobs are only looked at if the "archived" field is referenced
+    # either as a requested field or in the filter. By default archived jobs
+    # are ignored.
+    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
+
+    job_ids = qobj.RequestedNames()
+
+    list_all = (job_ids is None)
+
+    if list_all:
+      # Since files are added to/removed from the queue atomically, there's no
+      # risk of getting the job ids in an inconsistent state.
+      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
+
+    jobs = []
+
+    for job_id in job_ids:
+      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
+      if job is not None or not list_all:
+        jobs.append((job_id, job))
+
+    return (qobj, jobs, list_all)
+
+  def QueryJobs(self, fields, qfilter):
     """Returns a list of jobs in queue.
 
     """Returns a list of jobs in queue.
 
-    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
-    processing for each job.
+    @type fields: sequence
+    @param fields: List of wanted fields
+    @type qfilter: None or query2 filter (list)
+    @param qfilter: Query filter
+
+    """
+    (qobj, ctx, _) = self._Query(fields, qfilter)
+
+    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
+
+  def OldStyleQueryJobs(self, job_ids, fields):
+    """Returns a list of jobs in queue.
 
     @type job_ids: list
     @param job_ids: sequence of job identifiers or None for all
 
     @type job_ids: list
     @param job_ids: sequence of job identifiers or None for all
@@ -1263,17 +2678,51 @@ class JobQueue(object):
         the requested fields
 
     """
         the requested fields
 
     """
-    jobs = []
+    # backwards compat:
+    job_ids = [int(jid) for jid in job_ids]
+    qfilter = qlang.MakeSimpleFilter("id", job_ids)
 
 
-    for job in self._GetJobsUnlocked(job_ids):
-      if job is None:
-        jobs.append(None)
-      else:
-        jobs.append(self._GetJobInfoUnlocked(job, fields))
+    (qobj, ctx, _) = self._Query(fields, qfilter)
+
+    return qobj.OldStyleQuery(ctx, sort_by_name=False)
+
+  @locking.ssynchronized(_LOCK)
+  def PrepareShutdown(self):
+    """Prepare to stop the job queue.
+
+    Disables execution of jobs in the workerpool and returns whether there are
+    any jobs currently running. If the latter is the case, the job queue is not
+    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
+    be called without interfering with any job. Queued and unfinished jobs will
+    be resumed next time.
+
+    Once this function has been called no new job submissions will be accepted
+    (see L{_RequireNonDrainedQueue}).
+
+    @rtype: bool
+    @return: Whether there are any running jobs
+
+    """
+    if self._accepting_jobs:
+      self._accepting_jobs = False
+
+      # Tell worker pool to stop processing pending tasks
+      self._wpool.SetActive(False)
+
+    return self._wpool.HasRunningTasks()
+
+  def AcceptingJobsUnlocked(self):
+    """Returns whether jobs are accepted.
 
 
-    return jobs
+    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
+    queue is shutting down.
+
+    @rtype: bool
+
+    """
+    return self._accepting_jobs
 
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def Shutdown(self):
     """Stops the job queue.
   @_RequireOpenQueue
   def Shutdown(self):
     """Stops the job queue.
@@ -1283,5 +2732,5 @@ class JobQueue(object):
     """
     self._wpool.TerminateWorkers()
 
     """
     self._wpool.TerminateWorkers()
 
-    self._queue_lock.Close()
-    self._queue_lock = None
+    self._queue_filelock.Close()
+    self._queue_filelock = None