Merge branch 'stable-2.9' into stable-2.10
[ganeti-local] / lib / jqueue.py
index 3cf3b42..6f7bebd 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -35,6 +35,7 @@ import time
 import weakref
 import threading
 import itertools
+import operator
 
 try:
   # pylint: disable=E0611
@@ -48,6 +49,7 @@ from ganeti import serializer
 from ganeti import workerpool
 from ganeti import locking
 from ganeti import opcodes
+from ganeti import opcodes_base
 from ganeti import errors
 from ganeti import mcpu
 from ganeti import utils
@@ -57,15 +59,21 @@ from ganeti import runtime
 from ganeti import netutils
 from ganeti import compat
 from ganeti import ht
+from ganeti import query
+from ganeti import qlang
+from ganeti import pathutils
+from ganeti import vcluster
 
 
 JOBQUEUE_THREADS = 25
-JOBS_PER_ARCHIVE_DIRECTORY = 10000
 
 # member lock names to be passed to @ssynchronized decorator
 _LOCK = "_lock"
 _QUEUE = "_queue"
 
+#: Retrieves "id" attribute
+_GetIdAttr = operator.attrgetter("id")
+
 
 class CancelJob(Exception):
   """Special exception to cancel a job.
@@ -73,6 +81,12 @@ class CancelJob(Exception):
   """
 
 
+class QueueShutdown(Exception):
+  """Special exception to abort a job when the job queue is shutting down.
+
+  """
+
+
 def TimeStampNow():
   """Returns the current timestamp.
 
@@ -83,6 +97,33 @@ def TimeStampNow():
   return utils.SplitTime(time.time())
 
 
+def _CallJqUpdate(runner, names, file_name, content):
+  """Updates job queue file after virtualizing filename.
+
+  """
+  virt_file_name = vcluster.MakeVirtualPath(file_name)
+  return runner.call_jobqueue_update(names, virt_file_name, content)
+
+
+class _SimpleJobQuery:
+  """Wrapper for job queries.
+
+  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
+
+  """
+  def __init__(self, fields):
+    """Initializes this class.
+
+    """
+    self._query = query.Query(query.JOB_FIELDS, fields)
+
+  def __call__(self, job):
+    """Executes a job query using cached field list.
+
+    """
+    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
+
+
 class _QueuedOpCode(object):
   """Encapsulates an opcode object.
 
@@ -101,7 +142,7 @@ class _QueuedOpCode(object):
                "__weakref__"]
 
   def __init__(self, op):
-    """Constructor for the _QuededOpCode.
+    """Initializes instances of this class.
 
     @type op: L{opcodes.OpCode}
     @param op: the opcode we encapsulate
@@ -180,7 +221,23 @@ class _QueuedJob(object):
   # pylint: disable=W0212
   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
                "received_timestamp", "start_timestamp", "end_timestamp",
-               "__weakref__", "processor_lock", "writable"]
+               "__weakref__", "processor_lock", "writable", "archived"]
+
+  def _AddReasons(self):
+    """Extend the reason trail
+
+    Add the reason for all the opcodes of this job to be executed.
+
+    """
+    count = 0
+    for queued_op in self.ops:
+      op = queued_op.input
+      reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__)
+      reason_text = "job=%d;index=%d" % (self.id, count)
+      reason = getattr(op, "reason", [])
+      reason.append((reason_src, reason_text, utils.EpochNano()))
+      op.reason = reason
+      count = count + 1
 
   def __init__(self, queue, job_id, ops, writable):
     """Constructor for the _QueuedJob.
@@ -200,15 +257,19 @@ class _QueuedJob(object):
       raise errors.GenericError("A job needs at least one opcode")
 
     self.queue = queue
-    self.id = job_id
+    self.id = int(job_id)
     self.ops = [_QueuedOpCode(op) for op in ops]
+    self._AddReasons()
     self.log_serial = 0
     self.received_timestamp = TimeStampNow()
     self.start_timestamp = None
     self.end_timestamp = None
+    self.archived = False
 
     self._InitInMemory(self, writable)
 
+    assert not self.archived, "New jobs can not be marked as archived"
+
   @staticmethod
   def _InitInMemory(obj, writable):
     """Initializes in-memory variables.
@@ -232,7 +293,7 @@ class _QueuedJob(object):
     return "<%s at %#x>" % (" ".join(status), id(self))
 
   @classmethod
-  def Restore(cls, queue, state, writable):
+  def Restore(cls, queue, state, writable, archived):
     """Restore a _QueuedJob from serialized state:
 
     @type queue: L{JobQueue}
@@ -241,16 +302,19 @@ class _QueuedJob(object):
     @param state: the serialized state
     @type writable: bool
     @param writable: Whether job can be modified
+    @type archived: bool
+    @param archived: Whether job was already archived
     @rtype: _JobQueue
     @return: the restored _JobQueue instance
 
     """
     obj = _QueuedJob.__new__(cls)
     obj.queue = queue
-    obj.id = state["id"]
+    obj.id = int(state["id"])
     obj.received_timestamp = state.get("received_timestamp", None)
     obj.start_timestamp = state.get("start_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
+    obj.archived = archived
 
     obj.ops = []
     obj.log_serial = 0
@@ -383,41 +447,7 @@ class _QueuedJob(object):
         has been passed
 
     """
-    row = []
-    for fname in fields:
-      if fname == "id":
-        row.append(self.id)
-      elif fname == "status":
-        row.append(self.CalcStatus())
-      elif fname == "priority":
-        row.append(self.CalcPriority())
-      elif fname == "ops":
-        row.append([op.input.__getstate__() for op in self.ops])
-      elif fname == "opresult":
-        row.append([op.result for op in self.ops])
-      elif fname == "opstatus":
-        row.append([op.status for op in self.ops])
-      elif fname == "oplog":
-        row.append([op.log for op in self.ops])
-      elif fname == "opstart":
-        row.append([op.start_timestamp for op in self.ops])
-      elif fname == "opexec":
-        row.append([op.exec_timestamp for op in self.ops])
-      elif fname == "opend":
-        row.append([op.end_timestamp for op in self.ops])
-      elif fname == "oppriority":
-        row.append([op.priority for op in self.ops])
-      elif fname == "received_ts":
-        row.append(self.received_timestamp)
-      elif fname == "start_ts":
-        row.append(self.start_timestamp)
-      elif fname == "end_ts":
-        row.append(self.end_timestamp)
-      elif fname == "summary":
-        row.append([op.input.Summary() for op in self.ops])
-      else:
-        raise errors.OpExecError("Invalid self query field '%s'" % fname)
-    return row
+    return _SimpleJobQuery(fields)(self)
 
   def MarkUnfinishedOps(self, status, result):
     """Mark unfinished opcodes with a given status and result.
@@ -470,6 +500,50 @@ class _QueuedJob(object):
       logging.debug("Job %s is no longer waiting in the queue", self.id)
       return (False, "Job %s is no longer waiting in the queue" % self.id)
 
+  def ChangePriority(self, priority):
+    """Changes the job priority.
+
+    @type priority: int
+    @param priority: New priority
+    @rtype: tuple; (bool, string)
+    @return: Boolean describing whether job's priority was successfully changed
+      and a text message
+
+    """
+    status = self.CalcStatus()
+
+    if status in constants.JOBS_FINALIZED:
+      return (False, "Job %s is finished" % self.id)
+    elif status == constants.JOB_STATUS_CANCELING:
+      return (False, "Job %s is cancelling" % self.id)
+    else:
+      assert status in (constants.JOB_STATUS_QUEUED,
+                        constants.JOB_STATUS_WAITING,
+                        constants.JOB_STATUS_RUNNING)
+
+      changed = False
+      for op in self.ops:
+        if (op.status == constants.OP_STATUS_RUNNING or
+            op.status in constants.OPS_FINALIZED):
+          assert not changed, \
+            ("Found opcode for which priority should not be changed after"
+             " priority has been changed for previous opcodes")
+          continue
+
+        assert op.status in (constants.OP_STATUS_QUEUED,
+                             constants.OP_STATUS_WAITING)
+
+        changed = True
+
+        # Set new priority (doesn't modify opcode input)
+        op.priority = priority
+
+      if changed:
+        return (True, ("Priorities of pending opcodes for job %s have been"
+                       " changed to %s" % (self.id, priority)))
+      else:
+        return (False, "Job %s had no pending opcodes" % self.id)
+
 
 class _OpExecCallbacks(mcpu.OpExecCbBase):
   def __init__(self, queue, job, op):
@@ -500,6 +574,11 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
       logging.debug("Canceling opcode")
       raise CancelJob()
 
+    # See if queue is shutting down
+    if not self._queue.AcceptingJobsUnlocked():
+      logging.debug("Queue is shutting down")
+      raise QueueShutdown()
+
   @locking.ssynchronized(_QUEUE, shared=1)
   def NotifyStart(self):
     """Mark the opcode as running, not lock-waiting.
@@ -551,8 +630,8 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     timestamp = utils.SplitTime(time.time())
     self._AppendFeedback(timestamp, log_type, log_msg)
 
-  def CheckCancel(self):
-    """Check whether job has been cancelled.
+  def CurrentPriority(self):
+    """Returns current priority for opcode.
 
     """
     assert self._op.status in (constants.OP_STATUS_WAITING,
@@ -561,6 +640,8 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     # Cancel here if we were asked to
     self._CheckCancel()
 
+    return self._op.priority
+
   def SubmitManyJobs(self, jobs):
     """Submits jobs for processing.
 
@@ -583,7 +664,7 @@ class _JobChangesChecker(object):
     @param prev_log_serial: previous job serial, as passed by the LUXI client
 
     """
-    self._fields = fields
+    self._squery = _SimpleJobQuery(fields)
     self._prev_job_info = prev_job_info
     self._prev_log_serial = prev_log_serial
 
@@ -597,7 +678,7 @@ class _JobChangesChecker(object):
     assert not job.writable, "Expected read-only job"
 
     status = job.CalcStatus()
-    job_info = job.GetInfo(self._fields)
+    job_info = self._squery(job)
     log_entries = job.GetLogEntries(self._prev_log_serial)
 
     # Serializing and deserializing data can cause type changes (e.g. from
@@ -626,7 +707,7 @@ class _JobChangesChecker(object):
 
 
 class _JobFileChangesWaiter(object):
-  def __init__(self, filename):
+  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
     """Initializes this class.
 
     @type filename: string
@@ -634,7 +715,7 @@ class _JobFileChangesWaiter(object):
     @raises errors.InotifyError: if the notifier cannot be setup
 
     """
-    self._wm = pyinotify.WatchManager()
+    self._wm = _inotify_wm_cls()
     self._inotify_handler = \
       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
     self._notifier = \
@@ -676,7 +757,7 @@ class _JobFileChangesWaiter(object):
 
 
 class _JobChangesWaiter(object):
-  def __init__(self, filename):
+  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
     """Initializes this class.
 
     @type filename: string
@@ -685,6 +766,7 @@ class _JobChangesWaiter(object):
     """
     self._filewaiter = None
     self._filename = filename
+    self._waiter_cls = _waiter_cls
 
   def Wait(self, timeout):
     """Waits for a job to change.
@@ -701,7 +783,7 @@ class _JobChangesWaiter(object):
     # If this point is reached, return immediately and let caller check the job
     # file again in case there were changes since the last check. This avoids a
     # race condition.
-    self._filewaiter = _JobFileChangesWaiter(self._filename)
+    self._filewaiter = self._waiter_cls(self._filename)
 
     return True
 
@@ -739,7 +821,8 @@ class _WaitForJobChangesHelper(object):
     return result
 
   def __call__(self, filename, job_load_fn,
-               fields, prev_job_info, prev_log_serial, timeout):
+               fields, prev_job_info, prev_log_serial, timeout,
+               _waiter_cls=_JobChangesWaiter):
     """Waits for changes on a job.
 
     @type filename: string
@@ -759,7 +842,7 @@ class _WaitForJobChangesHelper(object):
     counter = itertools.count()
     try:
       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
-      waiter = _JobChangesWaiter(filename)
+      waiter = _waiter_cls(filename)
       try:
         return utils.Retry(compat.partial(self._CheckForChanges,
                                           counter, job_load_fn, check_fn),
@@ -767,7 +850,7 @@ class _WaitForJobChangesHelper(object):
                            wait_fn=waiter.Wait)
       finally:
         waiter.Close()
-    except (errors.InotifyError, errors.JobLost):
+    except errors.JobLost:
       return None
     except utils.RetryTimeout:
       return constants.JOB_NOTCHANGED
@@ -828,7 +911,7 @@ class _OpExecContext:
     self.summary = op.input.Summary()
 
     # Create local copy to modify
-    if getattr(op.input, opcodes.DEPEND_ATTR, None):
+    if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
       self.jobdeps = op.input.depends[:]
     else:
       self.jobdeps = None
@@ -1030,7 +1113,7 @@ class _JobProcessor(object):
       # Make sure not to hold queue lock while calling ExecOpCode
       result = self.opexec_fn(op.input,
                               _OpExecCallbacks(self.queue, self.job, op),
-                              timeout=timeout, priority=op.priority)
+                              timeout=timeout)
     except mcpu.LockAcquireTimeout:
       assert timeout is not None, "Received timeout for blocking acquire"
       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
@@ -1042,12 +1125,25 @@ class _JobProcessor(object):
       if op.status == constants.OP_STATUS_CANCELING:
         return (constants.OP_STATUS_CANCELING, None)
 
+      # Queue is shutting down, return to queued
+      if not self.queue.AcceptingJobsUnlocked():
+        return (constants.OP_STATUS_QUEUED, None)
+
       # Stay in waitlock while trying to re-acquire lock
       return (constants.OP_STATUS_WAITING, None)
     except CancelJob:
       logging.exception("%s: Canceling job", opctx.log_prefix)
       assert op.status == constants.OP_STATUS_CANCELING
       return (constants.OP_STATUS_CANCELING, None)
+
+    except QueueShutdown:
+      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
+
+      assert op.status == constants.OP_STATUS_WAITING
+
+      # Job hadn't been started yet, so it should return to the queue
+      return (constants.OP_STATUS_QUEUED, None)
+
     except Exception, err: # pylint: disable=W0703
       logging.exception("%s: Caught exception in %s",
                         opctx.log_prefix, opctx.summary)
@@ -1145,8 +1241,10 @@ class _JobProcessor(object):
 
           assert not waitjob
 
-        if op.status == constants.OP_STATUS_WAITING:
-          # Couldn't get locks in time
+        if op.status in (constants.OP_STATUS_WAITING,
+                         constants.OP_STATUS_QUEUED):
+          # waiting: Couldn't get locks in time
+          # queued: Queue is shutting down
           assert not op.end_timestamp
         else:
           # Finalize opcode
@@ -1158,7 +1256,19 @@ class _JobProcessor(object):
           else:
             assert op.status in constants.OPS_FINALIZED
 
-      if op.status == constants.OP_STATUS_WAITING or waitjob:
+      if op.status == constants.OP_STATUS_QUEUED:
+        # Queue is shutting down
+        assert not waitjob
+
+        finalize = False
+
+        # Reset context
+        job.cur_opctx = None
+
+        # In no case must the status be finalized here
+        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
+
+      elif op.status == constants.OP_STATUS_WAITING or waitjob:
         finalize = False
 
         if not waitjob and opctx.CheckPriorityIncrease():
@@ -1237,6 +1347,29 @@ class _JobProcessor(object):
       queue.release()
 
 
+def _EvaluateJobProcessorResult(depmgr, job, result):
+  """Looks at a result from L{_JobProcessor} for a job.
+
+  To be used in a L{_JobQueueWorker}.
+
+  """
+  if result == _JobProcessor.FINISHED:
+    # Notify waiting jobs
+    depmgr.NotifyWaiters(job.id)
+
+  elif result == _JobProcessor.DEFER:
+    # Schedule again
+    raise workerpool.DeferTask(priority=job.CalcPriority())
+
+  elif result == _JobProcessor.WAITDEP:
+    # No-op, dependency manager will re-schedule
+    pass
+
+  else:
+    raise errors.ProgrammerError("Job processor returned unknown status %s" %
+                                 (result, ))
+
+
 class _JobQueueWorker(workerpool.BaseWorker):
   """The actual job workers.
 
@@ -1277,23 +1410,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
     wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
                                     proc.ExecOpCode)
 
-    result = _JobProcessor(queue, wrap_execop_fn, job)()
-
-    if result == _JobProcessor.FINISHED:
-      # Notify waiting jobs
-      queue.depmgr.NotifyWaiters(job.id)
-
-    elif result == _JobProcessor.DEFER:
-      # Schedule again
-      raise workerpool.DeferTask(priority=job.CalcPriority())
-
-    elif result == _JobProcessor.WAITDEP:
-      # No-op, dependency manager will re-schedule
-      pass
-
-    else:
-      raise errors.ProgrammerError("Job processor returned unknown status %s" %
-                                   (result, ))
+    _EvaluateJobProcessorResult(queue.depmgr, job,
+                                _JobProcessor(queue, wrap_execop_fn, job)())
 
   @staticmethod
   def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
@@ -1390,14 +1508,14 @@ class _JobDependencyManager:
 
     @type job: L{_QueuedJob}
     @param job: Job object
-    @type dep_job_id: string
+    @type dep_job_id: int
     @param dep_job_id: ID of dependency job
     @type dep_status: list
     @param dep_status: Required status
 
     """
-    assert ht.TString(job.id)
-    assert ht.TString(dep_job_id)
+    assert ht.TJobId(job.id)
+    assert ht.TJobId(dep_job_id)
     assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
 
     if job.id == dep_job_id:
@@ -1439,28 +1557,39 @@ class _JobDependencyManager:
               " not one of '%s' as required" %
               (dep_job_id, status, utils.CommaJoin(dep_status)))
 
-  @locking.ssynchronized(_LOCK)
+  def _RemoveEmptyWaitersUnlocked(self):
+    """Remove all jobs without actual waiters.
+
+    """
+    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
+                   if not waiters]:
+      del self._waiters[job_id]
+
   def NotifyWaiters(self, job_id):
     """Notifies all jobs waiting for a certain job ID.
 
-    @type job_id: string
+    @attention: Do not call until L{CheckAndRegister} returned a status other
+      than C{WAITDEP} for C{job_id}, or behaviour is undefined
+    @type job_id: int
     @param job_id: Job ID
 
     """
-    assert ht.TString(job_id)
+    assert ht.TJobId(job_id)
+
+    self._lock.acquire()
+    try:
+      self._RemoveEmptyWaitersUnlocked()
+
+      jobs = self._waiters.pop(job_id, None)
+    finally:
+      self._lock.release()
 
-    jobs = self._waiters.pop(job_id, None)
     if jobs:
       # Re-add jobs to workerpool
       logging.debug("Re-adding %s jobs which were waiting for job %s",
                     len(jobs), job_id)
       self._enqueue_fn(jobs)
 
-    # Remove all jobs without actual waiters
-    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
-                   if not waiters]:
-      del self._waiters[job_id]
-
 
 def _RequireOpenQueue(fn):
   """Decorator for "public" functions.
@@ -1487,6 +1616,31 @@ def _RequireOpenQueue(fn):
   return wrapper
 
 
+def _RequireNonDrainedQueue(fn):
+  """Decorator checking for a non-drained queue.
+
+  To be used with functions submitting new jobs.
+
+  """
+  def wrapper(self, *args, **kwargs):
+    """Wrapper function.
+
+    @raise errors.JobQueueDrainError: if the job queue is marked for draining
+
+    """
+    # Ok when sharing the big job queue lock, as the drain file is created when
+    # the lock is exclusive.
+    # Needs access to protected member, pylint: disable=W0212
+    if self._drained:
+      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
+
+    if not self._accepting_jobs:
+      raise errors.JobQueueError("Job queue is shutting down, refusing job")
+
+    return fn(self, *args, **kwargs)
+  return wrapper
+
+
 class JobQueue(object):
   """Queue used to manage the jobs.
 
@@ -1518,6 +1672,9 @@ class JobQueue(object):
     self.acquire = self._lock.acquire
     self.release = self._lock.release
 
+    # Accept jobs by default
+    self._accepting_jobs = True
+
     # Initialize the queue, and acquire the filelock.
     # This ensures no other process is working on the job queue.
     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
@@ -1537,8 +1694,9 @@ class JobQueue(object):
 
     # TODO: Check consistency across nodes
 
-    self._queue_size = 0
+    self._queue_size = None
     self._UpdateQueueSizeUnlocked()
+    assert ht.TInt(self._queue_size)
     self._drained = jstore.CheckDrainFlag()
 
     # Job dependencies
@@ -1599,8 +1757,9 @@ class JobQueue(object):
           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
           restartjobs.append(job)
         else:
+          to_encode = errors.OpExecError("Unclean master daemon shutdown")
           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
-                                "Unclean master daemon shutdown")
+                                _EncodeOpError(to_encode))
           job.Finalize()
 
         self.UpdateJobUnlocked(job)
@@ -1611,6 +1770,12 @@ class JobQueue(object):
 
     logging.info("Job queue inspection finished")
 
+  def _GetRpc(self, address_list):
+    """Gets RPC runner with context.
+
+    """
+    return rpc.JobQueueRunner(self.context, address_list)
+
   @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def AddNode(self, node):
@@ -1624,7 +1789,7 @@ class JobQueue(object):
     assert node_name != self._my_hostname
 
     # Clean queue directory on added node
-    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
+    result = self._GetRpc(None).call_jobqueue_purge(node_name)
     msg = result.fail_msg
     if msg:
       logging.warning("Cannot cleanup queue directory on node %s: %s",
@@ -1640,20 +1805,31 @@ class JobQueue(object):
     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
 
     # Upload current serial file
-    files.append(constants.JOB_QUEUE_SERIAL_FILE)
+    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
+
+    # Static address list
+    addrs = [node.primary_ip]
 
     for file_name in files:
       # Read file content
       content = utils.ReadFile(file_name)
 
-      result = rpc.RpcRunner.call_jobqueue_update([node_name],
-                                                  [node.primary_ip],
-                                                  file_name, content)
+      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
+                             file_name, content)
       msg = result[node_name].fail_msg
       if msg:
         logging.error("Failed to upload file %s to node %s: %s",
                       file_name, node_name, msg)
 
+    # Set queue drained flag
+    result = \
+      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
+                                                       self._drained)
+    msg = result[node_name].fail_msg
+    if msg:
+      logging.error("Failed to set queue drained flag on node %s: %s",
+                    node_name, msg)
+
     self._nodes[node_name] = node.primary_ip
 
   @locking.ssynchronized(_LOCK)
@@ -1728,11 +1904,12 @@ class JobQueue(object):
     """
     getents = runtime.GetEnts()
     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
-                    gid=getents.masterd_gid)
+                    gid=getents.daemons_gid,
+                    mode=constants.JOB_QUEUE_FILES_PERMS)
 
     if replicate:
       names, addrs = self._GetNodeIp()
-      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
+      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
 
   def _RenameFilesUnlocked(self, rename):
@@ -1751,42 +1928,9 @@ class JobQueue(object):
 
     # ... and on all nodes
     names, addrs = self._GetNodeIp()
-    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
+    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
 
-  @staticmethod
-  def _FormatJobID(job_id):
-    """Convert a job ID to string format.
-
-    Currently this just does C{str(job_id)} after performing some
-    checks, but if we want to change the job id format this will
-    abstract this change.
-
-    @type job_id: int or long
-    @param job_id: the numeric job id
-    @rtype: str
-    @return: the formatted job id
-
-    """
-    if not isinstance(job_id, (int, long)):
-      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
-    if job_id < 0:
-      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
-
-    return str(job_id)
-
-  @classmethod
-  def _GetArchiveDirectory(cls, job_id):
-    """Returns the archive directory for a job.
-
-    @type job_id: str
-    @param job_id: Job identifier
-    @rtype: str
-    @return: Directory name
-
-    """
-    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
-
   def _NewSerialsUnlocked(self, count):
     """Generates a new job identifier.
 
@@ -1794,19 +1938,20 @@ class JobQueue(object):
 
     @type count: integer
     @param count: how many serials to return
-    @rtype: str
-    @return: a string representing the job identifier.
+    @rtype: list of int
+    @return: a list of job identifiers.
 
     """
-    assert count > 0
+    assert ht.TNonNegativeInt(count)
+
     # New number
     serial = self._last_serial + count
 
     # Write to file
-    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
+    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
                              "%s\n" % serial, True)
 
-    result = [self._FormatJobID(v)
+    result = [jstore.FormatJobID(v)
               for v in range(self._last_serial + 1, serial + 1)]
 
     # Keep it only if we were able to write the file
@@ -1826,10 +1971,10 @@ class JobQueue(object):
     @return: the path to the job file
 
     """
-    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
+    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
 
-  @classmethod
-  def _GetArchivedJobPath(cls, job_id):
+  @staticmethod
+  def _GetArchivedJobPath(job_id):
     """Returns the archived job file for a give job id.
 
     @type job_id: str
@@ -1838,11 +1983,30 @@ class JobQueue(object):
     @return: the path to the archived job file
 
     """
-    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
-                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
+    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
+                          jstore.GetArchiveDirectory(job_id),
+                          "job-%s" % job_id)
 
   @staticmethod
-  def _GetJobIDsUnlocked(sort=True):
+  def _DetermineJobDirectories(archived):
+    """Build list of directories containing job files.
+
+    @type archived: bool
+    @param archived: Whether to include directories for archived jobs
+    @rtype: list
+
+    """
+    result = [pathutils.QUEUE_DIR]
+
+    if archived:
+      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
+      result.extend(map(compat.partial(utils.PathJoin, archive_path),
+                        utils.ListVisibleFiles(archive_path)))
+
+    return result
+
+  @classmethod
+  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
     """Return all known job IDs.
 
     The method only looks at disk because it's a requirement that all
@@ -1856,12 +2020,15 @@ class JobQueue(object):
 
     """
     jlist = []
-    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
-      m = constants.JOB_FILE_RE.match(filename)
-      if m:
-        jlist.append(m.group(1))
+
+    for path in cls._DetermineJobDirectories(archived):
+      for filename in utils.ListVisibleFiles(path):
+        m = constants.JOB_FILE_RE.match(filename)
+        if m:
+          jlist.append(int(m.group(1)))
+
     if sort:
-      jlist = utils.NiceSort(jlist)
+      jlist.sort()
     return jlist
 
   def _LoadJobUnlocked(self, job_id):
@@ -1871,11 +2038,14 @@ class JobQueue(object):
     existing, or try to load the job from the disk. If loading from
     disk, it will also add the job to the cache.
 
+    @type job_id: int
     @param job_id: the job id
     @rtype: L{_QueuedJob} or None
     @return: either None or the job object
 
     """
+    assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!"
+
     job = self._memcache.get(job_id, None)
     if job:
       logging.debug("Found job %s in memcache", job_id)
@@ -1909,7 +2079,7 @@ class JobQueue(object):
 
     Given a job file, read, load and restore it in a _QueuedJob format.
 
-    @type job_id: string
+    @type job_id: int
     @param job_id: job identifier
     @type try_archived: bool
     @param try_archived: Whether to try loading an archived job
@@ -1917,15 +2087,15 @@ class JobQueue(object):
     @return: either None or the job object
 
     """
-    path_functions = [(self._GetJobPath, True)]
+    path_functions = [(self._GetJobPath, False)]
 
     if try_archived:
-      path_functions.append((self._GetArchivedJobPath, False))
+      path_functions.append((self._GetArchivedJobPath, True))
 
     raw_data = None
-    writable_default = None
+    archived = None
 
-    for (fn, writable_default) in path_functions:
+    for (fn, archived) in path_functions:
       filepath = fn(job_id)
       logging.debug("Loading job from %s", filepath)
       try:
@@ -1940,11 +2110,11 @@ class JobQueue(object):
       return None
 
     if writable is None:
-      writable = writable_default
+      writable = not archived
 
     try:
       data = serializer.LoadJson(raw_data)
-      job = _QueuedJob.Restore(self, data, writable)
+      job = _QueuedJob.Restore(self, data, writable, archived)
     except Exception, err: # pylint: disable=W0703
       raise errors.JobFileCorrupted(err)
 
@@ -1957,7 +2127,7 @@ class JobQueue(object):
     In case of error reading the job, it gets returned as None, and the
     exception is logged.
 
-    @type job_id: string
+    @type job_id: int
     @param job_id: job identifier
     @type try_archived: bool
     @param try_archived: Whether to try loading an archived job
@@ -1986,10 +2156,18 @@ class JobQueue(object):
     @param drain_flag: Whether to set or unset the drain flag
 
     """
+    # Change flag locally
     jstore.SetDrainFlag(drain_flag)
 
     self._drained = drain_flag
 
+    # ... and on all nodes
+    (names, addrs) = self._GetNodeIp()
+    result = \
+      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
+    self._CheckRpcResult(result, self._nodes,
+                         "Setting queue drain flag to %s" % drain_flag)
+
     return True
 
   @_RequireOpenQueue
@@ -2005,33 +2183,28 @@ class JobQueue(object):
     @param ops: The list of OpCodes that will become the new job.
     @rtype: L{_QueuedJob}
     @return: the job object to be queued
-    @raise errors.JobQueueDrainError: if the job queue is marked for draining
     @raise errors.JobQueueFull: if the job queue has too many jobs in it
     @raise errors.GenericError: If an opcode is not valid
 
     """
-    # Ok when sharing the big job queue lock, as the drain file is created when
-    # the lock is exclusive.
-    if self._drained:
-      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
-
     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
       raise errors.JobQueueFull()
 
     job = _QueuedJob(self, job_id, ops, True)
 
-    # Check priority
     for idx, op in enumerate(job.ops):
+      # Check priority
       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
                                   " are %s" % (idx, op.priority, allowed))
 
-      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
-      if not opcodes.TNoRelativeJobDependencies(dependencies):
+      # Check job dependencies
+      dependencies = getattr(op.input, opcodes_base.DEPEND_ATTR, None)
+      if not opcodes_base.TNoRelativeJobDependencies(dependencies):
         raise errors.GenericError("Opcode %s has invalid dependencies, must"
                                   " match %s: %s" %
-                                  (idx, opcodes.TNoRelativeJobDependencies,
+                                  (idx, opcodes_base.TNoRelativeJobDependencies,
                                    dependencies))
 
     # Write to disk
@@ -2046,6 +2219,7 @@ class JobQueue(object):
 
   @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
+  @_RequireNonDrainedQueue
   def SubmitJob(self, ops):
     """Create and store a new job.
 
@@ -2058,6 +2232,20 @@ class JobQueue(object):
 
   @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
+  def SubmitJobToDrainedQueue(self, ops):
+    """Forcefully create and store a new job.
+
+    Do so, even if the job queue is drained.
+    @see: L{_SubmitJobUnlocked}
+
+    """
+    (job_id, ) = self._NewSerialsUnlocked(1)
+    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
+    return job_id
+
+  @locking.ssynchronized(_LOCK)
+  @_RequireOpenQueue
+  @_RequireNonDrainedQueue
   def SubmitManyJobs(self, jobs):
     """Create and store multiple jobs.
 
@@ -2089,8 +2277,10 @@ class JobQueue(object):
     @param resolve_fn: Function to resolve a relative job ID
     @type deps: list
     @param deps: Dependencies
-    @rtype: list
-    @return: Resolved dependencies
+    @rtype: tuple; (boolean, string or list)
+    @return: If successful (first tuple item), the returned list contains
+      resolved job IDs along with the requested status; if not successful,
+      the second element is an error message
 
     """
     result = []
@@ -2125,7 +2315,7 @@ class JobQueue(object):
 
     for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
       for op in ops:
-        if getattr(op, opcodes.DEPEND_ATTR, None):
+        if getattr(op, opcodes_base.DEPEND_ATTR, None):
           (status, data) = \
             self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
                                          op.depends)
@@ -2169,19 +2359,17 @@ class JobQueue(object):
     """
     assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
     self._wpool.AddManyTasks([(job, ) for job in jobs],
-                             priority=[job.CalcPriority() for job in jobs])
+                             priority=[job.CalcPriority() for job in jobs],
+                             task_id=map(_GetIdAttr, jobs))
 
   def _GetJobStatusForDependencies(self, job_id):
     """Gets the status of a job for dependencies.
 
-    @type job_id: string
+    @type job_id: int
     @param job_id: Job ID
     @raise errors.JobLost: If job can't be found
 
     """
-    if not isinstance(job_id, basestring):
-      job_id = self._FormatJobID(job_id)
-
     # Not using in-memory cache as doing so would require an exclusive lock
 
     # Try to load from disk
@@ -2212,9 +2400,10 @@ class JobQueue(object):
       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
       assert (finalized ^ (job.end_timestamp is None))
       assert job.writable, "Can't update read-only job"
+      assert not job.archived, "Can't update archived job"
 
     filename = self._GetJobPath(job.id)
-    data = serializer.DumpJson(job.Serialize(), indent=False)
+    data = serializer.DumpJson(job.Serialize())
     logging.debug("Writing job %s to %s", job.id, filename)
     self._UpdateJobQueueFile(filename, data, replicate)
 
@@ -2222,7 +2411,7 @@ class JobQueue(object):
                         timeout):
     """Waits for changes in a job.
 
-    @type job_id: string
+    @type job_id: int
     @param job_id: Job identifier
     @type fields: list of strings
     @param fields: Which fields to check for changes
@@ -2242,7 +2431,7 @@ class JobQueue(object):
         as such by the clients
 
     """
-    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
+    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
                              writable=False)
 
     helper = _WaitForJobChangesHelper()
@@ -2257,20 +2446,64 @@ class JobQueue(object):
 
     This will only succeed if the job has not started yet.
 
-    @type job_id: string
+    @type job_id: int
     @param job_id: job ID of job to be cancelled.
 
     """
     logging.info("Cancelling job %s", job_id)
 
+    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
+
+  @locking.ssynchronized(_LOCK)
+  @_RequireOpenQueue
+  def ChangeJobPriority(self, job_id, priority):
+    """Changes a job's priority.
+
+    @type job_id: int
+    @param job_id: ID of the job whose priority should be changed
+    @type priority: int
+    @param priority: New priority
+
+    """
+    logging.info("Changing priority of job %s to %s", job_id, priority)
+
+    if priority not in constants.OP_PRIO_SUBMIT_VALID:
+      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
+      raise errors.GenericError("Invalid priority %s, allowed are %s" %
+                                (priority, allowed))
+
+    def fn(job):
+      (success, msg) = job.ChangePriority(priority)
+
+      if success:
+        try:
+          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
+        except workerpool.NoSuchTask:
+          logging.debug("Job %s is not in workerpool at this time", job.id)
+
+      return (success, msg)
+
+    return self._ModifyJobUnlocked(job_id, fn)
+
+  def _ModifyJobUnlocked(self, job_id, mod_fn):
+    """Modifies a job.
+
+    @type job_id: int
+    @param job_id: Job ID
+    @type mod_fn: callable
+    @param mod_fn: Modifying function, receiving job object as parameter,
+      returning tuple of (status boolean, message string)
+
+    """
     job = self._LoadJobUnlocked(job_id)
     if not job:
       logging.debug("Job %s not found", job_id)
       return (False, "Job %s not found" % job_id)
 
-    assert job.writable, "Can't cancel read-only job"
+    assert job.writable, "Can't modify read-only job"
+    assert not job.archived, "Can't modify archived job"
 
-    (success, msg) = job.Cancel()
+    (success, msg) = mod_fn(job)
 
     if success:
       # If the job was finalized (e.g. cancelled), this is the final write
@@ -2293,6 +2526,7 @@ class JobQueue(object):
     rename_files = []
     for job in jobs:
       assert job.writable, "Can't archive read-only job"
+      assert not job.archived, "Can't cancel archived job"
 
       if job.CalcStatus() not in constants.JOBS_FINALIZED:
         logging.debug("Job %s is not yet done", job.id)
@@ -2324,7 +2558,7 @@ class JobQueue(object):
 
     This is just a wrapper over L{_ArchiveJobsUnlocked}.
 
-    @type job_id: string
+    @type job_id: int
     @param job_id: Job ID of job to be archived.
     @rtype: bool
     @return: Whether job was archived
@@ -2395,7 +2629,47 @@ class JobQueue(object):
 
     return (archived_count, len(all_job_ids) - last_touched)
 
-  def QueryJobs(self, job_ids, fields):
+  def _Query(self, fields, qfilter):
+    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
+                       namefield="id")
+
+    # Archived jobs are only looked at if the "archived" field is referenced
+    # either as a requested field or in the filter. By default archived jobs
+    # are ignored.
+    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
+
+    job_ids = qobj.RequestedNames()
+
+    list_all = (job_ids is None)
+
+    if list_all:
+      # Since files are added to/removed from the queue atomically, there's no
+      # risk of getting the job ids in an inconsistent state.
+      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
+
+    jobs = []
+
+    for job_id in job_ids:
+      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
+      if job is not None or not list_all:
+        jobs.append((job_id, job))
+
+    return (qobj, jobs, list_all)
+
+  def QueryJobs(self, fields, qfilter):
+    """Returns a list of jobs in queue.
+
+    @type fields: sequence
+    @param fields: List of wanted fields
+    @type qfilter: None or query2 filter (list)
+    @param qfilter: Query filter
+
+    """
+    (qobj, ctx, _) = self._Query(fields, qfilter)
+
+    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
+
+  def OldStyleQueryJobs(self, job_ids, fields):
     """Returns a list of jobs in queue.
 
     @type job_ids: list
@@ -2407,22 +2681,49 @@ class JobQueue(object):
         the requested fields
 
     """
-    jobs = []
-    list_all = False
-    if not job_ids:
-      # Since files are added to/removed from the queue atomically, there's no
-      # risk of getting the job ids in an inconsistent state.
-      job_ids = self._GetJobIDsUnlocked()
-      list_all = True
+    # backwards compat:
+    job_ids = [int(jid) for jid in job_ids]
+    qfilter = qlang.MakeSimpleFilter("id", job_ids)
 
-    for job_id in job_ids:
-      job = self.SafeLoadJobFromDisk(job_id, True)
-      if job is not None:
-        jobs.append(job.GetInfo(fields))
-      elif not list_all:
-        jobs.append(None)
+    (qobj, ctx, _) = self._Query(fields, qfilter)
+
+    return qobj.OldStyleQuery(ctx, sort_by_name=False)
+
+  @locking.ssynchronized(_LOCK)
+  def PrepareShutdown(self):
+    """Prepare to stop the job queue.
 
-    return jobs
+    Disables execution of jobs in the workerpool and returns whether there are
+    any jobs currently running. If the latter is the case, the job queue is not
+    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
+    be called without interfering with any job. Queued and unfinished jobs will
+    be resumed next time.
+
+    Once this function has been called no new job submissions will be accepted
+    (see L{_RequireNonDrainedQueue}).
+
+    @rtype: bool
+    @return: Whether there are any running jobs
+
+    """
+    if self._accepting_jobs:
+      self._accepting_jobs = False
+
+      # Tell worker pool to stop processing pending tasks
+      self._wpool.SetActive(False)
+
+    return self._wpool.HasRunningTasks()
+
+  def AcceptingJobsUnlocked(self):
+    """Returns whether jobs are accepted.
+
+    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
+    queue is shutting down.
+
+    @rtype: bool
+
+    """
+    return self._accepting_jobs
 
   @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue