Include hvparams in ssconf files
[ganeti-local] / lib / jqueue.py
index 3b34cbe..110d386 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
 #
 #
 
-# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -35,6 +35,7 @@ import time
 import weakref
 import threading
 import itertools
 import weakref
 import threading
 import itertools
+import operator
 
 try:
   # pylint: disable=E0611
 
 try:
   # pylint: disable=E0611
@@ -59,6 +60,8 @@ from ganeti import compat
 from ganeti import ht
 from ganeti import query
 from ganeti import qlang
 from ganeti import ht
 from ganeti import query
 from ganeti import qlang
+from ganeti import pathutils
+from ganeti import vcluster
 
 
 JOBQUEUE_THREADS = 25
 
 
 JOBQUEUE_THREADS = 25
@@ -67,6 +70,9 @@ JOBQUEUE_THREADS = 25
 _LOCK = "_lock"
 _QUEUE = "_queue"
 
 _LOCK = "_lock"
 _QUEUE = "_queue"
 
+#: Retrieves "id" attribute
+_GetIdAttr = operator.attrgetter("id")
+
 
 class CancelJob(Exception):
   """Special exception to cancel a job.
 
 class CancelJob(Exception):
   """Special exception to cancel a job.
@@ -74,6 +80,12 @@ class CancelJob(Exception):
   """
 
 
   """
 
 
+class QueueShutdown(Exception):
+  """Special exception to abort a job when the job queue is shutting down.
+
+  """
+
+
 def TimeStampNow():
   """Returns the current timestamp.
 
 def TimeStampNow():
   """Returns the current timestamp.
 
@@ -84,6 +96,14 @@ def TimeStampNow():
   return utils.SplitTime(time.time())
 
 
   return utils.SplitTime(time.time())
 
 
+def _CallJqUpdate(runner, names, file_name, content):
+  """Updates job queue file after virtualizing filename.
+
+  """
+  virt_file_name = vcluster.MakeVirtualPath(file_name)
+  return runner.call_jobqueue_update(names, virt_file_name, content)
+
+
 class _SimpleJobQuery:
   """Wrapper for job queries.
 
 class _SimpleJobQuery:
   """Wrapper for job queries.
 
@@ -200,7 +220,23 @@ class _QueuedJob(object):
   # pylint: disable=W0212
   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
                "received_timestamp", "start_timestamp", "end_timestamp",
   # pylint: disable=W0212
   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
                "received_timestamp", "start_timestamp", "end_timestamp",
-               "__weakref__", "processor_lock", "writable"]
+               "__weakref__", "processor_lock", "writable", "archived"]
+
+  def _AddReasons(self):
+    """Extend the reason trail
+
+    Add the reason for all the opcodes of this job to be executed.
+
+    """
+    count = 0
+    for queued_op in self.ops:
+      op = queued_op.input
+      reason_src = opcodes.NameToReasonSrc(op.__class__.__name__)
+      reason_text = "job=%d;index=%d" % (self.id, count)
+      reason = getattr(op, "reason", [])
+      reason.append((reason_src, reason_text, utils.EpochNano()))
+      op.reason = reason
+      count = count + 1
 
   def __init__(self, queue, job_id, ops, writable):
     """Constructor for the _QueuedJob.
 
   def __init__(self, queue, job_id, ops, writable):
     """Constructor for the _QueuedJob.
@@ -220,15 +256,19 @@ class _QueuedJob(object):
       raise errors.GenericError("A job needs at least one opcode")
 
     self.queue = queue
       raise errors.GenericError("A job needs at least one opcode")
 
     self.queue = queue
-    self.id = job_id
+    self.id = int(job_id)
     self.ops = [_QueuedOpCode(op) for op in ops]
     self.ops = [_QueuedOpCode(op) for op in ops]
+    self._AddReasons()
     self.log_serial = 0
     self.received_timestamp = TimeStampNow()
     self.start_timestamp = None
     self.end_timestamp = None
     self.log_serial = 0
     self.received_timestamp = TimeStampNow()
     self.start_timestamp = None
     self.end_timestamp = None
+    self.archived = False
 
     self._InitInMemory(self, writable)
 
 
     self._InitInMemory(self, writable)
 
+    assert not self.archived, "New jobs can not be marked as archived"
+
   @staticmethod
   def _InitInMemory(obj, writable):
     """Initializes in-memory variables.
   @staticmethod
   def _InitInMemory(obj, writable):
     """Initializes in-memory variables.
@@ -252,7 +292,7 @@ class _QueuedJob(object):
     return "<%s at %#x>" % (" ".join(status), id(self))
 
   @classmethod
     return "<%s at %#x>" % (" ".join(status), id(self))
 
   @classmethod
-  def Restore(cls, queue, state, writable):
+  def Restore(cls, queue, state, writable, archived):
     """Restore a _QueuedJob from serialized state:
 
     @type queue: L{JobQueue}
     """Restore a _QueuedJob from serialized state:
 
     @type queue: L{JobQueue}
@@ -261,16 +301,19 @@ class _QueuedJob(object):
     @param state: the serialized state
     @type writable: bool
     @param writable: Whether job can be modified
     @param state: the serialized state
     @type writable: bool
     @param writable: Whether job can be modified
+    @type archived: bool
+    @param archived: Whether job was already archived
     @rtype: _JobQueue
     @return: the restored _JobQueue instance
 
     """
     obj = _QueuedJob.__new__(cls)
     obj.queue = queue
     @rtype: _JobQueue
     @return: the restored _JobQueue instance
 
     """
     obj = _QueuedJob.__new__(cls)
     obj.queue = queue
-    obj.id = state["id"]
+    obj.id = int(state["id"])
     obj.received_timestamp = state.get("received_timestamp", None)
     obj.start_timestamp = state.get("start_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
     obj.received_timestamp = state.get("received_timestamp", None)
     obj.start_timestamp = state.get("start_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
+    obj.archived = archived
 
     obj.ops = []
     obj.log_serial = 0
 
     obj.ops = []
     obj.log_serial = 0
@@ -456,6 +499,50 @@ class _QueuedJob(object):
       logging.debug("Job %s is no longer waiting in the queue", self.id)
       return (False, "Job %s is no longer waiting in the queue" % self.id)
 
       logging.debug("Job %s is no longer waiting in the queue", self.id)
       return (False, "Job %s is no longer waiting in the queue" % self.id)
 
+  def ChangePriority(self, priority):
+    """Changes the job priority.
+
+    @type priority: int
+    @param priority: New priority
+    @rtype: tuple; (bool, string)
+    @return: Boolean describing whether job's priority was successfully changed
+      and a text message
+
+    """
+    status = self.CalcStatus()
+
+    if status in constants.JOBS_FINALIZED:
+      return (False, "Job %s is finished" % self.id)
+    elif status == constants.JOB_STATUS_CANCELING:
+      return (False, "Job %s is cancelling" % self.id)
+    else:
+      assert status in (constants.JOB_STATUS_QUEUED,
+                        constants.JOB_STATUS_WAITING,
+                        constants.JOB_STATUS_RUNNING)
+
+      changed = False
+      for op in self.ops:
+        if (op.status == constants.OP_STATUS_RUNNING or
+            op.status in constants.OPS_FINALIZED):
+          assert not changed, \
+            ("Found opcode for which priority should not be changed after"
+             " priority has been changed for previous opcodes")
+          continue
+
+        assert op.status in (constants.OP_STATUS_QUEUED,
+                             constants.OP_STATUS_WAITING)
+
+        changed = True
+
+        # Set new priority (doesn't modify opcode input)
+        op.priority = priority
+
+      if changed:
+        return (True, ("Priorities of pending opcodes for job %s have been"
+                       " changed to %s" % (self.id, priority)))
+      else:
+        return (False, "Job %s had no pending opcodes" % self.id)
+
 
 class _OpExecCallbacks(mcpu.OpExecCbBase):
   def __init__(self, queue, job, op):
 
 class _OpExecCallbacks(mcpu.OpExecCbBase):
   def __init__(self, queue, job, op):
@@ -486,6 +573,11 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
       logging.debug("Canceling opcode")
       raise CancelJob()
 
       logging.debug("Canceling opcode")
       raise CancelJob()
 
+    # See if queue is shutting down
+    if not self._queue.AcceptingJobsUnlocked():
+      logging.debug("Queue is shutting down")
+      raise QueueShutdown()
+
   @locking.ssynchronized(_QUEUE, shared=1)
   def NotifyStart(self):
     """Mark the opcode as running, not lock-waiting.
   @locking.ssynchronized(_QUEUE, shared=1)
   def NotifyStart(self):
     """Mark the opcode as running, not lock-waiting.
@@ -537,8 +629,8 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     timestamp = utils.SplitTime(time.time())
     self._AppendFeedback(timestamp, log_type, log_msg)
 
     timestamp = utils.SplitTime(time.time())
     self._AppendFeedback(timestamp, log_type, log_msg)
 
-  def CheckCancel(self):
-    """Check whether job has been cancelled.
+  def CurrentPriority(self):
+    """Returns current priority for opcode.
 
     """
     assert self._op.status in (constants.OP_STATUS_WAITING,
 
     """
     assert self._op.status in (constants.OP_STATUS_WAITING,
@@ -547,6 +639,8 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     # Cancel here if we were asked to
     self._CheckCancel()
 
     # Cancel here if we were asked to
     self._CheckCancel()
 
+    return self._op.priority
+
   def SubmitManyJobs(self, jobs):
     """Submits jobs for processing.
 
   def SubmitManyJobs(self, jobs):
     """Submits jobs for processing.
 
@@ -612,7 +706,7 @@ class _JobChangesChecker(object):
 
 
 class _JobFileChangesWaiter(object):
 
 
 class _JobFileChangesWaiter(object):
-  def __init__(self, filename):
+  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
     """Initializes this class.
 
     @type filename: string
     """Initializes this class.
 
     @type filename: string
@@ -620,7 +714,7 @@ class _JobFileChangesWaiter(object):
     @raises errors.InotifyError: if the notifier cannot be setup
 
     """
     @raises errors.InotifyError: if the notifier cannot be setup
 
     """
-    self._wm = pyinotify.WatchManager()
+    self._wm = _inotify_wm_cls()
     self._inotify_handler = \
       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
     self._notifier = \
     self._inotify_handler = \
       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
     self._notifier = \
@@ -662,7 +756,7 @@ class _JobFileChangesWaiter(object):
 
 
 class _JobChangesWaiter(object):
 
 
 class _JobChangesWaiter(object):
-  def __init__(self, filename):
+  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
     """Initializes this class.
 
     @type filename: string
     """Initializes this class.
 
     @type filename: string
@@ -671,6 +765,7 @@ class _JobChangesWaiter(object):
     """
     self._filewaiter = None
     self._filename = filename
     """
     self._filewaiter = None
     self._filename = filename
+    self._waiter_cls = _waiter_cls
 
   def Wait(self, timeout):
     """Waits for a job to change.
 
   def Wait(self, timeout):
     """Waits for a job to change.
@@ -687,7 +782,7 @@ class _JobChangesWaiter(object):
     # If this point is reached, return immediately and let caller check the job
     # file again in case there were changes since the last check. This avoids a
     # race condition.
     # If this point is reached, return immediately and let caller check the job
     # file again in case there were changes since the last check. This avoids a
     # race condition.
-    self._filewaiter = _JobFileChangesWaiter(self._filename)
+    self._filewaiter = self._waiter_cls(self._filename)
 
     return True
 
 
     return True
 
@@ -725,7 +820,8 @@ class _WaitForJobChangesHelper(object):
     return result
 
   def __call__(self, filename, job_load_fn,
     return result
 
   def __call__(self, filename, job_load_fn,
-               fields, prev_job_info, prev_log_serial, timeout):
+               fields, prev_job_info, prev_log_serial, timeout,
+               _waiter_cls=_JobChangesWaiter):
     """Waits for changes on a job.
 
     @type filename: string
     """Waits for changes on a job.
 
     @type filename: string
@@ -745,7 +841,7 @@ class _WaitForJobChangesHelper(object):
     counter = itertools.count()
     try:
       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
     counter = itertools.count()
     try:
       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
-      waiter = _JobChangesWaiter(filename)
+      waiter = _waiter_cls(filename)
       try:
         return utils.Retry(compat.partial(self._CheckForChanges,
                                           counter, job_load_fn, check_fn),
       try:
         return utils.Retry(compat.partial(self._CheckForChanges,
                                           counter, job_load_fn, check_fn),
@@ -753,7 +849,7 @@ class _WaitForJobChangesHelper(object):
                            wait_fn=waiter.Wait)
       finally:
         waiter.Close()
                            wait_fn=waiter.Wait)
       finally:
         waiter.Close()
-    except (errors.InotifyError, errors.JobLost):
+    except errors.JobLost:
       return None
     except utils.RetryTimeout:
       return constants.JOB_NOTCHANGED
       return None
     except utils.RetryTimeout:
       return constants.JOB_NOTCHANGED
@@ -1016,7 +1112,7 @@ class _JobProcessor(object):
       # Make sure not to hold queue lock while calling ExecOpCode
       result = self.opexec_fn(op.input,
                               _OpExecCallbacks(self.queue, self.job, op),
       # Make sure not to hold queue lock while calling ExecOpCode
       result = self.opexec_fn(op.input,
                               _OpExecCallbacks(self.queue, self.job, op),
-                              timeout=timeout, priority=op.priority)
+                              timeout=timeout)
     except mcpu.LockAcquireTimeout:
       assert timeout is not None, "Received timeout for blocking acquire"
       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
     except mcpu.LockAcquireTimeout:
       assert timeout is not None, "Received timeout for blocking acquire"
       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
@@ -1028,12 +1124,25 @@ class _JobProcessor(object):
       if op.status == constants.OP_STATUS_CANCELING:
         return (constants.OP_STATUS_CANCELING, None)
 
       if op.status == constants.OP_STATUS_CANCELING:
         return (constants.OP_STATUS_CANCELING, None)
 
+      # Queue is shutting down, return to queued
+      if not self.queue.AcceptingJobsUnlocked():
+        return (constants.OP_STATUS_QUEUED, None)
+
       # Stay in waitlock while trying to re-acquire lock
       return (constants.OP_STATUS_WAITING, None)
     except CancelJob:
       logging.exception("%s: Canceling job", opctx.log_prefix)
       assert op.status == constants.OP_STATUS_CANCELING
       return (constants.OP_STATUS_CANCELING, None)
       # Stay in waitlock while trying to re-acquire lock
       return (constants.OP_STATUS_WAITING, None)
     except CancelJob:
       logging.exception("%s: Canceling job", opctx.log_prefix)
       assert op.status == constants.OP_STATUS_CANCELING
       return (constants.OP_STATUS_CANCELING, None)
+
+    except QueueShutdown:
+      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
+
+      assert op.status == constants.OP_STATUS_WAITING
+
+      # Job hadn't been started yet, so it should return to the queue
+      return (constants.OP_STATUS_QUEUED, None)
+
     except Exception, err: # pylint: disable=W0703
       logging.exception("%s: Caught exception in %s",
                         opctx.log_prefix, opctx.summary)
     except Exception, err: # pylint: disable=W0703
       logging.exception("%s: Caught exception in %s",
                         opctx.log_prefix, opctx.summary)
@@ -1131,8 +1240,10 @@ class _JobProcessor(object):
 
           assert not waitjob
 
 
           assert not waitjob
 
-        if op.status == constants.OP_STATUS_WAITING:
-          # Couldn't get locks in time
+        if op.status in (constants.OP_STATUS_WAITING,
+                         constants.OP_STATUS_QUEUED):
+          # waiting: Couldn't get locks in time
+          # queued: Queue is shutting down
           assert not op.end_timestamp
         else:
           # Finalize opcode
           assert not op.end_timestamp
         else:
           # Finalize opcode
@@ -1144,7 +1255,19 @@ class _JobProcessor(object):
           else:
             assert op.status in constants.OPS_FINALIZED
 
           else:
             assert op.status in constants.OPS_FINALIZED
 
-      if op.status == constants.OP_STATUS_WAITING or waitjob:
+      if op.status == constants.OP_STATUS_QUEUED:
+        # Queue is shutting down
+        assert not waitjob
+
+        finalize = False
+
+        # Reset context
+        job.cur_opctx = None
+
+        # In no case must the status be finalized here
+        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
+
+      elif op.status == constants.OP_STATUS_WAITING or waitjob:
         finalize = False
 
         if not waitjob and opctx.CheckPriorityIncrease():
         finalize = False
 
         if not waitjob and opctx.CheckPriorityIncrease():
@@ -1384,14 +1507,14 @@ class _JobDependencyManager:
 
     @type job: L{_QueuedJob}
     @param job: Job object
 
     @type job: L{_QueuedJob}
     @param job: Job object
-    @type dep_job_id: string
+    @type dep_job_id: int
     @param dep_job_id: ID of dependency job
     @type dep_status: list
     @param dep_status: Required status
 
     """
     @param dep_job_id: ID of dependency job
     @type dep_status: list
     @param dep_status: Required status
 
     """
-    assert ht.TString(job.id)
-    assert ht.TString(dep_job_id)
+    assert ht.TJobId(job.id)
+    assert ht.TJobId(dep_job_id)
     assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
 
     if job.id == dep_job_id:
     assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
 
     if job.id == dep_job_id:
@@ -1446,11 +1569,11 @@ class _JobDependencyManager:
 
     @attention: Do not call until L{CheckAndRegister} returned a status other
       than C{WAITDEP} for C{job_id}, or behaviour is undefined
 
     @attention: Do not call until L{CheckAndRegister} returned a status other
       than C{WAITDEP} for C{job_id}, or behaviour is undefined
-    @type job_id: string
+    @type job_id: int
     @param job_id: Job ID
 
     """
     @param job_id: Job ID
 
     """
-    assert ht.TString(job_id)
+    assert ht.TJobId(job_id)
 
     self._lock.acquire()
     try:
 
     self._lock.acquire()
     try:
@@ -1680,7 +1803,7 @@ class JobQueue(object):
     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
 
     # Upload current serial file
     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
 
     # Upload current serial file
-    files.append(constants.JOB_QUEUE_SERIAL_FILE)
+    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
 
     # Static address list
     addrs = [node.primary_ip]
 
     # Static address list
     addrs = [node.primary_ip]
@@ -1689,13 +1812,22 @@ class JobQueue(object):
       # Read file content
       content = utils.ReadFile(file_name)
 
       # Read file content
       content = utils.ReadFile(file_name)
 
-      result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name,
-                                                        content)
+      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
+                             file_name, content)
       msg = result[node_name].fail_msg
       if msg:
         logging.error("Failed to upload file %s to node %s: %s",
                       file_name, node_name, msg)
 
       msg = result[node_name].fail_msg
       if msg:
         logging.error("Failed to upload file %s to node %s: %s",
                       file_name, node_name, msg)
 
+    # Set queue drained flag
+    result = \
+      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
+                                                       self._drained)
+    msg = result[node_name].fail_msg
+    if msg:
+      logging.error("Failed to set queue drained flag on node %s: %s",
+                    node_name, msg)
+
     self._nodes[node_name] = node.primary_ip
 
   @locking.ssynchronized(_LOCK)
     self._nodes[node_name] = node.primary_ip
 
   @locking.ssynchronized(_LOCK)
@@ -1770,11 +1902,12 @@ class JobQueue(object):
     """
     getents = runtime.GetEnts()
     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
     """
     getents = runtime.GetEnts()
     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
-                    gid=getents.masterd_gid)
+                    gid=getents.daemons_gid,
+                    mode=constants.JOB_QUEUE_FILES_PERMS)
 
     if replicate:
       names, addrs = self._GetNodeIp()
 
     if replicate:
       names, addrs = self._GetNodeIp()
-      result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data)
+      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
 
   def _RenameFilesUnlocked(self, rename):
       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
 
   def _RenameFilesUnlocked(self, rename):
@@ -1803,17 +1936,17 @@ class JobQueue(object):
 
     @type count: integer
     @param count: how many serials to return
 
     @type count: integer
     @param count: how many serials to return
-    @rtype: str
-    @return: a string representing the job identifier.
+    @rtype: list of int
+    @return: a list of job identifiers.
 
     """
 
     """
-    assert ht.TPositiveInt(count)
+    assert ht.TNonNegativeInt(count)
 
     # New number
     serial = self._last_serial + count
 
     # Write to file
 
     # New number
     serial = self._last_serial + count
 
     # Write to file
-    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
+    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
                              "%s\n" % serial, True)
 
     result = [jstore.FormatJobID(v)
                              "%s\n" % serial, True)
 
     result = [jstore.FormatJobID(v)
@@ -1836,7 +1969,7 @@ class JobQueue(object):
     @return: the path to the job file
 
     """
     @return: the path to the job file
 
     """
-    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
+    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
 
   @staticmethod
   def _GetArchivedJobPath(job_id):
 
   @staticmethod
   def _GetArchivedJobPath(job_id):
@@ -1848,12 +1981,30 @@ class JobQueue(object):
     @return: the path to the archived job file
 
     """
     @return: the path to the archived job file
 
     """
-    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
+    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
                           jstore.GetArchiveDirectory(job_id),
                           "job-%s" % job_id)
 
   @staticmethod
                           jstore.GetArchiveDirectory(job_id),
                           "job-%s" % job_id)
 
   @staticmethod
-  def _GetJobIDsUnlocked(sort=True):
+  def _DetermineJobDirectories(archived):
+    """Build list of directories containing job files.
+
+    @type archived: bool
+    @param archived: Whether to include directories for archived jobs
+    @rtype: list
+
+    """
+    result = [pathutils.QUEUE_DIR]
+
+    if archived:
+      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
+      result.extend(map(compat.partial(utils.PathJoin, archive_path),
+                        utils.ListVisibleFiles(archive_path)))
+
+    return result
+
+  @classmethod
+  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
     """Return all known job IDs.
 
     The method only looks at disk because it's a requirement that all
     """Return all known job IDs.
 
     The method only looks at disk because it's a requirement that all
@@ -1867,12 +2018,15 @@ class JobQueue(object):
 
     """
     jlist = []
 
     """
     jlist = []
-    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
-      m = constants.JOB_FILE_RE.match(filename)
-      if m:
-        jlist.append(m.group(1))
+
+    for path in cls._DetermineJobDirectories(archived):
+      for filename in utils.ListVisibleFiles(path):
+        m = constants.JOB_FILE_RE.match(filename)
+        if m:
+          jlist.append(int(m.group(1)))
+
     if sort:
     if sort:
-      jlist = utils.NiceSort(jlist)
+      jlist.sort()
     return jlist
 
   def _LoadJobUnlocked(self, job_id):
     return jlist
 
   def _LoadJobUnlocked(self, job_id):
@@ -1882,6 +2036,7 @@ class JobQueue(object):
     existing, or try to load the job from the disk. If loading from
     disk, it will also add the job to the cache.
 
     existing, or try to load the job from the disk. If loading from
     disk, it will also add the job to the cache.
 
+    @type job_id: int
     @param job_id: the job id
     @rtype: L{_QueuedJob} or None
     @return: either None or the job object
     @param job_id: the job id
     @rtype: L{_QueuedJob} or None
     @return: either None or the job object
@@ -1920,7 +2075,7 @@ class JobQueue(object):
 
     Given a job file, read, load and restore it in a _QueuedJob format.
 
 
     Given a job file, read, load and restore it in a _QueuedJob format.
 
-    @type job_id: string
+    @type job_id: int
     @param job_id: job identifier
     @type try_archived: bool
     @param try_archived: Whether to try loading an archived job
     @param job_id: job identifier
     @type try_archived: bool
     @param try_archived: Whether to try loading an archived job
@@ -1928,15 +2083,15 @@ class JobQueue(object):
     @return: either None or the job object
 
     """
     @return: either None or the job object
 
     """
-    path_functions = [(self._GetJobPath, True)]
+    path_functions = [(self._GetJobPath, False)]
 
     if try_archived:
 
     if try_archived:
-      path_functions.append((self._GetArchivedJobPath, False))
+      path_functions.append((self._GetArchivedJobPath, True))
 
     raw_data = None
 
     raw_data = None
-    writable_default = None
+    archived = None
 
 
-    for (fn, writable_default) in path_functions:
+    for (fn, archived) in path_functions:
       filepath = fn(job_id)
       logging.debug("Loading job from %s", filepath)
       try:
       filepath = fn(job_id)
       logging.debug("Loading job from %s", filepath)
       try:
@@ -1951,11 +2106,11 @@ class JobQueue(object):
       return None
 
     if writable is None:
       return None
 
     if writable is None:
-      writable = writable_default
+      writable = not archived
 
     try:
       data = serializer.LoadJson(raw_data)
 
     try:
       data = serializer.LoadJson(raw_data)
-      job = _QueuedJob.Restore(self, data, writable)
+      job = _QueuedJob.Restore(self, data, writable, archived)
     except Exception, err: # pylint: disable=W0703
       raise errors.JobFileCorrupted(err)
 
     except Exception, err: # pylint: disable=W0703
       raise errors.JobFileCorrupted(err)
 
@@ -1968,7 +2123,7 @@ class JobQueue(object):
     In case of error reading the job, it gets returned as None, and the
     exception is logged.
 
     In case of error reading the job, it gets returned as None, and the
     exception is logged.
 
-    @type job_id: string
+    @type job_id: int
     @param job_id: job identifier
     @type try_archived: bool
     @param try_archived: Whether to try loading an archived job
     @param job_id: job identifier
     @type try_archived: bool
     @param try_archived: Whether to try loading an archived job
@@ -1997,10 +2152,18 @@ class JobQueue(object):
     @param drain_flag: Whether to set or unset the drain flag
 
     """
     @param drain_flag: Whether to set or unset the drain flag
 
     """
+    # Change flag locally
     jstore.SetDrainFlag(drain_flag)
 
     self._drained = drain_flag
 
     jstore.SetDrainFlag(drain_flag)
 
     self._drained = drain_flag
 
+    # ... and on all nodes
+    (names, addrs) = self._GetNodeIp()
+    result = \
+      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
+    self._CheckRpcResult(result, self._nodes,
+                         "Setting queue drain flag to %s" % drain_flag)
+
     return True
 
   @_RequireOpenQueue
     return True
 
   @_RequireOpenQueue
@@ -2025,13 +2188,14 @@ class JobQueue(object):
 
     job = _QueuedJob(self, job_id, ops, True)
 
 
     job = _QueuedJob(self, job_id, ops, True)
 
-    # Check priority
     for idx, op in enumerate(job.ops):
     for idx, op in enumerate(job.ops):
+      # Check priority
       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
                                   " are %s" % (idx, op.priority, allowed))
 
       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
                                   " are %s" % (idx, op.priority, allowed))
 
+      # Check job dependencies
       dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
       if not opcodes.TNoRelativeJobDependencies(dependencies):
         raise errors.GenericError("Opcode %s has invalid dependencies, must"
       dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
       if not opcodes.TNoRelativeJobDependencies(dependencies):
         raise errors.GenericError("Opcode %s has invalid dependencies, must"
@@ -2096,8 +2260,10 @@ class JobQueue(object):
     @param resolve_fn: Function to resolve a relative job ID
     @type deps: list
     @param deps: Dependencies
     @param resolve_fn: Function to resolve a relative job ID
     @type deps: list
     @param deps: Dependencies
-    @rtype: list
-    @return: Resolved dependencies
+    @rtype: tuple; (boolean, string or list)
+    @return: If successful (first tuple item), the returned list contains
+      resolved job IDs along with the requested status; if not successful,
+      the second element is an error message
 
     """
     result = []
 
     """
     result = []
@@ -2176,19 +2342,17 @@ class JobQueue(object):
     """
     assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
     self._wpool.AddManyTasks([(job, ) for job in jobs],
     """
     assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
     self._wpool.AddManyTasks([(job, ) for job in jobs],
-                             priority=[job.CalcPriority() for job in jobs])
+                             priority=[job.CalcPriority() for job in jobs],
+                             task_id=map(_GetIdAttr, jobs))
 
   def _GetJobStatusForDependencies(self, job_id):
     """Gets the status of a job for dependencies.
 
 
   def _GetJobStatusForDependencies(self, job_id):
     """Gets the status of a job for dependencies.
 
-    @type job_id: string
+    @type job_id: int
     @param job_id: Job ID
     @raise errors.JobLost: If job can't be found
 
     """
     @param job_id: Job ID
     @raise errors.JobLost: If job can't be found
 
     """
-    if not isinstance(job_id, basestring):
-      job_id = jstore.FormatJobID(job_id)
-
     # Not using in-memory cache as doing so would require an exclusive lock
 
     # Try to load from disk
     # Not using in-memory cache as doing so would require an exclusive lock
 
     # Try to load from disk
@@ -2219,6 +2383,7 @@ class JobQueue(object):
       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
       assert (finalized ^ (job.end_timestamp is None))
       assert job.writable, "Can't update read-only job"
       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
       assert (finalized ^ (job.end_timestamp is None))
       assert job.writable, "Can't update read-only job"
+      assert not job.archived, "Can't update archived job"
 
     filename = self._GetJobPath(job.id)
     data = serializer.DumpJson(job.Serialize())
 
     filename = self._GetJobPath(job.id)
     data = serializer.DumpJson(job.Serialize())
@@ -2229,7 +2394,7 @@ class JobQueue(object):
                         timeout):
     """Waits for changes in a job.
 
                         timeout):
     """Waits for changes in a job.
 
-    @type job_id: string
+    @type job_id: int
     @param job_id: Job identifier
     @type fields: list of strings
     @param fields: Which fields to check for changes
     @param job_id: Job identifier
     @type fields: list of strings
     @param fields: Which fields to check for changes
@@ -2249,7 +2414,7 @@ class JobQueue(object):
         as such by the clients
 
     """
         as such by the clients
 
     """
-    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
+    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
                              writable=False)
 
     helper = _WaitForJobChangesHelper()
                              writable=False)
 
     helper = _WaitForJobChangesHelper()
@@ -2264,20 +2429,64 @@ class JobQueue(object):
 
     This will only succeed if the job has not started yet.
 
 
     This will only succeed if the job has not started yet.
 
-    @type job_id: string
+    @type job_id: int
     @param job_id: job ID of job to be cancelled.
 
     """
     logging.info("Cancelling job %s", job_id)
 
     @param job_id: job ID of job to be cancelled.
 
     """
     logging.info("Cancelling job %s", job_id)
 
+    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
+
+  @locking.ssynchronized(_LOCK)
+  @_RequireOpenQueue
+  def ChangeJobPriority(self, job_id, priority):
+    """Changes a job's priority.
+
+    @type job_id: int
+    @param job_id: ID of the job whose priority should be changed
+    @type priority: int
+    @param priority: New priority
+
+    """
+    logging.info("Changing priority of job %s to %s", job_id, priority)
+
+    if priority not in constants.OP_PRIO_SUBMIT_VALID:
+      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
+      raise errors.GenericError("Invalid priority %s, allowed are %s" %
+                                (priority, allowed))
+
+    def fn(job):
+      (success, msg) = job.ChangePriority(priority)
+
+      if success:
+        try:
+          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
+        except workerpool.NoSuchTask:
+          logging.debug("Job %s is not in workerpool at this time", job.id)
+
+      return (success, msg)
+
+    return self._ModifyJobUnlocked(job_id, fn)
+
+  def _ModifyJobUnlocked(self, job_id, mod_fn):
+    """Modifies a job.
+
+    @type job_id: int
+    @param job_id: Job ID
+    @type mod_fn: callable
+    @param mod_fn: Modifying function, receiving job object as parameter,
+      returning tuple of (status boolean, message string)
+
+    """
     job = self._LoadJobUnlocked(job_id)
     if not job:
       logging.debug("Job %s not found", job_id)
       return (False, "Job %s not found" % job_id)
 
     job = self._LoadJobUnlocked(job_id)
     if not job:
       logging.debug("Job %s not found", job_id)
       return (False, "Job %s not found" % job_id)
 
-    assert job.writable, "Can't cancel read-only job"
+    assert job.writable, "Can't modify read-only job"
+    assert not job.archived, "Can't modify archived job"
 
 
-    (success, msg) = job.Cancel()
+    (success, msg) = mod_fn(job)
 
     if success:
       # If the job was finalized (e.g. cancelled), this is the final write
 
     if success:
       # If the job was finalized (e.g. cancelled), this is the final write
@@ -2300,6 +2509,7 @@ class JobQueue(object):
     rename_files = []
     for job in jobs:
       assert job.writable, "Can't archive read-only job"
     rename_files = []
     for job in jobs:
       assert job.writable, "Can't archive read-only job"
+      assert not job.archived, "Can't cancel archived job"
 
       if job.CalcStatus() not in constants.JOBS_FINALIZED:
         logging.debug("Job %s is not yet done", job.id)
 
       if job.CalcStatus() not in constants.JOBS_FINALIZED:
         logging.debug("Job %s is not yet done", job.id)
@@ -2331,7 +2541,7 @@ class JobQueue(object):
 
     This is just a wrapper over L{_ArchiveJobsUnlocked}.
 
 
     This is just a wrapper over L{_ArchiveJobsUnlocked}.
 
-    @type job_id: string
+    @type job_id: int
     @param job_id: Job ID of job to be archived.
     @rtype: bool
     @return: Whether job was archived
     @param job_id: Job ID of job to be archived.
     @rtype: bool
     @return: Whether job was archived
@@ -2406,6 +2616,11 @@ class JobQueue(object):
     qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
                        namefield="id")
 
     qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
                        namefield="id")
 
+    # Archived jobs are only looked at if the "archived" field is referenced
+    # either as a requested field or in the filter. By default archived jobs
+    # are ignored.
+    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
+
     job_ids = qobj.RequestedNames()
 
     list_all = (job_ids is None)
     job_ids = qobj.RequestedNames()
 
     list_all = (job_ids is None)
@@ -2413,7 +2628,7 @@ class JobQueue(object):
     if list_all:
       # Since files are added to/removed from the queue atomically, there's no
       # risk of getting the job ids in an inconsistent state.
     if list_all:
       # Since files are added to/removed from the queue atomically, there's no
       # risk of getting the job ids in an inconsistent state.
-      job_ids = self._GetJobIDsUnlocked()
+      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
 
     jobs = []
 
 
     jobs = []
 
@@ -2433,9 +2648,9 @@ class JobQueue(object):
     @param qfilter: Query filter
 
     """
     @param qfilter: Query filter
 
     """
-    (qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
+    (qobj, ctx, _) = self._Query(fields, qfilter)
 
 
-    return query.GetQueryResponse(qobj, ctx, sort_by_name=sort_by_name)
+    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
 
   def OldStyleQueryJobs(self, job_ids, fields):
     """Returns a list of jobs in queue.
 
   def OldStyleQueryJobs(self, job_ids, fields):
     """Returns a list of jobs in queue.
@@ -2449,11 +2664,13 @@ class JobQueue(object):
         the requested fields
 
     """
         the requested fields
 
     """
+    # backwards compat:
+    job_ids = [int(jid) for jid in job_ids]
     qfilter = qlang.MakeSimpleFilter("id", job_ids)
 
     qfilter = qlang.MakeSimpleFilter("id", job_ids)
 
-    (qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
+    (qobj, ctx, _) = self._Query(fields, qfilter)
 
 
-    return qobj.OldStyleQuery(ctx, sort_by_name=sort_by_name)
+    return qobj.OldStyleQuery(ctx, sort_by_name=False)
 
   @locking.ssynchronized(_LOCK)
   def PrepareShutdown(self):
 
   @locking.ssynchronized(_LOCK)
   def PrepareShutdown(self):
@@ -2480,6 +2697,17 @@ class JobQueue(object):
 
     return self._wpool.HasRunningTasks()
 
 
     return self._wpool.HasRunningTasks()
 
+  def AcceptingJobsUnlocked(self):
+    """Returns whether jobs are accepted.
+
+    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
+    queue is shutting down.
+
+    @rtype: bool
+
+    """
+    return self._accepting_jobs
+
   @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def Shutdown(self):
   @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def Shutdown(self):