Disable synchronous (locking) queries
[ganeti-local] / lib / jqueue.py
index 54a3562..3364a93 100644 (file)
 
 """Module implementing the job queue handling.
 
 
 """Module implementing the job queue handling.
 
-Locking:
-There's a single, large lock in the JobQueue class. It's used by all other
-classes in this module.
+Locking: there's a single, large lock in the L{JobQueue} class. It's
+used by all other classes in this module.
+
+@var JOBQUEUE_THREADS: the number of worker threads we start for
+    processing jobs
 
 """
 
 
 """
 
@@ -45,23 +47,46 @@ from ganeti import utils
 from ganeti import jstore
 from ganeti import rpc
 
 from ganeti import jstore
 from ganeti import rpc
 
-from ganeti.rpc import RpcRunner
 
 JOBQUEUE_THREADS = 25
 
 JOBQUEUE_THREADS = 25
+JOBS_PER_ARCHIVE_DIRECTORY = 10000
+
+
+class CancelJob(Exception):
+  """Special exception to cancel a job.
+
+  """
 
 
 def TimeStampNow():
 
 
 def TimeStampNow():
+  """Returns the current timestamp.
+
+  @rtype: tuple
+  @return: the current time in the (seconds, microseconds) format
+
+  """
   return utils.SplitTime(time.time())
 
 
 class _QueuedOpCode(object):
   """Encasulates an opcode object.
 
   return utils.SplitTime(time.time())
 
 
 class _QueuedOpCode(object):
   """Encasulates an opcode object.
 
-  The 'log' attribute holds the execution log and consists of tuples
-  of the form (log_serial, timestamp, level, message).
+  @ivar log: holds the execution log and consists of tuples
+  of the form C{(log_serial, timestamp, level, message)}
+  @ivar input: the OpCode we encapsulate
+  @ivar status: the current status
+  @ivar result: the result of the LU execution
+  @ivar start_timestamp: timestamp for the start of the execution
+  @ivar stop_timestamp: timestamp for the end of the execution
 
   """
   def __init__(self, op):
 
   """
   def __init__(self, op):
+    """Constructor for the _QuededOpCode.
+
+    @type op: L{opcodes.OpCode}
+    @param op: the opcode we encapsulate
+
+    """
     self.input = op
     self.status = constants.OP_STATUS_QUEUED
     self.result = None
     self.input = op
     self.status = constants.OP_STATUS_QUEUED
     self.result = None
@@ -71,6 +96,14 @@ class _QueuedOpCode(object):
 
   @classmethod
   def Restore(cls, state):
 
   @classmethod
   def Restore(cls, state):
+    """Restore the _QueuedOpCode from the serialized form.
+
+    @type state: dict
+    @param state: the serialized state
+    @rtype: _QueuedOpCode
+    @return: a new _QueuedOpCode instance
+
+    """
     obj = _QueuedOpCode.__new__(cls)
     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
     obj.status = state["status"]
     obj = _QueuedOpCode.__new__(cls)
     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
     obj.status = state["status"]
@@ -81,6 +114,12 @@ class _QueuedOpCode(object):
     return obj
 
   def Serialize(self):
     return obj
 
   def Serialize(self):
+    """Serializes this _QueuedOpCode.
+
+    @rtype: dict
+    @return: the dictionary holding the serialized state
+
+    """
     return {
       "input": self.input.__getstate__(),
       "status": self.status,
     return {
       "input": self.input.__getstate__(),
       "status": self.status,
@@ -94,13 +133,39 @@ class _QueuedOpCode(object):
 class _QueuedJob(object):
   """In-memory job representation.
 
 class _QueuedJob(object):
   """In-memory job representation.
 
-  This is what we use to track the user-submitted jobs. Locking must be taken
-  care of by users of this class.
+  This is what we use to track the user-submitted jobs. Locking must
+  be taken care of by users of this class.
+
+  @type queue: L{JobQueue}
+  @ivar queue: the parent queue
+  @ivar id: the job ID
+  @type ops: list
+  @ivar ops: the list of _QueuedOpCode that constitute the job
+  @type run_op_index: int
+  @ivar run_op_index: the currently executing opcode, or -1 if
+      we didn't yet start executing
+  @type log_serial: int
+  @ivar log_serial: holds the index for the next log entry
+  @ivar received_timestamp: the timestamp for when the job was received
+  @ivar start_timestmap: the timestamp for start of execution
+  @ivar end_timestamp: the timestamp for end of execution
+  @ivar change: a Condition variable we use for waiting for job changes
 
   """
   def __init__(self, queue, job_id, ops):
 
   """
   def __init__(self, queue, job_id, ops):
+    """Constructor for the _QueuedJob.
+
+    @type queue: L{JobQueue}
+    @param queue: our parent queue
+    @type job_id: job_id
+    @param job_id: our job id
+    @type ops: list
+    @param ops: the list of opcodes we hold, which will be encapsulated
+        in _QueuedOpCodes
+
+    """
     if not ops:
     if not ops:
-      # TODO
+      # TODO: use a better exception
       raise Exception("No opcodes")
 
     self.queue = queue
       raise Exception("No opcodes")
 
     self.queue = queue
@@ -117,6 +182,16 @@ class _QueuedJob(object):
 
   @classmethod
   def Restore(cls, queue, state):
 
   @classmethod
   def Restore(cls, queue, state):
+    """Restore a _QueuedJob from serialized state:
+
+    @type queue: L{JobQueue}
+    @param queue: to which queue the restored job belongs
+    @type state: dict
+    @param state: the serialized state
+    @rtype: _JobQueue
+    @return: the restored _JobQueue instance
+
+    """
     obj = _QueuedJob.__new__(cls)
     obj.queue = queue
     obj.id = state["id"]
     obj = _QueuedJob.__new__(cls)
     obj.queue = queue
     obj.id = state["id"]
@@ -139,6 +214,12 @@ class _QueuedJob(object):
     return obj
 
   def Serialize(self):
     return obj
 
   def Serialize(self):
+    """Serialize the _JobQueue instance.
+
+    @rtype: dict
+    @return: the serialized state
+
+    """
     return {
       "id": self.id,
       "ops": [op.Serialize() for op in self.ops],
     return {
       "id": self.id,
       "ops": [op.Serialize() for op in self.ops],
@@ -149,6 +230,27 @@ class _QueuedJob(object):
       }
 
   def CalcStatus(self):
       }
 
   def CalcStatus(self):
+    """Compute the status of this job.
+
+    This function iterates over all the _QueuedOpCodes in the job and
+    based on their status, computes the job status.
+
+    The algorithm is:
+      - if we find a cancelled, or finished with error, the job
+        status will be the same
+      - otherwise, the last opcode with the status one of:
+          - waitlock
+          - canceling
+          - running
+
+        will determine the job status
+
+      - otherwise, it means either all opcodes are queued, or success,
+        and the job status will be the same
+
+    @return: the job status
+
+    """
     status = constants.JOB_STATUS_QUEUED
 
     all_success = True
     status = constants.JOB_STATUS_QUEUED
 
     all_success = True
@@ -164,6 +266,9 @@ class _QueuedJob(object):
         status = constants.JOB_STATUS_WAITLOCK
       elif op.status == constants.OP_STATUS_RUNNING:
         status = constants.JOB_STATUS_RUNNING
         status = constants.JOB_STATUS_WAITLOCK
       elif op.status == constants.OP_STATUS_RUNNING:
         status = constants.JOB_STATUS_RUNNING
+      elif op.status == constants.OP_STATUS_CANCELING:
+        status = constants.JOB_STATUS_CANCELING
+        break
       elif op.status == constants.OP_STATUS_ERROR:
         status = constants.JOB_STATUS_ERROR
         # The whole job fails if one opcode failed
       elif op.status == constants.OP_STATUS_ERROR:
         status = constants.JOB_STATUS_ERROR
         # The whole job fails if one opcode failed
@@ -178,6 +283,16 @@ class _QueuedJob(object):
     return status
 
   def GetLogEntries(self, newer_than):
     return status
 
   def GetLogEntries(self, newer_than):
+    """Selectively returns the log entries.
+
+    @type newer_than: None or int
+    @param newer_than: if this is None, return all log enties,
+        otherwise return only the log entries with serial higher
+        than this value
+    @rtype: list
+    @return: the list of the log entries selected
+
+    """
     if newer_than is None:
       serial = -1
     else:
     if newer_than is None:
       serial = -1
     else:
@@ -185,12 +300,15 @@ class _QueuedJob(object):
 
     entries = []
     for op in self.ops:
 
     entries = []
     for op in self.ops:
-      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
+      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
 
     return entries
 
 
 class _JobQueueWorker(workerpool.BaseWorker):
 
     return entries
 
 
 class _JobQueueWorker(workerpool.BaseWorker):
+  """The actual job workers.
+
+  """
   def _NotifyStart(self):
     """Mark the opcode as running, not lock-waiting.
 
   def _NotifyStart(self):
     """Mark the opcode as running, not lock-waiting.
 
@@ -205,6 +323,13 @@ class _JobQueueWorker(workerpool.BaseWorker):
 
     self.queue.acquire()
     try:
 
     self.queue.acquire()
     try:
+      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
+                                    constants.OP_STATUS_CANCELING)
+
+      # Cancel here if we were asked to
+      if self.opcode.status == constants.OP_STATUS_CANCELING:
+        raise CancelJob()
+
       self.opcode.status = constants.OP_STATUS_RUNNING
     finally:
       self.queue.release()
       self.opcode.status = constants.OP_STATUS_RUNNING
     finally:
       self.queue.release()
@@ -215,8 +340,11 @@ class _JobQueueWorker(workerpool.BaseWorker):
     This functions processes a job. It is closely tied to the _QueuedJob and
     _QueuedOpCode classes.
 
     This functions processes a job. It is closely tied to the _QueuedJob and
     _QueuedOpCode classes.
 
+    @type job: L{_QueuedJob}
+    @param job: the job to be processed
+
     """
     """
-    logging.debug("Worker %s processing job %s",
+    logging.info("Worker %s processing job %s",
                   self.worker_id, job.id)
     proc = mcpu.Processor(self.pool.queue.context)
     self.queue = queue = job.queue
                   self.worker_id, job.id)
     proc = mcpu.Processor(self.pool.queue.context)
     self.queue = queue = job.queue
@@ -224,11 +352,16 @@ class _JobQueueWorker(workerpool.BaseWorker):
       try:
         count = len(job.ops)
         for idx, op in enumerate(job.ops):
       try:
         count = len(job.ops)
         for idx, op in enumerate(job.ops):
+          op_summary = op.input.Summary()
           try:
           try:
-            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
+            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
+                         op_summary)
 
             queue.acquire()
             try:
 
             queue.acquire()
             try:
+              if op.status == constants.OP_STATUS_CANCELED:
+                raise CancelJob()
+              assert op.status == constants.OP_STATUS_QUEUED
               job.run_op_index = idx
               op.status = constants.OP_STATUS_WAITLOCK
               op.result = None
               job.run_op_index = idx
               op.status = constants.OP_STATUS_WAITLOCK
               op.result = None
@@ -279,8 +412,11 @@ class _JobQueueWorker(workerpool.BaseWorker):
             finally:
               queue.release()
 
             finally:
               queue.release()
 
-            logging.debug("Op %s/%s: Successfully finished %s",
-                          idx + 1, count, op)
+            logging.info("Op %s/%s: Successfully finished opcode %s",
+                         idx + 1, count, op_summary)
+          except CancelJob:
+            # Will be handled further up
+            raise
           except Exception, err:
             queue.acquire()
             try:
           except Exception, err:
             queue.acquire()
             try:
@@ -288,13 +424,20 @@ class _JobQueueWorker(workerpool.BaseWorker):
                 op.status = constants.OP_STATUS_ERROR
                 op.result = str(err)
                 op.end_timestamp = TimeStampNow()
                 op.status = constants.OP_STATUS_ERROR
                 op.result = str(err)
                 op.end_timestamp = TimeStampNow()
-                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
+                logging.info("Op %s/%s: Error in opcode %s: %s",
+                             idx + 1, count, op_summary, err)
               finally:
                 queue.UpdateJobUnlocked(job)
             finally:
               queue.release()
             raise
 
               finally:
                 queue.UpdateJobUnlocked(job)
             finally:
               queue.release()
             raise
 
+      except CancelJob:
+        queue.acquire()
+        try:
+          queue.CancelJobUnlocked(job)
+        finally:
+          queue.release()
       except errors.GenericError, err:
         logging.exception("Ganeti exception")
       except:
       except errors.GenericError, err:
         logging.exception("Ganeti exception")
       except:
@@ -311,11 +454,14 @@ class _JobQueueWorker(workerpool.BaseWorker):
           status = job.CalcStatus()
       finally:
         queue.release()
           status = job.CalcStatus()
       finally:
         queue.release()
-      logging.debug("Worker %s finished job %s, status = %s",
-                    self.worker_id, job_id, status)
+      logging.info("Worker %s finished job %s, status = %s",
+                   self.worker_id, job_id, status)
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
+  """Simple class implementing a job-processing workerpool.
+
+  """
   def __init__(self, queue):
     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                               _JobQueueWorker)
   def __init__(self, queue):
     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                               _JobQueueWorker)
@@ -323,17 +469,22 @@ class _JobQueueWorkerPool(workerpool.WorkerPool):
 
 
 class JobQueue(object):
 
 
 class JobQueue(object):
+  """Quue used to manaage the jobs.
+
+  @cvar _RE_JOB_FILE: regex matching the valid job file names
+
+  """
   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
 
   def _RequireOpenQueue(fn):
     """Decorator for "public" functions.
 
   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
 
   def _RequireOpenQueue(fn):
     """Decorator for "public" functions.
 
-    This function should be used for all "public" functions. That is, functions
-    usually called from other classes.
+    This function should be used for all 'public' functions. That is,
+    functions usually called from other classes.
 
 
-    Important: Use this decorator only after utils.LockedMethod!
+    @warning: Use this decorator only after utils.LockedMethod!
 
 
-    Example:
+    Example::
       @utils.LockedMethod
       @_RequireOpenQueue
       def Example(self):
       @utils.LockedMethod
       @_RequireOpenQueue
       def Example(self):
@@ -346,6 +497,18 @@ class JobQueue(object):
     return wrapper
 
   def __init__(self, context):
     return wrapper
 
   def __init__(self, context):
+    """Constructor for JobQueue.
+
+    The constructor will initialize the job queue object and then
+    start loading the current jobs from disk, either for starting them
+    (if they were queue) or for aborting them (if they were already
+    running).
+
+    @type context: GanetiContext
+    @param context: the context object for access to the configuration
+        data and other ganeti objects
+
+    """
     self.context = context
     self._memcache = weakref.WeakValueDictionary()
     self._my_hostname = utils.HostInfo().name
     self.context = context
     self._memcache = weakref.WeakValueDictionary()
     self._my_hostname = utils.HostInfo().name
@@ -364,48 +527,87 @@ class JobQueue(object):
                                            " check in jstore and here")
 
     # Get initial list of nodes
                                            " check in jstore and here")
 
     # Get initial list of nodes
-    self._nodes = set(self.context.cfg.GetNodeList())
+    self._nodes = dict((n.name, n.primary_ip)
+                       for n in self.context.cfg.GetAllNodesInfo().values()
+                       if n.master_candidate)
 
     # Remove master node
     try:
 
     # Remove master node
     try:
-      self._nodes.remove(self._my_hostname)
-    except ValueError:
+      del self._nodes[self._my_hostname]
+    except KeyError:
       pass
 
     # TODO: Check consistency across nodes
 
     # Setup worker pool
     self._wpool = _JobQueueWorkerPool(self)
       pass
 
     # TODO: Check consistency across nodes
 
     # Setup worker pool
     self._wpool = _JobQueueWorkerPool(self)
-
-    # We need to lock here because WorkerPool.AddTask() may start a job while
-    # we're still doing our work.
-    self.acquire()
     try:
     try:
-      for job in self._GetJobsUnlocked(None):
-        status = job.CalcStatus()
+      # We need to lock here because WorkerPool.AddTask() may start a job while
+      # we're still doing our work.
+      self.acquire()
+      try:
+        logging.info("Inspecting job queue")
 
 
-        if status in (constants.JOB_STATUS_QUEUED, ):
-          self._wpool.AddTask(job)
+        all_job_ids = self._GetJobIDsUnlocked()
+        jobs_count = len(all_job_ids)
+        lastinfo = time.time()
+        for idx, job_id in enumerate(all_job_ids):
+          # Give an update every 1000 jobs or 10 seconds
+          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
+              idx == (jobs_count - 1)):
+            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
+                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
+            lastinfo = time.time()
 
 
-        elif status in (constants.JOB_STATUS_RUNNING,
-                        constants.JOB_STATUS_WAITLOCK):
-          logging.warning("Unfinished job %s found: %s", job.id, job)
-          try:
-            for op in job.ops:
-              op.status = constants.OP_STATUS_ERROR
-              op.result = "Unclean master daemon shutdown"
-          finally:
-            self.UpdateJobUnlocked(job)
-    finally:
-      self.release()
+          job = self._LoadJobUnlocked(job_id)
+
+          # a failure in loading the job can cause 'None' to be returned
+          if job is None:
+            continue
+
+          status = job.CalcStatus()
+
+          if status in (constants.JOB_STATUS_QUEUED, ):
+            self._wpool.AddTask(job)
+
+          elif status in (constants.JOB_STATUS_RUNNING,
+                          constants.JOB_STATUS_WAITLOCK,
+                          constants.JOB_STATUS_CANCELING):
+            logging.warning("Unfinished job %s found: %s", job.id, job)
+            try:
+              for op in job.ops:
+                op.status = constants.OP_STATUS_ERROR
+                op.result = "Unclean master daemon shutdown"
+            finally:
+              self.UpdateJobUnlocked(job)
+
+        logging.info("Job queue inspection finished")
+      finally:
+        self.release()
+    except:
+      self._wpool.TerminateWorkers()
+      raise
 
   @utils.LockedMethod
   @_RequireOpenQueue
 
   @utils.LockedMethod
   @_RequireOpenQueue
-  def AddNode(self, node_name):
+  def AddNode(self, node):
+    """Register a new node with the queue.
+
+    @type node: L{objects.Node}
+    @param node: the node object to be added
+
+    """
+    node_name = node.name
     assert node_name != self._my_hostname
 
     # Clean queue directory on added node
     assert node_name != self._my_hostname
 
     # Clean queue directory on added node
-    RpcRunner.call_jobqueue_purge(node_name)
+    rpc.RpcRunner.call_jobqueue_purge(node_name)
+
+    if not node.master_candidate:
+      # remove if existing, ignoring errors
+      self._nodes.pop(node_name, None)
+      # and skip the replication of the job ids
+      return
 
     # Upload the whole queue excluding archived jobs
     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
 
     # Upload the whole queue excluding archived jobs
     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
@@ -421,22 +623,43 @@ class JobQueue(object):
       finally:
         fd.close()
 
       finally:
         fd.close()
 
-      result = RpcRunner.call_jobqueue_update([node_name], file_name, content)
+      result = rpc.RpcRunner.call_jobqueue_update([node_name],
+                                                  [node.primary_ip],
+                                                  file_name, content)
       if not result[node_name]:
         logging.error("Failed to upload %s to %s", file_name, node_name)
 
       if not result[node_name]:
         logging.error("Failed to upload %s to %s", file_name, node_name)
 
-    self._nodes.add(node_name)
+    self._nodes[node_name] = node.primary_ip
 
   @utils.LockedMethod
   @_RequireOpenQueue
   def RemoveNode(self, node_name):
 
   @utils.LockedMethod
   @_RequireOpenQueue
   def RemoveNode(self, node_name):
+    """Callback called when removing nodes from the cluster.
+
+    @type node_name: str
+    @param node_name: the name of the node to remove
+
+    """
     try:
       # The queue is removed by the "leave node" RPC call.
     try:
       # The queue is removed by the "leave node" RPC call.
-      self._nodes.remove(node_name)
+      del self._nodes[node_name]
     except KeyError:
       pass
 
   def _CheckRpcResult(self, result, nodes, failmsg):
     except KeyError:
       pass
 
   def _CheckRpcResult(self, result, nodes, failmsg):
+    """Verifies the status of an RPC call.
+
+    Since we aim to keep consistency should this node (the current
+    master) fail, we will log errors if our rpc fail, and especially
+    log the case when more than half of the nodes failes.
+
+    @param result: the data as returned from the rpc call
+    @type nodes: list
+    @param nodes: the list of nodes we made the call to
+    @type failmsg: str
+    @param failmsg: the identifier to be used for logging
+
+    """
     failed = []
     success = []
 
     failed = []
     success = []
 
@@ -454,24 +677,69 @@ class JobQueue(object):
       # TODO: Handle failing nodes
       logging.error("More than half of the nodes failed")
 
       # TODO: Handle failing nodes
       logging.error("More than half of the nodes failed")
 
+  def _GetNodeIp(self):
+    """Helper for returning the node name/ip list.
+
+    @rtype: (list, list)
+    @return: a tuple of two lists, the first one with the node
+        names and the second one with the node addresses
+
+    """
+    name_list = self._nodes.keys()
+    addr_list = [self._nodes[name] for name in name_list]
+    return name_list, addr_list
+
   def _WriteAndReplicateFileUnlocked(self, file_name, data):
     """Writes a file locally and then replicates it to all nodes.
 
   def _WriteAndReplicateFileUnlocked(self, file_name, data):
     """Writes a file locally and then replicates it to all nodes.
 
+    This function will replace the contents of a file on the local
+    node and then replicate it to all the other nodes we have.
+
+    @type file_name: str
+    @param file_name: the path of the file to be replicated
+    @type data: str
+    @param data: the new contents of the file
+
     """
     utils.WriteFile(file_name, data=data)
 
     """
     utils.WriteFile(file_name, data=data)
 
-    result = RpcRunner.call_jobqueue_update(self._nodes, file_name, data)
+    names, addrs = self._GetNodeIp()
+    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
     self._CheckRpcResult(result, self._nodes,
                          "Updating %s" % file_name)
 
     self._CheckRpcResult(result, self._nodes,
                          "Updating %s" % file_name)
 
-  def _RenameFileUnlocked(self, old, new):
-    os.rename(old, new)
+  def _RenameFilesUnlocked(self, rename):
+    """Renames a file locally and then replicate the change.
 
 
-    result = RpcRunner.call_jobqueue_rename(self._nodes, old, new)
-    self._CheckRpcResult(result, self._nodes,
-                         "Moving %s to %s" % (old, new))
+    This function will rename a file in the local queue directory
+    and then replicate this rename to all the other nodes we have.
+
+    @type rename: list of (old, new)
+    @param rename: List containing tuples mapping old to new names
+
+    """
+    # Rename them locally
+    for old, new in rename:
+      utils.RenameFile(old, new, mkdir=True)
+
+    # ... and on all nodes
+    names, addrs = self._GetNodeIp()
+    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
+    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
 
   def _FormatJobID(self, job_id):
 
   def _FormatJobID(self, job_id):
+    """Convert a job ID to string format.
+
+    Currently this just does C{str(job_id)} after performing some
+    checks, but if we want to change the job id format this will
+    abstract this change.
+
+    @type job_id: int or long
+    @param job_id: the numeric job id
+    @rtype: str
+    @return: the formatted job id
+
+    """
     if not isinstance(job_id, (int, long)):
       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
     if job_id < 0:
     if not isinstance(job_id, (int, long)):
       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
     if job_id < 0:
@@ -479,12 +747,25 @@ class JobQueue(object):
 
     return str(job_id)
 
 
     return str(job_id)
 
+  @classmethod
+  def _GetArchiveDirectory(cls, job_id):
+    """Returns the archive directory for a job.
+
+    @type job_id: str
+    @param job_id: Job identifier
+    @rtype: str
+    @return: Directory name
+
+    """
+    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
+
   def _NewSerialUnlocked(self):
     """Generates a new job identifier.
 
     Job identifiers are unique during the lifetime of a cluster.
 
   def _NewSerialUnlocked(self):
     """Generates a new job identifier.
 
     Job identifiers are unique during the lifetime of a cluster.
 
-    Returns: A string representing the job identifier.
+    @rtype: str
+    @return: a string representing the job identifier.
 
     """
     # New number
 
     """
     # New number
@@ -501,14 +782,41 @@ class JobQueue(object):
 
   @staticmethod
   def _GetJobPath(job_id):
 
   @staticmethod
   def _GetJobPath(job_id):
+    """Returns the job file for a given job id.
+
+    @type job_id: str
+    @param job_id: the job identifier
+    @rtype: str
+    @return: the path to the job file
+
+    """
     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
 
     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
 
-  @staticmethod
-  def _GetArchivedJobPath(job_id):
-    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
+  @classmethod
+  def _GetArchivedJobPath(cls, job_id):
+    """Returns the archived job file for a give job id.
+
+    @type job_id: str
+    @param job_id: the job identifier
+    @rtype: str
+    @return: the path to the archived job file
+
+    """
+    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
+    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
 
   @classmethod
   def _ExtractJobID(cls, name):
 
   @classmethod
   def _ExtractJobID(cls, name):
+    """Extract the job id from a filename.
+
+    @type name: str
+    @param name: the job filename
+    @rtype: job id or None
+    @return: the job id corresponding to the given filename,
+        or None if the filename does not represent a valid
+        job file
+
+    """
     m = cls._RE_JOB_FILE.match(name)
     if m:
       return m.group(1)
     m = cls._RE_JOB_FILE.match(name)
     if m:
       return m.group(1)
@@ -525,16 +833,36 @@ class JobQueue(object):
     jobs are present on disk (so in the _memcache we don't have any
     extra IDs).
 
     jobs are present on disk (so in the _memcache we don't have any
     extra IDs).
 
+    @rtype: list
+    @return: the list of job IDs
+
     """
     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
     jlist = utils.NiceSort(jlist)
     return jlist
 
   def _ListJobFiles(self):
     """
     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
     jlist = utils.NiceSort(jlist)
     return jlist
 
   def _ListJobFiles(self):
+    """Returns the list of current job files.
+
+    @rtype: list
+    @return: the list of job file names
+
+    """
     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
             if self._RE_JOB_FILE.match(name)]
 
   def _LoadJobUnlocked(self, job_id):
     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
             if self._RE_JOB_FILE.match(name)]
 
   def _LoadJobUnlocked(self, job_id):
+    """Loads a job from the disk or memory.
+
+    Given a job id, this will return the cached job object if
+    existing, or try to load the job from the disk. If loading from
+    disk, it will also add the job to the cache.
+
+    @param job_id: the job id
+    @rtype: L{_QueuedJob} or None
+    @return: either None or the job object
+
+    """
     job = self._memcache.get(job_id, None)
     if job:
       logging.debug("Found job %s in memcache", job_id)
     job = self._memcache.get(job_id, None)
     if job:
       logging.debug("Found job %s in memcache", job_id)
@@ -553,17 +881,68 @@ class JobQueue(object):
     finally:
       fd.close()
 
     finally:
       fd.close()
 
-    job = _QueuedJob.Restore(self, data)
+    try:
+      job = _QueuedJob.Restore(self, data)
+    except Exception, err:
+      new_path = self._GetArchivedJobPath(job_id)
+      if filepath == new_path:
+        # job already archived (future case)
+        logging.exception("Can't parse job %s", job_id)
+      else:
+        # non-archived case
+        logging.exception("Can't parse job %s, will archive.", job_id)
+        self._RenameFilesUnlocked([(filepath, new_path)])
+      return None
+
     self._memcache[job_id] = job
     logging.debug("Added job %s to the cache", job_id)
     return job
 
   def _GetJobsUnlocked(self, job_ids):
     self._memcache[job_id] = job
     logging.debug("Added job %s to the cache", job_id)
     return job
 
   def _GetJobsUnlocked(self, job_ids):
+    """Return a list of jobs based on their IDs.
+
+    @type job_ids: list
+    @param job_ids: either an empty list (meaning all jobs),
+        or a list of job IDs
+    @rtype: list
+    @return: the list of job objects
+
+    """
     if not job_ids:
       job_ids = self._GetJobIDsUnlocked()
 
     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
 
     if not job_ids:
       job_ids = self._GetJobIDsUnlocked()
 
     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
 
+  @staticmethod
+  def _IsQueueMarkedDrain():
+    """Check if the queue is marked from drain.
+
+    This currently uses the queue drain file, which makes it a
+    per-node flag. In the future this can be moved to the config file.
+
+    @rtype: boolean
+    @return: True of the job queue is marked for draining
+
+    """
+    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
+
+  @staticmethod
+  def SetDrainFlag(drain_flag):
+    """Sets the drain flag for the queue.
+
+    This is similar to the function L{backend.JobQueueSetDrainFlag},
+    and in the future we might merge them.
+
+    @type drain_flag: boolean
+    @param drain_flag: wheter to set or unset the drain flag
+
+    """
+    if drain_flag:
+      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
+    else:
+      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
+    return True
+
   @utils.LockedMethod
   @_RequireOpenQueue
   def SubmitJob(self, ops):
   @utils.LockedMethod
   @_RequireOpenQueue
   def SubmitJob(self, ops):
@@ -574,8 +953,25 @@ class JobQueue(object):
 
     @type ops: list
     @param ops: The list of OpCodes that will become the new job.
 
     @type ops: list
     @param ops: The list of OpCodes that will become the new job.
+    @rtype: job ID
+    @return: the job ID of the newly created job
+    @raise errors.JobQueueDrainError: if the job is marked for draining
 
     """
 
     """
+    if self._IsQueueMarkedDrain():
+      raise errors.JobQueueDrainError()
+
+    # Check job queue size
+    size = len(self._ListJobFiles())
+    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
+      # TODO: Autoarchive jobs. Make sure it's not done on every job
+      # submission, though.
+      #size = ...
+      pass
+
+    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
+      raise errors.JobQueueFull()
+
     # Get job identifier
     job_id = self._NewSerialUnlocked()
     job = _QueuedJob(self, job_id, ops)
     # Get job identifier
     job_id = self._NewSerialUnlocked()
     job = _QueuedJob(self, job_id, ops)
@@ -593,6 +989,16 @@ class JobQueue(object):
 
   @_RequireOpenQueue
   def UpdateJobUnlocked(self, job):
 
   @_RequireOpenQueue
   def UpdateJobUnlocked(self, job):
+    """Update a job's on disk storage.
+
+    After a job has been modified, this function needs to be called in
+    order to write the changes to disk and replicate them to the other
+    nodes.
+
+    @type job: L{_QueuedJob}
+    @param job: the changed job
+
+    """
     filename = self._GetJobPath(job.id)
     data = serializer.DumpJson(job.Serialize(), indent=False)
     logging.debug("Writing job %s to %s", job.id, filename)
     filename = self._GetJobPath(job.id)
     data = serializer.DumpJson(job.Serialize(), indent=False)
     logging.debug("Writing job %s to %s", job.id, filename)
@@ -617,6 +1023,14 @@ class JobQueue(object):
     @param prev_log_serial: Last job message serial number
     @type timeout: float
     @param timeout: maximum time to wait
     @param prev_log_serial: Last job message serial number
     @type timeout: float
     @param timeout: maximum time to wait
+    @rtype: tuple (job info, log entries)
+    @return: a tuple of the job information as required via
+        the fields parameter, and the log entries as a list
+
+        if the job has not changed and the timeout has expired,
+        we instead return a special value,
+        L{constants.JOB_NOTCHANGED}, which should be interpreted
+        as such by the clients
 
     """
     logging.debug("Waiting for changes in job %s", job_id)
 
     """
     logging.debug("Waiting for changes in job %s", job_id)
@@ -668,70 +1082,109 @@ class JobQueue(object):
   def CancelJob(self, job_id):
     """Cancels a job.
 
   def CancelJob(self, job_id):
     """Cancels a job.
 
+    This will only succeed if the job has not started yet.
+
     @type job_id: string
     @type job_id: string
-    @param job_id: Job ID of job to be cancelled.
+    @param job_id: job ID of job to be cancelled.
 
     """
 
     """
-    logging.debug("Cancelling job %s", job_id)
+    logging.info("Cancelling job %s", job_id)
 
     job = self._LoadJobUnlocked(job_id)
     if not job:
       logging.debug("Job %s not found", job_id)
 
     job = self._LoadJobUnlocked(job_id)
     if not job:
       logging.debug("Job %s not found", job_id)
-      return
+      return (False, "Job %s not found" % job_id)
 
 
-    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
+    job_status = job.CalcStatus()
+
+    if job_status not in (constants.JOB_STATUS_QUEUED,
+                          constants.JOB_STATUS_WAITLOCK):
       logging.debug("Job %s is no longer in the queue", job.id)
       logging.debug("Job %s is no longer in the queue", job.id)
-      return
+      return (False, "Job %s is no longer in the queue" % job.id)
+
+    if job_status == constants.JOB_STATUS_QUEUED:
+      self.CancelJobUnlocked(job)
+      return (True, "Job %s canceled" % job.id)
 
 
+    elif job_status == constants.JOB_STATUS_WAITLOCK:
+      # The worker will notice the new status and cancel the job
+      try:
+        for op in job.ops:
+          op.status = constants.OP_STATUS_CANCELING
+      finally:
+        self.UpdateJobUnlocked(job)
+      return (True, "Job %s will be canceled" % job.id)
+
+  @_RequireOpenQueue
+  def CancelJobUnlocked(self, job):
+    """Marks a job as canceled.
+
+    """
     try:
       for op in job.ops:
     try:
       for op in job.ops:
-        op.status = constants.OP_STATUS_ERROR
-        op.result = "Job cancelled by request"
+        op.status = constants.OP_STATUS_CANCELED
+        op.result = "Job canceled by request"
     finally:
       self.UpdateJobUnlocked(job)
 
   @_RequireOpenQueue
     finally:
       self.UpdateJobUnlocked(job)
 
   @_RequireOpenQueue
-  def _ArchiveJobUnlocked(self, job_id):
-    """Archives a job.
+  def _ArchiveJobsUnlocked(self, jobs):
+    """Archives jobs.
 
 
-    @type job_id: string
-    @param job_id: Job ID of job to be archived.
+    @type jobs: list of L{_QueuedJob}
+    @param jobs: Job objects
+    @rtype: int
+    @return: Number of archived jobs
 
     """
 
     """
-    logging.info("Archiving job %s", job_id)
+    archive_jobs = []
+    rename_files = []
+    for job in jobs:
+      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
+                                  constants.JOB_STATUS_SUCCESS,
+                                  constants.JOB_STATUS_ERROR):
+        logging.debug("Job %s is not yet done", job.id)
+        continue
 
 
-    job = self._LoadJobUnlocked(job_id)
-    if not job:
-      logging.debug("Job %s not found", job_id)
-      return
+      archive_jobs.append(job)
 
 
-    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
-                                constants.JOB_STATUS_SUCCESS,
-                                constants.JOB_STATUS_ERROR):
-      logging.debug("Job %s is not yet done", job.id)
-      return
+      old = self._GetJobPath(job.id)
+      new = self._GetArchivedJobPath(job.id)
+      rename_files.append((old, new))
 
 
-    old = self._GetJobPath(job.id)
-    new = self._GetArchivedJobPath(job.id)
+    # TODO: What if 1..n files fail to rename?
+    self._RenameFilesUnlocked(rename_files)
 
 
-    self._RenameFileUnlocked(old, new)
+    logging.debug("Successfully archived job(s) %s",
+                  ", ".join(job.id for job in archive_jobs))
 
 
-    logging.debug("Successfully archived job %s", job.id)
+    return len(archive_jobs)
 
   @utils.LockedMethod
   @_RequireOpenQueue
   def ArchiveJob(self, job_id):
     """Archives a job.
 
 
   @utils.LockedMethod
   @_RequireOpenQueue
   def ArchiveJob(self, job_id):
     """Archives a job.
 
+    This is just a wrapper over L{_ArchiveJobsUnlocked}.
+
     @type job_id: string
     @param job_id: Job ID of job to be archived.
     @type job_id: string
     @param job_id: Job ID of job to be archived.
+    @rtype: bool
+    @return: Whether job was archived
 
     """
 
     """
-    return self._ArchiveJobUnlocked(job_id)
+    logging.info("Archiving job %s", job_id)
+
+    job = self._LoadJobUnlocked(job_id)
+    if not job:
+      logging.debug("Job %s not found", job_id)
+      return False
+
+    return self._ArchiveJobsUnlocked([job]) == 1
 
   @utils.LockedMethod
   @_RequireOpenQueue
 
   @utils.LockedMethod
   @_RequireOpenQueue
-  def AutoArchiveJobs(self, age):
+  def AutoArchiveJobs(self, age, timeout):
     """Archives all jobs based on age.
 
     The method will archive all jobs which are older than the age
     """Archives all jobs based on age.
 
     The method will archive all jobs which are older than the age
@@ -746,24 +1199,58 @@ class JobQueue(object):
     logging.info("Archiving jobs with age more than %s seconds", age)
 
     now = time.time()
     logging.info("Archiving jobs with age more than %s seconds", age)
 
     now = time.time()
-    for jid in self._GetJobIDsUnlocked(archived=False):
-      job = self._LoadJobUnlocked(jid)
-      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
-                                  constants.OP_STATUS_ERROR,
-                                  constants.OP_STATUS_CANCELED):
-        continue
-      if job.end_timestamp is None:
-        if job.start_timestamp is None:
-          job_age = job.received_timestamp
+    end_time = now + timeout
+    archived_count = 0
+    last_touched = 0
+
+    all_job_ids = self._GetJobIDsUnlocked(archived=False)
+    pending = []
+    for idx, job_id in enumerate(all_job_ids):
+      last_touched = idx
+
+      # Not optimal because jobs could be pending
+      # TODO: Measure average duration for job archival and take number of
+      # pending jobs into account.
+      if time.time() > end_time:
+        break
+
+      # Returns None if the job failed to load
+      job = self._LoadJobUnlocked(job_id)
+      if job:
+        if job.end_timestamp is None:
+          if job.start_timestamp is None:
+            job_age = job.received_timestamp
+          else:
+            job_age = job.start_timestamp
         else:
         else:
-          job_age = job.start_timestamp
-      else:
-        job_age = job.end_timestamp
+          job_age = job.end_timestamp
+
+        if age == -1 or now - job_age[0] > age:
+          pending.append(job)
+
+          # Archive 10 jobs at a time
+          if len(pending) >= 10:
+            archived_count += self._ArchiveJobsUnlocked(pending)
+            pending = []
 
 
-      if age == -1 or now - job_age[0] > age:
-        self._ArchiveJobUnlocked(jid)
+    if pending:
+      archived_count += self._ArchiveJobsUnlocked(pending)
+
+    return (archived_count, len(all_job_ids) - last_touched - 1)
 
   def _GetJobInfoUnlocked(self, job, fields):
 
   def _GetJobInfoUnlocked(self, job, fields):
+    """Returns information about a job.
+
+    @type job: L{_QueuedJob}
+    @param job: the job which we query
+    @type fields: list
+    @param fields: names of fields to return
+    @rtype: list
+    @return: list with one element for each field
+    @raise errors.OpExecError: when an invalid field
+        has been passed
+
+    """
     row = []
     for fname in fields:
       if fname == "id":
     row = []
     for fname in fields:
       if fname == "id":
@@ -799,9 +1286,16 @@ class JobQueue(object):
   def QueryJobs(self, job_ids, fields):
     """Returns a list of jobs in queue.
 
   def QueryJobs(self, job_ids, fields):
     """Returns a list of jobs in queue.
 
-    Args:
-    - job_ids: Sequence of job identifiers or None for all
-    - fields: Names of fields to return
+    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
+    processing for each job.
+
+    @type job_ids: list
+    @param job_ids: sequence of job identifiers or None for all
+    @type fields: list
+    @param fields: names of fields to return
+    @rtype: list
+    @return: list one element per job, each element being list with
+        the requested fields
 
     """
     jobs = []
 
     """
     jobs = []
@@ -819,6 +1313,8 @@ class JobQueue(object):
   def Shutdown(self):
     """Stops the job queue.
 
   def Shutdown(self):
     """Stops the job queue.
 
+    This shutdowns all the worker threads an closes the queue.
+
     """
     self._wpool.TerminateWorkers()
 
     """
     self._wpool.TerminateWorkers()