Add DCStatus data type for the data collectors
[ganeti-local] / lib / jqueue.py
index 833e61c..9752f93 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
 #
 #
 
-# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -29,15 +29,16 @@ used by all other classes in this module.
 
 """
 
 
 """
 
-import os
 import logging
 import errno
 import logging
 import errno
-import re
 import time
 import weakref
 import time
 import weakref
+import threading
+import itertools
+import operator
 
 try:
 
 try:
-  # pylint: disable-msg=E0611
+  # pylint: disable=E0611
   from pyinotify import pyinotify
 except ImportError:
   import pyinotify
   from pyinotify import pyinotify
 except ImportError:
   import pyinotify
@@ -56,15 +57,22 @@ from ganeti import rpc
 from ganeti import runtime
 from ganeti import netutils
 from ganeti import compat
 from ganeti import runtime
 from ganeti import netutils
 from ganeti import compat
+from ganeti import ht
+from ganeti import query
+from ganeti import qlang
+from ganeti import pathutils
+from ganeti import vcluster
 
 
 JOBQUEUE_THREADS = 25
 
 
 JOBQUEUE_THREADS = 25
-JOBS_PER_ARCHIVE_DIRECTORY = 10000
 
 # member lock names to be passed to @ssynchronized decorator
 _LOCK = "_lock"
 _QUEUE = "_queue"
 
 
 # member lock names to be passed to @ssynchronized decorator
 _LOCK = "_lock"
 _QUEUE = "_queue"
 
+#: Retrieves "id" attribute
+_GetIdAttr = operator.attrgetter("id")
+
 
 class CancelJob(Exception):
   """Special exception to cancel a job.
 
 class CancelJob(Exception):
   """Special exception to cancel a job.
@@ -72,6 +80,12 @@ class CancelJob(Exception):
   """
 
 
   """
 
 
+class QueueShutdown(Exception):
+  """Special exception to abort a job when the job queue is shutting down.
+
+  """
+
+
 def TimeStampNow():
   """Returns the current timestamp.
 
 def TimeStampNow():
   """Returns the current timestamp.
 
@@ -82,6 +96,33 @@ def TimeStampNow():
   return utils.SplitTime(time.time())
 
 
   return utils.SplitTime(time.time())
 
 
+def _CallJqUpdate(runner, names, file_name, content):
+  """Updates job queue file after virtualizing filename.
+
+  """
+  virt_file_name = vcluster.MakeVirtualPath(file_name)
+  return runner.call_jobqueue_update(names, virt_file_name, content)
+
+
+class _SimpleJobQuery:
+  """Wrapper for job queries.
+
+  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
+
+  """
+  def __init__(self, fields):
+    """Initializes this class.
+
+    """
+    self._query = query.Query(query.JOB_FIELDS, fields)
+
+  def __call__(self, job):
+    """Executes a job query using cached field list.
+
+    """
+    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
+
+
 class _QueuedOpCode(object):
   """Encapsulates an opcode object.
 
 class _QueuedOpCode(object):
   """Encapsulates an opcode object.
 
@@ -100,7 +141,7 @@ class _QueuedOpCode(object):
                "__weakref__"]
 
   def __init__(self, op):
                "__weakref__"]
 
   def __init__(self, op):
-    """Constructor for the _QuededOpCode.
+    """Initializes instances of this class.
 
     @type op: L{opcodes.OpCode}
     @param op: the opcode we encapsulate
 
     @type op: L{opcodes.OpCode}
     @param op: the opcode we encapsulate
@@ -173,14 +214,15 @@ class _QueuedJob(object):
   @ivar received_timestamp: the timestamp for when the job was received
   @ivar start_timestmap: the timestamp for start of execution
   @ivar end_timestamp: the timestamp for end of execution
   @ivar received_timestamp: the timestamp for when the job was received
   @ivar start_timestmap: the timestamp for start of execution
   @ivar end_timestamp: the timestamp for end of execution
+  @ivar writable: Whether the job is allowed to be modified
 
   """
 
   """
-  # pylint: disable-msg=W0212
+  # pylint: disable=W0212
   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
                "received_timestamp", "start_timestamp", "end_timestamp",
   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
                "received_timestamp", "start_timestamp", "end_timestamp",
-               "__weakref__"]
+               "__weakref__", "processor_lock", "writable", "archived"]
 
 
-  def __init__(self, queue, job_id, ops):
+  def __init__(self, queue, job_id, ops, writable):
     """Constructor for the _QueuedJob.
 
     @type queue: L{JobQueue}
     """Constructor for the _QueuedJob.
 
     @type queue: L{JobQueue}
@@ -190,29 +232,41 @@ class _QueuedJob(object):
     @type ops: list
     @param ops: the list of opcodes we hold, which will be encapsulated
         in _QueuedOpCodes
     @type ops: list
     @param ops: the list of opcodes we hold, which will be encapsulated
         in _QueuedOpCodes
+    @type writable: bool
+    @param writable: Whether job can be modified
 
     """
     if not ops:
       raise errors.GenericError("A job needs at least one opcode")
 
     self.queue = queue
 
     """
     if not ops:
       raise errors.GenericError("A job needs at least one opcode")
 
     self.queue = queue
-    self.id = job_id
+    self.id = int(job_id)
     self.ops = [_QueuedOpCode(op) for op in ops]
     self.log_serial = 0
     self.received_timestamp = TimeStampNow()
     self.start_timestamp = None
     self.end_timestamp = None
     self.ops = [_QueuedOpCode(op) for op in ops]
     self.log_serial = 0
     self.received_timestamp = TimeStampNow()
     self.start_timestamp = None
     self.end_timestamp = None
+    self.archived = False
+
+    self._InitInMemory(self, writable)
 
 
-    self._InitInMemory(self)
+    assert not self.archived, "New jobs can not be marked as archived"
 
   @staticmethod
 
   @staticmethod
-  def _InitInMemory(obj):
+  def _InitInMemory(obj, writable):
     """Initializes in-memory variables.
 
     """
     """Initializes in-memory variables.
 
     """
+    obj.writable = writable
     obj.ops_iter = None
     obj.cur_opctx = None
 
     obj.ops_iter = None
     obj.cur_opctx = None
 
+    # Read-only jobs are not processed and therefore don't need a lock
+    if writable:
+      obj.processor_lock = threading.Lock()
+    else:
+      obj.processor_lock = None
+
   def __repr__(self):
     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
               "id=%s" % self.id,
   def __repr__(self):
     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
               "id=%s" % self.id,
@@ -221,23 +275,28 @@ class _QueuedJob(object):
     return "<%s at %#x>" % (" ".join(status), id(self))
 
   @classmethod
     return "<%s at %#x>" % (" ".join(status), id(self))
 
   @classmethod
-  def Restore(cls, queue, state):
+  def Restore(cls, queue, state, writable, archived):
     """Restore a _QueuedJob from serialized state:
 
     @type queue: L{JobQueue}
     @param queue: to which queue the restored job belongs
     @type state: dict
     @param state: the serialized state
     """Restore a _QueuedJob from serialized state:
 
     @type queue: L{JobQueue}
     @param queue: to which queue the restored job belongs
     @type state: dict
     @param state: the serialized state
+    @type writable: bool
+    @param writable: Whether job can be modified
+    @type archived: bool
+    @param archived: Whether job was already archived
     @rtype: _JobQueue
     @return: the restored _JobQueue instance
 
     """
     obj = _QueuedJob.__new__(cls)
     obj.queue = queue
     @rtype: _JobQueue
     @return: the restored _JobQueue instance
 
     """
     obj = _QueuedJob.__new__(cls)
     obj.queue = queue
-    obj.id = state["id"]
+    obj.id = int(state["id"])
     obj.received_timestamp = state.get("received_timestamp", None)
     obj.start_timestamp = state.get("start_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
     obj.received_timestamp = state.get("received_timestamp", None)
     obj.start_timestamp = state.get("start_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
+    obj.archived = archived
 
     obj.ops = []
     obj.log_serial = 0
 
     obj.ops = []
     obj.log_serial = 0
@@ -247,7 +306,7 @@ class _QueuedJob(object):
         obj.log_serial = max(obj.log_serial, log_entry[0])
       obj.ops.append(op)
 
         obj.log_serial = max(obj.log_serial, log_entry[0])
       obj.ops.append(op)
 
-    cls._InitInMemory(obj)
+    cls._InitInMemory(obj, writable)
 
     return obj
 
 
     return obj
 
@@ -299,8 +358,8 @@ class _QueuedJob(object):
 
       if op.status == constants.OP_STATUS_QUEUED:
         pass
 
       if op.status == constants.OP_STATUS_QUEUED:
         pass
-      elif op.status == constants.OP_STATUS_WAITLOCK:
-        status = constants.JOB_STATUS_WAITLOCK
+      elif op.status == constants.OP_STATUS_WAITING:
+        status = constants.JOB_STATUS_WAITING
       elif op.status == constants.OP_STATUS_RUNNING:
         status = constants.JOB_STATUS_RUNNING
       elif op.status == constants.OP_STATUS_CANCELING:
       elif op.status == constants.OP_STATUS_RUNNING:
         status = constants.JOB_STATUS_RUNNING
       elif op.status == constants.OP_STATUS_CANCELING:
@@ -370,41 +429,7 @@ class _QueuedJob(object):
         has been passed
 
     """
         has been passed
 
     """
-    row = []
-    for fname in fields:
-      if fname == "id":
-        row.append(self.id)
-      elif fname == "status":
-        row.append(self.CalcStatus())
-      elif fname == "priority":
-        row.append(self.CalcPriority())
-      elif fname == "ops":
-        row.append([op.input.__getstate__() for op in self.ops])
-      elif fname == "opresult":
-        row.append([op.result for op in self.ops])
-      elif fname == "opstatus":
-        row.append([op.status for op in self.ops])
-      elif fname == "oplog":
-        row.append([op.log for op in self.ops])
-      elif fname == "opstart":
-        row.append([op.start_timestamp for op in self.ops])
-      elif fname == "opexec":
-        row.append([op.exec_timestamp for op in self.ops])
-      elif fname == "opend":
-        row.append([op.end_timestamp for op in self.ops])
-      elif fname == "oppriority":
-        row.append([op.priority for op in self.ops])
-      elif fname == "received_ts":
-        row.append(self.received_timestamp)
-      elif fname == "start_ts":
-        row.append(self.start_timestamp)
-      elif fname == "end_ts":
-        row.append(self.end_timestamp)
-      elif fname == "summary":
-        row.append([op.input.Summary() for op in self.ops])
-      else:
-        raise errors.OpExecError("Invalid self query field '%s'" % fname)
-    return row
+    return _SimpleJobQuery(fields)(self)
 
   def MarkUnfinishedOps(self, status, result):
     """Mark unfinished opcodes with a given status and result.
 
   def MarkUnfinishedOps(self, status, result):
     """Mark unfinished opcodes with a given status and result.
@@ -448,7 +473,7 @@ class _QueuedJob(object):
       self.Finalize()
       return (True, "Job %s canceled" % self.id)
 
       self.Finalize()
       return (True, "Job %s canceled" % self.id)
 
-    elif status == constants.JOB_STATUS_WAITLOCK:
+    elif status == constants.JOB_STATUS_WAITING:
       # The worker will notice the new status and cancel the job
       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
       return (True, "Job %s will be canceled" % self.id)
       # The worker will notice the new status and cancel the job
       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
       return (True, "Job %s will be canceled" % self.id)
@@ -457,6 +482,50 @@ class _QueuedJob(object):
       logging.debug("Job %s is no longer waiting in the queue", self.id)
       return (False, "Job %s is no longer waiting in the queue" % self.id)
 
       logging.debug("Job %s is no longer waiting in the queue", self.id)
       return (False, "Job %s is no longer waiting in the queue" % self.id)
 
+  def ChangePriority(self, priority):
+    """Changes the job priority.
+
+    @type priority: int
+    @param priority: New priority
+    @rtype: tuple; (bool, string)
+    @return: Boolean describing whether job's priority was successfully changed
+      and a text message
+
+    """
+    status = self.CalcStatus()
+
+    if status in constants.JOBS_FINALIZED:
+      return (False, "Job %s is finished" % self.id)
+    elif status == constants.JOB_STATUS_CANCELING:
+      return (False, "Job %s is cancelling" % self.id)
+    else:
+      assert status in (constants.JOB_STATUS_QUEUED,
+                        constants.JOB_STATUS_WAITING,
+                        constants.JOB_STATUS_RUNNING)
+
+      changed = False
+      for op in self.ops:
+        if (op.status == constants.OP_STATUS_RUNNING or
+            op.status in constants.OPS_FINALIZED):
+          assert not changed, \
+            ("Found opcode for which priority should not be changed after"
+             " priority has been changed for previous opcodes")
+          continue
+
+        assert op.status in (constants.OP_STATUS_QUEUED,
+                             constants.OP_STATUS_WAITING)
+
+        changed = True
+
+        # Set new priority (doesn't modify opcode input)
+        op.priority = priority
+
+      if changed:
+        return (True, ("Priorities of pending opcodes for job %s have been"
+                       " changed to %s" % (self.id, priority)))
+      else:
+        return (False, "Job %s had no pending opcodes" % self.id)
+
 
 class _OpExecCallbacks(mcpu.OpExecCbBase):
   def __init__(self, queue, job, op):
 
 class _OpExecCallbacks(mcpu.OpExecCbBase):
   def __init__(self, queue, job, op):
@@ -487,6 +556,11 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
       logging.debug("Canceling opcode")
       raise CancelJob()
 
       logging.debug("Canceling opcode")
       raise CancelJob()
 
+    # See if queue is shutting down
+    if not self._queue.AcceptingJobsUnlocked():
+      logging.debug("Queue is shutting down")
+      raise QueueShutdown()
+
   @locking.ssynchronized(_QUEUE, shared=1)
   def NotifyStart(self):
     """Mark the opcode as running, not lock-waiting.
   @locking.ssynchronized(_QUEUE, shared=1)
   def NotifyStart(self):
     """Mark the opcode as running, not lock-waiting.
@@ -494,11 +568,11 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     This is called from the mcpu code as a notifier function, when the LU is
     finally about to start the Exec() method. Of course, to have end-user
     visible results, the opcode must be initially (before calling into
     This is called from the mcpu code as a notifier function, when the LU is
     finally about to start the Exec() method. Of course, to have end-user
     visible results, the opcode must be initially (before calling into
-    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
+    Processor.ExecOpCode) set to OP_STATUS_WAITING.
 
     """
     assert self._op in self._job.ops
 
     """
     assert self._op in self._job.ops
-    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
+    assert self._op.status in (constants.OP_STATUS_WAITING,
                                constants.OP_STATUS_CANCELING)
 
     # Cancel here if we were asked to
                                constants.OP_STATUS_CANCELING)
 
     # Cancel here if we were asked to
@@ -538,16 +612,27 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     timestamp = utils.SplitTime(time.time())
     self._AppendFeedback(timestamp, log_type, log_msg)
 
     timestamp = utils.SplitTime(time.time())
     self._AppendFeedback(timestamp, log_type, log_msg)
 
-  def CheckCancel(self):
-    """Check whether job has been cancelled.
+  def CurrentPriority(self):
+    """Returns current priority for opcode.
 
     """
 
     """
-    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
+    assert self._op.status in (constants.OP_STATUS_WAITING,
                                constants.OP_STATUS_CANCELING)
 
     # Cancel here if we were asked to
     self._CheckCancel()
 
                                constants.OP_STATUS_CANCELING)
 
     # Cancel here if we were asked to
     self._CheckCancel()
 
+    return self._op.priority
+
+  def SubmitManyJobs(self, jobs):
+    """Submits jobs for processing.
+
+    See L{JobQueue.SubmitManyJobs}.
+
+    """
+    # Locking is done in job queue
+    return self._queue.SubmitManyJobs(jobs)
+
 
 class _JobChangesChecker(object):
   def __init__(self, fields, prev_job_info, prev_log_serial):
 
 class _JobChangesChecker(object):
   def __init__(self, fields, prev_job_info, prev_log_serial):
@@ -561,7 +646,7 @@ class _JobChangesChecker(object):
     @param prev_log_serial: previous job serial, as passed by the LUXI client
 
     """
     @param prev_log_serial: previous job serial, as passed by the LUXI client
 
     """
-    self._fields = fields
+    self._squery = _SimpleJobQuery(fields)
     self._prev_job_info = prev_job_info
     self._prev_log_serial = prev_log_serial
 
     self._prev_job_info = prev_job_info
     self._prev_log_serial = prev_log_serial
 
@@ -572,8 +657,10 @@ class _JobChangesChecker(object):
     @param job: Job object
 
     """
     @param job: Job object
 
     """
+    assert not job.writable, "Expected read-only job"
+
     status = job.CalcStatus()
     status = job.CalcStatus()
-    job_info = job.GetInfo(self._fields)
+    job_info = self._squery(job)
     log_entries = job.GetLogEntries(self._prev_log_serial)
 
     # Serializing and deserializing data can cause type changes (e.g. from
     log_entries = job.GetLogEntries(self._prev_log_serial)
 
     # Serializing and deserializing data can cause type changes (e.g. from
@@ -592,7 +679,7 @@ class _JobChangesChecker(object):
     # no changes.
     if (status not in (constants.JOB_STATUS_QUEUED,
                        constants.JOB_STATUS_RUNNING,
     # no changes.
     if (status not in (constants.JOB_STATUS_QUEUED,
                        constants.JOB_STATUS_RUNNING,
-                       constants.JOB_STATUS_WAITLOCK) or
+                       constants.JOB_STATUS_WAITING) or
         job_info != self._prev_job_info or
         (log_entries and self._prev_log_serial != log_entries[0][0])):
       logging.debug("Job %s changed", job.id)
         job_info != self._prev_job_info or
         (log_entries and self._prev_log_serial != log_entries[0][0])):
       logging.debug("Job %s changed", job.id)
@@ -602,7 +689,7 @@ class _JobChangesChecker(object):
 
 
 class _JobFileChangesWaiter(object):
 
 
 class _JobFileChangesWaiter(object):
-  def __init__(self, filename):
+  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
     """Initializes this class.
 
     @type filename: string
     """Initializes this class.
 
     @type filename: string
@@ -610,7 +697,7 @@ class _JobFileChangesWaiter(object):
     @raises errors.InotifyError: if the notifier cannot be setup
 
     """
     @raises errors.InotifyError: if the notifier cannot be setup
 
     """
-    self._wm = pyinotify.WatchManager()
+    self._wm = _inotify_wm_cls()
     self._inotify_handler = \
       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
     self._notifier = \
     self._inotify_handler = \
       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
     self._notifier = \
@@ -652,7 +739,7 @@ class _JobFileChangesWaiter(object):
 
 
 class _JobChangesWaiter(object):
 
 
 class _JobChangesWaiter(object):
-  def __init__(self, filename):
+  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
     """Initializes this class.
 
     @type filename: string
     """Initializes this class.
 
     @type filename: string
@@ -661,6 +748,7 @@ class _JobChangesWaiter(object):
     """
     self._filewaiter = None
     self._filename = filename
     """
     self._filewaiter = None
     self._filename = filename
+    self._waiter_cls = _waiter_cls
 
   def Wait(self, timeout):
     """Waits for a job to change.
 
   def Wait(self, timeout):
     """Waits for a job to change.
@@ -677,7 +765,7 @@ class _JobChangesWaiter(object):
     # If this point is reached, return immediately and let caller check the job
     # file again in case there were changes since the last check. This avoids a
     # race condition.
     # If this point is reached, return immediately and let caller check the job
     # file again in case there were changes since the last check. This avoids a
     # race condition.
-    self._filewaiter = _JobFileChangesWaiter(self._filename)
+    self._filewaiter = self._waiter_cls(self._filename)
 
     return True
 
 
     return True
 
@@ -697,7 +785,13 @@ class _WaitForJobChangesHelper(object):
 
   """
   @staticmethod
 
   """
   @staticmethod
-  def _CheckForChanges(job_load_fn, check_fn):
+  def _CheckForChanges(counter, job_load_fn, check_fn):
+    if counter.next() > 0:
+      # If this isn't the first check the job is given some more time to change
+      # again. This gives better performance for jobs generating many
+      # changes/messages.
+      time.sleep(0.1)
+
     job = job_load_fn()
     if not job:
       raise errors.JobLost()
     job = job_load_fn()
     if not job:
       raise errors.JobLost()
@@ -709,7 +803,8 @@ class _WaitForJobChangesHelper(object):
     return result
 
   def __call__(self, filename, job_load_fn,
     return result
 
   def __call__(self, filename, job_load_fn,
-               fields, prev_job_info, prev_log_serial, timeout):
+               fields, prev_job_info, prev_log_serial, timeout,
+               _waiter_cls=_JobChangesWaiter):
     """Waits for changes on a job.
 
     @type filename: string
     """Waits for changes on a job.
 
     @type filename: string
@@ -726,17 +821,18 @@ class _WaitForJobChangesHelper(object):
     @param timeout: maximum time to wait in seconds
 
     """
     @param timeout: maximum time to wait in seconds
 
     """
+    counter = itertools.count()
     try:
       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
     try:
       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
-      waiter = _JobChangesWaiter(filename)
+      waiter = _waiter_cls(filename)
       try:
         return utils.Retry(compat.partial(self._CheckForChanges,
       try:
         return utils.Retry(compat.partial(self._CheckForChanges,
-                                          job_load_fn, check_fn),
+                                          counter, job_load_fn, check_fn),
                            utils.RETRY_REMAINING_TIME, timeout,
                            wait_fn=waiter.Wait)
       finally:
         waiter.Close()
                            utils.RETRY_REMAINING_TIME, timeout,
                            wait_fn=waiter.Wait)
       finally:
         waiter.Close()
-    except (errors.InotifyError, errors.JobLost):
+    except errors.JobLost:
       return None
     except utils.RetryTimeout:
       return constants.JOB_NOTCHANGED
       return None
     except utils.RetryTimeout:
       return constants.JOB_NOTCHANGED
@@ -796,6 +892,12 @@ class _OpExecContext:
     self.log_prefix = log_prefix
     self.summary = op.input.Summary()
 
     self.log_prefix = log_prefix
     self.summary = op.input.Summary()
 
+    # Create local copy to modify
+    if getattr(op.input, opcodes.DEPEND_ATTR, None):
+      self.jobdeps = op.input.depends[:]
+    else:
+      self.jobdeps = None
+
     self._timeout_strategy_factory = timeout_strategy_factory
     self._ResetTimeoutStrategy()
 
     self._timeout_strategy_factory = timeout_strategy_factory
     self._ResetTimeoutStrategy()
 
@@ -833,6 +935,10 @@ class _OpExecContext:
 
 
 class _JobProcessor(object):
 
 
 class _JobProcessor(object):
+  (DEFER,
+   WAITDEP,
+   FINISHED) = range(1, 4)
+
   def __init__(self, queue, opexec_fn, job,
                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
     """Initializes this class.
   def __init__(self, queue, opexec_fn, job,
                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
     """Initializes this class.
@@ -897,14 +1003,14 @@ class _JobProcessor(object):
     """
     assert op in job.ops
     assert op.status in (constants.OP_STATUS_QUEUED,
     """
     assert op in job.ops
     assert op.status in (constants.OP_STATUS_QUEUED,
-                         constants.OP_STATUS_WAITLOCK)
+                         constants.OP_STATUS_WAITING)
 
     update = False
 
     op.result = None
 
     if op.status == constants.OP_STATUS_QUEUED:
 
     update = False
 
     op.result = None
 
     if op.status == constants.OP_STATUS_QUEUED:
-      op.status = constants.OP_STATUS_WAITLOCK
+      op.status = constants.OP_STATUS_WAITING
       update = True
 
     if op.start_timestamp is None:
       update = True
 
     if op.start_timestamp is None:
@@ -915,17 +1021,73 @@ class _JobProcessor(object):
       job.start_timestamp = op.start_timestamp
       update = True
 
       job.start_timestamp = op.start_timestamp
       update = True
 
-    assert op.status == constants.OP_STATUS_WAITLOCK
+    assert op.status == constants.OP_STATUS_WAITING
 
     return update
 
 
     return update
 
+  @staticmethod
+  def _CheckDependencies(queue, job, opctx):
+    """Checks if an opcode has dependencies and if so, processes them.
+
+    @type queue: L{JobQueue}
+    @param queue: Queue object
+    @type job: L{_QueuedJob}
+    @param job: Job object
+    @type opctx: L{_OpExecContext}
+    @param opctx: Opcode execution context
+    @rtype: bool
+    @return: Whether opcode will be re-scheduled by dependency tracker
+
+    """
+    op = opctx.op
+
+    result = False
+
+    while opctx.jobdeps:
+      (dep_job_id, dep_status) = opctx.jobdeps[0]
+
+      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
+                                                          dep_status)
+      assert ht.TNonEmptyString(depmsg), "No dependency message"
+
+      logging.info("%s: %s", opctx.log_prefix, depmsg)
+
+      if depresult == _JobDependencyManager.CONTINUE:
+        # Remove dependency and continue
+        opctx.jobdeps.pop(0)
+
+      elif depresult == _JobDependencyManager.WAIT:
+        # Need to wait for notification, dependency tracker will re-add job
+        # to workerpool
+        result = True
+        break
+
+      elif depresult == _JobDependencyManager.CANCEL:
+        # Job was cancelled, cancel this job as well
+        job.Cancel()
+        assert op.status == constants.OP_STATUS_CANCELING
+        break
+
+      elif depresult in (_JobDependencyManager.WRONGSTATUS,
+                         _JobDependencyManager.ERROR):
+        # Job failed or there was an error, this job must fail
+        op.status = constants.OP_STATUS_ERROR
+        op.result = _EncodeOpError(errors.OpExecError(depmsg))
+        break
+
+      else:
+        raise errors.ProgrammerError("Unknown dependency result '%s'" %
+                                     depresult)
+
+    return result
+
   def _ExecOpCodeUnlocked(self, opctx):
     """Processes one opcode and returns the result.
 
     """
     op = opctx.op
 
   def _ExecOpCodeUnlocked(self, opctx):
     """Processes one opcode and returns the result.
 
     """
     op = opctx.op
 
-    assert op.status == constants.OP_STATUS_WAITLOCK
+    assert op.status == constants.OP_STATUS_WAITING
 
     timeout = opctx.GetNextLockTimeout()
 
 
     timeout = opctx.GetNextLockTimeout()
 
@@ -933,25 +1095,38 @@ class _JobProcessor(object):
       # Make sure not to hold queue lock while calling ExecOpCode
       result = self.opexec_fn(op.input,
                               _OpExecCallbacks(self.queue, self.job, op),
       # Make sure not to hold queue lock while calling ExecOpCode
       result = self.opexec_fn(op.input,
                               _OpExecCallbacks(self.queue, self.job, op),
-                              timeout=timeout, priority=op.priority)
+                              timeout=timeout)
     except mcpu.LockAcquireTimeout:
       assert timeout is not None, "Received timeout for blocking acquire"
       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
 
     except mcpu.LockAcquireTimeout:
       assert timeout is not None, "Received timeout for blocking acquire"
       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
 
-      assert op.status in (constants.OP_STATUS_WAITLOCK,
+      assert op.status in (constants.OP_STATUS_WAITING,
                            constants.OP_STATUS_CANCELING)
 
       # Was job cancelled while we were waiting for the lock?
       if op.status == constants.OP_STATUS_CANCELING:
         return (constants.OP_STATUS_CANCELING, None)
 
                            constants.OP_STATUS_CANCELING)
 
       # Was job cancelled while we were waiting for the lock?
       if op.status == constants.OP_STATUS_CANCELING:
         return (constants.OP_STATUS_CANCELING, None)
 
+      # Queue is shutting down, return to queued
+      if not self.queue.AcceptingJobsUnlocked():
+        return (constants.OP_STATUS_QUEUED, None)
+
       # Stay in waitlock while trying to re-acquire lock
       # Stay in waitlock while trying to re-acquire lock
-      return (constants.OP_STATUS_WAITLOCK, None)
+      return (constants.OP_STATUS_WAITING, None)
     except CancelJob:
       logging.exception("%s: Canceling job", opctx.log_prefix)
       assert op.status == constants.OP_STATUS_CANCELING
       return (constants.OP_STATUS_CANCELING, None)
     except CancelJob:
       logging.exception("%s: Canceling job", opctx.log_prefix)
       assert op.status == constants.OP_STATUS_CANCELING
       return (constants.OP_STATUS_CANCELING, None)
-    except Exception, err: # pylint: disable-msg=W0703
+
+    except QueueShutdown:
+      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
+
+      assert op.status == constants.OP_STATUS_WAITING
+
+      # Job hadn't been started yet, so it should return to the queue
+      return (constants.OP_STATUS_QUEUED, None)
+
+    except Exception, err: # pylint: disable=W0703
       logging.exception("%s: Caught exception in %s",
                         opctx.log_prefix, opctx.summary)
       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
       logging.exception("%s: Caught exception in %s",
                         opctx.log_prefix, opctx.summary)
       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
@@ -964,9 +1139,9 @@ class _JobProcessor(object):
     """Continues execution of a job.
 
     @param _nextop_fn: Callback function for tests
     """Continues execution of a job.
 
     @param _nextop_fn: Callback function for tests
-    @rtype: bool
-    @return: True if job is finished, False if processor needs to be called
-             again
+    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
+      be deferred and C{WAITDEP} if the dependency manager
+      (L{_JobDependencyManager}) will re-schedule the job when appropriate
 
     """
     queue = self.queue
 
     """
     queue = self.queue
@@ -978,9 +1153,11 @@ class _JobProcessor(object):
     try:
       opcount = len(job.ops)
 
     try:
       opcount = len(job.ops)
 
+      assert job.writable, "Expected writable job"
+
       # Don't do anything for finalized jobs
       if job.CalcStatus() in constants.JOBS_FINALIZED:
       # Don't do anything for finalized jobs
       if job.CalcStatus() in constants.JOBS_FINALIZED:
-        return True
+        return self.FINISHED
 
       # Is a previous opcode still pending?
       if job.cur_opctx:
 
       # Is a previous opcode still pending?
       if job.cur_opctx:
@@ -999,39 +1176,57 @@ class _JobProcessor(object):
                         for i in job.ops[opctx.index + 1:])
 
       assert op.status in (constants.OP_STATUS_QUEUED,
                         for i in job.ops[opctx.index + 1:])
 
       assert op.status in (constants.OP_STATUS_QUEUED,
-                           constants.OP_STATUS_WAITLOCK,
+                           constants.OP_STATUS_WAITING,
                            constants.OP_STATUS_CANCELING)
 
       assert (op.priority <= constants.OP_PRIO_LOWEST and
               op.priority >= constants.OP_PRIO_HIGHEST)
 
                            constants.OP_STATUS_CANCELING)
 
       assert (op.priority <= constants.OP_PRIO_LOWEST and
               op.priority >= constants.OP_PRIO_HIGHEST)
 
+      waitjob = None
+
       if op.status != constants.OP_STATUS_CANCELING:
         assert op.status in (constants.OP_STATUS_QUEUED,
       if op.status != constants.OP_STATUS_CANCELING:
         assert op.status in (constants.OP_STATUS_QUEUED,
-                             constants.OP_STATUS_WAITLOCK)
+                             constants.OP_STATUS_WAITING)
 
         # Prepare to start opcode
         if self._MarkWaitlock(job, op):
           # Write to disk
           queue.UpdateJobUnlocked(job)
 
 
         # Prepare to start opcode
         if self._MarkWaitlock(job, op):
           # Write to disk
           queue.UpdateJobUnlocked(job)
 
-        assert op.status == constants.OP_STATUS_WAITLOCK
-        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
+        assert op.status == constants.OP_STATUS_WAITING
+        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
         assert job.start_timestamp and op.start_timestamp
         assert job.start_timestamp and op.start_timestamp
+        assert waitjob is None
 
 
-        logging.info("%s: opcode %s waiting for locks",
-                     opctx.log_prefix, opctx.summary)
+        # Check if waiting for a job is necessary
+        waitjob = self._CheckDependencies(queue, job, opctx)
 
 
-        queue.release()
-        try:
-          (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
-        finally:
-          queue.acquire(shared=1)
+        assert op.status in (constants.OP_STATUS_WAITING,
+                             constants.OP_STATUS_CANCELING,
+                             constants.OP_STATUS_ERROR)
+
+        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
+                                         constants.OP_STATUS_ERROR)):
+          logging.info("%s: opcode %s waiting for locks",
+                       opctx.log_prefix, opctx.summary)
+
+          assert not opctx.jobdeps, "Not all dependencies were removed"
+
+          queue.release()
+          try:
+            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
+          finally:
+            queue.acquire(shared=1)
+
+          op.status = op_status
+          op.result = op_result
 
 
-        op.status = op_status
-        op.result = op_result
+          assert not waitjob
 
 
-        if op.status == constants.OP_STATUS_WAITLOCK:
-          # Couldn't get locks in time
+        if op.status in (constants.OP_STATUS_WAITING,
+                         constants.OP_STATUS_QUEUED):
+          # waiting: Couldn't get locks in time
+          # queued: Queue is shutting down
           assert not op.end_timestamp
         else:
           # Finalize opcode
           assert not op.end_timestamp
         else:
           # Finalize opcode
@@ -1043,10 +1238,22 @@ class _JobProcessor(object):
           else:
             assert op.status in constants.OPS_FINALIZED
 
           else:
             assert op.status in constants.OPS_FINALIZED
 
-      if op.status == constants.OP_STATUS_WAITLOCK:
+      if op.status == constants.OP_STATUS_QUEUED:
+        # Queue is shutting down
+        assert not waitjob
+
         finalize = False
 
         finalize = False
 
-        if opctx.CheckPriorityIncrease():
+        # Reset context
+        job.cur_opctx = None
+
+        # In no case must the status be finalized here
+        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
+
+      elif op.status == constants.OP_STATUS_WAITING or waitjob:
+        finalize = False
+
+        if not waitjob and opctx.CheckPriorityIncrease():
           # Priority was changed, need to update on-disk file
           queue.UpdateJobUnlocked(job)
 
           # Priority was changed, need to update on-disk file
           queue.UpdateJobUnlocked(job)
 
@@ -1057,7 +1264,7 @@ class _JobProcessor(object):
                 op.priority >= constants.OP_PRIO_HIGHEST)
 
         # In no case must the status be finalized here
                 op.priority >= constants.OP_PRIO_HIGHEST)
 
         # In no case must the status be finalized here
-        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
+        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
 
       else:
         # Ensure all opcodes so far have been successful
 
       else:
         # Ensure all opcodes so far have been successful
@@ -1105,39 +1312,118 @@ class _JobProcessor(object):
         # allowed. Once the file has been written, it can be archived anytime.
         queue.UpdateJobUnlocked(job)
 
         # allowed. Once the file has been written, it can be archived anytime.
         queue.UpdateJobUnlocked(job)
 
+        assert not waitjob
+
         if finalize:
           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
         if finalize:
           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
-          return True
+          return self.FINISHED
 
 
-      return False
+      assert not waitjob or queue.depmgr.JobWaiting(job)
+
+      if waitjob:
+        return self.WAITDEP
+      else:
+        return self.DEFER
     finally:
     finally:
+      assert job.writable, "Job became read-only while being processed"
       queue.release()
 
 
       queue.release()
 
 
+def _EvaluateJobProcessorResult(depmgr, job, result):
+  """Looks at a result from L{_JobProcessor} for a job.
+
+  To be used in a L{_JobQueueWorker}.
+
+  """
+  if result == _JobProcessor.FINISHED:
+    # Notify waiting jobs
+    depmgr.NotifyWaiters(job.id)
+
+  elif result == _JobProcessor.DEFER:
+    # Schedule again
+    raise workerpool.DeferTask(priority=job.CalcPriority())
+
+  elif result == _JobProcessor.WAITDEP:
+    # No-op, dependency manager will re-schedule
+    pass
+
+  else:
+    raise errors.ProgrammerError("Job processor returned unknown status %s" %
+                                 (result, ))
+
+
 class _JobQueueWorker(workerpool.BaseWorker):
   """The actual job workers.
 
   """
 class _JobQueueWorker(workerpool.BaseWorker):
   """The actual job workers.
 
   """
-  def RunTask(self, job): # pylint: disable-msg=W0221
+  def RunTask(self, job): # pylint: disable=W0221
     """Job executor.
 
     """Job executor.
 
-    This functions processes a job. It is closely tied to the L{_QueuedJob} and
-    L{_QueuedOpCode} classes.
-
     @type job: L{_QueuedJob}
     @param job: the job to be processed
 
     """
     @type job: L{_QueuedJob}
     @param job: the job to be processed
 
     """
+    assert job.writable, "Expected writable job"
+
+    # Ensure only one worker is active on a single job. If a job registers for
+    # a dependency job, and the other job notifies before the first worker is
+    # done, the job can end up in the tasklist more than once.
+    job.processor_lock.acquire()
+    try:
+      return self._RunTaskInner(job)
+    finally:
+      job.processor_lock.release()
+
+  def _RunTaskInner(self, job):
+    """Executes a job.
+
+    Must be called with per-job lock acquired.
+
+    """
     queue = job.queue
     assert queue == self.pool.queue
 
     queue = job.queue
     assert queue == self.pool.queue
 
-    self.SetTaskName("Job%s" % job.id)
+    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
+    setname_fn(None)
 
     proc = mcpu.Processor(queue.context, job.id)
 
 
     proc = mcpu.Processor(queue.context, job.id)
 
-    if not _JobProcessor(queue, proc.ExecOpCode, job)():
-      # Schedule again
-      raise workerpool.DeferTask(priority=job.CalcPriority())
+    # Create wrapper for setting thread name
+    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
+                                    proc.ExecOpCode)
+
+    _EvaluateJobProcessorResult(queue.depmgr, job,
+                                _JobProcessor(queue, wrap_execop_fn, job)())
+
+  @staticmethod
+  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
+    """Updates the worker thread name to include a short summary of the opcode.
+
+    @param setname_fn: Callable setting worker thread name
+    @param execop_fn: Callable for executing opcode (usually
+                      L{mcpu.Processor.ExecOpCode})
+
+    """
+    setname_fn(op)
+    try:
+      return execop_fn(op, *args, **kwargs)
+    finally:
+      setname_fn(None)
+
+  @staticmethod
+  def _GetWorkerName(job, op):
+    """Sets the worker thread name.
+
+    @type job: L{_QueuedJob}
+    @type op: L{opcodes.OpCode}
+
+    """
+    parts = ["Job%s" % job.id]
+
+    if op:
+      parts.append(op.TinySummary())
+
+    return "/".join(parts)
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
@@ -1145,12 +1431,148 @@ class _JobQueueWorkerPool(workerpool.WorkerPool):
 
   """
   def __init__(self, queue):
 
   """
   def __init__(self, queue):
-    super(_JobQueueWorkerPool, self).__init__("JobQueue",
+    super(_JobQueueWorkerPool, self).__init__("Jq",
                                               JOBQUEUE_THREADS,
                                               _JobQueueWorker)
     self.queue = queue
 
 
                                               JOBQUEUE_THREADS,
                                               _JobQueueWorker)
     self.queue = queue
 
 
+class _JobDependencyManager:
+  """Keeps track of job dependencies.
+
+  """
+  (WAIT,
+   ERROR,
+   CANCEL,
+   CONTINUE,
+   WRONGSTATUS) = range(1, 6)
+
+  def __init__(self, getstatus_fn, enqueue_fn):
+    """Initializes this class.
+
+    """
+    self._getstatus_fn = getstatus_fn
+    self._enqueue_fn = enqueue_fn
+
+    self._waiters = {}
+    self._lock = locking.SharedLock("JobDepMgr")
+
+  @locking.ssynchronized(_LOCK, shared=1)
+  def GetLockInfo(self, requested): # pylint: disable=W0613
+    """Retrieves information about waiting jobs.
+
+    @type requested: set
+    @param requested: Requested information, see C{query.LQ_*}
+
+    """
+    # No need to sort here, that's being done by the lock manager and query
+    # library. There are no priorities for notifying jobs, hence all show up as
+    # one item under "pending".
+    return [("job/%s" % job_id, None, None,
+             [("job", [job.id for job in waiters])])
+            for job_id, waiters in self._waiters.items()
+            if waiters]
+
+  @locking.ssynchronized(_LOCK, shared=1)
+  def JobWaiting(self, job):
+    """Checks if a job is waiting.
+
+    """
+    return compat.any(job in jobs
+                      for jobs in self._waiters.values())
+
+  @locking.ssynchronized(_LOCK)
+  def CheckAndRegister(self, job, dep_job_id, dep_status):
+    """Checks if a dependency job has the requested status.
+
+    If the other job is not yet in a finalized status, the calling job will be
+    notified (re-added to the workerpool) at a later point.
+
+    @type job: L{_QueuedJob}
+    @param job: Job object
+    @type dep_job_id: int
+    @param dep_job_id: ID of dependency job
+    @type dep_status: list
+    @param dep_status: Required status
+
+    """
+    assert ht.TJobId(job.id)
+    assert ht.TJobId(dep_job_id)
+    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
+
+    if job.id == dep_job_id:
+      return (self.ERROR, "Job can't depend on itself")
+
+    # Get status of dependency job
+    try:
+      status = self._getstatus_fn(dep_job_id)
+    except errors.JobLost, err:
+      return (self.ERROR, "Dependency error: %s" % err)
+
+    assert status in constants.JOB_STATUS_ALL
+
+    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
+
+    if status not in constants.JOBS_FINALIZED:
+      # Register for notification and wait for job to finish
+      job_id_waiters.add(job)
+      return (self.WAIT,
+              "Need to wait for job %s, wanted status '%s'" %
+              (dep_job_id, dep_status))
+
+    # Remove from waiters list
+    if job in job_id_waiters:
+      job_id_waiters.remove(job)
+
+    if (status == constants.JOB_STATUS_CANCELED and
+        constants.JOB_STATUS_CANCELED not in dep_status):
+      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
+
+    elif not dep_status or status in dep_status:
+      return (self.CONTINUE,
+              "Dependency job %s finished with status '%s'" %
+              (dep_job_id, status))
+
+    else:
+      return (self.WRONGSTATUS,
+              "Dependency job %s finished with status '%s',"
+              " not one of '%s' as required" %
+              (dep_job_id, status, utils.CommaJoin(dep_status)))
+
+  def _RemoveEmptyWaitersUnlocked(self):
+    """Remove all jobs without actual waiters.
+
+    """
+    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
+                   if not waiters]:
+      del self._waiters[job_id]
+
+  def NotifyWaiters(self, job_id):
+    """Notifies all jobs waiting for a certain job ID.
+
+    @attention: Do not call until L{CheckAndRegister} returned a status other
+      than C{WAITDEP} for C{job_id}, or behaviour is undefined
+    @type job_id: int
+    @param job_id: Job ID
+
+    """
+    assert ht.TJobId(job_id)
+
+    self._lock.acquire()
+    try:
+      self._RemoveEmptyWaitersUnlocked()
+
+      jobs = self._waiters.pop(job_id, None)
+    finally:
+      self._lock.release()
+
+    if jobs:
+      # Re-add jobs to workerpool
+      logging.debug("Re-adding %s jobs which were waiting for job %s",
+                    len(jobs), job_id)
+      self._enqueue_fn(jobs)
+
+
 def _RequireOpenQueue(fn):
   """Decorator for "public" functions.
 
 def _RequireOpenQueue(fn):
   """Decorator for "public" functions.
 
@@ -1170,20 +1592,41 @@ def _RequireOpenQueue(fn):
 
   """
   def wrapper(self, *args, **kwargs):
 
   """
   def wrapper(self, *args, **kwargs):
-    # pylint: disable-msg=W0212
+    # pylint: disable=W0212
     assert self._queue_filelock is not None, "Queue should be open"
     return fn(self, *args, **kwargs)
   return wrapper
 
 
     assert self._queue_filelock is not None, "Queue should be open"
     return fn(self, *args, **kwargs)
   return wrapper
 
 
-class JobQueue(object):
-  """Queue used to manage the jobs.
+def _RequireNonDrainedQueue(fn):
+  """Decorator checking for a non-drained queue.
 
 
-  @cvar _RE_JOB_FILE: regex matching the valid job file names
+  To be used with functions submitting new jobs.
 
   """
 
   """
-  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
+  def wrapper(self, *args, **kwargs):
+    """Wrapper function.
+
+    @raise errors.JobQueueDrainError: if the job queue is marked for draining
+
+    """
+    # Ok when sharing the big job queue lock, as the drain file is created when
+    # the lock is exclusive.
+    # Needs access to protected member, pylint: disable=W0212
+    if self._drained:
+      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
+
+    if not self._accepting_jobs:
+      raise errors.JobQueueError("Job queue is shutting down, refusing job")
+
+    return fn(self, *args, **kwargs)
+  return wrapper
+
+
+class JobQueue(object):
+  """Queue used to manage the jobs.
 
 
+  """
   def __init__(self, context):
     """Constructor for JobQueue.
 
   def __init__(self, context):
     """Constructor for JobQueue.
 
@@ -1211,6 +1654,9 @@ class JobQueue(object):
     self.acquire = self._lock.acquire
     self.release = self._lock.release
 
     self.acquire = self._lock.acquire
     self.release = self._lock.release
 
+    # Accept jobs by default
+    self._accepting_jobs = True
+
     # Initialize the queue, and acquire the filelock.
     # This ensures no other process is working on the job queue.
     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
     # Initialize the queue, and acquire the filelock.
     # This ensures no other process is working on the job queue.
     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
@@ -1230,9 +1676,15 @@ class JobQueue(object):
 
     # TODO: Check consistency across nodes
 
 
     # TODO: Check consistency across nodes
 
-    self._queue_size = 0
+    self._queue_size = None
     self._UpdateQueueSizeUnlocked()
     self._UpdateQueueSizeUnlocked()
-    self._drained = self._IsQueueMarkedDrain()
+    assert ht.TInt(self._queue_size)
+    self._drained = jstore.CheckDrainFlag()
+
+    # Job dependencies
+    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
+                                        self._EnqueueJobs)
+    self.context.glm.AddToLockMonitor(self.depmgr)
 
     # Setup worker pool
     self._wpool = _JobQueueWorkerPool(self)
 
     # Setup worker pool
     self._wpool = _JobQueueWorkerPool(self)
@@ -1278,26 +1730,33 @@ class JobQueue(object):
         restartjobs.append(job)
 
       elif status in (constants.JOB_STATUS_RUNNING,
         restartjobs.append(job)
 
       elif status in (constants.JOB_STATUS_RUNNING,
-                      constants.JOB_STATUS_WAITLOCK,
+                      constants.JOB_STATUS_WAITING,
                       constants.JOB_STATUS_CANCELING):
         logging.warning("Unfinished job %s found: %s", job.id, job)
 
                       constants.JOB_STATUS_CANCELING):
         logging.warning("Unfinished job %s found: %s", job.id, job)
 
-        if status == constants.JOB_STATUS_WAITLOCK:
+        if status == constants.JOB_STATUS_WAITING:
           # Restart job
           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
           restartjobs.append(job)
         else:
           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
                                 "Unclean master daemon shutdown")
           # Restart job
           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
           restartjobs.append(job)
         else:
           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
                                 "Unclean master daemon shutdown")
+          job.Finalize()
 
         self.UpdateJobUnlocked(job)
 
     if restartjobs:
       logging.info("Restarting %s jobs", len(restartjobs))
 
         self.UpdateJobUnlocked(job)
 
     if restartjobs:
       logging.info("Restarting %s jobs", len(restartjobs))
-      self._EnqueueJobs(restartjobs)
+      self._EnqueueJobsUnlocked(restartjobs)
 
     logging.info("Job queue inspection finished")
 
 
     logging.info("Job queue inspection finished")
 
+  def _GetRpc(self, address_list):
+    """Gets RPC runner with context.
+
+    """
+    return rpc.JobQueueRunner(self.context, address_list)
+
   @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def AddNode(self, node):
   @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def AddNode(self, node):
@@ -1311,7 +1770,7 @@ class JobQueue(object):
     assert node_name != self._my_hostname
 
     # Clean queue directory on added node
     assert node_name != self._my_hostname
 
     # Clean queue directory on added node
-    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
+    result = self._GetRpc(None).call_jobqueue_purge(node_name)
     msg = result.fail_msg
     if msg:
       logging.warning("Cannot cleanup queue directory on node %s: %s",
     msg = result.fail_msg
     if msg:
       logging.warning("Cannot cleanup queue directory on node %s: %s",
@@ -1327,20 +1786,31 @@ class JobQueue(object):
     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
 
     # Upload current serial file
     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
 
     # Upload current serial file
-    files.append(constants.JOB_QUEUE_SERIAL_FILE)
+    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
+
+    # Static address list
+    addrs = [node.primary_ip]
 
     for file_name in files:
       # Read file content
       content = utils.ReadFile(file_name)
 
 
     for file_name in files:
       # Read file content
       content = utils.ReadFile(file_name)
 
-      result = rpc.RpcRunner.call_jobqueue_update([node_name],
-                                                  [node.primary_ip],
-                                                  file_name, content)
+      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
+                             file_name, content)
       msg = result[node_name].fail_msg
       if msg:
         logging.error("Failed to upload file %s to node %s: %s",
                       file_name, node_name, msg)
 
       msg = result[node_name].fail_msg
       if msg:
         logging.error("Failed to upload file %s to node %s: %s",
                       file_name, node_name, msg)
 
+    # Set queue drained flag
+    result = \
+      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
+                                                       self._drained)
+    msg = result[node_name].fail_msg
+    if msg:
+      logging.error("Failed to set queue drained flag on node %s: %s",
+                    node_name, msg)
+
     self._nodes[node_name] = node.primary_ip
 
   @locking.ssynchronized(_LOCK)
     self._nodes[node_name] = node.primary_ip
 
   @locking.ssynchronized(_LOCK)
@@ -1419,7 +1889,7 @@ class JobQueue(object):
 
     if replicate:
       names, addrs = self._GetNodeIp()
 
     if replicate:
       names, addrs = self._GetNodeIp()
-      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
+      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
 
   def _RenameFilesUnlocked(self, rename):
       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
 
   def _RenameFilesUnlocked(self, rename):
@@ -1438,42 +1908,9 @@ class JobQueue(object):
 
     # ... and on all nodes
     names, addrs = self._GetNodeIp()
 
     # ... and on all nodes
     names, addrs = self._GetNodeIp()
-    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
+    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
 
     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
 
-  @staticmethod
-  def _FormatJobID(job_id):
-    """Convert a job ID to string format.
-
-    Currently this just does C{str(job_id)} after performing some
-    checks, but if we want to change the job id format this will
-    abstract this change.
-
-    @type job_id: int or long
-    @param job_id: the numeric job id
-    @rtype: str
-    @return: the formatted job id
-
-    """
-    if not isinstance(job_id, (int, long)):
-      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
-    if job_id < 0:
-      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
-
-    return str(job_id)
-
-  @classmethod
-  def _GetArchiveDirectory(cls, job_id):
-    """Returns the archive directory for a job.
-
-    @type job_id: str
-    @param job_id: Job identifier
-    @rtype: str
-    @return: Directory name
-
-    """
-    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
-
   def _NewSerialsUnlocked(self, count):
     """Generates a new job identifier.
 
   def _NewSerialsUnlocked(self, count):
     """Generates a new job identifier.
 
@@ -1481,19 +1918,20 @@ class JobQueue(object):
 
     @type count: integer
     @param count: how many serials to return
 
     @type count: integer
     @param count: how many serials to return
-    @rtype: str
-    @return: a string representing the job identifier.
+    @rtype: list of int
+    @return: a list of job identifiers.
 
     """
 
     """
-    assert count > 0
+    assert ht.TNonNegativeInt(count)
+
     # New number
     serial = self._last_serial + count
 
     # Write to file
     # New number
     serial = self._last_serial + count
 
     # Write to file
-    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
+    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
                              "%s\n" % serial, True)
 
                              "%s\n" % serial, True)
 
-    result = [self._FormatJobID(v)
+    result = [jstore.FormatJobID(v)
               for v in range(self._last_serial + 1, serial + 1)]
 
     # Keep it only if we were able to write the file
               for v in range(self._last_serial + 1, serial + 1)]
 
     # Keep it only if we were able to write the file
@@ -1513,10 +1951,10 @@ class JobQueue(object):
     @return: the path to the job file
 
     """
     @return: the path to the job file
 
     """
-    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
+    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
 
 
-  @classmethod
-  def _GetArchivedJobPath(cls, job_id):
+  @staticmethod
+  def _GetArchivedJobPath(job_id):
     """Returns the archived job file for a give job id.
 
     @type job_id: str
     """Returns the archived job file for a give job id.
 
     @type job_id: str
@@ -1525,10 +1963,30 @@ class JobQueue(object):
     @return: the path to the archived job file
 
     """
     @return: the path to the archived job file
 
     """
-    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
-                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
+    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
+                          jstore.GetArchiveDirectory(job_id),
+                          "job-%s" % job_id)
+
+  @staticmethod
+  def _DetermineJobDirectories(archived):
+    """Build list of directories containing job files.
+
+    @type archived: bool
+    @param archived: Whether to include directories for archived jobs
+    @rtype: list
+
+    """
+    result = [pathutils.QUEUE_DIR]
 
 
-  def _GetJobIDsUnlocked(self, sort=True):
+    if archived:
+      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
+      result.extend(map(compat.partial(utils.PathJoin, archive_path),
+                        utils.ListVisibleFiles(archive_path)))
+
+    return result
+
+  @classmethod
+  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
     """Return all known job IDs.
 
     The method only looks at disk because it's a requirement that all
     """Return all known job IDs.
 
     The method only looks at disk because it's a requirement that all
@@ -1542,12 +2000,15 @@ class JobQueue(object):
 
     """
     jlist = []
 
     """
     jlist = []
-    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
-      m = self._RE_JOB_FILE.match(filename)
-      if m:
-        jlist.append(m.group(1))
+
+    for path in cls._DetermineJobDirectories(archived):
+      for filename in utils.ListVisibleFiles(path):
+        m = constants.JOB_FILE_RE.match(filename)
+        if m:
+          jlist.append(int(m.group(1)))
+
     if sort:
     if sort:
-      jlist = utils.NiceSort(jlist)
+      jlist.sort()
     return jlist
 
   def _LoadJobUnlocked(self, job_id):
     return jlist
 
   def _LoadJobUnlocked(self, job_id):
@@ -1557,6 +2018,7 @@ class JobQueue(object):
     existing, or try to load the job from the disk. If loading from
     disk, it will also add the job to the cache.
 
     existing, or try to load the job from the disk. If loading from
     disk, it will also add the job to the cache.
 
+    @type job_id: int
     @param job_id: the job id
     @rtype: L{_QueuedJob} or None
     @return: either None or the job object
     @param job_id: the job id
     @rtype: L{_QueuedJob} or None
     @return: either None or the job object
@@ -1565,10 +2027,11 @@ class JobQueue(object):
     job = self._memcache.get(job_id, None)
     if job:
       logging.debug("Found job %s in memcache", job_id)
     job = self._memcache.get(job_id, None)
     if job:
       logging.debug("Found job %s in memcache", job_id)
+      assert job.writable, "Found read-only job in memcache"
       return job
 
     try:
       return job
 
     try:
-      job = self._LoadJobFromDisk(job_id)
+      job = self._LoadJobFromDisk(job_id, False)
       if job is None:
         return job
     except errors.JobFileCorrupted:
       if job is None:
         return job
     except errors.JobFileCorrupted:
@@ -1583,70 +2046,79 @@ class JobQueue(object):
         self._RenameFilesUnlocked([(old_path, new_path)])
       return None
 
         self._RenameFilesUnlocked([(old_path, new_path)])
       return None
 
+    assert job.writable, "Job just loaded is not writable"
+
     self._memcache[job_id] = job
     logging.debug("Added job %s to the cache", job_id)
     return job
 
     self._memcache[job_id] = job
     logging.debug("Added job %s to the cache", job_id)
     return job
 
-  def _LoadJobFromDisk(self, job_id):
+  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
     """Load the given job file from disk.
 
     Given a job file, read, load and restore it in a _QueuedJob format.
 
     """Load the given job file from disk.
 
     Given a job file, read, load and restore it in a _QueuedJob format.
 
-    @type job_id: string
+    @type job_id: int
     @param job_id: job identifier
     @param job_id: job identifier
+    @type try_archived: bool
+    @param try_archived: Whether to try loading an archived job
     @rtype: L{_QueuedJob} or None
     @return: either None or the job object
 
     """
     @rtype: L{_QueuedJob} or None
     @return: either None or the job object
 
     """
-    filepath = self._GetJobPath(job_id)
-    logging.debug("Loading job from %s", filepath)
-    try:
-      raw_data = utils.ReadFile(filepath)
-    except EnvironmentError, err:
-      if err.errno in (errno.ENOENT, ):
-        return None
-      raise
+    path_functions = [(self._GetJobPath, False)]
+
+    if try_archived:
+      path_functions.append((self._GetArchivedJobPath, True))
+
+    raw_data = None
+    archived = None
+
+    for (fn, archived) in path_functions:
+      filepath = fn(job_id)
+      logging.debug("Loading job from %s", filepath)
+      try:
+        raw_data = utils.ReadFile(filepath)
+      except EnvironmentError, err:
+        if err.errno != errno.ENOENT:
+          raise
+      else:
+        break
+
+    if not raw_data:
+      return None
+
+    if writable is None:
+      writable = not archived
 
     try:
       data = serializer.LoadJson(raw_data)
 
     try:
       data = serializer.LoadJson(raw_data)
-      job = _QueuedJob.Restore(self, data)
-    except Exception, err: # pylint: disable-msg=W0703
+      job = _QueuedJob.Restore(self, data, writable, archived)
+    except Exception, err: # pylint: disable=W0703
       raise errors.JobFileCorrupted(err)
 
     return job
 
       raise errors.JobFileCorrupted(err)
 
     return job
 
-  def SafeLoadJobFromDisk(self, job_id):
+  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
     """Load the given job file from disk.
 
     Given a job file, read, load and restore it in a _QueuedJob format.
     In case of error reading the job, it gets returned as None, and the
     exception is logged.
 
     """Load the given job file from disk.
 
     Given a job file, read, load and restore it in a _QueuedJob format.
     In case of error reading the job, it gets returned as None, and the
     exception is logged.
 
-    @type job_id: string
+    @type job_id: int
     @param job_id: job identifier
     @param job_id: job identifier
+    @type try_archived: bool
+    @param try_archived: Whether to try loading an archived job
     @rtype: L{_QueuedJob} or None
     @return: either None or the job object
 
     """
     try:
     @rtype: L{_QueuedJob} or None
     @return: either None or the job object
 
     """
     try:
-      return self._LoadJobFromDisk(job_id)
+      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
     except (errors.JobFileCorrupted, EnvironmentError):
       logging.exception("Can't load/parse job %s", job_id)
       return None
 
     except (errors.JobFileCorrupted, EnvironmentError):
       logging.exception("Can't load/parse job %s", job_id)
       return None
 
-  @staticmethod
-  def _IsQueueMarkedDrain():
-    """Check if the queue is marked from drain.
-
-    This currently uses the queue drain file, which makes it a
-    per-node flag. In the future this can be moved to the config file.
-
-    @rtype: boolean
-    @return: True of the job queue is marked for draining
-
-    """
-    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
-
   def _UpdateQueueSizeUnlocked(self):
     """Update the queue size.
 
   def _UpdateQueueSizeUnlocked(self):
     """Update the queue size.
 
@@ -1662,16 +2134,18 @@ class JobQueue(object):
     @param drain_flag: Whether to set or unset the drain flag
 
     """
     @param drain_flag: Whether to set or unset the drain flag
 
     """
-    getents = runtime.GetEnts()
-
-    if drain_flag:
-      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
-                      uid=getents.masterd_uid, gid=getents.masterd_gid)
-    else:
-      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
+    # Change flag locally
+    jstore.SetDrainFlag(drain_flag)
 
     self._drained = drain_flag
 
 
     self._drained = drain_flag
 
+    # ... and on all nodes
+    (names, addrs) = self._GetNodeIp()
+    result = \
+      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
+    self._CheckRpcResult(result, self._nodes,
+                         "Setting queue drain flag to %s" % drain_flag)
+
     return True
 
   @_RequireOpenQueue
     return True
 
   @_RequireOpenQueue
@@ -1687,28 +2161,30 @@ class JobQueue(object):
     @param ops: The list of OpCodes that will become the new job.
     @rtype: L{_QueuedJob}
     @return: the job object to be queued
     @param ops: The list of OpCodes that will become the new job.
     @rtype: L{_QueuedJob}
     @return: the job object to be queued
-    @raise errors.JobQueueDrainError: if the job queue is marked for draining
     @raise errors.JobQueueFull: if the job queue has too many jobs in it
     @raise errors.GenericError: If an opcode is not valid
 
     """
     @raise errors.JobQueueFull: if the job queue has too many jobs in it
     @raise errors.GenericError: If an opcode is not valid
 
     """
-    # Ok when sharing the big job queue lock, as the drain file is created when
-    # the lock is exclusive.
-    if self._drained:
-      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
-
     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
       raise errors.JobQueueFull()
 
     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
       raise errors.JobQueueFull()
 
-    job = _QueuedJob(self, job_id, ops)
+    job = _QueuedJob(self, job_id, ops, True)
 
 
-    # Check priority
     for idx, op in enumerate(job.ops):
     for idx, op in enumerate(job.ops):
+      # Check priority
       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
                                   " are %s" % (idx, op.priority, allowed))
 
       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
                                   " are %s" % (idx, op.priority, allowed))
 
+      # Check job dependencies
+      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
+      if not opcodes.TNoRelativeJobDependencies(dependencies):
+        raise errors.GenericError("Opcode %s has invalid dependencies, must"
+                                  " match %s: %s" %
+                                  (idx, opcodes.TNoRelativeJobDependencies,
+                                   dependencies))
+
     # Write to disk
     self.UpdateJobUnlocked(job)
 
     # Write to disk
     self.UpdateJobUnlocked(job)
 
@@ -1721,41 +2197,115 @@ class JobQueue(object):
 
   @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
 
   @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
+  @_RequireNonDrainedQueue
   def SubmitJob(self, ops):
     """Create and store a new job.
 
     @see: L{_SubmitJobUnlocked}
 
     """
   def SubmitJob(self, ops):
     """Create and store a new job.
 
     @see: L{_SubmitJobUnlocked}
 
     """
-    job_id = self._NewSerialsUnlocked(1)[0]
-    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
+    (job_id, ) = self._NewSerialsUnlocked(1)
+    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
     return job_id
 
   @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
     return job_id
 
   @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
+  @_RequireNonDrainedQueue
   def SubmitManyJobs(self, jobs):
     """Create and store multiple jobs.
 
     @see: L{_SubmitJobUnlocked}
 
     """
   def SubmitManyJobs(self, jobs):
     """Create and store multiple jobs.
 
     @see: L{_SubmitJobUnlocked}
 
     """
-    results = []
-    added_jobs = []
     all_job_ids = self._NewSerialsUnlocked(len(jobs))
     all_job_ids = self._NewSerialsUnlocked(len(jobs))
-    for job_id, ops in zip(all_job_ids, jobs):
-      try:
-        added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
-        status = True
-        data = job_id
-      except errors.GenericError, err:
-        data = str(err)
-        status = False
-      results.append((status, data))
 
 
-    self._EnqueueJobs(added_jobs)
+    (results, added_jobs) = \
+      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
+
+    self._EnqueueJobsUnlocked(added_jobs)
 
     return results
 
 
     return results
 
+  @staticmethod
+  def _FormatSubmitError(msg, ops):
+    """Formats errors which occurred while submitting a job.
+
+    """
+    return ("%s; opcodes %s" %
+            (msg, utils.CommaJoin(op.Summary() for op in ops)))
+
+  @staticmethod
+  def _ResolveJobDependencies(resolve_fn, deps):
+    """Resolves relative job IDs in dependencies.
+
+    @type resolve_fn: callable
+    @param resolve_fn: Function to resolve a relative job ID
+    @type deps: list
+    @param deps: Dependencies
+    @rtype: tuple; (boolean, string or list)
+    @return: If successful (first tuple item), the returned list contains
+      resolved job IDs along with the requested status; if not successful,
+      the second element is an error message
+
+    """
+    result = []
+
+    for (dep_job_id, dep_status) in deps:
+      if ht.TRelativeJobId(dep_job_id):
+        assert ht.TInt(dep_job_id) and dep_job_id < 0
+        try:
+          job_id = resolve_fn(dep_job_id)
+        except IndexError:
+          # Abort
+          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
+      else:
+        job_id = dep_job_id
+
+      result.append((job_id, dep_status))
+
+    return (True, result)
+
+  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
+    """Create and store multiple jobs.
+
+    @see: L{_SubmitJobUnlocked}
+
+    """
+    results = []
+    added_jobs = []
+
+    def resolve_fn(job_idx, reljobid):
+      assert reljobid < 0
+      return (previous_job_ids + job_ids[:job_idx])[reljobid]
+
+    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
+      for op in ops:
+        if getattr(op, opcodes.DEPEND_ATTR, None):
+          (status, data) = \
+            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
+                                         op.depends)
+          if not status:
+            # Abort resolving dependencies
+            assert ht.TNonEmptyString(data), "No error message"
+            break
+          # Use resolved dependencies
+          op.depends = data
+      else:
+        try:
+          job = self._SubmitJobUnlocked(job_id, ops)
+        except errors.GenericError, err:
+          status = False
+          data = self._FormatSubmitError(str(err), ops)
+        else:
+          status = True
+          data = job_id
+          added_jobs.append(job)
+
+      results.append((status, data))
+
+    return (results, added_jobs)
+
+  @locking.ssynchronized(_LOCK)
   def _EnqueueJobs(self, jobs):
     """Helper function to add jobs to worker pool's queue.
 
   def _EnqueueJobs(self, jobs):
     """Helper function to add jobs to worker pool's queue.
 
@@ -1763,8 +2313,39 @@ class JobQueue(object):
     @param jobs: List of all jobs
 
     """
     @param jobs: List of all jobs
 
     """
+    return self._EnqueueJobsUnlocked(jobs)
+
+  def _EnqueueJobsUnlocked(self, jobs):
+    """Helper function to add jobs to worker pool's queue.
+
+    @type jobs: list
+    @param jobs: List of all jobs
+
+    """
+    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
     self._wpool.AddManyTasks([(job, ) for job in jobs],
     self._wpool.AddManyTasks([(job, ) for job in jobs],
-                             priority=[job.CalcPriority() for job in jobs])
+                             priority=[job.CalcPriority() for job in jobs],
+                             task_id=map(_GetIdAttr, jobs))
+
+  def _GetJobStatusForDependencies(self, job_id):
+    """Gets the status of a job for dependencies.
+
+    @type job_id: int
+    @param job_id: Job ID
+    @raise errors.JobLost: If job can't be found
+
+    """
+    # Not using in-memory cache as doing so would require an exclusive lock
+
+    # Try to load from disk
+    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
+
+    assert not job.writable, "Got writable job" # pylint: disable=E1101
+
+    if job:
+      return job.CalcStatus()
+
+    raise errors.JobLost("Job %s not found" % job_id)
 
   @_RequireOpenQueue
   def UpdateJobUnlocked(self, job, replicate=True):
 
   @_RequireOpenQueue
   def UpdateJobUnlocked(self, job, replicate=True):
@@ -1783,9 +2364,11 @@ class JobQueue(object):
     if __debug__:
       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
       assert (finalized ^ (job.end_timestamp is None))
     if __debug__:
       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
       assert (finalized ^ (job.end_timestamp is None))
+      assert job.writable, "Can't update read-only job"
+      assert not job.archived, "Can't update archived job"
 
     filename = self._GetJobPath(job.id)
 
     filename = self._GetJobPath(job.id)
-    data = serializer.DumpJson(job.Serialize(), indent=False)
+    data = serializer.DumpJson(job.Serialize())
     logging.debug("Writing job %s to %s", job.id, filename)
     self._UpdateJobQueueFile(filename, data, replicate)
 
     logging.debug("Writing job %s to %s", job.id, filename)
     self._UpdateJobQueueFile(filename, data, replicate)
 
@@ -1793,7 +2376,7 @@ class JobQueue(object):
                         timeout):
     """Waits for changes in a job.
 
                         timeout):
     """Waits for changes in a job.
 
-    @type job_id: string
+    @type job_id: int
     @param job_id: Job identifier
     @type fields: list of strings
     @param fields: Which fields to check for changes
     @param job_id: Job identifier
     @type fields: list of strings
     @param fields: Which fields to check for changes
@@ -1813,7 +2396,8 @@ class JobQueue(object):
         as such by the clients
 
     """
         as such by the clients
 
     """
-    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
+    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
+                             writable=False)
 
     helper = _WaitForJobChangesHelper()
 
 
     helper = _WaitForJobChangesHelper()
 
@@ -1827,18 +2411,64 @@ class JobQueue(object):
 
     This will only succeed if the job has not started yet.
 
 
     This will only succeed if the job has not started yet.
 
-    @type job_id: string
+    @type job_id: int
     @param job_id: job ID of job to be cancelled.
 
     """
     logging.info("Cancelling job %s", job_id)
 
     @param job_id: job ID of job to be cancelled.
 
     """
     logging.info("Cancelling job %s", job_id)
 
+    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
+
+  @locking.ssynchronized(_LOCK)
+  @_RequireOpenQueue
+  def ChangeJobPriority(self, job_id, priority):
+    """Changes a job's priority.
+
+    @type job_id: int
+    @param job_id: ID of the job whose priority should be changed
+    @type priority: int
+    @param priority: New priority
+
+    """
+    logging.info("Changing priority of job %s to %s", job_id, priority)
+
+    if priority not in constants.OP_PRIO_SUBMIT_VALID:
+      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
+      raise errors.GenericError("Invalid priority %s, allowed are %s" %
+                                (priority, allowed))
+
+    def fn(job):
+      (success, msg) = job.ChangePriority(priority)
+
+      if success:
+        try:
+          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
+        except workerpool.NoSuchTask:
+          logging.debug("Job %s is not in workerpool at this time", job.id)
+
+      return (success, msg)
+
+    return self._ModifyJobUnlocked(job_id, fn)
+
+  def _ModifyJobUnlocked(self, job_id, mod_fn):
+    """Modifies a job.
+
+    @type job_id: int
+    @param job_id: Job ID
+    @type mod_fn: callable
+    @param mod_fn: Modifying function, receiving job object as parameter,
+      returning tuple of (status boolean, message string)
+
+    """
     job = self._LoadJobUnlocked(job_id)
     if not job:
       logging.debug("Job %s not found", job_id)
       return (False, "Job %s not found" % job_id)
 
     job = self._LoadJobUnlocked(job_id)
     if not job:
       logging.debug("Job %s not found", job_id)
       return (False, "Job %s not found" % job_id)
 
-    (success, msg) = job.Cancel()
+    assert job.writable, "Can't modify read-only job"
+    assert not job.archived, "Can't modify archived job"
+
+    (success, msg) = mod_fn(job)
 
     if success:
       # If the job was finalized (e.g. cancelled), this is the final write
 
     if success:
       # If the job was finalized (e.g. cancelled), this is the final write
@@ -1860,6 +2490,9 @@ class JobQueue(object):
     archive_jobs = []
     rename_files = []
     for job in jobs:
     archive_jobs = []
     rename_files = []
     for job in jobs:
+      assert job.writable, "Can't archive read-only job"
+      assert not job.archived, "Can't cancel archived job"
+
       if job.CalcStatus() not in constants.JOBS_FINALIZED:
         logging.debug("Job %s is not yet done", job.id)
         continue
       if job.CalcStatus() not in constants.JOBS_FINALIZED:
         logging.debug("Job %s is not yet done", job.id)
         continue
@@ -1890,7 +2523,7 @@ class JobQueue(object):
 
     This is just a wrapper over L{_ArchiveJobsUnlocked}.
 
 
     This is just a wrapper over L{_ArchiveJobsUnlocked}.
 
-    @type job_id: string
+    @type job_id: int
     @param job_id: Job ID of job to be archived.
     @rtype: bool
     @return: Whether job was archived
     @param job_id: Job ID of job to be archived.
     @rtype: bool
     @return: Whether job was archived
@@ -1961,7 +2594,47 @@ class JobQueue(object):
 
     return (archived_count, len(all_job_ids) - last_touched)
 
 
     return (archived_count, len(all_job_ids) - last_touched)
 
-  def QueryJobs(self, job_ids, fields):
+  def _Query(self, fields, qfilter):
+    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
+                       namefield="id")
+
+    # Archived jobs are only looked at if the "archived" field is referenced
+    # either as a requested field or in the filter. By default archived jobs
+    # are ignored.
+    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
+
+    job_ids = qobj.RequestedNames()
+
+    list_all = (job_ids is None)
+
+    if list_all:
+      # Since files are added to/removed from the queue atomically, there's no
+      # risk of getting the job ids in an inconsistent state.
+      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
+
+    jobs = []
+
+    for job_id in job_ids:
+      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
+      if job is not None or not list_all:
+        jobs.append((job_id, job))
+
+    return (qobj, jobs, list_all)
+
+  def QueryJobs(self, fields, qfilter):
+    """Returns a list of jobs in queue.
+
+    @type fields: sequence
+    @param fields: List of wanted fields
+    @type qfilter: None or query2 filter (list)
+    @param qfilter: Query filter
+
+    """
+    (qobj, ctx, _) = self._Query(fields, qfilter)
+
+    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
+
+  def OldStyleQueryJobs(self, job_ids, fields):
     """Returns a list of jobs in queue.
 
     @type job_ids: list
     """Returns a list of jobs in queue.
 
     @type job_ids: list
@@ -1973,22 +2646,49 @@ class JobQueue(object):
         the requested fields
 
     """
         the requested fields
 
     """
-    jobs = []
-    list_all = False
-    if not job_ids:
-      # Since files are added to/removed from the queue atomically, there's no
-      # risk of getting the job ids in an inconsistent state.
-      job_ids = self._GetJobIDsUnlocked()
-      list_all = True
+    # backwards compat:
+    job_ids = [int(jid) for jid in job_ids]
+    qfilter = qlang.MakeSimpleFilter("id", job_ids)
 
 
-    for job_id in job_ids:
-      job = self.SafeLoadJobFromDisk(job_id)
-      if job is not None:
-        jobs.append(job.GetInfo(fields))
-      elif not list_all:
-        jobs.append(None)
+    (qobj, ctx, _) = self._Query(fields, qfilter)
+
+    return qobj.OldStyleQuery(ctx, sort_by_name=False)
+
+  @locking.ssynchronized(_LOCK)
+  def PrepareShutdown(self):
+    """Prepare to stop the job queue.
+
+    Disables execution of jobs in the workerpool and returns whether there are
+    any jobs currently running. If the latter is the case, the job queue is not
+    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
+    be called without interfering with any job. Queued and unfinished jobs will
+    be resumed next time.
+
+    Once this function has been called no new job submissions will be accepted
+    (see L{_RequireNonDrainedQueue}).
 
 
-    return jobs
+    @rtype: bool
+    @return: Whether there are any running jobs
+
+    """
+    if self._accepting_jobs:
+      self._accepting_jobs = False
+
+      # Tell worker pool to stop processing pending tasks
+      self._wpool.SetActive(False)
+
+    return self._wpool.HasRunningTasks()
+
+  def AcceptingJobsUnlocked(self):
+    """Returns whether jobs are accepted.
+
+    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
+    queue is shutting down.
+
+    @rtype: bool
+
+    """
+    return self._accepting_jobs
 
   @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
 
   @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue